
# Research Project: Adaptive ML for Behavioral Trading Signals
## Daily S&P 500 Equities (2014–2024) 
### X-HEC Double Degree Data & Finance
<hr>
Auguste PIROMALLI (auguste.piromalli@hec.edu)
<br>
Nadiy ABDEL KARIM DIT ARAMOUNI (nadiy.abdel-karim-dit-aramouni@hec.edu)
<br>
<hr>

In this notebook we perform the following:

- Data ingest & QC (features + close for 50+ long-term constituents).
- Target engineering for multi-horizon **forward returns** (default: 5D and 21D).
- Feature pre-lag (as-of)
- Cross-sectional constant feature filter
- Walk-forward CV (expanding window, monthly steps)
- Cross-sectional portfolio construction (long-only or long–short) with turnover & costs.
- Rank IC (+ sanity probes)
- Backtest analytics (CAGR, Sharpe, Sortino, max DD, turnover, coverage).
<hr>

***
***
### 1 - Imports & parameters

This section sets the foundations for data handling, modeling, and portfolio simulation.  


In [None]:

# --- Setup ---
import os, re, math, json, warnings
import random
from pathlib import Path
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import List, Tuple, Iterable, Dict
import matplotlib.pyplot as plt
import itertools

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, QuantileTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import ElasticNet
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.base import clone
from sklearn.linear_model import Ridge, ElasticNet, HuberRegressor
from sklearn.ensemble import HistGradientBoostingRegressor


warnings.filterwarnings('ignore')

# ----- Paths & dates -----
DATA_ROOT = Path('data/tickers')
CACHE_PARQUET = Path('data/equity_equitydata.parquet')
START_DATE = '2014-01-01'
END_DATE   = '2024-12-31'


# ---- Signals / targets ----
# Modeling target(s): forward log returns horizons (in trading days)
TARGET_HORIZONS = [5, 21]      # trading days, 1-week and ~1-month
MAX_H = max(TARGET_HORIZONS)   # used to embargo tail when training


# Walk-forward CV settings
TRAIN_YEARS   = 3      # expanding window starts at 3 years
STEP_MONTHS   = 1      # move forward one month between folds
EMBARGO_DAYS  =  MAX_H     #  gap between train and test windows

# Portfolio & costs
REBALANCE_FREQ   = 'W-FRI'  # 'W-FRI' or 'M' for monthly
LONG_SHORT       = True    # True -> long top q & short bottom q; False -> long-only
TOP_QUANTILE     = 0.2      # cross-sectional top quantile to hold
HOLDING_PERIOD   = 5        # days to hold each signal
TCOST_BPS        = 20        # transaction cost in basis points per turnover

# Master toggle: run multi-variant sweep? 
USE_VARIANT_SWEEP = False   # True only when exploring

#  Production model used by the whole notebook
PROD_MODEL_KIND   = 'ridge' #{"ridge","elasticnet","huber","hgbt"}
PROD_MODEL_PARAMS = {}

# Single source of truth for all earlier sections (_fitpredict_one_horizon, etc.)
MODEL_KIND, MODEL_PARAMS = PROD_MODEL_KIND, PROD_MODEL_PARAMS

# ---- Misc ----
DTYPE = np.float32
RANDOM_STATE = 42
random.seed(RANDOM_STATE)
np.random.seed(RANDOM_STATE)
os.environ["PYTHONHASHSEED"] = str(RANDOM_STATE)

os.chdir(Path().resolve()) 


***
### Risk Control Parameters

Define constraints and toggles for portfolio construction: liquidity filters, beta-neutralization, position caps, and leverage settings.  
These guard against illiquid names, unintended factor exposures, and concentration risks.  


In [2]:
# --- Risk-control toggles & params ---
USE_ADV_ELIGIBILITY = True    # True -> drop illiquid names by ADV percentile
ADV_LOOKBACK_DAYS   = 60       # rolling days for ADV
ADV_PCT             = 0.20     # keep names above this cross-sec percentile each date

USE_BETA_NEUTRAL    = False    # True -> neutralize scores vs rolling beta each date
BETA_LOOKBACK_DAYS  = 252      # rolling days for beta estimation

MIN_XS_NAMES        = 10       # guard for per-date regressions
RIDGE_EPS           = 1e-12    # small ridge for numerical safety

USE_PER_NAME_CAP   = True     # clip per-name weights at abs cap, then renormalize
PER_NAME_CAP       = 0.02     # e.g.: 2% max per name (long-only)

BETA_NEUTRAL_STRENGTH = 1.0   # 1.0 full, 0.5 half, 0 off = skip overlay

# target gross exposure (sum |w|). With LONG_SHORT=True, make_positions already splits 50/50 long & short.
GROSS_LEVERAGE   = 1.0       # set 2.0 for “1 long + 1 short” gross



***
### Plotting Helpers

This section defines reusable plotting functions to visualize model performance and signal quality.  

#### Functions Overview

- **`plot_equity_curve(eq, title)`**  
  Plots the cumulative equity curve of a single strategy over time.  
  Useful to quickly visualize performance evolution.  

- **`plot_rolling_ic(ic, window, title)`**  
  Plots the rolling mean of the **Information Coefficient (IC)**, which measures the rank correlation between model predictions and realized returns.  
  Indicates whether the signal has stable predictive power through time.  

- **`lead_lag_ic_profile_plotly(pred_df, df_prices, horizons, title, compute_rank_ic_fn)`**  
  Computes and plots IC across different lead/lag horizons.  
  - Positive horizons → predictive power for future returns.  
  - Negative horizons → check for look-ahead bias.  
  This is a key diagnostic for signal validity.  

- **`bar_ic_means(ic_dict, title)`**  
  Plots average IC values for each horizon or lag, making it easy to compare signals.  

- **`multi_equity_plot(eq_dict, title)`**  
  Compares equity curves from multiple strategies in one chart (e.g., baseline ML vs RL).  
  Helps assess relative robustness and performance.  

In [3]:
# --- Plotly helpers ---
import plotly.graph_objects as go
import plotly.io as pio
from IPython.display import display, HTML, clear_output

pd.options.display.float_format = '{:.4f}'.format
pio.templates.default = "simple_white"  

# Toggle: False to fall back to Matplotlib for any plot
USE_PLOTLY = True

def _fig_size(w=900, h=350): return dict(width=w, height=h)

def plot_equity_curve(eq: pd.Series, title="Strategy Equity Curve"):
    if not USE_PLOTLY:
        ax = eq.plot(figsize=(10,4), title=title); ax.set_ylabel("Equity"); return
    fig = go.Figure([go.Scatter(x=eq.index, y=eq.values, mode="lines")])
    fig.update_layout(title=title, yaxis_title="Equity", showlegend=False, **_fig_size())
    fig.update_xaxes(showgrid=False)
    fig.show()

def plot_rolling_ic(ic: pd.Series, window=60, title=None):
    roll = ic.rolling(window).mean()
    title = title or f"{window}D Rolling Rank IC"
    if not USE_PLOTLY:
        ax = roll.plot(figsize=(10,3), title=title); ax.axhline(0, lw=1); ax.set_ylabel("IC"); return
    fig = go.Figure([go.Scatter(x=roll.index, y=roll.values, mode="lines")])
    fig.add_hline(y=0, line_width=1)
    fig.update_layout(title=title, yaxis_title="IC", showlegend=False, **_fig_size())
    fig.show()

def lead_lag_ic_profile_plotly(
    pred_df,
    df_prices,
    horizons=range(-15,16),
    title="Lead/Lag IC profile (H in days)",
    compute_rank_ic_fn=None,   
):
    
    if compute_rank_ic_fn is None:
        compute_rank_ic_fn = globals().get("compute_rank_ic", None)
    if compute_rank_ic_fn is None:
        raise NameError("compute_rank_ic is not defined yet; define it or pass as compute_rank_ic_fn=...")

    prof = {}
    for h in progress(list(horizons), total=len(list(horizons)),
                      desc="Lead/Lag IC"):
        ic_h = compute_rank_ic_fn(pred_df, df_prices, horizon=h, min_names=10, demean=True)
        prof[h] = float(ic_h.mean()) if len(ic_h) else np.nan

    s = pd.Series(prof, name="IC")
    if not USE_PLOTLY:
        ax = s.plot(kind="bar", figsize=(10,3), title=title); ax.axhline(0, lw=1); ax.set_ylabel("IC"); return s
    fig = go.Figure([go.Bar(x=[int(i) for i in s.index], y=s.values)])
    fig.add_hline(y=0, line_width=1)
    fig.update_layout(title=title, yaxis_title="IC", showlegend=False, **_fig_size(950,300))
    fig.show()
    return s


def bar_ic_means(ic_dict: dict, title="Per-lag mean IC"):
    s = pd.Series(ic_dict).sort_index()
    if not USE_PLOTLY:
        ax = s.plot(kind="bar", figsize=(10,3), title=title); ax.axhline(0, lw=1); ax.set_ylabel("IC"); return s
    fig = go.Figure([go.Bar(x=[str(k) for k in s.index], y=s.values)])
    fig.add_hline(y=0, line_width=1)
    fig.update_layout(title=title, yaxis_title="IC", showlegend=False, **_fig_size(700,300))
    fig.show()
    return s

def multi_equity_plot(eq_dict: dict, title="Equity curves"):
    if not USE_PLOTLY:
        ax = pd.concat(eq_dict, axis=1).plot(figsize=(10,4), title=title); ax.set_ylabel("Equity"); return
    fig = go.Figure()
    for name, s in eq_dict.items():
        fig.add_trace(go.Scatter(x=s.index, y=s.values, mode="lines", name=name))
    fig.update_layout(title=title, yaxis_title="Equity", **_fig_size(950,380))
    fig.show()


***
### Progress Bars

To monitor long-running loops and model training, we enable **tqdm progress bars** and create two helper functions


In [4]:
# --- Progress bars -----------------------
from contextlib import nullcontext

USE_TQDM = True  # master toggle for all progress bars

try:
    from tqdm.auto import tqdm as _tqdm        
    try:
        from tqdm.contrib import tqdm_joblib as _tqdm_joblib  
    except Exception:
        _tqdm_joblib = None
except Exception:
    _tqdm = None
    _tqdm_joblib = None

def progress(iterable, **kwargs):
    """
    Serial loops: wrap any iterable with a tqdm bar when available and enabled.
    Falls back to identity iterator (no bar).
    """
    if USE_TQDM and _tqdm is not None:
        kwargs.setdefault("dynamic_ncols", True)
        kwargs.setdefault("leave", False)
        return _tqdm(iterable, **kwargs)
    return iterable

def joblib_progress(total: int, desc: str = "Working"):
    """
    Parallel loops (joblib.Parallel): return a context manager that
    shows a single tqdm bar while tasks run. Falls back to a no-op.
    Usage:
        with joblib_progress(len(tasks), "Model variants"):
            results = Parallel(...)(...)
    """
    if USE_TQDM and (_tqdm is not None) and (_tqdm_joblib is not None):
        return _tqdm_joblib(_tqdm(total=total, desc=desc, dynamic_ncols=True, leave=True))
    return nullcontext()


***
### Data Loader Utilities

This section defines helper functions for **loading and standardizing raw ticker data**.  
The goal is to normalize column names, ensure consistent formatting (dates, prices, volumes), and efficiently cache results.  

#### Functions

- **`sanitize_columns(cols)`**  
  Cleans messy column names by removing invisible characters, replacing special symbols (`%`, `/`, `-`, etc.), and enforcing unique, safe identifiers.  
  Example: `"Adj Close (%)"` → `"Adj_Close_pct"`  

- **`_infer_ticker_from_path(path)`**  
  Extracts the ticker symbol from a file name (e.g., `"ABT_technical_cleaned.csv"` → `"ABT"`).  
  Fallback: uses the parent folder name if the filename is ambiguous.  

- **`load_csv(path)`**  
  Loads a single ticker CSV into a standardized `DataFrame`.  
  Steps:
  1. Sanitize column names.  
  2. Parse and normalize dates (`Date` or `Datetime`).  
  3. Standardize price/volume column names (`close`, `volume`).  
  4. Convert all numeric columns to consistent type (`DTYPE`, here `float32`).  
  5. Set the date as index, drop duplicates, and attach the ticker symbol.  

- **`load_all(root, force=False)`**  
  Recursively loads **all CSVs** from the given root folder:  
  - If a cached parquet file (`CACHE_PARQUET`) exists and `force=False`, load from it directly for speed.  
  - Otherwise, load each CSV with `load_csv()`, concatenate, and save to parquet.  
  - Returns a single time-indexed `DataFrame` covering all tickers.  


In [5]:
# --- Loader utilities ---

def sanitize_columns(cols):
    """Normalize weird characters and make safe, unique column names."""
    out = []
    for c in cols:
        c2 = re.sub(r'[\u200b-\u200f\u202a-\u202e]', '', c)  # zero-width & bidi
        c2 = (c2.replace('%', 'pct').replace('/', '_').replace('\\', '_')
                  .replace('(', '_').replace(')', '_').replace(',', '_')
                  .replace(' ', '_').replace('-', '_'))
        c2 = re.sub(r'__+', '_', c2)
        c2 = re.sub(r'[^0-9A-Za-z_]', '', c2).strip('_')
        out.append(c2)
    # de-duplicate while preserving order
    seen, uniq = {}, []
    for c in out:
        if c not in seen:
            seen[c] = 0; uniq.append(c)
        else:
            seen[c] += 1; uniq.append(f"{c}_{seen[c]}")
    return uniq


def _infer_ticker_from_path(path: Path) -> str:
    """ABT_technical_cleaned.csv -> ABT; fallback to parent folder name."""
    stem = path.stem
    tkr = stem.split('_', 1)[0].upper() if '_' in stem else path.parent.name.upper()
    return tkr


def load_csv(path: Path) -> pd.DataFrame:
    """Load one ticker CSV with a consistent schema."""
    df = pd.read_csv(path)
    df.columns = sanitize_columns(df.columns)

    # date parse & normalize (all files share the same layout; expect 'Date' or 'date')
    date_col_candidates = [c for c in df.columns if c.lower() in ('date', 'datetime')]
    assert date_col_candidates, f"No date column found in {path.name}"
    dcol = date_col_candidates[0]

    df[dcol] = pd.to_datetime(df[dcol], errors='coerce', utc=True)
    df = df.dropna(subset=[dcol]).sort_values(dcol)
    df[dcol] = df[dcol].dt.tz_localize(None).dt.normalize()

    # rename canonical price/volume (files already cleaned, so this is just standardization)
    if 'Close' in df.columns:   # after sanitize, original 'Close' stays 'Close'
        df = df.rename(columns={'Close': 'close'})
    if 'close' not in df.columns and 'Close_1' in df.columns:
        df = df.rename(columns={'Close_1': 'close'})
    if 'Volume' in df.columns:
        df = df.rename(columns={'Volume': 'volume'})

    # numeric coercion for all non-date columns, then cast to DTYPE
    for c in df.columns:
        if c == dcol:
            continue
        if df[c].dtype == 'object':
            df[c] = df[c].astype(str).str.replace(',', '', regex=False)
        df[c] = pd.to_numeric(df[c], errors='coerce').astype(DTYPE, copy=False)

    # index & dedup
    df = df.set_index(dcol)
    df = df[~df.index.duplicated(keep='last')]

    # attach ticker
    df['ticker'] = _infer_ticker_from_path(path)
    return df


def load_all(root: Path, force: bool = False) -> pd.DataFrame:
    """
    Recursively load all CSVs under DATA_ROOT into a single DataFrame,
    cache to parquet for fast reloads.
    """
    if CACHE_PARQUET.exists() and not force:
        return pd.read_parquet(CACHE_PARQUET)

    csvs = sorted(root.rglob('*.csv'))
    if not csvs:
        raise FileNotFoundError(f"No CSV files found under {root.resolve()}")

    frames = [load_csv(p) for p in progress(csvs, desc="Load CSVs")]
    out = pd.concat(frames, axis=0).sort_index()
    out.to_parquet(CACHE_PARQUET)
    return out


***
***
### 2 - Data Loading & Validation

Now that our setup is complete and all helper functions are defined, we can load all raw ticker data into a unified `DataFrame`, restrict to the analysis window, and run basic quality checks.  

#### Steps
1. **Load data**  
   - `load_all(DATA_ROOT, force=True)` loads all CSVs and forces a fresh parquet build.  
   - In normal runs, `force=False` reuses the parquet cache for faster reload.  

2. **Restrict to analysis window**  
   - Trim dataset to `START_DATE : END_DATE` (2014–2024).  
   - Ensure index is timezone-naive and normalized to daily frequency.  
   - Cast `ticker` to a categorical type (saves memory, safer group ops).  

3. **Define feature list**  
   - `base_features`: all columns except `ticker` and `close`.  
   - `tickers`: list of all available tickers.  

4. **Sanity checks & coverage**  
   - Report total rows, number of tickers, number of raw features, and date span.  
   - Print missing value percentages for the worst 10 features.  
   - Compute per-ticker coverage (first/last date, row count).  

5. **Winsorization helper**  
   - `winsorize_clip()` will be used later to clip feature distributions at quantiles (reduce outlier impact).  


In [6]:
# Load (force=False in normal runs to use the parquet cache)
df = load_all(DATA_ROOT, force=True)

# Restrict to analysis window and normalize/index guarantees
df = df.loc[START_DATE:END_DATE].sort_index()
assert 'close' in df.columns, "'close' column missing after load"
assert getattr(df.index, "tz", None) is None, "Index should be tz-naive"
df.index = pd.to_datetime(df.index).normalize()   # idempotent
df.index.name = "date" 
df['ticker'] = df['ticker'].astype('category')    # memory + safer ops

# Feature list BEFORE targets are added (keeps naming clear later)
base_features = [c for c in df.columns if c not in ['ticker', 'close']]
tickers = df['ticker'].cat.categories.tolist()

print(f"Rows: {len(df):,} | Tickers: {len(tickers)} | Raw features: {len(base_features)}")
print("Span:", df.index.min(), "→", df.index.max())


miss = df[base_features].isna().mean().sort_values(ascending=False)
print("NaN% (worst 10)\n", miss.head(10))

# coverage sanity per ticker
cov = (df.reset_index()                           
         .groupby('ticker', as_index=False)
         .agg(first_date=('date', 'min'),
              last_date =('date', 'max'),
              rows      =('date', 'size'))
         .sort_values('rows', ascending=False))

print("\nPer-ticker coverage (top 10 by rows):")
print(cov.head(10))


# Winsorization helper (used later inside pipelines)
def winsorize_clip(X: pd.DataFrame, lo=0.005, hi=0.995):
    ql, qh = X.quantile(lo), X.quantile(hi)
    return X.clip(lower=ql, upper=qh, axis=1)


                                                          

Rows: 155,008 | Tickers: 56 | Raw features: 80
Span: 2014-01-01 00:00:00 → 2024-12-30 00:00:00
NaN% (worst 10)
 MA_Neg_Vol_255_ma                0.0183
KlingerSignal_Klinger_13_34_55   0.0023
Klinger_Klinger_13_34_55         0.0022
Klinger_13_34_55_hist            0.0022
Result_Detrended_14_ma           0.0022
Result_Schaff_10_23_50_ema       0.0007
Result_Prime_Number_5            0.0007
PMOSignal_PMO_35_20_10           0.0007
PMO_PMO_35_20_10                 0.0006
Result_Mass_Idx_25_27            0.0005
dtype: float64

Per-ticker coverage (top 10 by rows):
   ticker first_date  last_date  rows
0       A 2014-01-01 2024-12-30  2768
1    AAPL 2014-01-01 2024-12-30  2768
30    APA 2014-01-01 2024-12-30  2768
31    APD 2014-01-01 2024-12-30  2768
32    APH 2014-01-01 2024-12-30  2768
33    APO 2014-01-01 2024-12-30  2768
34   APTV 2014-01-01 2024-12-30  2768
35    ARE 2014-01-01 2024-12-30  2768
36    ATO 2014-01-01 2024-12-30  2768
37    AVB 2014-01-01 2024-12-30  2768


***
***
### 3 - Target Engineering (Forward Log Returns)

We build forward-return targets per ticker for horizons in `TARGET_HORIZONS`, then keep only rows where at least one target exists. Feature/target separation and basic guards included.


In [7]:
# --- Target engineering ---
def add_targets(d: pd.DataFrame, horizons: List[int]) -> pd.DataFrame:
    d = d.copy()
    d["log_close"] = np.log(d["close"])
    out = []
    n_tkr = d['ticker'].nunique()
    for tkr, g in progress(d.groupby("ticker", group_keys=False),
                           total=n_tkr, desc="Build forward targets"):
        for h in horizons:
            g[f"fwd_ret_{h}d"] = g["log_close"].shift(-h) - g["log_close"]
        out.append(g)

    return pd.concat(out)

# Build forward targets
df = add_targets(df, TARGET_HORIZONS)

# Columns
target_cols  = [f"fwd_ret_{h}d" for h in TARGET_HORIZONS]
exclude_cols = set(["ticker", "close", "log_close"] + target_cols)
feature_cols = [c for c in df.columns if c not in exclude_cols]

# Basic guards
assert len(feature_cols) > 0, "No features left after exclusion!"
assert not (set(feature_cols) & exclude_cols), "Feature/target column overlap."

# Keep rows where at least one target exists (drop only tail per ticker)
df = df[df[target_cols].notna().any(axis=1)].copy()

print("Features after target add:", len(feature_cols))
print("Shape after dropping all-NA target rows:", df.shape)


                                                             

Features after target add: 80
Shape after dropping all-NA target rows: (154657, 85)




We enforce an **as-of principle**: features used at time *t* must be known strictly before the prediction target.  

### Steps
1. **Lag features**  
   - Apply a global 1-day lag (`ASOF_LAG_DAYS=1`) to all features within each ticker.  
   - Prevents lookahead bias (using information from the same day as the return).  

2. **Drop incomplete rows**  
   - Remove any row missing the target or any feature.  

3. **Remove unstable features**  
   - Compute how often features are **constant across all tickers per day**.  
   - If a feature is constant in >80% of days, drop it (little/no cross-sectional info).  

4. **Finalize types**  
   - Cast all features to the chosen numeric dtype (`float32`).  

### Output
- `df_asof`: fully lagged and cleaned dataset.  
- Reports shape, number of features kept, and how many dropped.  


In [8]:
# As-of feature freeze (global 1D lag)
ASOF_LAG_DAYS = 1
TARGET = f"fwd_ret_{TARGET_HORIZONS[0]}d"

df_asof = df.copy()
df_asof[feature_cols] = df_asof.groupby('ticker', group_keys=False)[feature_cols].shift(ASOF_LAG_DAYS)

# Drop rows missing either target or any feature
df_asof = df_asof.dropna(subset=[TARGET] + feature_cols, how='any')

# Optional: drop cross-sectionally constant-by-day features (speed + stability)
cs_nuniq = df_asof.groupby(level=0)[feature_cols].nunique()
share_const = (cs_nuniq <= 1).mean(axis=0).sort_values(ascending=False)
CONST_THRESH = 0.80
const_like = share_const[share_const > CONST_THRESH].index.tolist()
feature_cols = [c for c in feature_cols if c not in const_like]
df_asof = df_asof.dropna(subset=feature_cols + [TARGET]).copy()

# dtype
for c in feature_cols:
    df_asof[c] = df_asof[c].astype(DTYPE)

print(f"[asof] df_asof: {df_asof.shape} | kept features: {len(feature_cols)} (dropped {len(const_like)})")


[asof] df_asof: (151310, 85) | kept features: 80 (dropped 0)


### Eligibility Lookback & Leakage Safeguards

We impose two safeguards before modeling:

1) **Minimum history requirement**  
   A name must have at least `MIN_HIST_DAYS` (=252) days of history **after the as-of lag** to be eligible for scoring.  
   This stabilizes rolling stats (e.g., z-scores, betas) and avoids cold-start noise.

2) **Leakage drop-list**  
   Certain features are known/assumed to leak future information (directly or via look-ahead preprocessing).  
   We drop them from both the feature list and the training view to prevent optimistic bias.

**Outputs**
- `df_asof`: filtered to names with sufficient history and without leaky fields.
- `feature_cols`: updated to exclude leaky features.
- Assertions ensure no NAs remain in the modeling view and no dropped feature survives in `feature_cols`.


In [9]:
# Require a minimum lookback before a name can be scored
MIN_HIST_DAYS = 252
tmp = df_asof.reset_index().rename(columns={"index":"date"}) if df_asof.index.name is None else df_asof.reset_index()
tmp["first_date"] = tmp.groupby("ticker")["date"].transform("min")
tmp["days_live"]  = (tmp["date"] - tmp["first_date"]).dt.days
df_asof = tmp[tmp["days_live"] >= MIN_HIST_DAYS].drop(columns=["first_date","days_live"]).set_index("date")

In [10]:
# --- leakage drop-list (8 features) ---
LEAKY_FEATURES = [
    "Result_Detrended_14_ma",
    "MA_Neg_Vol_255_ma",
    "STARC_Bands_Bottom_STARC_Bands_15_5_13_y",
    "STARC_Bands_Median_STARC_Bands_15_5_13_y",
    "STARC_Bands_Top_STARC_Bands_15_5_13_y",
    "Result_Hist_Vol_10_252_1",
    "PMOSignal_PMO_35_20_10",
    "Gator_13_8_8_5_5_3_hist1",
]

# Drop safely from feature list and training view
existing = [c for c in LEAKY_FEATURES if c in df_asof.columns]
missing  = sorted(set(LEAKY_FEATURES) - set(existing))

feature_cols = [c for c in feature_cols if c not in LEAKY_FEATURES]
df_asof = df_asof.drop(columns=LEAKY_FEATURES, errors="ignore").copy()

print(f"[leak-drop] dropped {len(existing)}/{len(LEAKY_FEATURES)} present → {existing}")
if missing:
    print("[leak-drop] not present (already absent):", missing)

# Safety
assert not (set(feature_cols) & set(LEAKY_FEATURES))
assert df_asof[feature_cols + [TARGET]].notna().all().all(), "Found NA after drop."

print(f"[asof] df_asof now: {df_asof.shape} | kept features: {len(feature_cols)}")


[leak-drop] dropped 8/8 present → ['Result_Detrended_14_ma', 'MA_Neg_Vol_255_ma', 'STARC_Bands_Bottom_STARC_Bands_15_5_13_y', 'STARC_Bands_Median_STARC_Bands_15_5_13_y', 'STARC_Bands_Top_STARC_Bands_15_5_13_y', 'Result_Hist_Vol_10_252_1', 'PMOSignal_PMO_35_20_10', 'Gator_13_8_8_5_5_3_hist1']
[asof] df_asof now: (141572, 77) | kept features: 72


***
***
### 4 - Walk-Forward Cross-Validation Windows

We implement **walk-forward (expanding window) cross-validation**, which is more realistic for time series than random splits.  

#### Key points
- **Training window** starts with a minimum of `TRAIN_YEARS` (default = 3 years).  
- **Expands forward** as time progresses (no shrinking).  
- **Testing window** = the next calendar month after the embargo period.  
- **Embargo** = `EMBARGO_DAYS` ensures no overlap between train and test (avoids leakage from target horizons).  
- **Step size** = `STEP_MONTHS` controls how far the training window advances each iteration (default = 1 month).  

#### Outputs
- `WFWindow` dataclass: stores train/test start and end dates.  
- `wf_windows`: full list of folds.  
- `wf_windows_run`: subset of folds (optionally truncated by `N_FOLDS_LIMIT`).  


In [11]:
# Walk-forward windows
from dataclasses import dataclass
from dateutil.relativedelta import relativedelta

@dataclass
class WFWindow:
    train_start: pd.Timestamp
    train_end:   pd.Timestamp
    test_start:  pd.Timestamp
    test_end:    pd.Timestamp

def generate_wf_windows(dates: pd.DatetimeIndex,
                        train_years: int = TRAIN_YEARS,
                        step_months: int = STEP_MONTHS,
                        embargo_days: int = EMBARGO_DAYS):
    """
    Expanding training window; test on the next calendar month.
    Embargo between train_end and test_start to avoid look-ahead via target horizon.
    """
    dmin, dmax   = dates.min(), dates.max()
    start_train  = dmin
    start_end    = start_train + relativedelta(years=train_years) - pd.Timedelta(days=1)
    cursor       = start_end + pd.Timedelta(days=embargo_days)

    while cursor <= dmax - pd.tseries.offsets.MonthEnd(0):
        test_start = (cursor + pd.offsets.MonthBegin(0)).normalize()
        test_end   = (test_start + pd.offsets.MonthEnd(0))
        yield WFWindow(start_train, start_end, test_start, test_end)
        start_end += pd.offsets.MonthEnd(step_months)
        cursor     = start_end + pd.Timedelta(days=embargo_days)

 # Build windows on the AS-OF dataset 
unique_dates = pd.DatetimeIndex(sorted(df_asof.index.unique())).normalize()
wf_windows   = list(generate_wf_windows(unique_dates, TRAIN_YEARS, STEP_MONTHS, EMBARGO_DAYS))

N_FOLDS_LIMIT  = None
wf_windows_run = wf_windows if not N_FOLDS_LIMIT else wf_windows[:N_FOLDS_LIMIT]
print("WF folds:", len(wf_windows_run))
wf_windows_run[:3]



WF folds: 88


[WFWindow(train_start=Timestamp('2014-09-17 00:00:00'), train_end=Timestamp('2017-09-16 00:00:00'), test_start=Timestamp('2017-11-01 00:00:00'), test_end=Timestamp('2017-11-30 00:00:00')),
 WFWindow(train_start=Timestamp('2014-09-17 00:00:00'), train_end=Timestamp('2017-09-30 00:00:00'), test_start=Timestamp('2017-11-01 00:00:00'), test_end=Timestamp('2017-11-30 00:00:00')),
 WFWindow(train_start=Timestamp('2014-09-17 00:00:00'), train_end=Timestamp('2017-10-31 00:00:00'), test_start=Timestamp('2017-12-01 00:00:00'), test_end=Timestamp('2017-12-31 00:00:00'))]

### Walk-Forward Training & Prediction (AS-OF)

This section defines the **train/test splitter**, **model pipelines**, and two WF runners:

#### 1) `_split_wf(df_view, w, embargo_bd, asof_lag)`
Creates train/test slices for one walk-forward window `w`:
- **Ceiling guard**: training data stops strictly **before** the first test observation **and** before any info that could bleed due to the AS-OF lag (`asof_lag`) and horizon.
- **Embargo**: removes the last `embargo_bd` business days from the effective training end to avoid leakage via target computation.

#### 2) `build_pipeline(model)`
Builds a preprocessing + estimator pipeline:
- **Preprocessing**: median imputation → standardization.
- **Models**: `"elasticnet"` (with grid for `alpha`, `l1_ratio`) or `"ridge"` (grid for `alpha`).
- Returns `(pipeline, param_grid)` for tuning.

#### 3) `fit_predict_wf_fast_asof(...)`
Fast baseline using **fixed Ridge(alpha)** (no CV). For each fold:
- Split with `_split_wf` → fit → predict on test month.
- Returns a concatenated `DataFrame` with `y_pred`, `ticker`, `wf_fold`, `model`.

#### 4) `fit_predict_wf_asof(...)`
Tuned variant with **RandomizedSearchCV** over the model grid using **TimeSeriesSplit** (3 splits) on the training chunk of each fold.
- Scoring: **neg MSE** (minimizes MSE on the target forward return).
- Progress bar shows `n_iter × CV_splits` tasks per fold.
- Returns predictions with best-found hyperparameters per fold.

**Notes**
- All inputs must already respect **AS-OF lagging** and prior filters (`df_asof`, `feature_cols`, `TARGET`).
- Use `wf_windows_run` created earlier for consistent fold definitions.
- For reproducibility, the seeds are fixed and preprocessing is identical across models.


In [12]:
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Ridge, ElasticNet
from sklearn.model_selection import RandomizedSearchCV, TimeSeriesSplit
from pandas.tseries.offsets import BDay


def _split_wf(df_view, w, embargo_bd=MAX_H, asof_lag=ASOF_LAG_DAYS):
    # Ceiling: stop training before the first test observation and before target horizon
    ceiling = w.test_start - BDay(asof_lag + 1)         # nothing from (t-1) bleeding into test
    train_end_eff = min(w.train_end, ceiling) - BDay(max(1, embargo_bd))
    tr = df_view.loc[(df_view.index >= w.train_start) & (df_view.index <= train_end_eff)]
    te = df_view.loc[(df_view.index >= w.test_start)  & (df_view.index <= w.test_end)]
    return tr, te, train_end_eff

def build_pipeline(model: str = "elasticnet"):
    pre = Pipeline([
        ("impute", SimpleImputer(strategy="median")),
        ("scale",  StandardScaler(with_mean=True, with_std=True)),
    ])
    if model == "elasticnet":
        est = ElasticNet(random_state=RANDOM_STATE, max_iter=2000)
        params = {
            "est__alpha":    np.logspace(-4, 1, 50),
            "est__l1_ratio": np.linspace(0.0, 1.0, 21),
        }
    elif model == "ridge":
        est = Ridge(random_state=RANDOM_STATE)
        params = {"est__alpha": np.logspace(-4, 2, 20)}
    else:
        raise ValueError("model must be 'elasticnet' or 'ridge'")
    return Pipeline([("pre", pre), ("est", est)]), params

def fit_predict_wf_fast_asof(df_view: pd.DataFrame,
                             feature_cols: list,
                             target_col: str,
                             windows=None,
                             alpha: float = 1.0) -> pd.DataFrame:
    if windows is None: windows = wf_windows_run
    preds, pipe = [], Pipeline([
        ("impute", SimpleImputer(strategy="median")),
        ("scale",  StandardScaler()),
        ("est",    Ridge(alpha=alpha, random_state=RANDOM_STATE)),
    ])
    for k, w in enumerate(progress(windows, total=len(windows),
                                   desc=f"WF ridge_fast [{target_col}]"), 1):

        tr, te, tr_end_eff = _split_wf(df_view, w)
        if te.empty: 
            print(f"Fold {k:02d} skipped (te=0)"); 
            continue
        tr = tr.dropna(subset=[target_col])
        X_tr, y_tr = tr[feature_cols], tr[target_col]
        X_te       = te[feature_cols]
        if len(X_tr)==0 or len(X_te)==0:
            print(f"Fold {k:02d} skipped (tr={len(X_tr)}, te={len(X_te)})")
            continue
        pipe.fit(X_tr, y_tr)
        yhat = pipe.predict(X_te)
        out = pd.DataFrame(
            {"ticker": te["ticker"].values, "y_pred": yhat,
             "wf_fold": k, "model": "ridge_fast"},
            index=te.index
        ); out.index.name = "date"
        preds.append(out)
        print(f"Fold {k:02d} | Train {w.train_start.date()}→{pd.Timestamp(tr_end_eff).date()} "
              f"| Test {w.test_start.date()}→{w.test_end.date()} | est={pipe.named_steps['est']}")
    return pd.concat(preds).sort_index()

def fit_predict_wf_asof(df_view: pd.DataFrame,
                        feature_cols: list,
                        target_col: str,
                        model: str = "elasticnet",
                        n_iter: int = 25,
                        windows=None) -> pd.DataFrame:
    if windows is None: windows = wf_windows_run
    pipe, params = build_pipeline(model)
    preds = []
    for k, w in enumerate(progress(windows, total=len(windows),
                                  desc=f"WF {model} [n_iter={n_iter}]"), 1):
        tr, te, tr_end_eff = _split_wf(df_view, w)
        if te.empty:
            print(f"Fold {k:02d} skipped (te=0)"); 
            continue
        tr = tr.dropna(subset=[target_col])
        X_tr, y_tr = tr[feature_cols], tr[target_col]
        X_te       = te[feature_cols]
        cv = TimeSeriesSplit(n_splits=3)
        tuner = RandomizedSearchCV(
        pipe, params, n_iter=n_iter, random_state=RANDOM_STATE,
        n_jobs=-1, cv=cv,
        scoring="neg_mean_squared_error", verbose=0
        )
        
        total_tasks = n_iter * cv.get_n_splits()
        with joblib_progress(total_tasks, desc=f"RS[{model}] fold {k:02d}"):
            tuner.fit(X_tr, y_tr)
        out = te[["ticker"]].copy()
        out["y_pred"]  = tuner.predict(X_te)
        out["wf_fold"] = k
        out["model"]   = model
        out.index.name = "date"
        preds.append(out)
        print(f"Fold {k:02d} | Train {w.train_start.date()}→{pd.Timestamp(tr_end_eff).date()} "
              f"| Test {w.test_start.date()}→{w.test_end.date()} | best={tuner.best_params_}")
    return pd.concat(preds).sort_index()


***
***
### 5 - Portfolio Construction & Backtest Engine

This section converts model predictions into portfolio weights, simulates trades, and evaluates performance.
#### Components

- **`_cap_and_renorm(row, cap, long_short)`**  
  Applies a **per-name weight cap** and renormalizes weights:
  - Long/short: cap on both sides, then rescale each side to 50% gross.  
  - Long-only: cap, then scale down so total gross ≤ 1.  

- **`make_positions(signal_df, ...)`**  
  Converts daily model scores (`y_pred`) into cross-sectional weights:
  1. Normalize scores to a daily matrix.  
  2. Resample at rebalance frequency (weekly/monthly).  
  3. Rank scores → take top `q` fraction (and bottom if long/short).  
  4. Assign equal weights per side, adjusted for leverage, eligibility masks, and per-name caps.  

- **`compute_returns_matrix(d)`**  
  Converts `(date, ticker, close)` into a **date × ticker matrix of simple returns**, de-duplicating per `(date,ticker)`.

- **`backtest(signal_pred, prices_df, ...)`**  
  Simulates trading given predictions and prices:
  - Build rebalance-day positions with `make_positions`.  
  - Forward-fill to daily weights, then **lag one day** (trade at next day’s open).  
  - Approximate multi-day holds with rolling mean of weights.  
  - Compute turnover and apply transaction costs.  
  - Output daily portfolio returns, turnover, and cumulative equity curve.  

- **`perf_stats(ts, freq)`**  
  Computes key performance metrics from a return series:
  - CAGR, annualized volatility, Sharpe ratio, max drawdown, sample size.  

#### Notes
- This design enforces **as-of trading discipline**: trade one day after signal, no lookahead.  
- Turnover-based costs ensure realistic performance attribution.  
- Weights respect liquidity/eligibility and per-name caps from earlier toggles.  


In [13]:
def _cap_and_renorm(row: pd.Series, cap: float, long_short: bool) -> pd.Series:
    if cap is None or cap <= 0:
        return row

    w = row.copy()

    if long_short:
        # 1) hard box cap on both sides
        w = w.clip(lower=-cap, upper=cap)

        # 2) side-wise sums after capping
        pos_sum = w[w > 0].sum()
        neg_sum = -w[w < 0].sum()

        # 3) scale DOWN only, never up (keeps the cap hard)
        target_side = 0.5
        if pos_sum > 0:
            w.loc[w > 0] *= min(target_side / pos_sum, 1.0)
        if neg_sum > 0:
            w.loc[w < 0] *= min(target_side / neg_sum, 1.0)

        return w

    else:
        # long-only: hard cap then scale DOWN to gross=1
        w = w.clip(lower=0.0, upper=cap)
        gross = w.sum()
        if gross > 0:
            w *= min(1.0 / gross, 1.0)
        return w


def make_positions(
    signal_df: pd.DataFrame,
    long_short: bool = LONG_SHORT,
    top_q: float = TOP_QUANTILE,
    freq: str = REBALANCE_FREQ,
    gross_leverage: float = 1.0,
    eligibility: pd.DataFrame | None = None,
    per_name_cap: float | None = None,
    debug: bool = False
) -> pd.DataFrame:
    """
    Build cross-sectional weights from daily scores without backfilling future signals.
    Optionally enforce ADV eligibility per date.
    """
    if per_name_cap is None:
        per_name_cap = (globals().get('PER_NAME_CAP') 
                        if globals().get('USE_PER_NAME_CAP', False) else None)
    
    sig = (signal_df[['ticker', 'y_pred']]
           .rename(columns={'y_pred': 'score'})
           .copy())
    if getattr(sig.index, "tz", None) is not None:
        sig.index = sig.index.tz_localize(None)
    sig.index = pd.to_datetime(sig.index).normalize()
    sig = sig.dropna(subset=['score'])

    if debug:
        print("[make_positions] pred rows:", len(sig),
              "| unique dates:", sig.index.nunique(),
              "| range:", sig.index.min(), "->", sig.index.max(),
              "| tickers:", sig['ticker'].nunique())

    scores_daily = (sig.reset_index(names='date')
                      .sort_values(['date','ticker'])
                      .drop_duplicates(['date','ticker'], keep='last')
                      .pivot(index='date', columns='ticker', values='score')
                      .astype(float)
                      .sort_index())

    scores = scores_daily.resample(freq).last().dropna(how="all")

    if debug:
        valid_days = int((scores.notna().sum(axis=1) > 0).sum())
        print("[make_positions] scores shape:", scores.shape,
              "| valid rebalance days:", valid_days)

    # ranks (1=best); build weights with optional eligibility mask
    w = pd.DataFrame(0.0, index=scores.index, columns=scores.columns)
    for dt, row in scores.iterrows():
        r = row.dropna()
        if eligibility is not None and dt in eligibility.index:
            elig_row = eligibility.loc[dt].reindex(r.index)
            r = r[elig_row.fillna(False)]
        if len(r) == 0:
            continue
        k = max(1, int(round(len(r) * top_q)))
        ranks = r.rank(ascending=False, method='first')
        top = ranks.nsmallest(k).index
        if long_short:
            bot = ranks.nlargest(k).index
            w.loc[dt, top] = +gross_leverage / (2*k)
            w.loc[dt, bot] = -gross_leverage / (2*k)
        else:
            w.loc[dt, top] = gross_leverage / k
            
        if per_name_cap is not None and per_name_cap > 0:
            w.loc[dt] = _cap_and_renorm(w.loc[dt], per_name_cap, long_short)    
    return w




def compute_returns_matrix(d: pd.DataFrame) -> pd.DataFrame:
    """
    Convert (date, ticker, close) rows into a date x ticker matrix of SIMPLE returns.
    De-duplicates (date,ticker) by taking the last row.
    """
    x = (d.reset_index(names='date')
           .sort_values(['date','ticker'])
           .drop_duplicates(['date','ticker'], keep='last'))
    px = x.pivot(index='date', columns='ticker', values='close')
    if getattr(px.index, "tz", None) is not None:
        px.index = px.index.tz_localize(None)
    px.index = pd.to_datetime(px.index).normalize()
    rets = px.pct_change().fillna(0.0)  # simple returns
    return rets.sort_index()


def backtest(
    signal_pred: pd.DataFrame,
    prices_df: pd.DataFrame,
    long_short: bool = LONG_SHORT,
    top_q: float = TOP_QUANTILE,
    freq: str = REBALANCE_FREQ,
    holding_period: int = HOLDING_PERIOD,
    tcost_bps: float = TCOST_BPS,
    eligibility: pd.DataFrame | None = None,
    per_name_cap: float | None = None,
    gross_leverage: float = 1.0, 
    debug: bool = False
):
    """
    Align predictions to daily returns, trade next day, and hold each rebalance for
    `holding_period` days via rolling mean of weights (staggered overlap).
    """
    # positions on rebalance calendar
    w_rb = make_positions(signal_pred, long_short=long_short, top_q=top_q,
                          freq=freq, gross_leverage=gross_leverage, eligibility=eligibility, 
                          per_name_cap=per_name_cap, debug=debug)
    # daily returns matrix
    rets = compute_returns_matrix(prices_df)

    # align: forward-fill positions to daily dates, then select only return dates
    weights = (w_rb
               .reindex(w_rb.index.union(rets.index))
               .sort_index()
               .ffill()
               .reindex(rets.index))

    # common universe
    common = rets.columns.intersection(weights.columns)
    if debug:
        print("[backtest] columns | rets:", len(rets.columns),
              "weights:", len(weights.columns), "common:", len(common))
        print("[backtest] weights_w first date:", w_rb.index.min(),
              "| after-align nonzero rows:",
              int((weights[common].abs().sum(axis=1) > 1e-12).sum()))
        if w_rb.index.min() in weights.index:
            print("[backtest] sum |w| at first rebalance:",
                  float(weights.loc[w_rb.index.min(), common].abs().sum()))

    weights = weights[common].fillna(0.0)
    rets    = rets[common].fillna(0.0)

    # enter next day; approximate H-day hold with rolling mean
    w_held = weights.shift(1).rolling(holding_period).mean().fillna(0.0)

    # mask out pre-deployment days (before first nonzero)
    mask_live = (w_held.abs().sum(axis=1) > 1e-12)
    if mask_live.any():
        first_live = mask_live.idxmax()  # first True
        w_held = w_held.loc[first_live:]
        rets   = rets.loc[first_live:]

    # turnover & costs
    turnover = w_held.sub(w_held.shift(1).fillna(0.0)).abs().sum(axis=1)
    cost = turnover * (tcost_bps / 1e4)

    # portfolio simple return
    port_ret = (w_held * rets).sum(axis=1) - cost
    equity   = (1.0 + port_ret.fillna(0.0)).cumprod()

    out = pd.DataFrame({'ret': port_ret, 'turnover': turnover, 'equity': equity})
    out.index.name = 'date'
    return out, w_held


def perf_stats(ts: pd.Series, freq: int = 252) -> dict:
    r = ts.dropna()
    if len(r) == 0:
        return dict(CAGR=np.nan, Vol=np.nan, Sharpe=np.nan, MaxDD=np.nan, N=0)
    cagr   = (1 + r).prod()**(freq/len(r)) - 1
    vol    = r.std() * np.sqrt(freq)
    sharpe = (r.mean() / r.std()) * np.sqrt(freq) if r.std() > 0 else np.nan
    curve  = (1 + r).cumprod()
    maxdd  = (curve / curve.cummax() - 1).min()
    return dict(CAGR=cagr, Vol=vol, Sharpe=sharpe, MaxDD=maxdd, N=len(r))


### Risk-Control Helpers: Liquidity Eligibility & Beta Neutralization

This section builds two overlays that can be applied during portfolio construction.

#### 1) `build_adv_eligibility(df_prices, lookback=60, pct=0.20)`
Computes an **ADV-based eligibility mask** (date × ticker → boolean):
- ADV ≈ rolling mean of `close × volume` over `lookback` days.
- **Leak-safe:** uses `shift(1)` so day-t eligibility never uses day-t information.
- For each date, marks tickers with ADV ≥ cross-sectional `pct` percentile (default top 20% liquid) as eligible.

#### 2) `build_rolling_beta(df_prices, lookback=252)`
Computes **rolling betas** of each ticker to an equal-weight market return:
- Market proxy: cross-sectional mean of simple returns each day.
- Beta per ticker via rolling covariance/variance over `lookback`.
- **Leak-safe:** `shift(1)` to use info only up to t-1.

#### 3) `neutralize_vs_beta(pred_df, betas, min_names, strength)`
Removes unintended **market beta exposure** from daily scores:
- For each date, regress scores (`y_pred`) on betas cross-sectionally and take residuals.  
- `min_names` guards against thin cross-sections; `strength` (0..1) scales the neutralization (1 = full).  
- Demeans both variables per date; ridge term (`RIDGE_EPS`) stabilizes low-variance cases.  
- Output preserves `(date, ticker, y_pred)` with residualized scores.

**Usage notes**
- Apply `eligibility` mask inside `make_positions(...)` to skip illiquid names.
- Apply `neutralize_vs_beta(...)` to `pred_df` **before** turning scores into weights if `USE_BETA_NEUTRAL=True`.


In [14]:
# --- Risk-control helpers (ADV eligibility & beta-neutral overlay) ---

def build_adv_eligibility(df_prices: pd.DataFrame,
                          lookback: int = 60,
                          pct: float = 0.20) -> pd.DataFrame:
    """
    Returns a boolean DataFrame (date x ticker) marking names eligible by ADV percentile.
    Leak-safe: uses rolling mean of (close*volume) shifted by 1 day.
    """
    d = (df_prices.reset_index(names='date')
                  .drop_duplicates(['date','ticker'], keep='last')
                  .pivot(index='date', columns='ticker', values='close'))
    v = (df_prices.reset_index(names='date')
                  .drop_duplicates(['date','ticker'], keep='last')
                  .pivot(index='date', columns='ticker', values='volume'))
    dv  = (d * v).astype(float)
    adv = dv.rolling(lookback).mean().shift(1)  # no day-of info

    # per-date threshold at the requested percentile
    thr = adv.apply(lambda row: np.nanpercentile(row.values, pct*100), axis=1)
    elig = adv.ge(thr, axis=0) & adv.notna()
    elig.index.name = 'date'
    return elig

def build_rolling_beta(df_prices: pd.DataFrame,
                       lookback: int = 252) -> pd.DataFrame:
    """
    Rolling beta vs equal-weight market return. Leak-safe via shift(1).
    Returns DataFrame (date x ticker) of betas.
    """
    px = (df_prices.reset_index(names='date')
                     .drop_duplicates(['date','ticker'], keep='last')
                     .pivot(index='date', columns='ticker', values='close'))
    rets = px.pct_change()
    mkt  = rets.mean(axis=1)  # equal-weight market
    var_m = mkt.rolling(lookback).var()
    betas = pd.DataFrame(index=rets.index, columns=rets.columns, dtype=float)

    # rolling covariance per column vs market
    for c in rets.columns:
        cov_im = rets[c].rolling(lookback).cov(mkt)
        betas[c] = cov_im / (var_m + RIDGE_EPS)

    betas = betas.shift(1)  # use only info up to t-1
    betas.index.name = 'date'
    return betas

def neutralize_vs_beta(pred_df: pd.DataFrame,
                       betas: pd.DataFrame,
                       min_names: int = MIN_XS_NAMES,
                       strength: float = 1.0) -> pd.DataFrame:
    """
    Per-date cross-sectional regression of scores on beta; returns residualized scores.
    Handles duplicate (date,ticker) by keeping the last one.
    """
    s_tidy = (pred_df.reset_index().rename(columns={'index':'date'})
              .sort_values(['date','ticker'])
              .drop_duplicates(['date','ticker'], keep='last'))
    s = s_tidy.pivot_table(index='date', columns='ticker', values='y_pred', aggfunc='last')

    # align betas to score matrix
    b = betas.reindex(s.index).reindex(columns=s.columns)

    def _resid_row(y, x):
        y = y.astype(float).copy()
        x = x.astype(float).copy()
        m = np.isfinite(y) & np.isfinite(x)
        if m.sum() < min_names:
            return y * np.nan
        # demean both; if beta variance is ~0, just return demeaned y
        y0 = y[m] - y[m].mean()
        x0 = x[m] - x[m].mean()
        denom = float(np.dot(x0, x0))
        if denom <= RIDGE_EPS:
            y_res = y.copy()
            y_res[m] = y0
            return y_res
        beta = float(np.dot(x0, y0) / (denom + RIDGE_EPS))
        y_res = y.copy()
        y_res[m] = y0 - strength * beta * x0
        return y_res

    resid = s.combine(b, func=_resid_row, overwrite=False)

    out = (resid.stack(dropna=False)
                 .rename('y_pred')
                 .reset_index()
                 .dropna(subset=['y_pred']))
    return out.set_index('date')[['ticker','y_pred']]


***
***
### 6 - Lag-Ensemble (Train & Test) + Horizon Ensemble

This section **trains models on walk-forward training sets and evaluates them on test folds**, then combines predictions across multiple AS-OF lags (and optionally horizons).

#### Key steps

1. **Train–test loop**
   - For each walk-forward window (`wf_windows_run`):
     - **Train** the model pipeline on the training slice.
     - **Test** by predicting on the next month’s test slice.
   - This is repeated independently for each lag (1D, 3D, 5D, 10D).

2. **Lag ensemble**
   - Collect predictions from all lags per (date, ticker).
   - Weighting:
     - **Train-only IC weights**: average daily IC on the training span only, shrunk toward equal weights by `IC_SHRINK_LAMBDA`.
     - Or fallback: equal weights, or test-set IC weighting if enabled.

3. **(Optional) Horizon ensemble**
   - Repeat the train–test lag-ensemble procedure for each horizon (`5D`, `21D`).
   - Combine horizon-level predictions using train-only IC weights (`HORIZON_IC_SHRINK_LAMBDA`).

4. **Residualization vs 1D return**
   - On the combined **test predictions**, regress out exposure to prior 1-day returns per date.
   - Output: final test-set prediction series `PRED_USE` with `(date, ticker, y_pred)`.

#### Why this matters
- Ensures a **clean separation between training and testing** — no information leakage.
- **Lag ensemble** reduces timing risk; **train-only IC weighting** keeps it out-of-sample.
- Residualization vs 1D return avoids overfitting to trivial short-term patterns.


In [15]:
from functools import reduce

# --- Train & predict: lag-ensemble + neutralization -------------------------
USE_TRAIN_IC_WEIGHTS   = True   # default off
IC_SHRINK_LAMBDA       = 0.6     # 0=pure IC weights, 1=pure equal weights

USE_FAST = True
TARGET   = f"fwd_ret_{TARGET_HORIZONS[0]}d"

# As-of lags in TRADING days (1 = current baseline).
# LAG_ENS = [1, 3, 5, 10]

LAG_ENS = [1, 3]

# weighting scheme for the ensemble: "equal" or "ic"
# ("ic" will try to compute mean IC per-lag; if compute_rank_ic isn't defined yet,
# it will auto-fallback to equal weights)
WEIGHT_METHOD = "equal"

# --- Horizon ensemble toggles ---
USE_HORIZON_ENSEMBLE   = False    # <— blend 5D + 21D if True
HORIZON_LIST           = [5, 21] # must be subset of TARGET_HORIZONS
HORIZON_IC_SHRINK_LAMBDA = 0.6   # 0 = pure train-IC weights, 1 = equal weights

# --- Model variant toggles ----------------------------------------------------
MODEL_KIND   =  "ridge"      # {"ridge","elasticnet","huber","hgbt"}  (hgbt = HistGBR)
MODEL_PARAMS = {}           # per-model overrides; leave {} for defaults


# default params (used if key missing in MODEL_PARAMS)
MODEL_DEFAULTS = {
    "ridge":      {"alpha": 1.0},
    "elasticnet": {"alpha": 0.5, "l1_ratio": 0.2, "max_iter": 5000},
    "huber":      {"epsilon": 1.35, "alpha": 1e-4, "max_iter": 1000},
    "hgbt":       {"max_depth": 3, "learning_rate": 0.05, "max_iter": 250,
                   "l2_regularization": 0.0, "max_leaf_nodes": 31}
}


def _make_pipeline(kind: str, params: dict):
    p = (MODEL_DEFAULTS.get(kind, {}) | (params or {}))
    steps = [("impute", SimpleImputer(strategy="median"))]

    if kind in {"ridge","elasticnet","huber"}:
        steps.append(("scale", StandardScaler()))

    if kind == "ridge":
        est = Ridge(alpha=float(p["alpha"]), random_state=RANDOM_STATE)
    elif kind == "elasticnet":
        est = ElasticNet(alpha=float(p["alpha"]),
                         l1_ratio=float(p["l1_ratio"]),
                         max_iter=int(p["max_iter"]),
                         random_state=RANDOM_STATE)
    elif kind == "huber":
        est = HuberRegressor(epsilon=float(p["epsilon"]),
                             alpha=float(p["alpha"]),
                             max_iter=int(p["max_iter"]))
    elif kind == "hgbt":
        est = HistGradientBoostingRegressor(
            max_depth=int(p["max_depth"]),
            learning_rate=float(p["learning_rate"]),
            max_iter=int(p["max_iter"]),
            l2_regularization=float(p["l2_regularization"]),
            max_leaf_nodes=int(p["max_leaf_nodes"]),
            random_state=RANDOM_STATE)
    else:
        raise ValueError(f"Unknown MODEL_KIND={kind}")

    return Pipeline(steps + [("est", est)])


def fit_predict_wf_with_pipe(df_asof, feature_cols, target_col, windows, pipe) -> pd.DataFrame:
    """
    Run your existing walk-forward windows with a custom sklearn Pipeline.

    This is robust to different _split_wf return signatures by:
    - calling _split_wf(df_asof, w)
    - extracting the first and last DataFrame-like objects as train/test
    """
    out = []

    for w in windows:
        parts = _split_wf(df_asof, w)

        dfs = [p for p in parts if isinstance(p, pd.DataFrame)]
        if not dfs:
            raise ValueError(f"_split_wf returned no DataFrames for window {w!r}: {type(parts)=}")

        # first DF is train, last DF is test
        tr_df = dfs[0]
        te_df = dfs[-1] if len(dfs) >= 2 else None

        if te_df is None or getattr(te_df, "empty", True):
            continue

        pp = clone(pipe)
        pp.fit(tr_df[feature_cols], tr_df[target_col])
        yhat = pp.predict(te_df[feature_cols])

        fold = pd.DataFrame(
            {"ticker": te_df["ticker"].values, "y_pred": yhat},
            index=te_df.index
        )
        fold.index.name = "date"
        out.append(fold)

    if not out:
        return pd.DataFrame(columns=["ticker","y_pred"])

    return (pd.concat(out).sort_index()
            .reset_index().set_index("date")[["ticker","y_pred"]].sort_index())


def _compute_rank_ic_quick(pred_df: pd.DataFrame,
                           df_prices: pd.DataFrame,
                           horizon: int = 5,
                           min_names: int = 10,
                           demean: bool = True) -> pd.Series:
    # predictions → tidy
    p = (pred_df[['ticker','y_pred']]
         .rename(columns={'y_pred':'score'})
         .rename_axis('date').reset_index())
    p['date'] = pd.to_datetime(p['date']).dt.normalize()
    p = p[['date','ticker','score']]

    # prices → wide, normalized index
    px = (df_prices.reset_index(names='date')
            .sort_values(['date','ticker'])
            .drop_duplicates(['date','ticker'], keep='last')
            .pivot(index='date', columns='ticker', values='close')
            .astype(float).sort_index())

    # realized forward simple returns
    if horizon > 0:
        fwd = px.pct_change(horizon).shift(-horizon)
    elif horizon < 0:
        H = abs(horizon); fwd = px.pct_change(H)
    else:
        fwd = px * 0.0

    real = (fwd.stack().rename('fwd')
              .reset_index().rename(columns={'level_1':'ticker'}))

    ic_df = p.merge(real, on=['date','ticker'], how='left')

    if demean:
        ic_df['fwd']   = ic_df.groupby('date')['fwd'].transform(lambda x: x - x.mean())
        ic_df['score'] = ic_df.groupby('date')['score'].transform(lambda x: x - x.mean())

    def _daily_ic(g):
        g = g.dropna(subset=['score','fwd'])
        if len(g) < min_names: return np.nan
        return g[['score','fwd']].corr(method='spearman').iloc[0,1]

    ic = ic_df.groupby('date', sort=True).apply(_daily_ic).dropna()
    ic.index.name = 'date'
    ic.name = f'IC_{horizon}d'
    return ic


def _asof_extra_lag(df_base, cols, extra):
    """Apply an extra per-ticker shift to features only (keeps TARGET intact)."""
    if extra <= 0:
        return df_base
    d = df_base.copy()
    d[cols] = d.groupby('ticker', group_keys=False)[cols].shift(extra)
    return d.dropna(subset=cols + [TARGET])


def _neutralize_vs_ret1_quick(pred_df, df_prices):
    ret1 = df_prices[['ticker','close']].groupby('ticker')['close'].pct_change(1).rename('ret_1d')
    ret1 = (df_prices.assign(ret_1d=ret1)
            .assign(date=lambda x: x.index)
            .reset_index(drop=True)[['date','ticker','ret_1d']])
    sc = pred_df.reset_index().rename(columns={'index':'date'})
    m  = sc.merge(ret1, on=['date','ticker'], how='left')
    def _res(g):
        x, y = g['ret_1d'].to_numpy(), g['y_pred'].to_numpy()
        mask = np.isfinite(x) & np.isfinite(y)
        if mask.sum() < 10: g['y_pred'] = np.nan; return g
        beta = np.dot(x[mask], y[mask]) / (np.dot(x[mask], x[mask]) + 1e-12)
        g.loc[mask, 'y_pred'] = y[mask] - beta * x[mask]
        return g
    return (m.groupby('date', group_keys=False).apply(_res)
              .dropna(subset=['y_pred'])
              .set_index('date')[['ticker','y_pred']])

def compute_train_ic_weights(df_asof, feature_cols, target_col, lags, df_prices,
                             use_fast=True, alpha=1.0,
                             neutralize_ret1=True, shrink_lambda=0.5,
                             model_kind: str = None, model_params: dict | None = None):

    # build train window = expanding start .. first fold's effective train_end
    first_w = wf_windows_run[0]
    tr, _, _ = _split_wf(df_asof, first_w)
    if tr.empty:
        return {L: 1.0/len(lags) for L in lags}

    # fit once per lag on TRAIN ONLY, score train, neutralize, compute daily Spearman IC
    ic_means = {}
    px_wide = (df_prices.reset_index(names='date')
             .sort_values(['date','ticker'])
             .drop_duplicates(['date','ticker'], keep='last')
             .pivot(index='date', columns='ticker', values='close')
             .astype(float)
             .sort_index())

    for L in progress(lags, total=len(lags), desc="Train IC by lag"):
        dL = _asof_extra_lag(df_asof, feature_cols, extra=L-1)
        dL = dL.sort_index()  # safe no-op if already sorted
        tr_start, tr_end = tr.index.min(), tr.index.max()
        trL = dL[(dL.index >= tr_start) & (dL.index <= tr_end)]
        if trL.empty:
            ic_means[L] = 0.0
            continue

        pipe = _make_pipeline(model_kind or MODEL_KIND, model_params or MODEL_PARAMS)


        X_tr, y_tr = trL[feature_cols], trL[target_col]
        pipe.fit(X_tr, y_tr)
        yhat = pipe.predict(X_tr)

        pred = pd.DataFrame({"ticker": trL["ticker"].values, "y_pred": yhat}, index=trL.index)
        pred.index.name = "date"
        if neutralize_ret1:
            pred = _neutralize_vs_ret1_quick(pred, df_prices)

        # realized fwd simple returns on train
        H = int(target_col.split('_')[2].rstrip('d')) 
        if pred.empty:
            ic_means[L] = 0.0
            continue

        # ensure pred dates are normalized
        pred.index = pd.to_datetime(pred.index).normalize()

        start, end = pred.index.min(), pred.index.max()
        px_tr = px_wide[(px_wide.index >= start) & (px_wide.index <= end)]

        fwd = px_tr.pct_change(H).shift(-H)

        # tidy merge
        sc   = pred.reset_index().rename(columns={'index':'date'})
        real = (fwd.stack().rename('fwd')
                    .reset_index()
                    .rename(columns={'level_1': 'ticker'}))
        ic_df = sc.merge(real, on=['date','ticker'], how='left')


        def _daily_ic(g):
            g = g.dropna(subset=['y_pred','fwd'])
            if len(g) < 10: return np.nan
            return g[['y_pred','fwd']].corr(method='spearman').iloc[0,1]

        ic = ic_df.groupby('date', sort=True).apply(_daily_ic).dropna()
        ic_means[L] = float(ic.mean()) if len(ic) else 0.0

    w = np.array([max(ic_means[L], 0.0) for L in lags], float)
    if w.sum() == 0: w[:] = 1.0
    w /= w.sum()

    eq = np.ones_like(w) / len(lags)
    w_final = (1 - shrink_lambda)*w + shrink_lambda*eq
    return dict(zip(lags, w_final))


def _cs_zscore_by_date(df_scores: pd.DataFrame) -> pd.DataFrame:
    """
    Cross-sectional z-score per date on column 'y_pred'.
    Input: index=date, cols=['ticker','y_pred']
    Output: same schema with z-scored y_pred.
    """
    def _z(g):
        s = g['y_pred'].astype(float)
        sd = float(s.std(ddof=0))
        if not np.isfinite(sd) or sd <= 1e-12:
            return pd.Series(0.0, index=s.index)  
        return (s - float(s.mean())) / sd
    return df_scores.groupby(level=0, group_keys=False).apply(lambda g: g.assign(y_pred=_z(g)))

def _make_df_asof_for_h(df_asof_base: pd.DataFrame, target_h: int) -> pd.DataFrame:
    """
    Reuse your existing df_asof (features already shifted 1D, leaky features dropped),
    but keep only rows where the H-target exists.
    """
    tcol = f"fwd_ret_{target_h}d"
    assert tcol in df_asof_base.columns, f"Missing target {tcol} in df_asof"
    return df_asof_base.dropna(subset=[tcol])

def _fitpredict_one_horizon(target_h: int,
                            model_kind: str = None,
                            model_params: dict | None = None
                           ) -> tuple[dict, pd.DataFrame, pd.DataFrame]:
    """
    Build the lag-ensemble for a given horizon H (5 or 21):
      - per-lag predictions via your WF runner
      - train-only IC weights across lags (if enabled)
      - per-date neutralization vs 1D return
    Returns:
      pred_by_lag_H : {L: DataFrame(date,[ticker,y_pred_L{L}])}
      pred_ens_raw  : DataFrame(date,[ticker, y_pred])  (pre-neutralization)
      PRED_BASE_H   : DataFrame(date,[ticker, y_pred])  (post 1D-neutralization)
    """
    tcol = f"fwd_ret_{target_h}d"
    dfH  = _make_df_asof_for_h(df_asof, target_h)

    # 1) per-lag predictions
    pred_by_lag_H = {}
    for L in progress(LAG_ENS, total=len(LAG_ENS), desc=f"Lags (H={target_h}d)"):
        dfL = _asof_extra_lag(dfH, feature_cols, extra=L-1)
        pipeH = _make_pipeline(model_kind or MODEL_KIND, model_params or MODEL_PARAMS)
        pL = fit_predict_wf_with_pipe(dfL, feature_cols, tcol, wf_windows_run, pipeH)

        pL = pL.sort_index().rename(columns={'y_pred': f'y_pred_L{L}'})
        pred_by_lag_H[L] = pL[['ticker', f'y_pred_L{L}']]

    # 2) align and weight across lags (train-only IC if enabled)
    dfs = [v.set_index(['ticker'], append=True) for v in pred_by_lag_H.values()]
    pred_ens = reduce(lambda a,b: a.join(b, how='inner'), dfs).sort_index()
    if USE_TRAIN_IC_WEIGHTS:
        W_lag = compute_train_ic_weights(
        df_asof=dfH, feature_cols=feature_cols, target_col=tcol, lags=LAG_ENS, df_prices=df,
        use_fast=True, alpha=1.0, neutralize_ret1=True, shrink_lambda=IC_SHRINK_LAMBDA,
        model_kind=(model_kind or MODEL_KIND), model_params=(model_params or MODEL_PARAMS)
    )

    else:
        W_lag = {L: 1.0/len(LAG_ENS) for L in LAG_ENS}

    pred_ens['y_pred'] = 0.0
    for L in LAG_ENS:
        pred_ens['y_pred'] += float(W_lag[L]) * pred_ens[f'y_pred_L{L}']

    pred_ens_raw = (pred_ens[['y_pred']]
                    .reset_index()
                    .set_index('date')[['ticker','y_pred']]
                    .sort_index())

    # 3) neutralize vs prior 1D return (same as 5D path for consistency)
    predN = (pred_ens_raw.reset_index().rename(columns={'index':'date'})
             .merge(
                 df.assign(ret_1d=df.groupby('ticker')['close'].pct_change(1))\
                   .assign(date=lambda x: x.index)\
                   .reset_index(drop=True)[['date','ticker','ret_1d']],
                 on=['date','ticker'], how='left'))

    def _res(g):
        x, y = g['ret_1d'].to_numpy(), g['y_pred'].to_numpy()
        m = np.isfinite(x) & np.isfinite(y)
        if m.sum() < 10: g['y_pred'] = np.nan; return g
        beta = np.dot(x[m], y[m]) / (np.dot(x[m], x[m]) + 1e-12)
        g.loc[m, 'y_pred'] = y[m] - beta * x[m]
        return g

    PRED_BASE_H = (predN.groupby('date', group_keys=False).apply(_res)
                   .dropna(subset=['y_pred'])
                   .set_index('date')[['ticker','y_pred']])

    # Dedup safety
    PRED_BASE_H = (PRED_BASE_H.reset_index().sort_values(['date','ticker'])
                   .drop_duplicates(['date','ticker'], keep='last')
                   .set_index('date')[['ticker','y_pred']])

    return pred_by_lag_H, pred_ens_raw, PRED_BASE_H



# 1) fit/predict for each as-of lag
pred_by_lag = {}
for L in progress(LAG_ENS, total=len(LAG_ENS), desc="Lag ensemble (main)"):
    dfL   = _asof_extra_lag(df_asof, feature_cols, extra=L-1)
    pipe0 = _make_pipeline(MODEL_KIND, MODEL_PARAMS)
    pL    = fit_predict_wf_with_pipe(dfL, feature_cols, TARGET, wf_windows_run, pipe0)
    pL    = pL.sort_index().rename(columns={'y_pred': f'y_pred_L{L}'})
    pred_by_lag[L] = pL[['ticker', f'y_pred_L{L}']]
    if USE_TQDM and '_tqdm' in globals() and _tqdm is not None:
        _tqdm.write(f"[ens] L={L} | rows={len(pL)} | dates={pL.index.nunique()} | span={pL.index.min()}→{pL.index.max()}")
    else:
        print(f"[ens] L={L} | rows={len(pL)} | dates={pL.index.nunique()} | span={pL.index.min()}→{pL.index.max()}")


# 2) align all lag predictions on (date, ticker)
dfs = [v.set_index(['ticker'], append=True) for v in pred_by_lag.values()]  # -> MultiIndex (date,ticker)
pred_ens = reduce(lambda a,b: a.join(b, how='inner'), dfs).sort_index()
lag_cols = [f'y_pred_L{L}' for L in LAG_ENS]

# 3) weights
if USE_TRAIN_IC_WEIGHTS:
    WEIGHTS = compute_train_ic_weights(
    df_asof, feature_cols, TARGET, LAG_ENS, df,
    neutralize_ret1=True, shrink_lambda=IC_SHRINK_LAMBDA,
    model_kind=MODEL_KIND, model_params=MODEL_PARAMS
    )
    print("[ens] Train-only IC weights:", {L: round(WEIGHTS[L],4) for L in LAG_ENS})

elif WEIGHT_METHOD == "ic" and 'compute_rank_ic' in globals():
    try:
        # original test-set IC weighting path
        px_tidy = df[['ticker','close']].copy()
        if getattr(px_tidy.index, "tz", None) is not None:
            px_tidy.index = px_tidy.index.tz_localize(None)
        px_tidy.index = pd.to_datetime(px_tidy.index).normalize()
        ic_means = {}
        for L in LAG_ENS:
            tmp = (pred_ens[[f'y_pred_L{L}']]
                   .rename(columns={f'y_pred_L{L}':'y_pred'})
                   .reset_index(level=1)
                   .rename_axis('date'))
            ic = compute_rank_ic(tmp, px_tidy, horizon=TARGET_HORIZONS[0], min_names=10, demean=True)
            ic_means[L] = float(ic.mean()) if len(ic) else 0.0
        w = np.array([max(ic_means[L], 0.0) for L in LAG_ENS], float)
        if w.sum() == 0: w[:] = 1.0
        w = w / w.sum()
        WEIGHTS = dict(zip(LAG_ENS, w))
        print("[ens] IC weights:", {L: round(WEIGHTS[L],4) for L in LAG_ENS})
    except Exception as e:
        print(f"[ens] IC weighting failed ({e}); using equal weights.")
        WEIGHTS = {L: 1.0/len(LAG_ENS) for L in LAG_ENS}

else:
    WEIGHTS = {L: 1.0/len(LAG_ENS) for L in LAG_ENS}

# 4) weighted average prediction
pred_ens['y_pred'] = 0.0
for L in LAG_ENS:
    pred_ens['y_pred'] += WEIGHTS[L] * pred_ens[f'y_pred_L{L}']

# reshape back to expected schema: index=date, columns ['ticker','y_pred']
pred_ens = (pred_ens[['y_pred']]
            .reset_index()
            .set_index('date')[['ticker','y_pred']]
            .sort_index())

print("Pred-ensemble rows:", len(pred_ens),
      "| unique dates:", pred_ens.index.nunique(),
      "| tickers:", pred_ens['ticker'].nunique())

# 5) per-date neutralization vs prior 1D return
ret1 = (df[['ticker','close']].groupby('ticker')['close'].pct_change(1).rename('ret_1d'))
ret1 = (df.assign(ret_1d=ret1)
          .assign(date=lambda x: x.index)
          .reset_index(drop=True)[['date','ticker','ret_1d']])

predN = (pred_ens.reset_index().rename(columns={'index':'date'})
         .merge(ret1, on=['date','ticker'], how='left'))

def _resid(g):
    x, y = g['ret_1d'].to_numpy(), g['y_pred'].to_numpy()
    m = np.isfinite(x) & np.isfinite(y)
    if m.sum() < 10:
        g['y_pred'] = np.nan
        return g
    beta = np.dot(x[m], y[m]) / (np.dot(x[m], x[m]) + 1e-12)
    g.loc[m, 'y_pred'] = y[m] - beta * x[m]
    return g

PRED_USE = (predN.groupby('date', group_keys=False).apply(_resid)
            .dropna(subset=['y_pred'])
            .set_index('date')[['ticker','y_pred']])


Lag ensemble (main):  50%|█████     | 1/2 [00:49<00:49, 49.77s/it]

[ens] L=1 | rows=99841 | dates=1796 | span=2017-11-01 00:00:00→2024-12-22 00:00:00


                                                                  

[ens] L=3 | rows=99841 | dates=1796 | span=2017-11-01 00:00:00→2024-12-22 00:00:00


                                                              

[ens] Train-only IC weights: {1: 0.4917, 3: 0.5083}
Pred-ensemble rows: 102193 | unique dates: 1796 | tickers: 56


### Horizon Ensemble (Train & Test)

This section extends the lag-ensemble to blend **multiple target horizons** (5D and 21D in this case).  
Again, all weights are learned only on the **training data** of the first fold and applied to the test folds.

#### Key steps

1. **Per-horizon lag-ensembles**
   - Run `_fitpredict_one_horizon(5)` and `_fitpredict_one_horizon(21)`.
   - Each call internally:
     - Fits models on training windows (walk-forward).
     - Predicts on test folds.
     - Produces lag-ensemble predictions (`PRED_BASE_H`) neutralized vs prior 1D returns.

2. **Train-only horizon weights**
   - For each horizon `H ∈ {5, 21}`:
     - Refit on the training span only.
     - Compute mean daily Spearman IC of horizon predictions vs realized returns.
   - Shrink horizon IC weights toward equal (`HORIZON_IC_SHRINK_LAMBDA`) to reduce variance.
   - Example: if 5D has stronger IC than 21D in training, it gets more weight.

3. **Blend across horizons**
   - Convert both horizon predictions to **cross-sectional z-scores** per date (`z5`, `z21`).  
   - Weighted combination:  
     \[
     y\_blend = w_5 \cdot z5 + w_{21} \cdot z21
     \]
   - Output: `PRED_BASE_HENS` = horizon-ensemble prediction series.

4. **Final predictions**
   - Set `PRED_USE = PRED_BASE_HENS` if horizon-ensemble is enabled.
   - Otherwise, fall back to the single-horizon (5D) lag-ensemble result.

#### Why this matters
- **Multiple horizons** capture short-term (5D) and medium-term (21D) dynamics.  
- **Train-only weighting** ensures horizon blending is free of test-set leakage.  
- **Z-scoring before blend** puts different horizons on comparable scales.  


In [16]:
# ----- 21D clone + Horizon ensemble (toggle) -----
if USE_HORIZON_ENSEMBLE:
    assert set(HORIZON_LIST).issubset(set(TARGET_HORIZONS)), "HORIZON_LIST must be in TARGET_HORIZONS"

    # 5D & 21D paths
    pred_by_lag_5,  pred_ens_5_raw,  PRED_BASE_5D  = _fitpredict_one_horizon(5,  MODEL_KIND, MODEL_PARAMS)
    pred_by_lag_21, pred_ens_21_raw, PRED_BASE_21D = _fitpredict_one_horizon(21, MODEL_KIND, MODEL_PARAMS)

    # --- Train-only horizon weights (IC on train; shrink toward equal) ---
    def _train_ic_for_horizon(H: int,
                              model_kind: str = MODEL_KIND,
                              model_params: dict | None = MODEL_PARAMS) -> float:
        """
        Train-time IC for a given horizon H using the SAME model variant as production.
        """
        tcol = f"fwd_ret_{H}d"
        dfH  = _make_df_asof_for_h(df_asof, H)

        # lag weights on TRAIN for this horizon, with same model variant
        Wlag = compute_train_ic_weights(
            df_asof=dfH, feature_cols=feature_cols, target_col=tcol, lags=LAG_ENS, df_prices=df,
            neutralize_ret1=True, shrink_lambda=IC_SHRINK_LAMBDA,
            model_kind=model_kind, model_params=model_params
        )

        # first train DataFrame from the first window
        parts = _split_wf(dfH, wf_windows_run[0])
        dfs   = [p for p in parts if isinstance(p, pd.DataFrame)]
        tr    = dfs[0] if dfs else pd.DataFrame()
        if tr.empty:
            return 0.0

        # template pipeline; clone per-lag
        pipe_tpl = _make_pipeline(model_kind, model_params)

        preds = []
        for L in LAG_ENS:
            dL = _asof_extra_lag(dfH, feature_cols, extra=L-1)
            dL = dL[(dL.index >= tr.index.min()) & (dL.index <= tr.index.max())]
            dL = dL.dropna(subset=[tcol])
            if dL.empty:
                continue

            pp = clone(pipe_tpl)
            X_tr, y_tr = dL[feature_cols], dL[tcol]
            pp.fit(X_tr, y_tr)
            yhat = pp.predict(X_tr)

            out = pd.DataFrame({"ticker": dL["ticker"].values,
                                f"y_pred_L{L}": yhat}, index=dL.index)
            out.index.name = "date"
            preds.append(out)

        if not preds:
            return 0.0

        # weight per-lag train preds with Wlag
        P = reduce(lambda a,b: a.join(b, how='inner'),
                   [p.set_index(['ticker'], append=True).sort_index() for p in preds])
        P['y_pred'] = 0.0
        for L in LAG_ENS:
            col = f'y_pred_L{L}'
            if col in P.columns:
                P['y_pred'] += float(Wlag[L]) * P[col]
        P = (P[['y_pred']].reset_index().set_index('date')[['ticker','y_pred']].sort_index())

        # neutralize vs. 1D return and compute IC on the train span
        Pn = _neutralize_vs_ret1_quick(P, df)
        ic_series = _compute_rank_ic_quick(Pn, df[['ticker','close']], horizon=H, min_names=10, demean=True)
        return float(ic_series.mean()) if len(ic_series) else 0.0

    # compute horizon weights on train
    ic_train_by_H = {}
    for H in progress(HORIZON_LIST, total=len(HORIZON_LIST), desc="H-weights train IC"):
        ic_train_by_H[H] = max(_train_ic_for_horizon(H), 0.0)

    w_raw = np.array([ic_train_by_H[H] for H in HORIZON_LIST], float)
    if w_raw.sum() == 0: 
        w_raw[:] = 1.0
    w_raw /= w_raw.sum()
    w_eq  = np.ones_like(w_raw) / len(HORIZON_LIST)
    wH = (1 - HORIZON_IC_SHRINK_LAMBDA) * w_raw + HORIZON_IC_SHRINK_LAMBDA * w_eq
    H_WEIGHTS = dict(zip(HORIZON_LIST, wH))

    H_W_SUMMARY = {
        'H_w5':  float(H_WEIGHTS.get(5,  np.nan)),
        'H_w21': float(H_WEIGHTS.get(21, np.nan)),
    }
    print("[h-ens] weights detail:", H_W_SUMMARY)
    print("[h-ens] Train-only horizon weights:", {int(k): round(v,4) for k,v in H_WEIGHTS.items()})

    # --- Per-date z-score + blend across horizons (outer join, fill mean=0) ---
    Z5  = _cs_zscore_by_date(PRED_BASE_5D).rename(columns={'y_pred':'z5'})
    Z21 = _cs_zscore_by_date(PRED_BASE_21D).rename(columns={'y_pred':'z21'})
    Z   = (Z5.reset_index().merge(Z21.reset_index(), on=['date','ticker'], how='outer'))
    Z[['z5','z21']] = Z[['z5','z21']].fillna(0.0)

    y_blend = H_WEIGHTS.get(5, 0)*Z['z5'] + H_WEIGHTS.get(21, 0)*Z['z21']
    PRED_BASE_HENS = (pd.DataFrame({'date': Z['date'], 'ticker': Z['ticker'], 'y_pred': y_blend})
                      .set_index('date')[['ticker','y_pred']].sort_index())

    PRED_USE = PRED_BASE_HENS.copy()

else:
    # keep existing 5D path result in PRED_USE 
    pass

***
***
### 7 - Risk-Control Overlays (Train/Test Predictions)

At this stage we apply **risk controls to the test-set predictions** `PRED_USE`.  
The purpose is to align raw model outputs with realistic portfolio constraints.

#### Steps

0. **Deduplication & snapshot**
   - Ensure only one `(date, ticker)` row remains per prediction (keep last if duplicates).  
   - Save `PRED_BASE` = post–1D-return neutralized predictions, pre-beta-neutral overlay (useful for ablation tests).

1. **ADV eligibility filter (if enabled)**
   - Build a per-date × ticker eligibility mask based on rolling average dollar volume (ADV).  
   - Keeps only names above the specified liquidity percentile (`ADV_PCT`).  
   - This prevents illiquid names from entering the portfolio.

2. **Beta-neutral overlay (if enabled)**
   - Compute rolling betas of each ticker to the market.  
   - Cross-sectionally regress scores on betas per date, and replace with residuals.  
   - Strength toggle allows partial neutralization (e.g., 0.5 = half beta neutral, 1.0 = full).

3. **Final check**
   - Deduplicate again after overlays.  
   - Log the count of duplicate `(date, ticker)` rows as a debug sanity check.

#### Outputs
- `PRED_USE`: final **test-set predictions after risk controls**.  
- `PRED_BASE`: snapshot before beta-neutralization, for comparison.

In [17]:
# --- Risk controls (toggled) ---

# 0) Ensure uniqueness before overlays & keep a pre-beta snapshot for ablations
def _dedup_scores(df_scores: pd.DataFrame) -> pd.DataFrame:
    return (df_scores.reset_index().rename(columns={'index':'date'})
            .sort_values(['date','ticker'])
            .drop_duplicates(['date','ticker'], keep='last')
            .set_index('date')[['ticker','y_pred']])

PRED_USE  = _dedup_scores(PRED_USE)   # post 1D-return neutralization
PRED_BASE = PRED_USE.copy()           # <-- snapshot PRE-BETA for ablations

# 1) ADV eligibility
if 'ELIG_ADV' not in globals():
    ELIG_ADV = None
if USE_ADV_ELIGIBILITY:
    ELIG_ADV = build_adv_eligibility(df, lookback=ADV_LOOKBACK_DAYS, pct=ADV_PCT)
    print(f"[elig] ADV lookback={ADV_LOOKBACK_DAYS}d, pct={ADV_PCT:.2f} → shape {ELIG_ADV.shape}")

# 2) Beta-neutral overlay on scores
if USE_BETA_NEUTRAL:
    BETAS = build_rolling_beta(df, lookback=BETA_LOOKBACK_DAYS)
    PRED_USE = neutralize_vs_beta(PRED_BASE, BETAS,
                                  min_names=MIN_XS_NAMES,
                                  strength=BETA_NEUTRAL_STRENGTH)
    PRED_USE = _dedup_scores(PRED_USE)
    print(f"[beta] applied beta-neutral overlay (lookback={BETA_LOOKBACK_DAYS}d, strength={BETA_NEUTRAL_STRENGTH})")

_dups = (PRED_USE.reset_index().rename(columns={'index':'date'})
         .duplicated(['date','ticker']).sum())
print(f"[debug] duplicate (date,ticker) rows in PRED_USE: {_dups}")


[elig] ADV lookback=60d, pct=0.20 → shape (2763, 56)
[debug] duplicate (date,ticker) rows in PRED_USE: 0


***
***
### 8 - Evaluation on Test Predictions (Backtest & Reporting)

We now **evaluate the test-set predictions** by running the execution-side backtest and reporting core portfolio stats.

**Inputs**
- `PRED_USE` – final test predictions (after train→test WF pipeline, lag/horizon ensembles, and post-prediction overlays).
- `px` – prices (`close`) for realized returns.
- Risk controls: ADV eligibility, per-name caps, leverage, turnover costs.

**Backtest logic**
- Rebalance per `REBALANCE_FREQ`, trade **next day** (as-of discipline), hold for `HOLDING_PERIOD` via rolling-mean overlap, apply **transaction costs**.
- Output: daily portfolio return `ret`, `turnover`, and cumulative `equity`.

**Reports**
- `perf_stats`: CAGR, annualized Vol, Sharpe, Max Drawdown, and sample size.
- Diagnostics: average gross exposure (∑|w|), average number of names held, average daily turnover.
- Plot: strategy equity curve (label reflects whether horizon-ensemble is used).
- If ADV eligibility or per-name caps are toggled, summarize their effective constraints.


In [18]:
px = df[['ticker','close']].copy()

bt_main, w_held_main = backtest(
    PRED_USE, px,
    long_short=LONG_SHORT,
    top_q=TOP_QUANTILE,
    freq=REBALANCE_FREQ,
    holding_period=HOLDING_PERIOD,
    tcost_bps=TCOST_BPS,
    eligibility=(ELIG_ADV if USE_ADV_ELIGIBILITY else None),
    per_name_cap=(PER_NAME_CAP if USE_PER_NAME_CAP else None),
    gross_leverage=GROSS_LEVERAGE,
    debug=False
)


bt_5d     = bt_main
w_held_5d = w_held_main

curve_title = 'Strategy Equity Curve (horizon-ensemble)' if USE_HORIZON_ENSEMBLE else 'Strategy Equity Curve (5D target)'
print_label = 'Perf (h-ens)' if USE_HORIZON_ENSEMBLE else 'Perf (5D target)'

print(f"{print_label}:", perf_stats(bt_main['ret']))
print("Avg |w| per day:", float(w_held_main.abs().sum(axis=1).mean()))
print("Avg names held:", float((w_held_main.abs() > 1e-12).sum(axis=1).mean()))
print("Avg daily turnover:", float(bt_main['turnover'].mean()))
plot_equity_curve(bt_main['equity'], title=curve_title)

if USE_ADV_ELIGIBILITY and ELIG_ADV is not None:
    elig_frac = ELIG_ADV.mean(axis=1)
    print(f"[elig] mean eligible names/day: {float(ELIG_ADV.sum(axis=1).mean()):.1f}")
    print(f"[elig] mean eligible fraction: {elig_frac.mean():.2%}  | min: {elig_frac.min():.2%}")

if USE_PER_NAME_CAP and PER_NAME_CAP:
    print(f"[cap] per-name cap active at {PER_NAME_CAP:.2%} (long_short={LONG_SHORT})")


Perf (5D target): {'CAGR': 0.09456847191697171, 'Vol': 0.03247674852696277, 'Sharpe': 2.799023994309642, 'MaxDD': -0.026883668244567338, 'N': 1793}
Avg |w| per day: 0.43256664807585077
Avg names held: 28.49191299498048
Avg daily turnover: 0.06859118795315118


[elig] mean eligible names/day: 43.4
[elig] mean eligible fraction: 77.46%  | min: 0.00%
[cap] per-name cap active at 2.00% (long_short=True)


In this section, we take the final **test-set predictions** and evaluate execution-side overlays to see how each affects performance.

#### What this does
- **`run_variant()`**: evaluates a labeled variant by optionally applying:
  - **Beta neutralization** on scores (test-time only), with `beta_strength ∈ [0,1]`.
  - **ADV eligibility** mask (test-time only) to skip illiquid names.
  - **Per-name cap** passthrough to the backtest.
- Runs the standard **backtest** on the modified test predictions and returns `CAGR`, `Vol`, `Sharpe`, `MaxDD`, `N`.

#### Variants compared
- **Base (no overlays)** — raw test predictions after 1D-return neutralization.
- **ADV only** — apply liquidity mask.
- **Beta 1.0 only / Beta 0.5 only** — residualize scores vs rolling betas with full/half strength.
- **ADV + Beta** — combine both overlays (using global `BETA_NEUTRAL_STRENGTH`).

### Why this matters
These ablations isolate the **execution-side controls** and quantify their impact on realized performance **without contaminating train/test splits**.


In [19]:
def run_variant(label,
                pred_base: pd.DataFrame,
                adv: pd.DataFrame | None = None,
                betas: pd.DataFrame | None = None,
                beta_strength: float = 1.0,
                per_name_cap: float | None = None):
    """
    Evaluate a variant: optional ADV eligibility and optional beta neutralization (with strength).
    pred_base must be (index=date, cols=['ticker','y_pred']) AFTER 1D-return neutralization.
    """
    P = pred_base
    if betas is not None and beta_strength > 0:
        P = neutralize_vs_beta(P, betas, min_names=MIN_XS_NAMES, strength=beta_strength)
        P = (P.reset_index().sort_values(['date','ticker'])
               .drop_duplicates(['date','ticker'], keep='last')
               .set_index('date')[['ticker','y_pred']])

    bt, _ = backtest(P, df[['ticker','close']],
                     long_short=LONG_SHORT, top_q=TOP_QUANTILE,
                     freq=REBALANCE_FREQ, holding_period=HOLDING_PERIOD,
                     tcost_bps=TCOST_BPS, eligibility=adv,
                     per_name_cap=per_name_cap,  # pass cap through
                     debug=False)
    
    s = perf_stats(bt['ret'])
    s.update(dict(Label=label))
    return s

# small ablation table around PRED_BASE (pre-beta) and BETAS/ELIG_ADV
_base = PRED_BASE
_cap  = (PER_NAME_CAP if globals().get('USE_PER_NAME_CAP', False) else None)
_adv  = (ELIG_ADV if globals().get('USE_ADV_ELIGIBILITY', False) else None)

rows = []
rows.append(run_variant("Base (no overlays)", _base, adv=None, betas=None, per_name_cap=_cap))
rows.append(run_variant("ADV only",          _base, adv=_adv, betas=None, per_name_cap=_cap))
if 'BETAS' in globals():
    rows.append(run_variant("Beta 1.0 only", _base, adv=None, betas=BETAS, beta_strength=1.0, per_name_cap=_cap))
    rows.append(run_variant("Beta 0.5 only", _base, adv=None, betas=BETAS, beta_strength=0.5, per_name_cap=_cap))
    rows.append(run_variant("ADV + Beta",    _base, adv=_adv, betas=BETAS, beta_strength=BETA_NEUTRAL_STRENGTH, per_name_cap=_cap))

abl = (pd.DataFrame(rows)
         .set_index('Label')[['CAGR','Vol','Sharpe','MaxDD','N']]
         .sort_values('Sharpe', ascending=False))
display(abl.round(3))

Unnamed: 0_level_0,CAGR,Vol,Sharpe,MaxDD,N
Label,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Base (no overlays),0.095,0.032,2.799,-0.027,1793
ADV only,0.095,0.032,2.799,-0.027,1793


***
***
### 9 - Rank-IC Diagnostics (Test-Set Predictions Only)

We assess the **cross-sectional predictive power** of our final **test-set predictions** `PRED_USE` using Rank-IC (daily Spearman correlation between scores and future returns).

#### What’s computed
- **`compute_rank_ic()`**  
  Tidy-merges predictions with realized returns and computes **daily Spearman IC**, with optional **demeaning per date** to remove market drift.  
  - `horizon > 0`: forward returns over `h` days, shifted to avoid leakage.  
  - `horizon < 0`: backward check (sanity/look-ahead probe).  
  - `min_names`: minimum cross-section size per day; else IC is NaN.

- **True IC on primary horizon (H = `TARGET_HORIZONS[0]`)**  
  Reports mean IC, IC std, **t-stat** \((\bar{IC}/\sigma(IC))\sqrt{N}\), and number of days.

- **Rolling IC**  
  60-day rolling mean IC plot to visualize temporal stability.

- **Cross-sectional dispersion sanity check**  
  Warns if daily score variance is zero (Spearman undefined).

- **+1D lag probe**  
  Shifts predictions by one day **before** evaluation to ensure the signal is not relying on same-day artifacts.

- **Shuffle tests (null baselines)**  
  (a) **Cross-section shuffle per date** — randomizes ranks within each day (IC should ≈ 0).  
  (b) **Calendar shuffle** — permutes the return dates while keeping predictions fixed (IC should ≈ 0).

- **Lead/Lag IC profile**  
  IC across horizons `−15…+15` days:  
  - Negative lags: guard against look-ahead.  
  - Positive lags: profile of true predictive horizon; peak location indicates timing.

#### Why this matters
These diagnostics quantify **out-of-sample** ranking skill, check for leakage or timing artifacts, and reveal where the signal’s horizon is strongest.


In [20]:
# Rank-IC diagnostics

px_tidy = df[['ticker','close']].copy()
if getattr(px_tidy.index, "tz", None) is not None:
    px_tidy.index = px_tidy.index.tz_localize(None)
px_tidy.index = pd.to_datetime(px_tidy.index).normalize()


def compute_rank_ic(
    pred_df: pd.DataFrame,
    df_prices: pd.DataFrame,
    horizon: int = 5,
    min_names: int = 10,
    demean: bool = True
) -> pd.Series:
    # predictions → tidy
    p = (pred_df[['ticker', 'y_pred']]
         .rename(columns={'y_pred': 'score'})
         .rename_axis('date')
         .reset_index())
    p['date'] = pd.to_datetime(p['date']).dt.normalize()
    p = p[['date','ticker','score']]

    # prices → tidy + normalized index
    px = df_prices[['ticker','close']].copy()
    if getattr(px.index, "tz", None) is not None:
        px.index = px.index.tz_localize(None)
    px.index = pd.to_datetime(px.index).normalize()

    # realized returns per ticker
    if horizon > 0:
        fwd = px.groupby('ticker')['close'].pct_change(horizon).shift(-horizon)
    elif horizon < 0:
        H = abs(horizon)
        fwd = px.groupby('ticker')['close'].pct_change(H)
    else:
        fwd = pd.Series(0.0, index=px.index)

    real = (px.assign(fwd=fwd)
              .assign(date=lambda x: x.index)
              .reset_index(drop=True)[['date','ticker','fwd']])

    # merge preds with realized forward returns
    ic_df = p.merge(real, on=['date','ticker'], how='left')

    if demean:
        ic_df['fwd']   = ic_df.groupby('date')['fwd'].transform(lambda x: x - x.mean())
        ic_df['score'] = ic_df.groupby('date')['score'].transform(lambda x: x - x.mean())

    def _daily_ic(g):
        g = g.dropna(subset=['score','fwd'])
        if len(g) < min_names:
            return np.nan
        return g[['score','fwd']].corr(method='spearman').iloc[0,1]

    ic = ic_df.groupby('date', sort=True).apply(_daily_ic).dropna()
    ic.index.name = 'date'
    ic.name = f'IC_{horizon}d'
    return ic


# --- True IC on the primary horizon ---
H = TARGET_HORIZONS[0]
ic_true = compute_rank_ic(PRED_USE, px_tidy, horizon=H, min_names=10, demean=True)

N = len(ic_true)
ic_mean = float(ic_true.mean()) if N else np.nan
ic_std  = float(ic_true.std())  if N else np.nan
t_stat  = (ic_mean / ic_std * np.sqrt(N)) if (N and ic_std > 0) else np.nan

print(f"IC mean: {ic_mean:.4f} | IC std: {ic_std:.4f} | t-stat: {t_stat:.2f} | N days: {N}")
plot_rolling_ic(ic_true, window=60, title=f"60D Rolling Rank IC ({H}D horizon)")

# Sanity: CS dispersion
cs_std = PRED_USE.groupby(level=0)['y_pred'].std()
if (cs_std.fillna(0) == 0).all():
    print("All days have zero cross-sectional std; Spearman IC will be NaN.")

# +1D lag probe
pred_lag1 = (PRED_USE.copy()
             .groupby('ticker', group_keys=False)
             .apply(lambda g: g.assign(y_pred=g['y_pred'].shift(1))))
ic_lag1 = compute_rank_ic(pred_lag1, px_tidy, horizon=H, min_names=10, demean=True)
print("IC (+1D lag) mean:", float(ic_lag1.mean()) if len(ic_lag1) else np.nan)

# Shuffle tests (seeded)
rng = np.random.default_rng(42)

# (a) cross-section shuffle per date
pred_shuf = (PRED_USE.rename(columns={'y_pred':'score'})
             .rename_axis('date').reset_index()
             .groupby('date', group_keys=False)
             .apply(lambda g: g.assign(score=rng.permutation(g['score'].values)))
             .set_index('date')
             .rename(columns={'score':'y_pred'}))
ic_shuf = compute_rank_ic(pred_shuf, px_tidy, horizon=H, min_names=10, demean=True)
print("IC (shuffle xs) mean:", float(ic_shuf.mean()) if len(ic_shuf) else np.nan)

# (b) calendar shuffle: permute dates of returns
u_dates = px_tidy.index.unique()
perm_map = dict(zip(u_dates, rng.permutation(u_dates)))
px_perm = px_tidy.copy()
px_perm.index = px_perm.index.map(perm_map)
ic_cal = compute_rank_ic(PRED_USE, px_perm, horizon=H, min_names=10, demean=True)
print("IC (calendar shuffle) mean:", float(ic_cal.mean()) if len(ic_cal) else np.nan)



_ = lead_lag_ic_profile_plotly(
        PRED_USE, px_tidy, horizons=range(-15, 16),
        compute_rank_ic_fn=compute_rank_ic
)



IC mean: 0.1072 | IC std: 0.1996 | t-stat: 22.54 | N days: 1760


IC (+1D lag) mean: 0.10452232913945084
IC (shuffle xs) mean: -0.0008423353543540391
IC (calendar shuffle) mean: 0.006841639748848141


                                                            

In [21]:
# --- Annual breakdown (perf + IC) ---

# DataFrame of perf stats by calendar year
rows = []
for y, s in bt_5d['ret'].groupby(bt_5d.index.year):
    d = perf_stats(s)         # -> dict
    d['year'] = int(y)
    rows.append(d)
yr_perf = pd.DataFrame(rows).set_index('year').sort_index()

# Mean daily IC per year 
if 'ic_true' in globals() and isinstance(ic_true, pd.Series) and len(ic_true):
    yr_ic = ic_true.groupby(ic_true.index.year).mean()
    yr_ic.name = 'IC_mean'
    yr_ic = yr_ic.to_frame()
    yr_ic.index.name = 'year'
else:
    yr_ic = pd.DataFrame(columns=['IC_mean'])

annual = yr_perf.join(yr_ic, how='left')
display(annual.round(3))


Unnamed: 0_level_0,CAGR,Vol,Sharpe,MaxDD,N,IC_mean
year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2017,-0.025,0.016,-1.553,-0.006,37,0.1
2018,0.121,0.031,3.726,-0.012,251,0.135
2019,0.026,0.027,0.971,-0.023,252,0.078
2020,0.132,0.042,2.973,-0.027,253,0.11
2021,0.073,0.031,2.312,-0.015,252,0.08
2022,0.125,0.032,3.73,-0.016,251,0.113
2023,0.096,0.035,2.667,-0.017,250,0.105
2024,0.111,0.03,3.567,-0.014,247,0.131


### Lag-Ensemble Diagnostics (Test-Only)

We evaluate **each AS-OF lag** separately and compare them to the **final ensemble** on the test folds.

#### What this does
1) **Per-lag IC means**  
   - For each lag `L`, optionally residualize scores vs prior 1D return (`neutralize_single=True`), then compute daily **Spearman Rank-IC** on the **test set** at horizon `H`.  
   - Plot a bar chart of per-lag mean ICs.

2) **Per-lag backtests vs ensemble**  
   - Backtest each single-lag signal and the **already-neutralized ensemble** under the same execution settings (rebalance freq, top-q selection, costs, eligibility, caps).  
   - Overlay equity curves (lags vs ensemble) and print a performance table (CAGR, Vol, Sharpe, MaxDD) with the per-lag IC for context.

#### Inputs / outputs
- **Inputs:**  
  `pred_by_lag = {L: (date,[ticker,y_pred_L{L}])}`, `pred_ensemble = (date,[ticker,y_pred])`, prices `df_prices`, horizon `H`.  
- **Outputs:**  
  `s` (Series of per-lag mean IC), `perf_df` (per-lag perf metrics), `eq_df` (equity curves per lag and ensemble).

**Note:** This is a **test-only** analysis — no retraining occurs here. It helps attribute where the ensemble’s edge comes from and whether any single lag dominates.


In [22]:
# --- Lag-ensemble diagnostics: per-lag ICs + equity curves -------------------

def _px_tidy(df_prices):
    px = df_prices[['ticker','close']].copy()
    if getattr(px.index, "tz", None) is not None:
        px.index = px.index.tz_localize(None)
    px.index = pd.to_datetime(px.index).normalize()
    return px

def _neutralize_vs_ret1(pred_df, df_prices):
    """Per-date residualize y_pred against prior 1D close-to-close return."""
    ret1 = df_prices[['ticker','close']].groupby('ticker')['close'].pct_change(1).rename('ret_1d')
    ret1 = (df_prices.assign(ret_1d=ret1)
            .assign(date=lambda x: x.index)
            .reset_index(drop=True)[['date','ticker','ret_1d']])
    sc = pred_df.reset_index().rename(columns={'index':'date'})
    m  = sc.merge(ret1, on=['date','ticker'], how='left')

    def _resid_per_date(g):
        x, y = g['ret_1d'].to_numpy(), g['y_pred'].to_numpy()
        mask = np.isfinite(x) & np.isfinite(y)
        if mask.sum() < 10:
            g['y_pred'] = np.nan
            return g
        beta = np.dot(x[mask], y[mask]) / (np.dot(x[mask], x[mask]) + 1e-12)
        g.loc[mask, 'y_pred'] = y[mask] - beta * x[mask]
        return g

    out = (m.groupby('date', group_keys=False).apply(_resid_per_date)
             .dropna(subset=['y_pred'])
             .set_index('date')[['ticker','y_pred']])
    return out

def lag_ens_diagnostics(pred_by_lag: dict,
                        pred_ensemble: pd.DataFrame,
                        df_prices: pd.DataFrame,
                        horizon: int,
                        neutralize_single: bool = True,
                        title_suffix: str = ""):
    """
    pred_by_lag: {L: DataFrame(index=date, cols=['ticker', f'y_pred_L{L}'])}
    pred_ensemble: DataFrame(index=date, cols=['ticker','y_pred']) (already neutralized)
    df_prices: wide tidy prices (the 'df' you use everywhere)
    """
    TOP_Q   = globals().get('TOP_QUANTILE', 0.2)
    FREQ    = globals().get('REBALANCE_FREQ', 'W-FRI')
    HP      = globals().get('HOLDING_PERIOD', 5)
    TCOST   = globals().get('TCOST_BPS', 0)

    px = _px_tidy(df_prices)
    lags = sorted(pred_by_lag.keys())

    # --- 1) per-lag IC means
    ic_means = {}
    preds_use = {}
    for L in progress(lags, total=len(lags), desc="Per-lag IC"):
        pL = pred_by_lag[L].copy()
        col = f'y_pred_L{L}'
        if col not in pL.columns:
            col = 'y_pred'
        pL = pL.rename(columns={col: 'y_pred'})
        if neutralize_single:
            pL = _neutralize_vs_ret1(pL, df_prices)
        preds_use[L] = pL
        ic = compute_rank_ic(pL, px, horizon=horizon, min_names=10, demean=True)
        ic_means[L] = float(ic.mean()) if len(ic) else np.nan

    # --- Plot per-lag ICs
    s = bar_ic_means(ic_means, title=f'Per-lag mean IC (H={horizon}d){(" " + title_suffix) if title_suffix else ""}')

    # --- 2) Backtest each lag and the ensemble; plot equity overlay
    eq = {}
    rows = []
    # individual lags
    for L in progress(lags, total=len(lags), desc="Per-lag backtests"):
        bt, _ = backtest(
        preds_use[L], df_prices[['ticker','close']],
        long_short=False, top_q=TOP_Q, freq=FREQ,
        holding_period=HP, tcost_bps=TCOST,
        eligibility=(ELIG_ADV if globals().get('USE_ADV_ELIGIBILITY', False) else None),
        per_name_cap=(PER_NAME_CAP if globals().get('USE_PER_NAME_CAP', False) else None),
        debug=False
        )
        eq[f"L{L}"] = bt['equity'].rename(f"L{L}")
        stats = perf_stats(bt['ret'])
        stats.update(dict(model=f"L{L}", IC_mean=s.loc[L]))
        rows.append(stats)

    # ensemble (already neutralized upstream)
    btE, _ = backtest(
    pred_ensemble, df_prices[['ticker','close']],
    long_short=False, top_q=TOP_Q, freq=FREQ,
    holding_period=HP, tcost_bps=TCOST,
    eligibility=(ELIG_ADV if globals().get('USE_ADV_ELIGIBILITY', False) else None),
    per_name_cap=(PER_NAME_CAP if globals().get('USE_PER_NAME_CAP', False) else None),
    debug=False
    )
    eq["Ens"] = btE['equity'].rename("Ens")

    eq_df = pd.concat(eq, axis=1).dropna(how='all')
    multi_equity_plot(eq, title=f"Equity curves: individual lags vs ensemble{(' ' + title_suffix) if title_suffix else ''}")


    perf_df = pd.DataFrame(rows).set_index('model').sort_values('Sharpe', ascending=False)
    display(perf_df.round(3))

    return s, perf_df, eq_df


In [23]:
ic_by_lag, perf_by_lag, eq_df = lag_ens_diagnostics(
    pred_by_lag,
    PRED_USE,
    df,
    horizon=TARGET_HORIZONS[0],
    neutralize_single=True,
    title_suffix="(h-ens)" if USE_HORIZON_ENSEMBLE else "(5D)"
)


                                                         

                                                                

Unnamed: 0_level_0,CAGR,Vol,Sharpe,MaxDD,N,IC_mean
model,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
L1,0.076,0.051,1.462,-0.089,1793,0.105
L3,0.076,0.051,1.45,-0.083,1793,0.102


### Diagnostic: Residual Beta Exposure (Test Predictions)

We quantify any **remaining market beta exposure** in the **test-set predictions** after overlays.

**Method**
- For each date, regress scores `y_pred` cross-sectionally on rolling betas:
  \[
  \text{beta\_slope}_t \;=\; \frac{\sum_i (x_{it}-\bar x_t)(y_{it}-\bar y_t)}{\sum_i (x_{it}-\bar x_t)^2}
  \]
  where \(x_{it}\) is the rolling beta of ticker \(i\) and \(y_{it}\) is the prediction score.
- Requires at least `MIN_XS_NAMES` names per day; otherwise returns `NaN`.

**Interpretation**
- Mean of `beta_slope` ≈ **0** ⇒ scores are neutral to market beta (as intended).
- Positive (negative) mean ⇒ residual long (short) tilt to high-beta names.
- You can also plot a rolling mean to check stability over time.

**Outputs**
- `beta_slope`: daily series of estimated beta loading of the scores.
- Printed mean value for a quick sanity check.


In [24]:
def beta_exposure_series(pred_df, betas):
    s = (pred_df.reset_index()
         .pivot_table(index='date', columns='ticker', values='y_pred', aggfunc='last'))
    b = betas.reindex(s.index).reindex(columns=s.columns)
    expo = []
    for dt in s.index:
        y = s.loc[dt].values; x = b.loc[dt].values
        m = np.isfinite(y) & np.isfinite(x)
        if m.sum() < MIN_XS_NAMES: expo.append(np.nan); continue
        y0 = y[m] - y[m].mean(); x0 = x[m] - x[m].mean()
        expo.append(float(np.dot(x0, y0) / (np.dot(x0, x0) + 1e-12)))
    return pd.Series(expo, index=s.index, name='beta_slope')

if USE_BETA_NEUTRAL:
    beta_slope = beta_exposure_series(PRED_USE, BETAS)
    print("Beta exposure mean (should be ~0):", float(beta_slope.mean()))


***
***
### 10 - Model Variant Sweep (Train → Test, Parallelized)

This section **trains on walk-forward training folds and evaluates on their test folds** for a set of model variants (e.g., Ridge, ElasticNet, HGBT). For each variant we build horizon-ensemble predictions (5D + 21D), optionally apply beta overlay, and backtest — then compare performance and equity curves.

#### Workflow (per variant)
1) **Train/Test predictions (WF)**
   - `_fitpredict_one_horizon(5, kind, params)` and `_fitpredict_one_horizon(21, kind, params)`:
     - **Train** model on each fold’s training slice.
     - **Test** on the subsequent test month.
     - Internally includes lag-ensemble and 1D-return neutralization.

2) **Train-only horizon weights (cached)**
   - `_train_ic_for_horizon(...)` computes mean daily Spearman IC on the **training span only** for each horizon.
   - Weights are shrunk toward equal by `HORIZON_IC_SHRINK_LAMBDA`.
   - Results cached by `(H, model kind, params)` to avoid recomputation.

3) **Blend horizons on test predictions**
   - Per-date z-score each horizon’s scores, then combine with the train-only weights to get the **horizon-ensemble** test scores.

4) **Optional beta overlay (test-time)**
   - If `USE_BETA_NEUTRAL=True`, residualize scores vs precomputed rolling betas (`BETAS_FOR_SWEEP`), then deduplicate to one `(date, ticker)` score.

5) **Backtest & metrics (test-only)**
   - Run `backtest(...)` with the same execution settings (rebalance, top-q, costs, eligibility, per-name cap, leverage).
   - Collect `CAGR, Vol, Sharpe, MaxDD, N_days, AvgTurnover, wall_sec`.

#### Parallelization & safety
- Up to `N_JOBS_VARIANTS` variants run **in parallel** (threading backend); per-estimator threads limited via `threadpool_limits` to prevent oversubscription.
- Errors are caught per variant and reported in the results table without stopping the sweep.

#### Outputs
- **`variant_tbl`**: sortable comparison of metrics per variant (with params and runtime).
- **Equity overlay plot** across variants for quick visual comparison.

**Note:** All weighting (IC for horizons) uses **training data only**. Backtests and reported metrics are strictly **out-of-sample test results**.


In [25]:
# --- 10. Model variant sweep -------------------
from functools import lru_cache
from joblib import Parallel, delayed
from threadpoolctl import threadpool_limits
import time, json, os


# How many variants to run concurrently (1 to keep serial + tqdm)
N_JOBS_VARIANTS = max(1, min(os.cpu_count() - 1, 4))
# Prevent thread oversubscription inside each worker
LIMIT_THREADS_PER_EST = 1

if USE_VARIANT_SWEEP:
    VARIANTS = [
        ("ridge",      {"alpha": 1.0}),
        ("elasticnet", {"alpha": 0.5, "l1_ratio": 0.2}),
        ("elasticnet", {"alpha": 1.0, "l1_ratio": 0.1}),
        ("hgbt",       {"max_depth": 3, "learning_rate": 0.05, "max_iter": 250}),
    ]
else:
    VARIANTS = [(PROD_MODEL_KIND, PROD_MODEL_PARAMS)]

# precompute betas once for the sweep if overlay is ON
BETAS_FOR_SWEEP = build_rolling_beta(df, lookback=BETA_LOOKBACK_DAYS) if USE_BETA_NEUTRAL else None

def _variant_label(kind: str, params: dict) -> str:
    keys = ("alpha","l1_ratio","epsilon","max_depth","learning_rate","max_iter")
    short = ",".join(f"{k}={params[k]}" for k in keys if k in params)
    return f"{kind}({short})" if short else kind


# Memorize the train-IC by (H, model kind, params)
@lru_cache(maxsize=None)
def _train_ic_for_horizon_cached(H: int, kind: str, params_key: str) -> float:
    params = json.loads(params_key)
    if not USE_HORIZON_ENSEMBLE or '_train_ic_for_horizon' not in globals():
        return 0.0
    val = _train_ic_for_horizon(H, model_kind=kind, model_params=params)
    return float(max(val, 0.0))

def run_variant(kind, params):
    t0 = time.perf_counter()
    label = _variant_label(kind, params)
    params_key = json.dumps(params, sort_keys=True)
    try:
        with threadpool_limits(limits=LIMIT_THREADS_PER_EST):
            # horizon-ensemble predictions with the given model
            _, _, P5  = _fitpredict_one_horizon(5,  kind, params)
            P21 = None
            if USE_HORIZON_ENSEMBLE:
                _, _, P21 = _fitpredict_one_horizon(21, kind, params)

        if USE_HORIZON_ENSEMBLE and P21 is not None:
            # horizon weights on train (uses cached IC)
            ic_train_by_H = {H: _train_ic_for_horizon_cached(H, kind, params_key) for H in HORIZON_LIST}
            w_raw = np.array([ic_train_by_H[H] for H in HORIZON_LIST], float)
            if w_raw.sum()==0: 
                w_raw[:] = 1.0
            w_raw /= w_raw.sum()
            w_eq  = np.ones_like(w_raw)/len(HORIZON_LIST)
            wH    = (1 - HORIZON_IC_SHRINK_LAMBDA)*w_raw + HORIZON_IC_SHRINK_LAMBDA*w_eq
            H_WEIGHTS = dict(zip(HORIZON_LIST, wH))

            # per-date z-score + blend across horizons
            Z5  = _cs_zscore_by_date(P5).rename(columns={'y_pred':'z5'})
            Z21 = _cs_zscore_by_date(P21).rename(columns={'y_pred':'z21'})
            Z   = (Z5.reset_index().merge(Z21.reset_index(), on=['date','ticker'], how='outer'))
            Z[['z5','z21']] = Z[['z5','z21']].fillna(0.0)
            y_blend = H_WEIGHTS.get(5,0)*Z['z5'] + H_WEIGHTS.get(21,0)*Z['z21']
            PRED = (pd.DataFrame({'date':Z['date'],'ticker':Z['ticker'],'y_pred':y_blend})
                    .set_index('date')[['ticker','y_pred']].sort_index())
        else:
             # 5D only path (PROFILE P1) — P5 is already neutralized vs 1D inside _fitpredict_one_horizon
            PRED = P5.copy()
        # de-dup safety
        PRED = (PRED.reset_index().sort_values(['date','ticker'])
            .drop_duplicates(['date','ticker'], keep='last')
            .set_index('date')[['ticker','y_pred']])

        # beta overlay (reuses precomputed betas)
        if USE_BETA_NEUTRAL and BETAS_FOR_SWEEP is not None:
            PRED = neutralize_vs_beta(PRED, BETAS_FOR_SWEEP,
                                      min_names=MIN_XS_NAMES,
                                      strength=BETA_NEUTRAL_STRENGTH)
            PRED = (PRED.reset_index().sort_values(['date','ticker'])
                    .drop_duplicates(['date','ticker'], keep='last')
                    .set_index('date')[['ticker','y_pred']])

        # backtest on current risk toggles
        bt, _ = backtest(
            PRED, df[['ticker','close']],
            long_short=LONG_SHORT,
            top_q=TOP_QUANTILE,
            freq=REBALANCE_FREQ,
            holding_period=HOLDING_PERIOD,
            tcost_bps=TCOST_BPS,
            eligibility=(ELIG_ADV if USE_ADV_ELIGIBILITY else None),
            per_name_cap=(PER_NAME_CAP if USE_PER_NAME_CAP else None),
            gross_leverage=GROSS_LEVERAGE,
            debug=False
        )
        stats = perf_stats(bt['ret'])
        stats.update(
                label=label,
                kind=kind,
                params=params_key,
                wall_sec=time.perf_counter() - t0,
                N_days=int(bt['ret'].shape[0]),
                AvgTurnover=float(bt['turnover'].mean()),
        )
        return stats, bt['equity'].rename(label)  
    
    except Exception as e:
        import traceback
        tb = traceback.format_exc()
        stats = dict(CAGR=np.nan, Vol=np.nan, Sharpe=np.nan, MaxDD=np.nan, N=0,
                     label=label, kind=kind, params=params_key, wall_sec=np.nan,
                     error=str(e))
        print(f"[variant ERROR] {label}\n{tb}")
        empty_eq = pd.Series(dtype=float, name=label)
        return stats, empty_eq

# --- Execute ----------
rows, curves = [], {}

if N_JOBS_VARIANTS > 1 and len(VARIANTS) > 1:
    print(f"[variants] running {len(VARIANTS)} models in parallel "
          f"(n_jobs={N_JOBS_VARIANTS}, per-task threads={LIMIT_THREADS_PER_EST})")

    tasks = (delayed(run_variant)(kind, params) for kind, params in VARIANTS)
    with joblib_progress(len(VARIANTS), "Model variants"):
        results = Parallel(n_jobs=N_JOBS_VARIANTS, backend="threading")(tasks)

    for s, eq in results:
        rows.append(s); curves[eq.name] = eq
else:
    # Ridge-only (or single variant) path
    for kind, params in VARIANTS:
        s, eq = run_variant(kind, params)
        rows.append(s); curves[eq.name] = eq

variant_tbl = (pd.DataFrame(rows)
               .set_index('label')
               .sort_values('Sharpe', ascending=False)
)
base_cols = ['kind','params','CAGR','Vol','Sharpe','MaxDD','N']
extra_cols = [c for c in ['N_days','AvgTurnover','wall_sec','error'] if c in variant_tbl.columns]
variant_tbl = variant_tbl.reindex(columns=base_cols + extra_cols)
display(variant_tbl.round(3))

title = "Equity by model kind (h-ens)" if len(curves) > 1 else f"Equity: {next(iter(curves))}"
multi_equity_plot(curves, title=title)


                                                              

Unnamed: 0_level_0,kind,params,CAGR,Vol,Sharpe,MaxDD,N,N_days,AvgTurnover,wall_sec
label,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
ridge,ridge,{},0.095,0.032,2.799,-0.027,1793,1793,0.069,101.403


### 11 - Parameter sweeps

In [26]:
# 11.Holding Period (HP), Rebalance Freq, Top-Q
import itertools

HP_LIST    = [5, 10]
FREQ_LIST  = ['W-FRI', 'M']
TOPQ_LIST  = [0.10, 0.20, 0.30]
TCOST_FOR_SWEEP = TCOST_BPS   # costs fixed here; cost sweep below

def run_sweep_hpfreq_topq(pred_df, prices_df,
                          hp_list=HP_LIST, freq_list=FREQ_LIST, topq_list=TOPQ_LIST,
                          tcost_bps=TCOST_FOR_SWEEP, long_short=LONG_SHORT):
    rows, eq = [], {}

    grid = list(itertools.product(hp_list, freq_list, topq_list))
    for hp, fq, tq in progress(grid, total=len(grid),
                               desc="Sweep HP/Freq/TopQ"):
        bt, wheld = backtest(
            pred_df, prices_df[['ticker','close']],
            long_short=long_short,
            top_q=tq,
            freq=fq,
            holding_period=hp,
            tcost_bps=tcost_bps,
            eligibility=(ELIG_ADV if USE_ADV_ELIGIBILITY else None),
            per_name_cap=(PER_NAME_CAP if USE_PER_NAME_CAP else None),
            gross_leverage=GROSS_LEVERAGE,     
            debug=False
        )
        stats = perf_stats(bt['ret'])
        label = f"HP{hp}-{fq}-Q{int(tq*100)}"

        stats.update(dict(
            label=label, HP=hp, Freq=fq, TopQ=tq,
            AvgTurnover=float(bt['turnover'].mean()),
            AvgNames=float((wheld.abs() > 1e-12).sum(axis=1).mean())
        ))

        eq[label] = bt['equity'].rename(label)
        rows.append(stats)

    res = pd.DataFrame(rows).set_index('label').sort_values('Sharpe', ascending=False)
    eq_df = pd.concat(eq, axis=1).dropna(how='all')
    return res, eq_df


sweep_tbl, sweep_eq = run_sweep_hpfreq_topq(PRED_USE, df)
display(sweep_tbl.round(3))

# Sharpe vs Turnover scatter 
import plotly.graph_objects as go
def scatter_sharpe_vs_turnover(sweep_df, title="Sharpe vs Avg Daily Turnover"):
    s = sweep_df.copy()
    s['TopQ%'] = (s['TopQ']*100).astype(int)
    fig = go.Figure()
    for fq in s['Freq'].unique():
        ss = s[s['Freq']==fq]
        fig.add_trace(go.Scatter(
            x=ss['AvgTurnover'], y=ss['Sharpe'],
            mode='markers+text', name=fq,
            text=ss.index, textposition='top center',
            marker=dict(size=10),
            hovertemplate=("Label=%{text}<br>Sharpe=%{y:.2f}"
                           "<br>Turnover=%{x:.3f}<br>HP=%{customdata[0]} "
                           "<br>TopQ=%{customdata[1]}%"),
            customdata=np.c_[ss['HP'], ss['TopQ%']]
        ))
    fig.update_layout(title=title, xaxis_title="Avg daily turnover (|Δw|1)",
                      yaxis_title="Sharpe", width=900, height=380)
    fig.show()

scatter_sharpe_vs_turnover(sweep_tbl)

# Overlay equity curves for the top 3 combos by Sharpe
top_labels = sweep_tbl.head(3).index.tolist()
multi_equity_plot(
    {k: sweep_eq[k] for k in top_labels},
    title=f"Equity: Top 3 sweep configs {'(h-ens)' if USE_HORIZON_ENSEMBLE else '(5D)'}"
)


                                                                   

Unnamed: 0_level_0,CAGR,Vol,Sharpe,MaxDD,N,HP,Freq,TopQ,AvgTurnover,AvgNames
label,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
HP10-W-FRI-Q20,0.11,0.031,3.403,-0.02,1793,10,W-FRI,0.2,0.04,34.206
HP10-W-FRI-Q30,0.136,0.038,3.379,-0.031,1793,10,W-FRI,0.3,0.053,46.838
HP10-M-Q30,0.129,0.039,3.127,-0.046,1775,10,M,0.3,0.027,33.841
HP5-M-Q30,0.13,0.04,3.11,-0.048,1775,5,M,0.3,0.027,31.469
HP10-M-Q20,0.099,0.032,3.01,-0.033,1775,10,M,0.2,0.02,22.858
HP5-M-Q20,0.1,0.032,2.983,-0.035,1775,5,M,0.2,0.02,20.761
HP10-W-FRI-Q10,0.068,0.022,2.959,-0.016,1793,10,W-FRI,0.1,0.025,20.294
HP5-W-FRI-Q30,0.117,0.04,2.802,-0.036,1793,5,W-FRI,0.3,0.09,41.62
HP5-W-FRI-Q20,0.095,0.032,2.799,-0.027,1793,5,W-FRI,0.2,0.069,28.492
HP5-W-FRI-Q10,0.061,0.024,2.517,-0.023,1793,5,W-FRI,0.1,0.043,16.215


In [27]:
# 11.1 Cost sweep (keep HP/Freq/TopQ at chosen baseline from §8)

COSTS = [0, 5, 10, 25]

def cost_sweep(pred_df, prices_df,
               hp=HOLDING_PERIOD, fq=REBALANCE_FREQ, tq=TOP_QUANTILE, long_short=LONG_SHORT):
    rows, eq = [], {}
    for bps in progress(COSTS, total=len(COSTS), desc="Cost sweep"):
        bt, _ = backtest(
            pred_df, prices_df[['ticker','close']],
            long_short=long_short,
            top_q=tq,
            freq=fq,
            holding_period=hp,
            tcost_bps=bps,
            eligibility=(ELIG_ADV if USE_ADV_ELIGIBILITY else None),
            per_name_cap=(PER_NAME_CAP if USE_PER_NAME_CAP else None),
            gross_leverage=GROSS_LEVERAGE,   
            debug=False
        )
        s = perf_stats(bt['ret']); s.update(dict(tcost_bps=bps))
        rows.append(s)
        eq[f"{bps}bps"] = bt['equity'].rename(f"{bps}bps")
    return pd.DataFrame(rows).set_index('tcost_bps').sort_index(), eq


cost_tbl, cost_eq = cost_sweep(PRED_USE, df,
                               hp=HOLDING_PERIOD,
                               fq=REBALANCE_FREQ,
                               tq=TOP_QUANTILE,
                               long_short=LONG_SHORT)
display(cost_tbl.round(3))

# Plot cost equity overlays
multi_equity_plot(cost_eq, title=f"Equity by transaction costs (bps) {'(h-ens)' if USE_HORIZON_ENSEMBLE else '(5D)'}")


import plotly.graph_objects as go
fig = go.Figure([go.Bar(x=[str(i) for i in cost_tbl.index], y=cost_tbl['Sharpe'].values)])
mode = "(h-ens)" if USE_HORIZON_ENSEMBLE else "(5D)"
fig.update_layout(title=f"Sharpe by tcost (bps) {mode}",
                  xaxis_title="bps", yaxis_title="Sharpe",
                  width=650, height=320)
fig.show()

                                                         

Unnamed: 0_level_0,CAGR,Vol,Sharpe,MaxDD,N
tcost_bps,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
0,0.133,0.032,3.867,-0.024,1793
5,0.123,0.032,3.6,-0.025,1793
10,0.114,0.032,3.333,-0.026,1793
25,0.085,0.032,2.532,-0.028,1793


### 12 - Save artifacts & summary

In [28]:
from datetime import datetime

# --- Checks  ---
assert 'PRED_USE' in locals(), "PRED_USE (neutralized ensemble) not defined yet."
assert 'bt_5d'     in locals(), "bt_5d not defined yet."
assert 'w_held_5d' in locals(), "w_held_5d not defined yet."

RUNS_DIR = Path("results/equity_daily")
RUNS_DIR.mkdir(parents=True, exist_ok=True)

run_id  = datetime.now().strftime("%Y%m%d-%H%M%S")
out_dir = RUNS_DIR / run_id
out_dir.mkdir(parents=True, exist_ok=True)

# --- Save core artifacts ---
PRED_USE.to_parquet(out_dir / "pred_5d.parquet")  
bt_5d.to_parquet(out_dir / "bt_5d.parquet")
w_held_5d.to_parquet(out_dir / "weights_held.parquet")



# --- Save config / metadata for full reproducibility ---
metadata = {
    "run_id": run_id,
    "start_date": str(START_DATE),
    "end_date": str(END_DATE),
    "target_horizons": TARGET_HORIZONS,
    "max_h": MAX_H,
    "train_years": TRAIN_YEARS,
    "step_months": STEP_MONTHS,
    "embargo_days": EMBARGO_DAYS,
    "rebalance_freq": REBALANCE_FREQ,
    "long_short": LONG_SHORT,
    "top_quantile": TOP_QUANTILE,
    "holding_period": HOLDING_PERIOD,
    "tcost_bps": TCOST_BPS,
    "random_state": RANDOM_STATE,
    "n_tickers": int(df['ticker'].nunique()),
    "n_rows_df": int(len(df)),
    "n_features": int(len(feature_cols)),
}
with open(out_dir / "config.json", "w") as f:
    json.dump(metadata, f, indent=2)

# --- Save feature list, tickers, and windows used ---
pd.Series(feature_cols, name="feature").to_csv(out_dir / "features.csv", index=False)
pd.Series(sorted(df['ticker'].unique()), name="ticker").to_csv(out_dir / "universe.csv", index=False)

# Store window boundaries for audit
wf_tbl = pd.DataFrame(
    [(w.train_start, w.train_end, w.test_start, w.test_end) for w in wf_windows_run],
    columns=["train_start","train_end","test_start","test_end"]
)
wf_tbl.to_csv(out_dir / "wf_windows.csv", index=False)

# --- Performance & IC summary ---
perf = perf_stats(bt_5d['ret'])
summary = {
    "CAGR":  float(perf["CAGR"]) if perf["CAGR"]==perf["CAGR"] else np.nan,
    "Vol":   float(perf["Vol"])  if perf["Vol"]==perf["Vol"]   else np.nan,
    "Sharpe":float(perf["Sharpe"]) if perf["Sharpe"]==perf["Sharpe"] else np.nan,
    "MaxDD": float(perf["MaxDD"]) if perf["MaxDD"]==perf["MaxDD"] else np.nan,
    "N_days":int(perf["N"]),
}

# IC
if "ic_true" in locals() and isinstance(ic_true, pd.Series) and len(ic_true):
    summary.update({
        "IC_mean": float(ic_true.mean()),
        "IC_std":  float(ic_true.std()),
        "IC_t":    float(ic_true.mean()/ic_true.std()*np.sqrt(len(ic_true))) if ic_true.std() > 0 else np.nan,
        "IC_N":    int(len(ic_true)),
    })

pd.DataFrame([summary]).to_csv(out_dir / "summary.csv", index=False)

# --- Save an equity curve PNG ---
plt.figure(figsize=(10,4))
title_str = 'Strategy Equity Curve (horizon-ensemble)' if USE_HORIZON_ENSEMBLE else 'Strategy Equity Curve (5D target)'
bt_5d['equity'].plot(title=title_str)
plt.tight_layout()
plt.savefig(out_dir / "equity_curve.png", dpi=150)
plt.close()

# --- update a 'latest' pointer ---
latest_link = RUNS_DIR / "latest"
try:
    if latest_link.exists() or latest_link.is_symlink():
        latest_link.unlink()
    latest_link.symlink_to(out_dir.resolve())
except Exception:
    with open(RUNS_DIR / "LATEST_RUN.txt", "w") as f:
        f.write(str(out_dir.resolve()))

print(f"Artifacts saved → {out_dir}")


Artifacts saved → results/equity_daily/20250902-181229


### 13 - Reload helper

In [29]:
def load_run(run_path: Path):
    preds   = pd.read_parquet(run_path / "pred_5d.parquet")
    bt      = pd.read_parquet(run_path / "bt_5d.parquet")
    weights = pd.read_parquet(run_path / "weights_held.parquet")
    with open(run_path / "config.json") as f:
        cfg = json.load(f)
    return preds, bt, weights, cfg

# Example:
# preds, bt, weights, cfg = load_run(Path("results/equity_daily/latest"))
