#### 08: Event-Driven Backtest (T+1 execution, ATR sizing, stops, kill-switch)


This module introduces a deterministic, daily-bar backtester for long/short signals.  
It consumes model signals (by ticker & date) and OHLC prices, converts probabilities into risk-based position sizes, manages trades with targets/stops/trailing, and enforces a global kill-switch on deep drawdowns.  
It returns an equity curve and a full trade ledger suitable for evaluation and reporting.

---

## What the backtester does
- Executes signals on **T+1** (next day) using the open price (falls back to close if open is missing).
- Filters entries by confidence, ATR bounds, and a short cool-down per ticker.
- Sizes positions by risk (equity × risk%) divided by a dollar stop distance derived from ATR.
- Manages exits daily via:
  - Hard stop proxy  
  - Trailing stop  
  - Fixed stop/target  
  - Time exit (5 days)  
- Marks to market at each close and applies commissions + slippage on both entry and exit.
- **Kill-switch**: if equity drawdown from its running peak ≤ `kill_dd_pct` (default −30%), all positions are closed and a reset is flagged.

---

## Required inputs

### 1. `merged_data` (signals; one row per potential trade)

**Required columns**  
- `ticker` — string  
- `entry_date` — date when the signal becomes eligible to trade; the engine executes on `entry_date + 1` business day  
- `signal` — +1 for long, −1 for short  

**Optional (recommended)**  
- `prob_up` — model probability for the “up” class (used for confidence-weighted sizing)  
- `atr` — recent ATR (either in % of price or $ absolute, see notes)

---

### 2. `prices` (daily OHLC)

**Expected columns (case-insensitive)**: `date`, `ticker`, `open`, `high`, `low`, `close`.  
If `open` is missing, the engine uses `close`.

---

## Key parameters (defaults)

### Capital & risk
- `initial_capital = 100_000`
- Per-trade risk grows with confidence:  
  \[
  \text{risk\_pct} \;=\; \text{base\_risk\_pct} + \text{conf}\,\big(\text{max\_risk\_pct} - \text{base\_risk\_pct}\big)
  \]  
  where  
  \[
  \text{conf} = \text{clip}\!\left(\frac{\text{prob\_up}-\text{prob\_long\_min}}{1-\text{prob\_long\_min}},\,0,\,1\right)
  \]  
- `base_risk_pct = 0.004` (0.4%)  
- `max_risk_pct = 0.08` (8%)

### Trade management
- Target: `target_pct = 0.07` (+7%)  
- Stop: `stop_loss_pct = 0.04` (−4%)  
- Trailing stop: `trail_mult = 3.5 × ATR` (see notes on units)  
- Time exit: force close after 5 days  
- Hard stop proxy: exit if price moves ~−9% vs entry (directional)  

### Global risk
- Kill-switch: `kill_dd_pct = -0.30`  
  - If equity drawdown ≤ −30% from peak, close everything and flag `reset=True`

### Costs
- `commission_pct = 0.001` (10 bps per leg)  
- `slippage_pct = 0.005` (50 bps per fill)  

**Trade return formula**:  
\[
r = \frac{P_{\text{exit}}-P_{\text{entry}}}{P_{\text{entry}}}\cdot \text{direction} \;-\; 2\,(\text{commission}+\text{slippage})
\]

### Signal & market filters
- Minimum confidence: `prob_long_min = 0.63`, `prob_short_min = 0.63`  
- ATR bounds (eligibility): `atr_min = 0.20`, `atr_max = 4.00`  
- Per-ticker cool-down: ignore a ticker for 5 days after any exit  

---

## Sizing & execution details
- **Stop distance (dollars)** derived from ATR:  
  - If `atr < 3`: treated as a percentage → `$ = atr × entry_price`  
  - Else: ATR is taken as-is in dollars  
- **Shares**:  
  \[
  \text{shares} = \frac{\text{equity\_start}\times \text{risk\_pct}}{\text{atr\_abs}}\times (1-\text{commission}-\text{slippage})
  \]
- Trades with cost < $5 notional are skipped.  
- Trades that don’t fit available cash are skipped.  

**Execution timeline (per day)**  
1. Manage open positions on today’s close (hard stop, trailing, stop/target, time exit).  
2. Compute equity and check kill-switch; close all if triggered.  
3. Open new positions for signals with `entry_date == today`, executed at next day open (T+1) if available.  

---

## Outputs

- **portfolio** (DataFrame, daily index)  
  Columns: `cash`, `positions` (MTM value), `equity`, `reset` (bool kill-switch flag)  

- **trades_df** (DataFrame, one row per completed trade)  
  Columns:  
  `position_id`, `ticker`, `direction`, `entry_date`, `entry_price`, `exit_date`,  
  `exit_price`, `return`, `days_held`, `exit_reason`, `shares`, `notional`  

- **missing_df**  
  Placeholder; can be extended to log missing price events.

---

## Practical notes & assumptions
- **Daily bars only**: “intraday” hard stop is approximated by daily checks (no intraday simulation).  
- **Single position per ticker**: no pyramiding or partial exits (for clarity).  
- **ATR units**:  
  - If in %, convert to $ via `atr × entry_price`  
  - Else use as-is  
  - Best practice: standardize ATR to a single convention.  
- **No look-ahead**: `entry_date` is when the signal becomes eligible; fills occur next trading day.  
- **Costs**: applied on both legs; returns in `trades_df['return']` are net of round-trip commission+slippage. 

In [3]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.compose import make_column_transformer , ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.model_selection import TimeSeriesSplit ,  train_test_split
from xgboost import XGBClassifier
from tqdm import tqdm
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GroupKFold
from pathlib import Path
import os, random, warnings
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import StackingClassifier, RandomForestClassifier
from sklearn.metrics import roc_auc_score, brier_score_loss, accuracy_score, matthews_corrcoef
ROOT = Path(__file__).resolve().parents[0] if "__file__" in globals() else Path.cwd()
DATA_DIR = Path(os.getenv("DATA_DIR", ROOT / "data")) 
def p(file): return DATA_DIR / file

In [4]:
def run_backtest(
    merged_data      : pd.DataFrame,
    prices           : pd.DataFrame,
    *,
   
    initial_capital  : float = 100_000,
    base_risk_pct    : float = 0.004,   
    max_risk_pct     : float = 0.08,
    target_pct       : float = 0.07,   
    stop_loss_pct    : float = 0.04,    
    trail_mult       : float = 3.5,     
    kill_dd_pct      : float = -0.30,   
    
    commission_pct   : float = 0.001,   
    slippage_pct     : float = 0.005,   
   
    prob_long_min    : float = 0.63,
    prob_short_min   : float = 0.63,
    
    atr_min          : float = 0.20,
    atr_max          : float = 4.00,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    
   
    TARGET       = target_pct
    STOP_LOSS    = stop_loss_pct
    COMMISSION   = commission_pct
    SLIPPAGE     = slippage_pct
    BASE_RISK_PCT, MAX_RISK_PCT = base_risk_pct, max_risk_pct
    MAX_DD_PCT   = kill_dd_pct
    MIN_ATR      = atr_min

   
    prices = prices.reset_index()
    merged_data["entry_date"] = pd.to_datetime(merged_data["entry_date"]).dt.normalize()

   
    date_col = next((c for c in ["date", "Date", "datetime", "timestamp"] if c in prices.columns), None)
    if date_col is None:
        raise KeyError("Colonna data non trovata in prices!")

    prices["date_normalized"] = pd.to_datetime(prices[date_col]).dt.normalize()

    price_cols = {c.lower(): (c if c in prices.columns else c.title())
                  for c in ["open", "close", "high", "low"]}

   
    price_map = {
        (row["date_normalized"].date(), row["ticker"]): {
            "open" : row[price_cols["open"]],
            "close": row[price_cols["close"]],
            "high" : row[price_cols["high"]],
            "low"  : row[price_cols["low"]],
        }
        for _, row in prices.iterrows()
    }

    all_dates = pd.date_range(
        start=merged_data["entry_date"].min(),
        end  =merged_data["entry_date"].max() + pd.Timedelta(days=10),
        freq ="D",
    )

   
    portfolio = pd.DataFrame(index=all_dates, dtype=float)
    portfolio["cash"]      = float(initial_capital)
    portfolio["positions"] = 0.0
    portfolio["equity"]    = float(initial_capital)
    portfolio["reset"]     = False        

    
    def get_price(ts, ticker, pt):
        key = (pd.Timestamp(ts).date(), ticker)
        return price_map.get(key, {}).get(pt)

   
    open_positions  = {}
    trades          = []
    missing_log     = []
    last_exit       = {}
    ma50_cache      = {}
    position_id     = 1

    global_peak = initial_capital  

    #  PRINCIPAL
    for current_date in tqdm(all_dates, desc="Back-test"):
        today = current_date.date()

        # 1) START OF THE DAY
        if current_date == all_dates[0]:
            cash = initial_capital
        else:
            cash = portfolio.at[current_date - pd.Timedelta(days=1), "cash"]

        positions_value = 0.0                    
        kill_triggered  = False

        # 2) CLOSING POSITIONS
        to_close = []
        for ticker, pos in open_positions.items():
            entry_price  = pos["entry_price"]
            direction    = pos["direction"]      # +1 long, -1 short
            days_held    = (today - pos["entry_date"]).days
            current_px   = get_price(current_date, ticker, "close")

            # MISSING VALUES
            if current_px is None:
                pos["missing"] = pos.get("missing", 0) + 1
                if pos["missing"] < 3:
                    positions_value += pos["shares"] * pos["last_price"]
                    continue                        
                current_px = pos["last_price"]      
                exit_reason = "missing_data"
            else:
                pos["missing"] = 0
                pos["last_price"] = current_px
                exit_reason = None

            # hard-stop intraday 9%
            if exit_reason is None:
                hard_stop = 0.09
                if (direction == 1 and current_px <= entry_price * (1 - hard_stop)) or \
                   (direction == -1 and current_px >= entry_price * (1 + hard_stop)):
                    exit_reason = "hard_stop"

            # dinamic trailing stop 
            if exit_reason is None:
                atr = pos["atr"]
                trail_mult = 3.5
                if direction == 1:
                    pos["max_price"] = max(pos.get("max_price", entry_price), current_px)
                    stop_trail = pos["max_price"] - trail_mult * np.clip(atr, .2, 1.0) * entry_price
                    if current_px <= stop_trail:
                        exit_reason = "trail"
                else:
                    pos["min_price"] = min(pos.get("min_price", entry_price), current_px)
                    stop_trail = pos["min_price"] + trail_mult * atr * entry_price
                    if current_px >= stop_trail:
                        exit_reason = "trail"

            # target / stop 
            if exit_reason is None:
                stop_px   = entry_price * (1 - STOP_LOSS*direction)
                target_px = entry_price * (1 + TARGET*direction)
                if (direction == 1 and current_px <= stop_px) or \
                   (direction == -1 and current_px >= stop_px):
                    exit_reason = "stop"
                elif (direction == 1 and current_px >= target_px) or \
                     (direction == -1 and current_px <= target_px):
                    exit_reason = "target"

            # time-exit 
            if exit_reason is None and days_held >= 5:
                exit_reason = "time"

            # Closing?
            if exit_reason:
                proceeds = pos["shares"] * current_px
                fees     = proceeds * (COMMISSION + SLIPPAGE)
                cash    += proceeds - fees

                ret_pct  = (current_px - entry_price) / entry_price * direction \
                           - 2*(COMMISSION+SLIPPAGE)

                trades.append({
                    "position_id": position_id,
                    "ticker": ticker,
                    "direction": "long" if direction==1 else "short",
                    "entry_date": pos["entry_date"],
                    "entry_price": entry_price,
                    "exit_date": today,
                    "exit_price": current_px,
                    "return": ret_pct,
                    "days_held": days_held,
                    "exit_reason": exit_reason,
                    "shares": pos["shares"],                      
                    "notional": pos["shares"] * entry_price,   
                })
                position_id += 1
                last_exit[ticker] = today
                to_close.append(ticker)
            else:
                positions_value += pos["shares"] * current_px

        for t in to_close:
            del open_positions[t]

       
        equity_start = cash + positions_value

        # 3) kill-switch 
        dd_pct = (equity_start - global_peak) / global_peak
        if dd_pct <= MAX_DD_PCT:
            kill_triggered = True
            portfolio.at[current_date, "reset"] = True

            # Forced closure
            for t, pos in list(open_positions.items()):
                px   = pos["last_price"]
                proceeds = pos["shares"] * px
                fees     = proceeds * (COMMISSION + SLIPPAGE)
                cash    += proceeds - fees
                trades.append({
                    "position_id": position_id,
                    "ticker": t,
                    "direction": "long" if pos["direction"]==1 else "short",
                    "entry_date": pos["entry_date"],
                    "entry_price": pos["entry_price"],
                    "exit_date": today,
                    "exit_price": px,
                    "return": (px - pos["entry_price"])/pos["entry_price"]*pos["direction"]
                              - 2*(COMMISSION+SLIPPAGE),
                    "days_held": (today - pos["entry_date"]).days,
                    "shares": pos["shares"],
                    "notional": pos["shares"] * pos["entry_price"],
                    
                    "exit_reason": "kill_switch",
                })
                position_id += 1
            open_positions.clear()
            positions_value = 0.0       

        # 4) New positions if not kill
        if not kill_triggered:
            todays_signals = merged_data[merged_data["entry_date"].dt.date == today]
            if not todays_signals.empty:
                for _, sig in todays_signals.iterrows():
                    ticker      = sig["ticker"]
                    signal      = sig["signal"]         
                   
                    atr         = max(sig.get("atr", np.nan), MIN_ATR)
                    prob_up     = sig.get("prob_up", 0.5)
                    prob_down   = 1 - prob_up

                    # Filters
                    if ticker in open_positions:               continue
                    if ticker in last_exit and (today - last_exit[ticker]).days < 5: continue
                    if np.isnan(atr) or not (atr_min<= atr <= atr_max):                             continue
                    if signal == 1 and prob_up < prob_long_min :                          continue
                    if signal == -1 and prob_down < prob_short_min:                       continue

                    # Size
                    exec_date = current_date + pd.Timedelta(days=1)
                    if exec_date > all_dates[-1]:
                        continue 
                    entry_price = get_price(exec_date, ticker, "open")
                    if entry_price is None:
                        entry_price = get_price(exec_date, ticker, "close")   
                    if entry_price is None:
                       continue
                    atr_raw = max(sig.get("atr", np.nan), MIN_ATR)
                    if atr_raw < 3:
                       atr_abs = atr_raw * entry_price   
                    else:
                        atr_abs = atr_raw                 
                    stop_dist = atr_abs 

                         
                    
                    conf      = max(0, min(1, (prob_up - prob_long_min)/(1 - prob_long_min)))
                    risk_pct  = base_risk_pct + conf*(max_risk_pct - base_risk_pct)
                    risk_amt  = equity_start * risk_pct
                    stop_dist = atr_abs
                    shares    = risk_amt / stop_dist * (1-COMMISSION-SLIPPAGE)
                    if shares * entry_price < 5:                    continue
                    cost      = shares * entry_price * (1+COMMISSION+SLIPPAGE)
                    if cost > cash:                                  continue

                    # Open
                    open_positions[ticker] = {
                        "entry_date": exec_date.date(),
                        "entry_price": entry_price,
                        "shares": shares,
                        "direction": 1 if signal>0 else -1,
                        "last_price": entry_price,
                        "atr": atr_abs,
                        "max_price": entry_price,
                        "min_price":entry_price,
                    }
                    cash -= cost

        # 5) End of the day value
        positions_value_end = sum(pos["shares"] * get_price(current_date, t, "close")
                                  if get_price(current_date, t, "close") is not None
                                  else pos["shares"] * pos["last_price"]
                                  for t, pos in open_positions.items())

        equity_end   = cash + positions_value_end
        global_peak  = max(global_peak, equity_end)   # picco unico

        portfolio.loc[current_date, ["cash", "positions", "equity"]] = (
            cash, positions_value_end, equity_end
        )

    portfolio.ffill(inplace=True)
    trades_df       = pd.DataFrame(trades)
    missing_df      = pd.DataFrame(missing_log)
    return portfolio, trades_df, missing_df

# Performance Metrics & Reporting

This cell adds two helpers to summarize a backtest and plot the equity and drawdown curves:

- `calculate_metrics(portfolio, trades_df) -> dict`  
- `report_and_plot(metrics)`  

They are **model-agnostic**: pass the `portfolio` and `trades_df` returned by your backtester and you’ll get a compact, auditable snapshot.

---

## `calculate_metrics(...)` — returns a metrics dictionary

### Inputs
- **portfolio**: DataFrame indexed by date with at least `equity` (and typically `cash`, `positions`, `reset`).  
- **trades_df**: trade ledger with a `return` column (per-trade net return, after costs).  

### Outputs (dict)
- **Equity**: the equity curve (Series) for downstream plotting.  
- **Drawdown**: series of drawdowns, computed as  
 $$
\text{DD}_t = \frac{P_t - E_t}{P_t}
\qquad
\text{MDD} = \max_t \text{DD}_t
$$
- **TotRet**: total, non-annualized return over the backtest:  
  $$
\text{TotRet} = \frac{E_{\text{end}}}{E_{\text{start}}} - 1
$$
- **CAGR**: annualized geometric return assuming calendar days:  
 $$
\text{CAGR} = \left(\frac{E_{\text{end}}}{E_{\text{start}}}\right)^{\tfrac{365.25}{\text{days}}} - 1
$$

  (Note: CAGR can be misleading for short horizons; prefer **TotRet** for 1-6 month tests.)  

- **Sharpe**: daily Sharpe ratio (risk-free = 0), annualized with \(\sqrt{252}\):  
  - Daily returns:  
   $$
r_t = \frac{E_t}{E_{t-1}} - 1
$$
  - Formula:  
   $$
\text{Sharpe} = \frac{\overline{r}}{s_r}\sqrt{252}
$$
- **Max DD**: minimum drawdown (most negative point of the drawdown series).  
- **Win rate**: share of trades with `return > 0`.  
- **Profit factor**:  
$$
\text{PF} = \frac{\sum_{t} \max(r_t, 0)}{\sum_{t} \max(-r_t, 0)}
$$  
- **Num trades**: count of executed trades.  

### Edge handling
- If `portfolio` is empty → returns `{}`.  
- `TotRet` guarded if series length = 0 (returns `NaN`).  
- If `trades_df` is empty → trade stats become `NaN`/0 as appropriate.  

---

In [5]:
def calculate_metrics(portfolio: pd.DataFrame,
                      trades_df: pd.DataFrame) -> dict:
    
    if portfolio.empty:
        return {}

    eq_series = portfolio["equity"].astype(float)

   
    tot_ret = float(eq_series.iloc[-1] / eq_series.iloc[0] - 1) if len(eq_series) else float("nan")
    days = (eq_series.index[-1] - eq_series.index[0]).days
    cagr = (eq_series.iloc[-1] / eq_series.iloc[0])**(365.25/days) - 1

    
    rets   = eq_series.pct_change().fillna(0)
    sharpe = rets.mean() / rets.std() * np.sqrt(252) if rets.std() else 0.0

   
    dd_series = (eq_series - eq_series.cummax()) / eq_series.cummax()
    max_dd    = dd_series.min()

  
    if trades_df.empty:
        win_rate = profit_factor = np.nan
        num_trades = 0
    else:
        win_rate = (trades_df["return"] > 0).mean()
        gains    = trades_df.loc[trades_df["return"] > 0, "return"].sum()
        losses   = -trades_df.loc[trades_df["return"] < 0, "return"].sum()
        profit_factor = gains / losses if losses else np.inf
        num_trades   = len(trades_df)

    return {
        "Equity":           eq_series,
        "Drawdown":         dd_series,
        "TotRet"  :         tot_ret,
        "Max DD":           max_dd,
        "CAGR":             cagr,
        "Sharpe":           sharpe,
        "Win rate":         win_rate,
        "Profit factor":    profit_factor,
        "Num trades":       num_trades,
    }

# Model Pipeline: Forward-Purged CV, Regime-Aware Stacking, and Signal Generation

This cell builds a **leakage-aware classification pipeline** that turns daily cross-sectional features into **probabilistic trade signals** suitable for a T+1 event-driven backtest. It implements **forward-purged cross-validation with per-ticker embargo**, a **stacked base model (XGBoost + Random Forest → Logistic Regression)**, and a **regime-aware meta-gate** that blends predictions based on macro conditions. Finally, it evaluates out-of-fold (OOF) performance, runs a **permutation significance test**, computes **AUC by period**, and converts probabilities into **binary long/short signals** under a participation constraint.

---

## 1) Data setup & cleaning

- Ensures reproducibility (`PYTHONHASHSEED`, Python, NumPy seeds).
- Loads `ML.parquet`, drops rows with `target_hit == 0` (we keep ±1 only), replaces ±∞ with NaN.
- Normalizes `date` to midnight and sorts by `date, ticker`.
- Defines `FEATURES` (technical, options, NLP, macro) and `REGIME_COLS` (macro subset).
- Builds `X` (features) and `y` (labels where `-1→0, 1→1`), keeps only columns with at least one non-NaN value.

**Goal:** produce a clean, time-ordered feature matrix and labels with minimal information loss but no look-ahead.

---

## 2) Forward-purged CV with per-ticker embargo

**Function:** `build_forward_purged_folds_strict(...)`

- Splits the **unique dates** into `n_splits` contiguous test blocks (forward-in-time).
- **Training window:** dates before each test block (optionally limited to a lookback window).
- **Per-ticker embargo:** for each ticker, removes train rows within ±`embargo_days` around that ticker’s test dates.
- Optional global embargo around the test window.
- Enforces **minimum train size** and **minimum history**; discards invalid folds and ensures **no future leakage**.

**Why:** Cross-sectional daily data often have overlapping signals; embargo + forward purging materially reduces leakage and inflation of OOS metrics.

---

## 3) Preprocessing

- For model features: `SimpleImputer(median, add_indicator=True)` + `StandardScaler`.
- For regime (macro) features: `SimpleImputer(median)` + `StandardScaler`.
- Applied inside scikit-learn `Pipeline`/`ColumnTransformer` to avoid data leakage.

---

## 4) Base learner: Stacking (XGB + RF → LR)

**`make_model(spw)`** returns:
- **XGBoost** (hist) with tuned hyperparameters and `scale_pos_weight = neg/pos`.
- **RandomForest** (depth-limited).
- **StackingClassifier** that feeds both base models’ `predict_proba` into a **Logistic Regression** finalizer.

**Rationale:** Combine a strong gradient booster and a bagged tree model; LR stabilizes and calibrates the stacked probabilities.

---

## 5) Inner-OOF on the outer-train to learn a regime gate

**Function:** `build_inner_oof(X_sub, y_sub, meta_sub, spw)`

- Creates **inner forward-purged folds** (with embargo) on the **outer-fold’s training set** to generate **OOF probabilities** `p_tr_oof` **without leakage**.
- Computes **log-loss** of the base model vs. a **null** model (p=0.5).  
  - `y_meta = 1` if base log-loss < null log-loss (i.e., base model is **useful** in that regime), else `0`.
- If strict inner folds fail (too small), it falls back to a 70/30 forward split.

**Outcome:** A **binary target** indicating whether the base model tends to be useful under the contemporaneous macro regime.

---

## 6) Outer-fold loop: OOF base predictions + regime-aware blending

For each outer fold:
1. Train the **base stack** on the outer-train; produce **OOF base** probabilities `p_te` on the outer-test → store in `p_base`.
2. Build **inner-OOF** on the outer-train to derive `y_meta`.
3. Train a **regime meta-classifier** (Logistic Regression, class-balanced) on standardized macro features `r_cols` to estimate  
   `g_te = P(base is useful | regime)`.
4. **Blending (soft gate):**  
   \[
   p^{final} = 0.5 + g_{te}\,(p^{base} - 0.5)
   \]
   - If `g_te = 0` → return 0.5 (no edge).
   - If `g_te = 1` → return base probability.
   - (Option exists for a hard on/off gate, but we use soft blending.)

The loop logs per-fold AUC for **base** and **regime-aware** predictions. Aggregating over folds yields **OOF vectors** `p_base` and `p_final`.

---

## 7) OOF evaluation

`report(tag, y_true, p)` prints:
- **AUC**, **Brier score**, **Accuracy** (0.5 threshold), **MCC** (robust to class imbalance), and sample size.

Reported for:
- `BASE OOF` (stack only)  
- `REGIME OOF` (stack + meta-gate)

---

## 8) Significance test & stability diagnostics

- **Permutation test (N=5000):** permutes `y`, recomputes AUC each time, and reports the fraction ≥ observed AUC → **p-value**.
- **AUC by period:** computes AUC per **quarter** and **half-year** using `dates_valid`. If a period lacks both classes, AUC=NaN.

**Purpose:** Check that performance is **statistically meaningful** and **stable** over time, not driven by a small window.

---

## 9) Threshold selection under participation constraint

We convert probabilities into long/short **signals**:

- Desired **long participation**: `TARGET_SHARE = 0.50` with tolerance `BAND = 0.05`.
- Construct a candidate set of thresholds from a uniform grid and quantiles of the valid probabilities.
- For each threshold `t`:
  - Compute class share `mean(p ≥ t)`.
  - If within the target band, compute **MCC**; keep the best.
- **Fallback:** if no `t` satisfies the band, use the `(1 − TARGET_SHARE)` quantile of `p`.

**Output columns:**
- `prob_up_base` (OOF base; 0.5 where invalid)
- `prob_up_final` (OOF regime-aware; 0.5 where invalid)
- `prob_down_final = 1 − prob_up_final`
- `signal = +1` if `prob_up_final ≥ t*`, else `−1`

This `df_signals` is **ready for the event-driven backtester** (T+1 execution, ATR-based sizing, stops, kill-switch).

---

## Design choices & safeguards

- **Leakage control:** forward-purged folds + per-ticker embargo at both outer and inner levels.
- **Class imbalance:** `scale_pos_weight` in XGB; class-balanced LR for regime gate.
- **Calibration & robustness:** stacking with LR; Brier score; permutation test; AUC by period.
- **Operational readiness:** outputs include probabilities and a **directional signal** constrained to the targeted participation rate.

---


 #### 🚩 Hyperparameter Policy

- **Notebook 07** reports the official **nested forward-purged BayesSearchCV** tuning procedure.  
- **Notebook 08** uses a **frozen configuration** to generate signals for backtesting and paper trading.  

During exploratory phases (not shown in the project) we observed that a simple GroupKFold (by ticker) preferred:  
- slightly **higher** `n_estimators` (XGB/RF),  
- a **lower** logistic regression \(C\),  
- a **lower** XGB \( \alpha \).  

These settings yielded **greater fold-to-fold stability** and more consistent realized P&L.  
The nested Bayes search (Notebook 07) confirmed broadly similar ranges, but delivered slightly lower OOF AUC (~−0.02) and higher variance.  

For **stability in paper trading**, we froze a **hybrid configuration**:  
- anchored on the GroupKFold preferences (for robustness),  
- consistent with the nested Bayes search ranges.  

**Policy to avoid sub-optimization:**
1. No retuning during the paper-trading window.  
2. Nested Bayes will only be re-run if the **feature set changes** or the sample is extended.  
3. All configurations are tracked in versioned YAML with fixed seeds, folds, and embargo settings.

---


In [6]:
df_ml = pd.read_parquet(p('ML.parquet'))
df_ml["date"] = pd.to_datetime(df_ml["date"]).dt.normalize()
df_ml = df_ml.sort_values(["date","ticker"]).reset_index(drop=True)

lr_C = 1.6

FEATURES = [
    "atr_10d_Tm1","vol_5d_Tm1","momentum_5d_Tm1","finroberta_compound",
    "opt_total_option_volume","volume_spike_Tm1","cumret_20d_Tm1","maxdd_20d_Tm1",
    "nlp_logit","nlp_margin","ev_fda_pos","ev_fda_neg","volume_5d","nlp_entropy",
    "IBB_ret_20d","finroberta_neg","opt_avg_iv_call_ln",
    "IBB_v_Tm1","spread_3m_10y","VIX_ma5_x","SPY_ma5",
    "HY_OAS_z","HY_OAS_chg_5d","slope_2s10s","DTWEXBGS","XBI_over_IBB_RS20",
]
REGIME_COLS = ["VIX_ma5_x","HY_OAS_z","HY_OAS_chg_5d","slope_2s10s","DTWEXBGS","XBI_over_IBB_RS20"]

X_full = df_ml[FEATURES]
y_full = df_ml["target_hit"].replace({-1:0, 1:1}).astype(int).values

row_mask = X_full.notna().any(axis=1).values
X = X_full.loc[row_mask].reset_index(drop=True)
y = y_full[row_mask]
meta = df_ml.loc[row_mask, ["date","ticker"] + REGIME_COLS].reset_index(drop=True)

good_cols = [c for c in X.columns if X[c].notna().any()]
X = X[good_cols]
#same of 07_nb
def build_forward_purged_folds_strict(
    meta_df: pd.DataFrame,
    n_splits: int = 5,
    lookback_days: int | None = 252,
    min_train_obs: int = 200,
    min_history_days: int = 5,
    per_ticker_embargo_days: int = 5,
    global_embargo_days: int | None = None,
    verbose: bool = False,
):
    d = meta_df.copy()
    d["date"] = pd.to_datetime(d["date"]).dt.normalize()
    d = d.sort_values(["date","ticker"]).reset_index(drop=True)
    uniq_dates = d["date"].drop_duplicates().sort_values().to_list()
    date_blocks = np.array_split(uniq_dates, n_splits)

    folds = []
    for k, block in enumerate(date_blocks):
        test_dates = pd.to_datetime(pd.Index(block)).sort_values()
        test_start, test_end = test_dates.min(), test_dates.max()
        if (test_start - uniq_dates[0]).days < min_history_days:
            if verbose:
                print(f" Fold {k} saltato: no history (<{min_history_days}g).")
            continue

        test_mask = d["date"].isin(test_dates).values
        t0 = d["date"].min() if lookback_days is None else test_start - pd.Timedelta(days=lookback_days)
        base_train = (d["date"] < test_start) & (d["date"] >= t0)

        if global_embargo_days is not None and global_embargo_days > 0:
            lo_g = test_start - pd.Timedelta(days=global_embargo_days)
            hi_g = test_end   + pd.Timedelta(days=global_embargo_days)
            base_train &= ~d["date"].between(lo_g, hi_g)

        tr = d.loc[base_train, ["ticker","date"]].copy()
        te = d.loc[test_mask, ["ticker","date"]].copy()
        if len(tr) == 0 or len(te) == 0:
            if verbose:
                print(f" Fold {k} skip: train/test empty.")
            continue

        m = tr.merge(te, on="ticker", suffixes=("_tr","_te"))
        lo = m["date_te"] - pd.Timedelta(days=per_ticker_embargo_days)
        hi = m["date_te"] + pd.Timedelta(days=per_ticker_embargo_days)
        clash = m["date_tr"].between(lo, hi)

        keep_train = base_train.copy()
        if clash.any():
            bad = m.loc[clash, ["ticker","date_tr"]].drop_duplicates()
            bad["flag"] = True
            tr2 = tr.merge(bad, left_on=["ticker","date"], right_on=["ticker","date_tr"], how="left")
            # Robust boolean mask without fillna downcasting:
            drop_mask_local = tr2["flag"].eq(True).to_numpy()
            idx_local = np.flatnonzero(base_train)
            keep_train[idx_local[drop_mask_local]] = False

        if pd.to_datetime(d.loc[keep_train, "date"]).max() >= test_start:
            if verbose:
                print(f"  Fold {k} skip: train in the future.")
            continue
        if keep_train.sum() < min_train_obs:
            if verbose:
                print(f"  Fold {k} skip: train too small ({keep_train.sum()} < {min_train_obs}).")
            continue

        folds.append((keep_train.values, test_mask))
    assert len(folds) >= 2, "Too much fold skipped."
    return folds

folds = build_forward_purged_folds_strict(
    meta[["date","ticker"]],
    n_splits=5,
    lookback_days=252,
    min_train_obs=170,
    min_history_days=5,          
    per_ticker_embargo_days=5,
    global_embargo_days=None,
    verbose=False               )


preproc = ColumnTransformer(
    transformers=[("num", Pipeline([
        ("imp", SimpleImputer(strategy="median", add_indicator=True)),
        ("sc",  StandardScaler())
    ]), good_cols)],
    remainder="drop",
    verbose_feature_names_out=False
)


R_FEATURES = ["VIX_ma5_x","HY_OAS_z","slope_2s10s","DTWEXBGS","XBI_over_IBB_RS20","spread_3m_10y"]
r_cols = [c for c in R_FEATURES if c in X.columns]
preproc_regime = Pipeline([
    ("imp", SimpleImputer(strategy="median")),
    ("sc",  StandardScaler())
])


def make_model(spw: float):
    xgb = XGBClassifier(
        n_estimators=500, max_depth=5, learning_rate=0.019001692618319327,
        subsample=0.6170834796766332, colsample_bytree=0.797503119571276, tree_method="hist",min_child_weight = 6,
        gamma = 0.009311515004379921,reg_lambda = 2.3622918650462594, reg_alpha = 0.1,
        eval_metric="auc", random_state=45, n_jobs=-1,
        scale_pos_weight=spw
    )
    rf  = RandomForestClassifier(
        n_estimators=622, max_depth=11, max_features=0.5176158986513577,
        min_samples_leaf=5, min_samples_split=4,
        random_state=45, n_jobs=-1
    )
    stack = StackingClassifier(
        estimators=[('xgb', xgb), ('rf', rf)],
        final_estimator=LogisticRegression(C=lr_C, penalty='l2', solver='lbfgs', max_iter=2000),
        stack_method="predict_proba", cv=3, n_jobs=-1
    )
    return Pipeline([("pre", preproc), ("model", stack)])



def build_inner_oof(X_sub, y_sub, meta_sub, spw):
    def try_inner(n_splits, lookback_days, min_train_obs, min_history_days, embargo_days):
        inner = build_forward_purged_folds_strict(
            meta_sub[["date","ticker"]],
            n_splits=n_splits,
            lookback_days=lookback_days,
            min_train_obs=min_train_obs,
            min_history_days=min_history_days,
            per_ticker_embargo_days=embargo_days,
            global_embargo_days=None,
            verbose=False  # <— evita stampe durante i tentativi
        )
        oof_tmp = np.full(len(X_sub), np.nan, dtype=float)
        for (tr_m, te_m) in inner:
            pipe = make_model(spw)
            pipe.fit(X_sub.iloc[tr_m], y_sub[tr_m])
            oof_tmp[te_m] = pipe.predict_proba(X_sub.iloc[te_m])[:, 1]
        return oof_tmp

    n_obs = len(X_sub)
    configs = [
        dict(n_splits=3, lookback_days=None, min_train_obs=max(100, int(0.12*n_obs)), min_history_days=30, embargo_days=5),
        dict(n_splits=2, lookback_days=None, min_train_obs=max(80,  int(0.10*n_obs)), min_history_days=20, embargo_days=3),
    ]
    for cfg in configs:
        try:
            oof_sub = try_inner(**cfg)
            if np.isfinite(oof_sub).sum() > 0:
                return np.where(np.isnan(oof_sub), 0.5, oof_sub)
        except AssertionError:
            pass

    
    d = meta_sub.copy()
    d["date"] = pd.to_datetime(d["date"]).dt.normalize()
    uniq = d["date"].drop_duplicates().sort_values().to_list()
    if len(uniq) < 10:
        return np.full(n_obs, 0.5, dtype=float)
    cut = int(len(uniq)*0.7)
    tr_m = meta_sub["date"].isin(uniq[:cut]).values
    te_m = meta_sub["date"].isin(uniq[cut:]).values
    oof_sub = np.full(n_obs, 0.5, dtype=float)
    if tr_m.sum() >= max(60, int(0.08*n_obs)) and te_m.sum() >= 20:
        pipe = make_model(spw)
        pipe.fit(X_sub.iloc[tr_m], y_sub[tr_m])
        oof_sub[te_m] = pipe.predict_proba(X_sub.iloc[te_m])[:, 1]
    return oof_sub


HARD_GATE = False
META_THR  = 0.60
eps = 1e-9

p_base  = np.full(len(X), np.nan, dtype=float)
p_final = np.full(len(X), np.nan, dtype=float)

for k, (tr_mask, te_mask) in enumerate(folds):
    X_tr, X_te = X.iloc[tr_mask], X.iloc[te_mask]
    y_tr, y_te = y[tr_mask], y[te_mask]
    meta_tr, meta_te = meta.iloc[tr_mask], meta.iloc[te_mask]

    # base OOF 
    pos = (y_tr == 1).sum(); neg = (y_tr == 0).sum()
    spw = neg / max(1, pos)
    base = make_model(spw)
    base.fit(X_tr, y_tr)
    p_te = base.predict_proba(X_te)[:, 1]
    p_base[te_mask] = p_te

    # inner-OOF 
    p_tr_oof = build_inner_oof(X_tr, y_tr, meta_tr, spw)
    p_tr_clip = np.clip(p_tr_oof, eps, 1 - eps)
    ll_base = -(y_tr*np.log(p_tr_clip) + (1 - y_tr)*np.log(1 - p_tr_clip))
    ll_null = -np.log(0.5)
    y_meta  = (ll_base < ll_null).astype(int)

    if len(r_cols) == 0:
        p_final[te_mask] = p_te
        p_te_f = p_te
    else:
        Z_tr = preproc_regime.fit_transform(X_tr[r_cols])
        Z_te = preproc_regime.transform(X_te[r_cols])
        meta_clf = LogisticRegression(C=2.0, solver='lbfgs', max_iter=2000, class_weight='balanced')
        meta_clf.fit(Z_tr, y_meta)
        g_te = meta_clf.predict_proba(Z_te)[:, 1]

        if HARD_GATE:
            use = (g_te >= META_THR).astype(float)
            p_te_f = 0.5 + use*(p_te - 0.5)  # on/off
        else:
            p_te_f = 0.5 + g_te*(p_te - 0.5)  # soft blending
        p_final[te_mask] = p_te_f

    print(f"Fold {k}: train {tr_mask.sum():4d} | test {te_mask.sum():4d} | "
          f"AUC base {roc_auc_score(y_te, p_te):.3f} | AUC regime {roc_auc_score(y_te, p_te_f):.3f}")


def report(tag, y_true, p):
    v = ~np.isnan(p)
    auc   = roc_auc_score(y_true[v], p[v])
    brier = brier_score_loss(y_true[v], p[v])
    acc   = accuracy_score(y_true[v], (p[v] >= 0.5).astype(int))
    mcc   = matthews_corrcoef(y_true[v], (p[v] >= 0.5).astype(int))
    print(f"[{tag}] AUC {auc:.3f} | Brier {brier:.3f} | ACC {acc:.3f} | MCC {mcc:.3f} | n={v.sum()}")
    return auc, v

auc_b, vb = report("BASE OOF",   y, p_base)
auc_f, vf = report("REGIME OOF", y, p_final)


valid_f = vf
y_valid, p_valid = y[valid_f], p_final[valid_f]


N_PERM = 5000
rng = np.random.default_rng(42)
ge = sum(roc_auc_score(rng.permutation(y_valid), p_valid) >= auc_f for _ in range(N_PERM))
print(f"Permutation test p-value (final): {ge/N_PERM:.4f}")

dates_valid = meta.loc[valid_f, "date"].reset_index(drop=True)

def auc_by_period(dates, y, p, freq):
    df = pd.DataFrame({"date": dates, "y": y, "p": p})
    out = []
    for per, g in df.groupby(df["date"].dt.to_period(freq)):
        if g["y"].nunique()==2:
            out.append((str(per), roc_auc_score(g["y"], g["p"]), len(g)))
        else:
            out.append((str(per), np.nan, len(g)))
    return pd.DataFrame(out, columns=["period","auc","n_obs"]).sort_values("period")

print("\nAUC per trimestre (final):\n", auc_by_period(dates_valid, y_valid, p_valid, "Q"))


# Signals
TARGET_SHARE, BAND = 0.50, 0.05
ths = np.unique(np.concatenate([
    np.linspace(0.30, 0.70, 81),
    np.quantile(p_valid, np.linspace(0.05, 0.95, 91))
]))
ths = ths[(ths >= 0.01) & (ths <= 0.99)]

def pick_threshold_constrained(p, y, target_share=TARGET_SHARE, band=BAND):
    best = -1; t_star = 0.5
    for t in ths:
        yhat = (p >= t).astype(int)
        share = yhat.mean()
        if abs(share - target_share) <= band:
            mcc = matthews_corrcoef(y, yhat)
            if mcc > best:
                best, t_star = mcc, t
    if best < 0:
        return float(np.quantile(p, 1 - target_share))
    return float(t_star)

t_star = pick_threshold_constrained(p_valid, y_valid)
df_signals = df_ml.loc[row_mask].reset_index(drop=True).copy()
df_signals["prob_up_base"]  = 0.5; df_signals.loc[vb, "prob_up_base"]  = p_base[vb]
df_signals["prob_up_final"] = 0.5; df_signals.loc[vf, "prob_up_final"] = p_final[vf]
df_signals["prob_down_final"] = 1.0 - df_signals["prob_up_final"]
df_signals["signal"] = np.where(df_signals["prob_up_final"] >= t_star, 1, -1)
print(f"\n Choice t*={t_star:.3f} | long share={(df_signals['signal']>0).mean():.2%}")
# df_signals.to_csv("trade_signals_OOF_regimeaware.csv", index=False)

Fold 0: train  171 | test  356 | AUC base 0.634 | AUC regime 0.367
Fold 1: train  526 | test  700 | AUC base 0.533 | AUC regime 0.536
Fold 2: train 1101 | test  663 | AUC base 0.517 | AUC regime 0.516
Fold 3: train 1619 | test 1074 | AUC base 0.612 | AUC regime 0.595
[BASE OOF] AUC 0.560 | Brier 0.247 | ACC 0.546 | MCC 0.084 | n=2793
[REGIME OOF] AUC 0.549 | Brier 0.248 | ACC 0.546 | MCC 0.084 | n=2793
Permutation test p-value (final): 0.0000

AUC per trimestre (final):
    period       auc  n_obs
0  2024Q2  0.517747     78
1  2024Q3  0.446487    405
2  2024Q4  0.499288    663
3  2025Q1  0.472474    698
4  2025Q2  0.595620    949

 Choice t*=0.480 | long share=56.69%


#### Embargo Sensitivity Check

We stress-tested the **per-ticker embargo window** by varying it from **1 day** up to **10 days**.  
The resulting OOF AUC differed by only ~**0.01/02** on average compared to the baseline (5 days).  

This stability indicates that the model is **not materially affected by look-ahead bias or bleed** between training and test samples.


# Adding `meta_prob` to the signals DataFrame

In addition to the base model probability (`prob_up_base`) and the regime-aware blended probability (`prob_up_final`),  
we also store the **raw output of the meta-classifier** (the regime gate).  

---

## How it works
- We initialize a vector `meta_prob` filled with NaN.  
- Inside each outer fold, when we compute `g_te = P(base model is useful | regime features)`,  
  we assign these values into the appropriate test slice of `meta_prob`.  
- After the loop, when constructing `df_signals`, we add a column:
  ```python
  df_signals["meta_prob"] = 0.5
  df_signals.loc[vf, "meta_prob"] = meta_prob[vf]

In [7]:
meta_prob = np.full(len(X), np.nan, dtype=float)
meta_prob[te_mask] = g_te
df_signals["meta_prob"] = 0.5
df_signals.loc[vf, "meta_prob"] = meta_prob[vf]

# `make_merged(...)` Helper

### Inputs
- **df_signals**: DataFrame with model outputs, containing at least  
  `['date', 'ticker', prob_col]`.
- **df_features**: Feature dataset with  
  `['date', 'ticker', atr_col]`, so we can bring in ATR values.
- **prob_col**: The column in `df_signals` that stores the probability we want to trade on  
  (e.g., `prob_up_final` or `meta_prob`).
- **t_star**: Decision threshold chosen earlier (constrained to desired long/short share).
- **gate_mask** (optional): Boolean mask  
  (e.g., `df_signals["meta_prob"] >= τ`) to restrict trades to certain regimes.
- **atr_col**: The ATR feature to use for risk sizing (default = `"atr_10d_Tm1"`).

---

### Processing Steps

1. **Optional filter**  
   - If a `gate_mask` is provided, keep only rows where the condition is `True`  
     (for example: restrict to trusted regimes).

2. **Rename & normalize**  
   - `date → entry_date` (normalized to midnight).  
   - The chosen probability column → `prob_up`.

3. **Generate directional signal**  
   ```python
   df["signal"] = np.where(df["prob_up"] >= t_star, 1, -1)
### Step 4. Merge ATR values
- From `df_features`, bring in `atr_col` aligned by `(entry_date, ticker)`.  
- Standardize column name to `"atr"`.

---

### Step 5. Finalize
- Sort by `(entry_date, ticker)`.  
- Remove duplicates.  
- Keep only the expected backtest columns.

---

### Output

A clean DataFrame with exactly the columns required by the backtester:

- **entry_date**: the day the signal becomes eligible (execution is T+1).  
- **ticker**: asset identifier.  
- **prob_up**: model probability used for sizing.  
- **signal**: trade direction (+1 long, −1 short).  
- **atr**: ATR value (used for stop distance and risk sizing).

---

### Why this helper matters

- **Standardization**: ensures consistency between modeling outputs and the backtesting engine.  
- **Flexibility**: by changing `prob_col` or passing a `gate_mask`, we can backtest base, final, or gated signals easily.  
- **Risk-ready**: adds the ATR column directly, so the backtester can compute stop distances and position sizing.


In [8]:

def make_merged(df_signals: pd.DataFrame,
                df_features: pd.DataFrame,
                prob_col: str,
                t_star: float,
                gate_mask: pd.Series | None = None,
                atr_col: str = "atr_10d_Tm1") -> pd.DataFrame:
   
    df = df_signals.copy()
    if gate_mask is not None:
        df = df.loc[gate_mask].copy()

   
    df = (df.rename(columns={"date": "entry_date"})
            .loc[:, ["entry_date", "ticker", prob_col]])
    df["entry_date"] = pd.to_datetime(df["entry_date"]).dt.normalize()
    df = df.rename(columns={prob_col: "prob_up"})
    df["prob_up"] = df["prob_up"].astype(float)

    
    df["signal"] = np.where(df["prob_up"] >= t_star, 1, -1)

    # ATR (T-1 in the set)
    feat = df_features.loc[:, ["date", "ticker", atr_col]].copy()
    feat["date"] = pd.to_datetime(feat["date"]).dt.normalize()
    feat = feat.rename(columns={"date": "entry_date", atr_col: "atr"})

    merged = (df.merge(feat, on=["entry_date", "ticker"], how="left")
                .drop_duplicates(subset=["entry_date", "ticker"])
                .sort_values(["entry_date", "ticker"])
                .reset_index(drop=True))

    
    return merged[["entry_date", "ticker", "prob_up", "signal", "atr"]]

# Creating Different Versions of `merged_data`

At this stage we have `df_signals` containing the model outputs:
[‘date’, ‘ticker’, ‘prob_up_base’, ‘prob_up_final’, ‘meta_prob’, ‘signal’, …]
and a decision threshold `t_star` that was chosen on the OOF (out-of-fold) probability curve.  
We now use the helper `make_merged(...)` to prepare three different flavors of `merged_data` for backtesting.

---

## Variants

- **`merged_base`**  
  ```python
  merged_base = make_merged(df_signals, df_ml, prob_col="prob_up_base", t_star=t_star)
Uses the raw base stacker probabilities (prob_up_base).
This is the benchmark without any regime adjustment.
- **`merged_final`**  
  ```python
  merged_final = make_merged(df_signals, df_ml, prob_col="prob_up_final", t_star=t_star)
Uses the regime-aware blended probabilities (prob_up_final).
This incorporates the meta-classifier’s soft gating.
- **`merged_gated`**  
  ```python
  merged_gated = make_merged(df_signals, df_ml,
                           prob_col="prob_up_final",
                           t_star=t_star,
                           gate_mask=(df_signals["meta_prob"] >= TAU))
Same as merged_final, but restricted to rows where the meta-classifier’s trust score meta_prob is above a threshold (TAU = 0.60).
Effectively this is a hard-gated variant of the regime-aware strategy.
## Why this is useful

By generating **parallel versions** of `merged_data`, we can run the same backtest engine under different modeling assumptions:

- **BASE**: pure model stack (no regime).  
- **FINAL**: regime-aware soft blend.  
- **GATED**: regime-aware but only active when the gate is confident.  

This makes it easy to compare performance and evaluate whether the regime gate is truly adding value.

In [9]:
TAU = 0.60 
merged_base  = make_merged(df_signals, df_ml, prob_col="prob_up_base",  t_star=t_star)
merged_final = make_merged(df_signals, df_ml, prob_col="prob_up_final", t_star=t_star)
merged_gated = make_merged(df_signals, df_ml, prob_col="prob_up_final",
                           t_star=t_star, gate_mask=(df_signals["meta_prob"] >= TAU))

# sanity check
for name, dfm in [("BASE", merged_base), ("FINAL", merged_final), ("GATED", merged_gated)]:
    print(name, dfm.head(2))
    assert {"entry_date","ticker","prob_up","signal","atr"} <= set(dfm.columns)

BASE   entry_date ticker  prob_up  signal  atr
0 2024-01-02   VYGR      0.5       1  NaN
1 2024-01-04   ADCT      0.5       1  NaN
FINAL   entry_date ticker  prob_up  signal  atr
0 2024-01-02   VYGR      0.5       1  NaN
1 2024-01-04   ADCT      0.5       1  NaN
GATED   entry_date ticker   prob_up  signal      atr
0 2025-03-26   ACTU  0.457329      -1  0.39513
1 2025-03-26   ATAI  0.506686       1  0.10906


# Probabilistic Signal → Portfolio Weights Helper

### **Inputs**
- **df**: `DataFrame` with at least `date`, `ticker`, and the probability column.
- **prob_col** *(str)*: name of the probability column (e.g., `"p"`, `"prob_up_final"`).
- **t** *(float, default = 0.0)*: confidence threshold around 0.5.  
  Signals with \(|prob - 0.5| < t\) are set to 0 (ignored).
- **long_only** *(bool, default = False)*:
  - `False` → market-neutral normalization (longs + shorts).
  - `True` → long-only normalization (shorts removed).

---

### **Output**
`DataFrame` with columns: [‘date’, ‘ticker’, ‘weight’]

One row per asset-date.

---

### **Scoring and Normalization Logic**

#### 1. **Score (conviction relative to 0.5)**
$$
\text{score}_i = \text{prob}_i - 0.5
$$

- Positive scores → bullish convictions (long side).  
- Negative scores → bearish convictions (short side).  

---

#### 2. **Optional long-only constraint**
If `long_only=True`, negative scores are clipped:
$$
\text{score}_i \leftarrow \max(\text{score}_i, 0)
$$

---

#### 3. **Confidence threshold \(t\)**
Weak signals ignored:
$$
\text{score}_i =
\begin{cases}
0 & \text{if } \lvert \text{score}_i \rvert < t,\\
\text{score}_i & \text{otherwise}.
\end{cases}
$$

---

#### 4. **Daily normalization (per date)**

- **Market-neutral (default, `long_only=False`)**  
  Sum of absolute weights = 1:
  $$
w_i=\frac{\mathrm{score}_i}{\sum_j \lvert \mathrm{score}_j\rvert+\varepsilon}
$$

- **Long-only (`long_only=True`)**  
  Sum of positive weights = 1:
  $$
w_i=\frac{\max\!\big(\mathrm{score}_i,0\big)}{\sum_j \max\!\big(\mathrm{score}_j,0\big)+\varepsilon}
$$

- Small $\varepsilon=10^{-12}$  ensures numerical safety when all scores = 0.

---

### **Behavior & Edge Cases**
- **All scores zero** (e.g., tight threshold):  
  Denominator ≈ \(\varepsilon\), numerators = 0 → all weights = 0 (flat book).
- **Single large conviction**:  
  That asset ≈ weight ±1 (MN) or 1 (LO), others ≈ 0.
- **Market-neutral**: negative weights represent short exposure.
- **Long-only**: weights are non-negative and sum to 1 if any positive score; else all 0.

---

### **Why This Helper is Practical**
- Consistent **portfolio sizing** from probabilistic signals.  
- Flexible posture:  
  - **Market-neutral** (pairs-style) or **long-only** (benchmark-constrained) with one switch.  
- **Confidence thresholding** suppresses noise & turnover.  
- **Drop-in ready** for backtests: daily normalized weights simplify P&L attribution and risk checks.
    

In [10]:
def to_weights(df, prob_col, t=0.0, long_only=False):
    
    s = (df[prob_col] - 0.5)
    if long_only:
        s = s.clip(lower=0.0)
    s = s.where(s.abs() >= t, 0.0)  

    df = df.assign(score=s)

    def _normalize(g):
        if long_only:
            pos = g["score"].clip(lower=0.0)
            w = pos / (pos.sum() + 1e-12)
        else:
            w = g["score"] / (g["score"].abs().sum() + 1e-12)
        return pd.DataFrame({
            "date": g["date"].values,
            "ticker": g["ticker"].values,
            "weight": w.values
        })

   
    out = df.groupby("date", group_keys=False)[["date","ticker","score"]].apply(_normalize)
    return out.reset_index(drop=True)



signals_base   = to_weights(df_signals.rename(columns={"prob_up_base":"p"}),
                            prob_col="p", t=0.02, long_only=False)

signals_final  = to_weights(df_signals.rename(columns={"prob_up_final":"p"}),
                            prob_col="p", t=0.02, long_only=False)


TAU = 0.60
mask_gate = df_signals["meta_prob"] >= TAU
signals_base_gated = to_weights(
    df_signals.loc[mask_gate].rename(columns={"prob_up_base":"p"}),
    prob_col="p", t=0.02, long_only=False
)

In [11]:
def load_prices_ohlc(csv_path: str) -> pd.DataFrame:
    px = pd.read_csv(csv_path, parse_dates=["date"])
    # normalizza
    px = px.rename(columns={
        "Date":"date","DATE":"date",
        "Ticker":"ticker","TICKER":"ticker",
        "Open":"open","High":"high","Low":"low","Close":"close",
        "Adj Close":"adj_close","adj_close":"adj_close"
    })
    px["date"] = pd.to_datetime(px["date"]).dt.normalize()
    px = px.sort_values(["ticker","date"]).reset_index(drop=True)

    have_open = "open" in px.columns
    have_high = "high" in px.columns
    have_low  = "low"  in px.columns
    have_close= "close" in px.columns

    if not have_close:
        raise RuntimeError("file price without 'close' — at least (date,ticker,close).")

    
    if not have_open:
        px["open"] = px.groupby("ticker")["close"].shift(1)
        px["open"] = px["open"].fillna(px["close"])

    
    if not have_high:
        px["high"] = px[["open","close"]].max(axis=1)
    if not have_low:
        px["low"]  = px[["open","close"]].min(axis=1)

    
    prices = px[["date","ticker","open","high","low","close"]].copy()
    prices = prices.dropna(subset=["date","ticker","open","high","low","close"])
    prices = prices.drop_duplicates(subset=["date","ticker"])
    return prices


In [12]:
prices = load_prices_ohlc(p("underlying_prices_complete.csv"))
print(prices.head())

        date ticker   open   high     low  close
0 2025-02-13   AARD  15.18  15.18  11.550  14.31
1 2025-02-14   AARD  14.34  14.43  12.905  13.30
2 2025-02-18   AARD  13.39  14.40  12.580  13.56
3 2025-02-19   AARD  13.54  16.48  13.425  14.90
4 2025-02-20   AARD  15.89  19.58  15.630  17.25


In [13]:

from pathlib import Path
import json
import numpy as np
import pandas as pd

OUTDIR = Path("bt_runs")
OUTDIR.mkdir(parents=True, exist_ok=True)

def save_bt(name: str,
            port: pd.DataFrame,
            trades: pd.DataFrame,
            miss: pd.DataFrame | None,
            metrics: dict,
            args: dict,
            outdir: Path = OUTDIR) -> None:
   
    
    eq = metrics["Equity"]
   
    port_to_save = port.copy()
    if port_to_save.index.name is None:
        port_to_save.index.name = "date"
    port_to_save.reset_index().to_csv(outdir / f"portfolio_equity_{name}.csv", index=False)

    
    (trades if trades is not None else pd.DataFrame()).to_csv(
        outdir / f"trading_history_{name}.csv", index=False
    )

   
    if miss is not None and len(miss):
        miss.to_csv(outdir / f"missing_prices_{name}.csv", index=False)

   
    row = {
        "run": name,
        "cagr": float(metrics.get("CAGR", np.nan)),
        "sharpe": float(metrics.get("Sharpe", np.nan)),
        "max_dd": float(metrics.get("Max DD", np.nan)),
        "win_rate": float(metrics.get("Win rate", np.nan)) if metrics.get("Num trades", 0) > 0 else np.nan,
        "profit_factor": float(metrics.get("Profit factor", np.nan)) if metrics.get("Num trades", 0) > 0 else np.nan,
        "num_trades": int(metrics.get("Num trades", 0)),
        "tot_return": float(eq.iloc[-1] / eq.iloc[0] - 1) if len(eq) else np.nan,
    }
    summ_path = outdir / "metrics_summary.csv"
    if summ_path.exists():
        summary = pd.read_csv(summ_path)
        summary = pd.concat([summary, pd.DataFrame([row])], ignore_index=True)
    else:
        summary = pd.DataFrame([row])
    summary.to_csv(summ_path, index=False)

   
    (outdir / "last_bt_args.json").write_text(json.dumps(args, indent=2, default=str))

In [14]:
from tqdm import tqdm
bt_args = dict(
    prices=prices,
    prob_long_min = float(round(0.489, 3)),  
    prob_short_min= float(round(0.489, 3)),   
    commission_pct=0.001,                      
    slippage_pct =0.004,                       
    initial_capital=100000
)
# backtest 
print(">>> BASE …")
port_b, trades_b, miss_b = run_backtest(merged_base,  **bt_args)

print(">>> FINAL …")
port_f, trades_f, miss_f = run_backtest(merged_final, **bt_args)

print(">>> GATED …")
port_g, trades_g, miss_g = run_backtest(merged_gated, **bt_args)


def expectancy(trades_df: pd.DataFrame) -> float:
    if trades_df.empty:
        return float("nan")
    wr = (trades_df["return"] > 0).mean()
    avg_win  = trades_df.loc[trades_df["return"] > 0, "return"].mean()
    avg_loss = -trades_df.loc[trades_df["return"] < 0, "return"].mean()
    return wr*avg_win - (1-wr)*avg_loss

m_b = calculate_metrics(port_b, trades_b)
m_f = calculate_metrics(port_f, trades_f)
m_g = calculate_metrics(port_g, trades_g)

print("\n=== SUMMARY TABLE ===")
import pandas as pd, numpy as np
def row(name, m):
    eq = m["Equity"]
    tot_ret = float(eq.iloc[-1]/eq.iloc[0] - 1) if len(eq) else np.nan
    return {
        "run": name,
        "CAGR": m["CAGR"], "Sharpe(ann)": m["Sharpe"], "Max DD": m["Max DD"],
        "Win rate": m["Win rate"], "Profit factor": m["Profit factor"],
        "#trades": m["Num trades"], "TotRet": tot_ret
    }
summary_df = pd.DataFrame([row("BASE", m_b), row("FINAL", m_f), row("GATED", m_g)])
print(summary_df.to_string(index=False))

print(f"\nExpectancy per trade (BASE) : {expectancy(trades_b):.4%}")
print(f"Expectancy per trade (FINAL): {expectancy(trades_f):.4%}")
print(f"Expectancy per trade (GATED): {expectancy(trades_g):.4%}")
save_bt("BASE",  port_b, trades_b, miss_b, m_b, bt_args)
save_bt("FINAL", port_f, trades_f, miss_f, m_f, bt_args)
save_bt("GATED", port_g, trades_g, miss_g, m_g, bt_args)

>>> BASE …


Back-test: 100%|████████████████████████████| 551/551 [00:00<00:00, 1701.88it/s]


>>> FINAL …


Back-test: 100%|████████████████████████████| 551/551 [00:00<00:00, 1437.74it/s]


>>> GATED …


Back-test: 100%|████████████████████████████| 102/102 [00:00<00:00, 1463.47it/s]


=== SUMMARY TABLE ===
  run      CAGR  Sharpe(ann)    Max DD  Win rate  Profit factor  #trades    TotRet
 BASE -0.194505    -1.074737 -0.302458  0.447900       0.861867      643 -0.277982
FINAL -0.193466    -1.720422 -0.301102  0.471983       0.930198      928 -0.276579
GATED  0.450552     1.206264 -0.092459  0.530864       1.236173      405  0.108326

Expectancy per trade (BASE) : -0.6563%
Expectancy per trade (FINAL): -0.3203%
Expectancy per trade (GATED): 0.9820%





### Backtest results: BASE vs FINAL vs GATED 
**Costs (net): commission = 10 bps per side, slippage = 40 bps per side → ~50 bps per side, ~100 bps round-trip. Applied on both entry and exit.**
### Key takeaways (updated)

**BASE and FINAL** (always-on models) are negative over the sample:
- **TotRet:** ≈ **−27.8%** (BASE) / **−27.6%** (FINAL)  
- **Sharpe:** ≈ **−1.07** / **−1.72**  
- **MaxDD:** ≈ **−30.2%** / **−30.1%**  
- **Activity:** **643** / **928** trades with weak **expectancy per trade** (**−0.65%** / **−0.32%**)

**GATED** (regime-aware, hard filter) is materially better:
- **TotRet:** ≈ **+10.8%**  
- **Sharpe:** ≈ **1.20**  
- **MaxDD:** ≈ **−9.2%**  
- **Win rate:** ≈ **53%** · **Profit factor:** ≈ **1.24**  
- **#trades:** **405** (the gate blocks low-quality regimes and concentrates exposure when the meta-signal is confident)  
- **Expectancy per trade:** ≈ **+0.982%**

---

## Interpretation

- The **gate** reduces the number of trades and **avoids the deepest drawdowns** seen in BASE/FINAL.  
- **Positive expectancy per trade** rises to ~**0.982%** (vs **−0.65%** BASE / **−0.32%** FINAL), indicating the regime filter **selects higher-quality opportunities** and improves risk efficiency.

--- 
## Disclosures (brief)

Costs model: constant % slippage by side (40 bps) + commission (10 bps) — a simplification suitable for sensitivity analysis; real costs depend on spread, depth, volatility, and participation.

> 🚩 **Note — Regime features update**
>
> **Regime gate change:** added `IBB_v_Tm1`, removed `spread_3m_10y` and modified principal features list.  
> **AUC:** **~0.57–0.58 OOF** and **~0.626 in 2025Q2**.  
> **P&L impact:** Worst results due to **under-calibrated probabilities**.  
> **Next steps:** apply **Platt scaling** and **isotonic regression** to fix calibration & thresholding.  
> **For this report:** highlight the **earlier configuration** (stronger realized **P&L**) while calibration upgrades are finalized.


---

#### STRESS-TEST (COSTS - SLIPPAGE)

In [15]:
grid = [0.005, 0.006,0.007, 0.009,0.01]
rows = []
for s in grid:
    bt_args2 = {**bt_args, "slippage_pct": s}
    port, trades, miss = run_backtest(merged_gated, **bt_args2)  #  GATED
    m = calculate_metrics(port, trades)
    eq = m["Equity"]
    tot_ret = float(eq.iloc[-1]/eq.iloc[0]-1) if len(eq) else float("nan")
    rows.append({
        "slippage": s,
        "CAGR": m["CAGR"],
        "Sharpe": m["Sharpe"],
        "MaxDD": m["Max DD"],
        "PF": m["Profit factor"],
        "WinRate": m["Win rate"],
        "#trades": m["Num trades"],
       
        "Expect/trade": (trades["return"].mean() if len(trades) else float("nan")),
        "TotRet": tot_ret
    })
pd.DataFrame(rows)

Back-test: 100%|████████████████████████████| 102/102 [00:00<00:00, 1372.07it/s]
Back-test: 100%|████████████████████████████| 102/102 [00:00<00:00, 1437.19it/s]
Back-test: 100%|████████████████████████████| 102/102 [00:00<00:00, 1540.26it/s]
Back-test: 100%|████████████████████████████| 102/102 [00:00<00:00, 1482.12it/s]
Back-test: 100%|████████████████████████████| 102/102 [00:00<00:00, 1508.59it/s]


Unnamed: 0,slippage,CAGR,Sharpe,MaxDD,PF,WinRate,#trades,Expect/trade,TotRet
0,0.005,0.365363,1.028827,-0.099453,1.18392,0.530864,405,0.00782,0.089931
1,0.006,0.285254,0.851048,-0.106384,1.13392,0.528395,405,0.00582,0.07186
2,0.007,0.20992,0.67307,-0.113254,1.086014,0.520988,405,0.00382,0.054105
3,0.009,0.072435,0.317093,-0.126813,0.996105,0.516049,405,-0.00018,0.019526
4,0.01,0.00976,0.139381,-0.133503,0.953915,0.51358,405,-0.00218,0.002689


### - Slippage sensitivity (per side: entry and exit)

### How to read it 

- Increasing slippage per side from **50 bps → 100 bps** steadily erodes **CAGR, Sharpe, Profit Factor,** and **expectancy per trade**.  
- **Break-even zone:** around **0.9% per side** the Profit Factor ≈ 1.0 and expectancy ~ 0%, i.e., the edge is essentially consumed.  
- Win rate drifts down slightly; **#trades stays constant** (same signals; only costs change).

> This presentation fits small-cap biotech; pair with a short note on **liquidity filters** and **participation limits** to justify base and stress levels.
m
### Scenario labels used in the study (small-cap biotech universe)
- **Base - positive(off-table):** 0.004 (Tot ret = 10.8% | Max DD = -9.2%)
- **Base – conservative:** 0.005 and 0.006 per side  (Tot ret = 9% | 7.2% , Max DD = -10% | -10.6%)
- **Stress:** 0.007 per side   (Tot ret = 5.4% , Max DD = -11.3%)
- **Stress – high:** 0.009 per side  (Tot ret = 1.9% , Max DD = -12.7%)
- **Stress – very high:** 0.010 per side  ((Tot ret = 0.26% , Max DD = -13.3%)

These tiers reflect realistic-to-conservative execution assumptions for the segment. As slippage rises, **CAGR/Sharpe/PF decline**, and **expectancy approaches or breaches zero** around **~0.009–0.010 per side**.

-**All scenarios include commission = 10 bps per side; the table varies slippage per side only. Results are net of costs. Per side = entry or exit; round-trip ≈ 2× per-side.**

#### Why higher slippage hurts more when you trade more

- P&L erosion is roughly **proportional to the number of trades** (and traded notional).  
- More trades ⇒ more times you “pay” the per-side cost ⇒ **CAGR, Sharpe, and Profit Factor decline** as slippage rises.  
- This matches the **monotonic drop** observed in the table.

#### Model-development outlook (reducing slippage sensitivity)

We expect similar baseline performance but **lower sensitivity to slippage** by trading **less but higher-quality**:

- **Probability calibration:** Platt / Isotonic → better-calibrated scores → tighter, confidence-based filters → **fewer marginal trades**.  
- **Stacking upgrade:** add **ElasticNet** in the meta-learner → improved generalization and **sparser decision boundaries**.  
- **"Shadow model" with higher AUC:** use it for **meta-gating or consensus** → engage only when agreement/confidence is high.

**Net effect:** fewer, more selective trades, **higher average edge per trade**, and **less performance decay** as slippage increases.