In [1]:
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Sequence, Dict, Tuple

import numpy as np
import polars as pl

pl.enable_string_cache()
pl.Config.set_tbl_rows(20)
pl.Config.set_tbl_cols(20)

class MacroError(RuntimeError):
    pass


In [2]:
%pip install requests --quiet

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.0.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
import requests
import polars as pl

def fred_series_to_df(series_id: str, api_key: str, start: str = "2019-01-01") -> pl.DataFrame:
    url = "https://api.stlouisfed.org/fred/series/observations"
    params = {
        "series_id": series_id,
        "api_key": api_key,
        "file_type": "json",
        "observation_start": start,
    }
    r = requests.get(url, params=params, timeout=30)
    r.raise_for_status()
    obs = r.json()["observations"]
    df = pl.DataFrame({
        "date": [o["date"] for o in obs],
        series_id: [None if o["value"] == "." else float(o["value"]) for o in obs],
    }).with_columns(pl.col("date").str.strptime(pl.Date, "%Y-%m-%d"))
    return df.drop_nulls()

def yoy_from_index(df: pl.DataFrame, col: str) -> pl.DataFrame:
    # mensual -> YoY = (x / x.shift(12) - 1)*100
    return (
        df.sort("date")
          .with_columns(((pl.col(col) / pl.col(col).shift(12) - 1.0) * 100.0).alias(f"{col}_yoy"))
          .drop_nulls()
    )

FRED_API_KEY = "6e425839b9859cd25c0b054cd074fb4d"

cpi = fred_series_to_df("CPIAUCSL", FRED_API_KEY)
indpro = fred_series_to_df("INDPRO", FRED_API_KEY)

infl = yoy_from_index(cpi, "CPIAUCSL").select(["date", pl.col("CPIAUCSL_yoy").alias("inflation_yoy")])
grow = yoy_from_index(indpro, "INDPRO").select(["date", pl.col("INDPRO_yoy").alias("growth_yoy")])

macro = infl.join(grow, on="date", how="inner").sort("date")
macro.write_csv("macro_data.csv")
macro.head(5)


date,inflation_yoy,growth_yoy
date,f64,f64
2020-01-01,2.599768,-2.290379
2020-02-01,2.341317,-1.423222
2020-03-01,1.49404,-5.315055
2020-04-01,0.313047,-17.318282
2020-05-01,0.198201,-16.047738


In [None]:
@dataclass(frozen=True)
class MacroConfig:
    cache_dir: Path = Path("data/cache")

    # Inputs creados previamente
    algos_features_good: Path = Path("data/cache/algos_features_good.parquet")
    benchmark_trades_clean: Path = Path("data/cache/benchmark_trades_clean.parquet")
    # INPUT EXTERNO (tuyo)
    macro_csv: Path = Path("data/datos_competicion/macro_data.csv")  # <-- pon aquí tu fichero macro

    # Columnas mínimas
    date_col: str = "date"
    inflation_col: str = "inflation_yoy"
    growth_col: str = "growth_yoy"

    # Investment Clock: cómo definimos "alto/bajo" sin mirar el futuro
    # rolling_median: usa mediana móvil histórica (shifted 1) para thresholding
    regime_method: str = "rolling_median"   # "rolling_median" | "expanding_median" | "zscore"
    window_months: int = 36                 # para rolling_median (si data mensual)
    min_months: int = 12

    # Evitar look-ahead por retraso de publicación (si quieres)
    # Ejemplo: si macro es mensual y se publica con lag, pon lag_months=1
    lag_months: int = 0

    # Outputs
    macro_states_parquet: Path = Path("data/cache/macro_states.parquet")
    algos_features_macro_parquet: Path = Path("data/cache/algos_features_macro.parquet")

    regime_profiles_long: Path = Path("data/cache/algo_regime_profiles_long.parquet")
    regime_profiles_wide: Path = Path("data/cache/algo_regime_profiles_wide.parquet")
    regime_rankings: Path = Path("data/cache/algo_regime_rankings.parquet")

    clusters_parquet: Path = Path("data/cache/algo_regime_clusters.parquet")

def _ensure_cache(cfg: MacroConfig) -> None:
    cfg.cache_dir.mkdir(parents=True, exist_ok=True)
    (cfg.cache_dir / "checks").mkdir(parents=True, exist_ok=True)


In [5]:
def _parse_date_any(x: pl.Expr) -> pl.Expr:
    # Soporta YYYY-MM-DD y YYYY-MM
    return pl.coalesce([
        x.str.strptime(pl.Date, "%Y-%m-%d", strict=False),
        x.str.strptime(pl.Date, "%Y-%m", strict=False),
    ])

def load_macro(cfg: MacroConfig) -> pl.LazyFrame:
    _ensure_cache(cfg)
    if not cfg.macro_csv.exists():
        raise MacroError(f"No existe macro_csv: {cfg.macro_csv} (pon tu fichero macro externo ahí).")

    lf = pl.scan_csv(str(cfg.macro_csv), infer_schema_length=0)

    cols = lf.columns
    for c in (cfg.date_col, cfg.inflation_col, cfg.growth_col):
        if c not in cols:
            raise MacroError(f"macro_csv debe tener columna '{c}'. Columnas: {cols}")

    lf = (
        lf
        .with_columns([
            pl.col(cfg.date_col).cast(pl.Utf8).alias("date_str"),
            _parse_date_any(pl.col(cfg.date_col).cast(pl.Utf8)).alias("date"),
            pl.col(cfg.inflation_col).cast(pl.Float64, strict=False).alias("inflation_yoy"),
            pl.col(cfg.growth_col).cast(pl.Float64, strict=False).alias("growth_yoy"),
        ])
        .filter(pl.col("date").is_not_null())
        .filter(pl.col("inflation_yoy").is_not_null())
        .filter(pl.col("growth_yoy").is_not_null())
        .sort("date")
    )

    # Normaliza % vs decimal (si viene tipo 3.2 => 3.2%, si viene 0.032 => 3.2%)
    # Heurística: si abs(media) < 0.5 asumimos decimal.
    lf_stats = lf.select([
        pl.col("inflation_yoy").mean().alias("infl_mu"),
        pl.col("growth_yoy").mean().alias("grow_mu"),
    ])
    stats = lf_stats.collect()
    infl_mu = float(stats["infl_mu"][0]) if stats["infl_mu"][0] is not None else 0.0
    grow_mu = float(stats["grow_mu"][0]) if stats["grow_mu"][0] is not None else 0.0

    infl_scale = 100.0 if abs(infl_mu) < 0.5 else 1.0
    grow_scale = 100.0 if abs(grow_mu) < 0.5 else 1.0

    lf = lf.with_columns([
        (pl.col("inflation_yoy") * infl_scale).alias("inflation_yoy"),
        (pl.col("growth_yoy") * grow_scale).alias("growth_yoy"),
    ])

    # Opcional: lag por publicación (evita look-ahead si el macro se publica con retraso)
    if cfg.lag_months > 0:
        lf = lf.with_columns([
            pl.col("inflation_yoy").shift(cfg.lag_months).alias("inflation_yoy"),
            pl.col("growth_yoy").shift(cfg.lag_months).alias("growth_yoy"),
        ]).filter(pl.col("inflation_yoy").is_not_null()).filter(pl.col("growth_yoy").is_not_null())

    return lf

def compute_investment_clock_states(cfg: MacroConfig, macro_lf: pl.LazyFrame) -> pl.LazyFrame:
    """
    Devuelve: date, inflation_yoy, growth_yoy, regime_id, regime_name, one-hot, + macro continuas.
    Sin fuga temporal: thresholds shifted (t-1).
    """
    # Detectar frecuencia aproximada (mensual vs diaria) -> para window en filas, asumimos mensual
    # Si tu macro es diaria, puedes bajar window_months o cambiar a "expanding_median".
    W = int(cfg.window_months)
    MINP = int(cfg.min_months)

    if cfg.regime_method == "rolling_median":
        infl_thr = pl.col("inflation_yoy").rolling_median(W, min_periods=MINP).shift(1)
        grow_thr = pl.col("growth_yoy").rolling_median(W, min_periods=MINP).shift(1)
    elif cfg.regime_method == "expanding_median":
        # expanding median no está directo en polars; aproximamos con quantile expanding usando cumcount+window grande
        # aquí usamos rolling con ventana enorme como proxy
        BIG = 10_000
        infl_thr = pl.col("inflation_yoy").rolling_median(BIG, min_periods=MINP).shift(1)
        grow_thr = pl.col("growth_yoy").rolling_median(BIG, min_periods=MINP).shift(1)
    elif cfg.regime_method == "zscore":
        # zscore rolling: high si z > 0
        infl_mu = pl.col("inflation_yoy").rolling_mean(W, min_periods=MINP).shift(1)
        infl_sd = pl.col("inflation_yoy").rolling_std(W, min_periods=MINP).shift(1)
        grow_mu = pl.col("growth_yoy").rolling_mean(W, min_periods=MINP).shift(1)
        grow_sd = pl.col("growth_yoy").rolling_std(W, min_periods=MINP).shift(1)
        infl_thr = infl_mu
        grow_thr = grow_mu
        # high = value > mean (z>0); sd se usa si quieres z explícito
    else:
        raise MacroError(f"regime_method inválido: {cfg.regime_method}")

    base = macro_lf.with_columns([
        infl_thr.alias("infl_thr"),
        grow_thr.alias("grow_thr"),
    ]).filter(pl.col("infl_thr").is_not_null()).filter(pl.col("grow_thr").is_not_null())

    base = base.with_columns([
        (pl.col("inflation_yoy") > pl.col("infl_thr")).alias("infl_high"),
        (pl.col("growth_yoy") > pl.col("grow_thr")).alias("grow_high"),
    ])

    # Regime id mapping:
    # 0 Reflation   (grow_low, infl_low)
    # 1 Recovery    (grow_high, infl_low)
    # 2 Overheat    (grow_high, infl_high)
    # 3 Stagflation (grow_low, infl_high)
    base = base.with_columns([
        pl.when(pl.col("grow_high") & pl.col("infl_high")).then(pl.lit(2))
         .when(pl.col("grow_high") & ~pl.col("infl_high")).then(pl.lit(1))
         .when(~pl.col("grow_high") & pl.col("infl_high")).then(pl.lit(3))
         .otherwise(pl.lit(0))
         .alias("regime_id"),
    ]).with_columns([
        pl.when(pl.col("regime_id") == 0).then(pl.lit("Reflation"))
         .when(pl.col("regime_id") == 1).then(pl.lit("Recovery"))
         .when(pl.col("regime_id") == 2).then(pl.lit("Overheat"))
         .otherwise(pl.lit("Stagflation"))
         .alias("regime_name"),
    ])

    # One-hot
    base = base.with_columns([
        (pl.col("regime_id") == 0).cast(pl.Int8).alias("reg_reflation"),
        (pl.col("regime_id") == 1).cast(pl.Int8).alias("reg_recovery"),
        (pl.col("regime_id") == 2).cast(pl.Int8).alias("reg_overheat"),
        (pl.col("regime_id") == 3).cast(pl.Int8).alias("reg_stagflation"),
    ])

    # Añade automáticamente otras columnas macro numéricas (si existen)
    # (y excluye las internas)
    exclude = {"date_str", cfg.date_col, "date", "inflation_yoy", "growth_yoy", "infl_thr", "grow_thr",
               "infl_high", "grow_high", "regime_id", "regime_name",
               "reg_reflation", "reg_recovery", "reg_overheat", "reg_stagflation"}
    num_cols = [c for c in macro_lf.columns if c not in exclude]

    # casteamos numéricos extra a Float64 cuando sea posible
    extra_exprs = []
    for c in num_cols:
        extra_exprs.append(pl.col(c).cast(pl.Float64, strict=False).alias(c))

    out = base
    if extra_exprs:
        out = out.with_columns(extra_exprs)

    keep = ["date", "inflation_yoy", "growth_yoy",
            "regime_id", "regime_name",
            "reg_reflation", "reg_recovery", "reg_overheat", "reg_stagflation"] + num_cols

    return out.select([c for c in keep if c in out.columns])

def write_macro_states(cfg: MacroConfig, states_lf: pl.LazyFrame, overwrite: bool = False) -> Path:
    _ensure_cache(cfg)
    out = cfg.macro_states_parquet
    if out.exists() and not overwrite:
        return out
    states_lf.sink_parquet(str(out), compression="zstd", statistics=True)
    return out


In [6]:
def join_macro_to_algo_features(cfg: MacroConfig, macro_states_path: Path, overwrite: bool = False) -> Path:
    _ensure_cache(cfg)
    if not cfg.algos_features_good.exists():
        raise MacroError(f"No existe {cfg.algos_features_good}. Ejecuta el preprocesado base primero.")

    out = cfg.algos_features_macro_parquet
    if out.exists() and not overwrite:
        return out

    feats = pl.scan_parquet(str(cfg.algos_features_good))
    macro = pl.scan_parquet(str(macro_states_path))

    # Join por date (algos es diario; macro puede ser mensual -> si tu macro es mensual,
    # conviene que macro.date sea el 1er día del mes y tú expandas a diario.
    # Aquí hacemos forward-fill mensual->diario con un asof join.
    # Polars: join_asof requiere sorted.
    feats_s = feats.sort("date")
    macro_s = macro.sort("date")

    joined = (
        feats_s.join_asof(
            macro_s,
            on="date",
            strategy="backward"  # usa el último macro conocido en esa fecha
        )
        .filter(pl.col("regime_id").is_not_null())
    )

    joined.sink_parquet(str(out), compression="zstd", statistics=True)
    return out


In [11]:
def compute_algo_regime_profiles(cfg: MacroConfig, features_macro_path: Path, overwrite: bool = False) -> Dict[str, Path]:
    _ensure_cache(cfg)

    out_long = cfg.regime_profiles_long
    out_wide = cfg.regime_profiles_wide
    out_rank = cfg.regime_rankings

    if out_long.exists() and out_wide.exists() and out_rank.exists() and not overwrite:
        return {"long": out_long, "wide": out_wide, "rankings": out_rank}

    lf = pl.scan_parquet(str(features_macro_path))

    # Métricas por (algo, régimen): mean, std, sharpe, max drawdown aproximado
    # Nota: el drawdown aquí se calcula sobre "close" dentro del conjunto de fechas del régimen.
    # Es una proxy útil para clustering; si quieres drawdown por segmentos contiguos, se puede refinar después.
    annual = 252.0

    prof = (
        lf.group_by(["algo_id", "regime_id"])
          .agg([
              pl.len().alias("n_days"),
              pl.col("ret_1d").mean().alias("ret_mean"),
              pl.col("ret_1d").std().alias("ret_std"),
              (pl.col("close") / pl.col("close").cum_max() - 1.0).min().alias("max_drawdown"),
          ])
          .with_columns([
              (pl.col("ret_std") * np.sqrt(annual)).alias("vol_ann"),
              pl.when(pl.col("ret_std") > 0)
                .then((pl.col("ret_mean") / pl.col("ret_std")) * np.sqrt(annual))
                .otherwise(None)
                .alias("sharpe_ann"),
          ])
          .with_columns([
              pl.when(pl.col("regime_id") == 0).then(pl.lit("Reflation"))
               .when(pl.col("regime_id") == 1).then(pl.lit("Recovery"))
               .when(pl.col("regime_id") == 2).then(pl.lit("Overheat"))
               .otherwise(pl.lit("Stagflation"))
               .alias("regime_name")
          ])
    )

    prof.sink_parquet(str(out_long), compression="zstd", statistics=True)

    # Vector 4D por algoritmo (p.ej. Sharpe por régimen). Esto es lo que sugiere el PDF para clustering.
    # pivot() requiere DataFrame (eager), no LazyFrame
    wide_df = (
        pl.read_parquet(str(out_long))
        .select(["algo_id", "regime_id", "sharpe_ann", "ret_mean", "max_drawdown", "n_days"])
        .pivot(
            on="regime_id",
            index="algo_id",
            values="sharpe_ann",
            aggregate_function="first",
        )
        .rename({
            "0": "sharpe_reflation",
            "1": "sharpe_recovery",
            "2": "sharpe_overheat",
            "3": "sharpe_stagflation",
        })
        .with_columns([
            pl.all().exclude("algo_id").fill_null(0.0)
        ])
    )
    wide_df.write_parquet(str(out_wide), compression="zstd", statistics=True)

    # Rankings por régimen (para priors / action-masking contextual).
    rankings = (
        pl.scan_parquet(str(out_long))
        .filter(pl.col("sharpe_ann").is_not_null())
        .with_columns([
            pl.col("sharpe_ann").rank(method="dense", descending=True).over("regime_id").alias("rank"),
        ])
        .select(["regime_id", "regime_name", "algo_id", "n_days", "sharpe_ann", "ret_mean", "vol_ann", "max_drawdown", "rank"])
        .sort(["regime_id", "rank"])
    )
    rankings.sink_parquet(str(out_rank), compression="zstd", statistics=True)

    return {"long": out_long, "wide": out_wide, "rankings": out_rank}

In [8]:
def kmeans_numpy(X: np.ndarray, k: int, seed: int = 42, iters: int = 50) -> Tuple[np.ndarray, np.ndarray]:
    """
    KMeans simple (kmeans++ init), devuelve (labels, centers).
    X: (n, d)
    """
    rng = np.random.default_rng(seed)
    n, d = X.shape
    if k <= 1 or k > n:
        raise ValueError(f"k inválido: {k} para n={n}")

    # kmeans++ init
    centers = np.empty((k, d), dtype=np.float64)
    idx0 = rng.integers(0, n)
    centers[0] = X[idx0]
    dist2 = np.sum((X - centers[0]) ** 2, axis=1)

    for i in range(1, k):
        probs = dist2 / (dist2.sum() + 1e-12)
        idx = rng.choice(n, p=probs)
        centers[i] = X[idx]
        new_dist2 = np.sum((X - centers[i]) ** 2, axis=1)
        dist2 = np.minimum(dist2, new_dist2)

    labels = np.zeros(n, dtype=np.int32)

    for _ in range(iters):
        # assign
        dists = np.sum((X[:, None, :] - centers[None, :, :]) ** 2, axis=2)  # (n,k)
        new_labels = np.argmin(dists, axis=1).astype(np.int32)

        if np.array_equal(new_labels, labels):
            break
        labels = new_labels

        # update
        for j in range(k):
            mask = labels == j
            if mask.any():
                centers[j] = X[mask].mean(axis=0)
            else:
                # cluster vacío: re-seed
                centers[j] = X[rng.integers(0, n)]

    return labels, centers

def cluster_algos_by_regime_profile(cfg: MacroConfig, profiles_wide_path: Path, k: int = 50, overwrite: bool = False) -> Path:
    _ensure_cache(cfg)
    out = cfg.clusters_parquet
    if out.exists() and not overwrite:
        return out

    df = pl.read_parquet(str(profiles_wide_path))
    if df.height == 0:
        raise MacroError("profiles_wide está vacío.")

    algo_ids = df["algo_id"].to_list()
    X = df.select(["sharpe_reflation", "sharpe_recovery", "sharpe_overheat", "sharpe_stagflation"]).to_numpy()

    # Normalización robusta (para clustering estable)
    mu = np.nanmean(X, axis=0)
    sd = np.nanstd(X, axis=0) + 1e-9
    Xn = (X - mu) / sd

    labels, centers = kmeans_numpy(Xn, k=k, seed=42, iters=60)

    out_df = pl.DataFrame({
        "algo_id": algo_ids,
        "cluster_id": labels.astype(np.int32),
    })

    out_df.write_parquet(str(out), compression="zstd", statistics=True)
    return out


In [9]:
def benchmark_forensics_by_regime(cfg: MacroConfig, macro_states_path: Path, overwrite: bool = False) -> Path:
    _ensure_cache(cfg)
    out = cfg.cache_dir / "benchmark_forensics_by_regime.parquet"
    if out.exists() and not overwrite:
        return out

    if not cfg.benchmark_trades_clean.exists():
        raise MacroError(f"No existe {cfg.benchmark_trades_clean}. Ejecuta benchmark preprocessing base primero.")

    trades = pl.scan_parquet(str(cfg.benchmark_trades_clean)).select([
        pl.col("productname").alias("algo_id"),
        "volume", "open_date", "close_date", "holding_days",
        "equity_EOD", "AUM", "equity_normalized"
    ]).filter(pl.col("open_date").is_not_null())

    macro = pl.scan_parquet(str(macro_states_path)).select(["date", "regime_id", "regime_name"]).sort("date")

    # asignamos a cada trade el régimen macro vigente en la fecha de apertura (backward asof)
    trades_s = trades.sort("open_date").rename({"open_date": "date"})
    joined = trades_s.join_asof(macro, on="date", strategy="backward").filter(pl.col("regime_id").is_not_null())

    # stats por régimen
    stats = (
        joined.group_by(["regime_id", "regime_name"])
              .agg([
                  pl.len().alias("n_trades_open"),
                  pl.col("algo_id").n_unique().alias("unique_algos_traded"),
                  pl.col("volume").sum().alias("sum_volume"),
                  pl.col("volume").mean().alias("avg_volume"),
                  pl.col("holding_days").mean().alias("avg_holding_days"),
                  pl.col("equity_normalized").mean().alias("avg_equity_norm"),
                  pl.col("AUM").mean().alias("avg_AUM"),
              ])
              .sort("regime_id")
    )

    stats.sink_parquet(str(out), compression="zstd", statistics=True)
    return out


In [None]:
def run_macro_pipeline(cfg: MacroConfig, overwrite: bool = False, k_clusters: int = 50) -> Dict[str, Path]:
    _ensure_cache(cfg)

    macro_lf = load_macro(cfg)
    states_lf = compute_investment_clock_states(cfg, macro_lf)
    macro_states_path = write_macro_states(cfg, states_lf, overwrite=overwrite)

    features_macro_path = join_macro_to_algo_features(cfg, macro_states_path, overwrite=overwrite)

    prof_paths = compute_algo_regime_profiles(cfg, features_macro_path, overwrite=overwrite)
    clusters_path = cluster_algos_by_regime_profile(cfg, prof_paths["wide"], k=k_clusters, overwrite=overwrite)

    bench_forensics_path = benchmark_forensics_by_regime(cfg, macro_states_path, overwrite=overwrite)

    return {
        "macro_states": macro_states_path,
        "algos_features_macro": features_macro_path,
        "algo_regime_profiles_long": prof_paths["long"],
        "algo_regime_profiles_wide": prof_paths["wide"],
        "algo_regime_rankings": prof_paths["rankings"],
        "algo_regime_clusters": clusters_path,
        "benchmark_forensics_by_regime": bench_forensics_path,
    }

cfg = MacroConfig(
    cache_dir=Path("data/cache"),
    algos_features_good=Path("data/cache/algos_features_good.parquet"),
    benchmark_trades_clean=Path("data/cache/benchmark_trades_clean.parquet"),
    macro_csv=Path("data/datos_competicion/macro_data.csv"),  # <-- pon tu CSV aquí
    regime_method="rolling_median",
    window_months=36,
    min_months=12,
    lag_months=0,     # si quieres evitar look-ahead por publicación, pon 1
)

paths = run_macro_pipeline(cfg, overwrite=False, k_clusters=50)
paths


  cols = lf.columns
  infl_thr = pl.col("inflation_yoy").rolling_median(W, min_periods=MINP).shift(1)
  grow_thr = pl.col("growth_yoy").rolling_median(W, min_periods=MINP).shift(1)
  num_cols = [c for c in macro_lf.columns if c not in exclude]
  return out.select([c for c in keep if c in out.columns])


{'macro_states': WindowsPath('cache/macro_states.parquet'),
 'algos_features_macro': WindowsPath('cache/algos_features_macro.parquet'),
 'algo_regime_profiles_long': WindowsPath('cache/algo_regime_profiles_long.parquet'),
 'algo_regime_profiles_wide': WindowsPath('cache/algo_regime_profiles_wide.parquet'),
 'algo_regime_rankings': WindowsPath('cache/algo_regime_rankings.parquet'),
 'algo_regime_clusters': WindowsPath('cache/algo_regime_clusters.parquet'),
 'benchmark_forensics_by_regime': WindowsPath('cache/benchmark_forensics_by_regime.parquet')}