# Random Forest Cointegration (Pairs Trading) - Time Split (All 100 Assets)

This notebook builds a **pairs arbitrage** strategy:
1) Use the full 100-asset universe to find cointegrated pairs on the **training window**.
2) Build a supervised dataset of spread mean-reversion events.
3) Train a `RandomForestClassifier` to predict whether a spread will revert within a horizon.
4) On the **test window**, rank pairs weekly, construct a long/short portfolio, and run the repo's **regular backtester** with the existing **Bokeh** output.


## Math: Cointegration, Spread, Z-Score, and Labels

### Cointegration (Engle-Granger)
For two (log) price series $y_t$ and $x_t$, estimate the static hedge ratio via OLS:

$$y_t = a + b x_t + \epsilon_t.$$

The **spread** is the residual:

$$s_t = y_t - (a + b x_t).$$

If $s_t$ is stationary (unit-root rejected), the pair is treated as cointegrated.
We use the Engle-Granger test (`statsmodels.tsa.stattools.coint`) on the training window.

### Z-score (rolling)
We compute a rolling z-score of the spread:

$$z_t = \frac{s_t - \mu_t}{\sigma_t},\quad \mu_t = \text{MA}_W(s_t),\ \sigma_t = \text{Std}_W(s_t).$$

### Mean-reversion trade direction
- If $z_t$ is **high** (spread rich), we short the spread: short $y$, long $x$ (scaled by $b$).
- If $z_t$ is **low** (spread cheap), we long the spread: long $y$, short $x$.

### Supervised label
We create training examples only when the spread is far from equilibrium: $|z_t| \ge z_{\text{entry}}$.
Label $\ell_t = 1$ if the spread **reverts** within the next $H$ days:

$$\ell_t = \mathbb{1}\left[\min_{1\le k\le H} |z_{t+k}| \le z_{\text{exit}}\right]$$

(optionally requiring it does not blow out beyond a stop level).


In [1]:
from __future__ import annotations

from pathlib import Path
import sys
import itertools

import numpy as np
import pandas as pd

from bokeh.io import output_notebook, show

from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score

from statsmodels.tsa.stattools import coint


In [2]:
# Resolve project root robustly by walking parents.
# We require BOTH `dataset/` and `src/` to exist at the same level.
CWD = Path.cwd().resolve()
PROJECT_ROOT = None
for p in [CWD, *CWD.parents]:
    if (p / 'dataset').exists() and (p / 'src').exists():
        PROJECT_ROOT = p
        break

if PROJECT_ROOT is None:
    raise RuntimeError(f'Could not locate project root from CWD={CWD}')

if str(PROJECT_ROOT) not in sys.path:
    sys.path.append(str(PROJECT_ROOT))

print('CWD:', CWD)
print('PROJECT_ROOT:', PROJECT_ROOT)


CWD: /home/anivarth/college/quant-task/notebooks/random forest models
PROJECT_ROOT: /home/anivarth/college/quant-task


In [3]:
from src.backtester.data import load_cleaned_assets, align_close_prices

# Load all cleaned assets (expects Asset_001.csv ... Asset_100.csv)
assets_ohlcv = load_cleaned_assets(cleaned_dir=str(PROJECT_ROOT / 'dataset' / 'cleaned'))
close_prices_full = align_close_prices(assets_ohlcv).sort_index()

# Use log-prices for cointegration/spread calculations
logp_full = np.log(close_prices_full.replace(0.0, np.nan)).dropna(how='all')

symbols = sorted(logp_full.columns.tolist())
print('n_assets:', len(symbols), 'first/last:', symbols[:3], symbols[-3:])
print('date range:', logp_full.index.min().date(), '->', logp_full.index.max().date())


n_assets: 100 first/last: ['Asset_001', 'Asset_002', 'Asset_003'] ['Asset_098', 'Asset_099', 'Asset_100']
date range: 2016-01-25 -> 2026-01-16


In [4]:
# Time split (deterministic)
# - Train: first 7 years
# - Validation: middle period
# - Test: last 1.5 years (18 months)

start = pd.Timestamp(logp_full.index.min())
end = pd.Timestamp(logp_full.index.max())

train_end = start + pd.DateOffset(years=7)
test_start = end - pd.DateOffset(months=18)

if train_end >= test_start:
    raise ValueError(
        f'Not enough history for requested split: start={start.date()} train_end={train_end.date()} test_start={test_start.date()} end={end.date()}'
    )

train_idx = logp_full.index[logp_full.index < train_end]
val_idx = logp_full.index[(logp_full.index >= train_end) & (logp_full.index < test_start)]
test_idx = logp_full.index[logp_full.index >= test_start]

print('train:', train_idx.min().date(), '->', train_idx.max().date(), 'n:', len(train_idx))
print('val  :', val_idx.min().date(), '->', val_idx.max().date(), 'n:', len(val_idx))
print('test :', test_idx.min().date(), '->', test_idx.max().date(), 'n:', len(test_idx))


train: 2016-01-25 -> 2023-01-24 n: 1763
val  : 2023-01-25 -> 2024-07-15 n: 369
test : 2024-07-16 -> 2026-01-16 n: 379


In [5]:
# Pair scan on training window: cointegration p-values for ALL pairs.
# Note: This is compute-heavy (100 choose 2 = 4950 tests). It is intentionally exhaustive.

MAX_PAIRS_TO_TRADE = 10          # portfolio uses top-N pairs per rebalance
N_PAIRS_CANDIDATES = 50          # keep best-K cointegrated pairs for ML/backtest universe
ENTRY_Z = 2.0
EXIT_Z = 0.5
STOP_Z = 5.0
ZSCORE_WINDOW = 60
HORIZON_DAYS = 10

train_logp = logp_full.reindex(train_idx)

pairs = list(itertools.combinations(symbols, 2))
print('n_pairs_total:', len(pairs))

rows = []
for k, (y_sym, x_sym) in enumerate(pairs, start=1):
    y = train_logp[y_sym].dropna()
    x = train_logp[x_sym].dropna()
    idx = y.index.intersection(x.index)
    if len(idx) < max(ZSCORE_WINDOW + 5, 200):
        continue
    yv = y.loc[idx].to_numpy(dtype=float)
    xv = x.loc[idx].to_numpy(dtype=float)

    # Engle-Granger cointegration test on training window
    try:
        _score, pval, _crit = coint(yv, xv)
    except Exception:
        continue

    rows.append({'Y': y_sym, 'X': x_sym, 'pvalue': float(pval), 'n': int(len(idx))})

    if k % 250 == 0:
        print('tested', k, '/', len(pairs), 'pairs; kept', len(rows))

coint_df = pd.DataFrame(rows).sort_values('pvalue', ascending=True).reset_index(drop=True)
print('cointegration rows:', coint_df.shape)
display(coint_df.head(10))

# Candidate universe for the model/backtest
cand = coint_df.head(N_PAIRS_CANDIDATES).copy()
print('candidate pairs:', cand.shape[0])


n_pairs_total: 4950
tested 250 / 4950 pairs; kept 250
tested 500 / 4950 pairs; kept 500
tested 750 / 4950 pairs; kept 750
tested 1000 / 4950 pairs; kept 1000
tested 1250 / 4950 pairs; kept 1250
tested 1500 / 4950 pairs; kept 1500
tested 1750 / 4950 pairs; kept 1750
tested 2000 / 4950 pairs; kept 2000
tested 2250 / 4950 pairs; kept 2250
tested 2500 / 4950 pairs; kept 2500
tested 2750 / 4950 pairs; kept 2750
tested 3000 / 4950 pairs; kept 3000
tested 3250 / 4950 pairs; kept 3250
tested 3500 / 4950 pairs; kept 3500
tested 3750 / 4950 pairs; kept 3750
tested 4000 / 4950 pairs; kept 4000
tested 4250 / 4950 pairs; kept 4250
tested 4500 / 4950 pairs; kept 4500
tested 4750 / 4950 pairs; kept 4750
cointegration rows: (4950, 4)


Unnamed: 0,Y,X,pvalue,n
0,Asset_092,Asset_093,3.3e-05,1763
1,Asset_038,Asset_051,7.9e-05,1763
2,Asset_054,Asset_087,0.000155,1763
3,Asset_031,Asset_035,0.000176,1763
4,Asset_031,Asset_046,0.000186,1763
5,Asset_010,Asset_016,0.000215,1763
6,Asset_020,Asset_035,0.000277,1763
7,Asset_018,Asset_019,0.000278,1763
8,Asset_081,Asset_096,0.000343,1763
9,Asset_015,Asset_038,0.000366,1763


candidate pairs: 50


In [6]:
# Build supervised dataset from candidate pairs.
# We compute hedge ratio (train-only) and rolling z-score over the full history (no look-ahead).

from src.backtester.stat_arb import compute_pair_diagnostics


def fit_ols_params(y: pd.Series, x: pd.Series) -> tuple[float, float]:
    # y = a + b x
    df = pd.concat([y.rename('y'), x.rename('x')], axis=1).dropna()
    y_np = df['y'].to_numpy(dtype=float)
    x_np = df['x'].to_numpy(dtype=float)
    X = np.column_stack([np.ones_like(x_np), x_np])
    coeff, *_ = np.linalg.lstsq(X, y_np, rcond=None)
    a = float(coeff[0])
    b = float(coeff[1])
    return a, b


def compute_spread_z(
    *,
    y_sym: str,
    x_sym: str,
    a: float,
    b: float,
    window: int,
) -> tuple[pd.Series, pd.Series]:
    y = logp_full[y_sym]
    x = logp_full[x_sym]
    df = pd.concat([y.rename('y'), x.rename('x')], axis=1).dropna()
    spread = df['y'] - (a + b * df['x'])
    mu = spread.rolling(window).mean()
    sig = spread.rolling(window).std(ddof=1)
    z = (spread - mu) / (sig + 1e-12)
    return spread.rename('spread'), z.rename('z')


def build_events_for_pair(y_sym: str, x_sym: str, *, a: float, b: float) -> pd.DataFrame:
    spread, z = compute_spread_z(y_sym=y_sym, x_sym=x_sym, a=a, b=b, window=ZSCORE_WINDOW)

    # Events only where |z| >= ENTRY_Z
    z_ev = z.loc[z.index.intersection(train_idx.union(val_idx))]
    ev_mask = z_ev.abs() >= ENTRY_Z
    ev_times = z_ev.index[ev_mask]

    rows: list[dict[str, object]] = []
    for t in ev_times:
        # label computed with future z in [t+1, t+H]
        fut = z.loc[t:].iloc[1 : 1 + HORIZON_DAYS]
        if fut.empty:
            continue
        reverted = bool((fut.abs() <= EXIT_Z).any())
        blown = bool((fut.abs() >= STOP_Z).any())
        label = int(reverted and (not blown))

        zt = float(z.loc[t])
        s0 = float(spread.loc[t])
        z_prev = float(z.shift(1).loc[t]) if t in z.index else float('nan')

        rows.append(
            {
                'Date': pd.Timestamp(t),
                'Y': y_sym,
                'X': x_sym,
                'z': zt,
                'abs_z': float(abs(zt)),
                'dz': float(zt - z_prev) if np.isfinite(z_prev) else 0.0,
                'spread': s0,
                'hedge_ratio': float(b),
                'label': label,
            }
        )

    return pd.DataFrame(rows)


# Precompute OLS params on training, then create event dataset.
event_frames = []
params = {}

for i, row in cand.iterrows():
    y_sym = str(row['Y'])
    x_sym = str(row['X'])

    y_tr = train_logp[y_sym]
    x_tr = train_logp[x_sym]
    a, b = fit_ols_params(y_tr, x_tr)
    params[(y_sym, x_sym)] = (a, b)

    ev = build_events_for_pair(y_sym, x_sym, a=a, b=b)
    if not ev.empty:
        event_frames.append(ev)

    if (i + 1) % 10 == 0:
        print('pairs processed:', i + 1, '/', len(cand), 'events:', sum(len(x) for x in event_frames))

pairs_ds = pd.concat(event_frames, axis=0, ignore_index=True)
pairs_ds = pairs_ds.sort_values('Date').reset_index(drop=True)
print('pairs_ds shape:', pairs_ds.shape)
display(pairs_ds.head(5))


pairs processed: 10 / 50 events: 2389
pairs processed: 20 / 50 events: 4893
pairs processed: 30 / 50 events: 7549
pairs processed: 40 / 50 events: 10240
pairs processed: 50 / 50 events: 12770
pairs_ds shape: (12770, 9)


Unnamed: 0,Date,Y,X,z,abs_z,dz,spread,hedge_ratio,label
0,2016-04-19,Asset_018,Asset_019,2.160419,2.160419,0.0,-0.008631,0.386274,0
1,2016-04-20,Asset_046,Asset_059,-2.18098,2.18098,-0.416061,-0.141412,1.750055,1
2,2016-04-20,Asset_035,Asset_046,2.481208,2.481208,0.486867,0.0852,0.929414,1
3,2016-04-20,Asset_018,Asset_066,2.375963,2.375963,0.769525,-0.032575,0.424741,0
4,2016-04-20,Asset_015,Asset_046,2.082672,2.082672,0.480957,-0.012076,0.719829,0


In [7]:
# Train RandomForestClassifier on event dataset (time split).

df_train_ev = pairs_ds[pairs_ds['Date'].isin(train_idx)].copy()
df_val_ev = pairs_ds[pairs_ds['Date'].isin(val_idx)].copy()

feature_cols = ['z', 'abs_z', 'dz', 'spread', 'hedge_ratio']

X_tr = df_train_ev[feature_cols].replace([np.inf, -np.inf], np.nan)
y_tr = df_train_ev['label'].astype(int)

X_va = df_val_ev[feature_cols].replace([np.inf, -np.inf], np.nan)
y_va = df_val_ev['label'].astype(int)

rf = RandomForestClassifier(
    n_estimators=300,
    random_state=42,
    n_jobs=1,
    max_depth=10,
    min_samples_leaf=50,
)

pipe = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('model', rf),
])

pipe.fit(X_tr, y_tr)

if len(df_val_ev) > 0 and y_va.nunique() > 1:
    proba = pipe.predict_proba(X_va)[:, 1]
    auc = float(roc_auc_score(y_va, proba))
    print('val_auc:', auc)
else:
    print('val_auc: n/a (no validation events or single-class labels)')


val_auc: 0.5632999171758528


In [8]:
# Backtest on TEST window only: build weekly portfolio weights from pair scores.

from src.backtester.engine import BacktestConfig, run_backtest
from src.backtester.report import compute_backtest_report
from src.backtester.bokeh_plots import build_interactive_portfolio_layout

# Slice backtest price window to TEST to prevent stats starting at 2016
bt_start = pd.Timestamp(test_idx.min())
bt_end = pd.Timestamp(test_idx.max())

close_prices = close_prices_full.loc[bt_start:bt_end].copy()
logp = logp_full.loc[bt_start:bt_end].copy()

# Market proxy OHLCV for the Bokeh dashboard (same slice)
assets_ohlcv_slice = {k: v.loc[bt_start:bt_end] for k, v in assets_ohlcv.items()}
market_df = pd.DataFrame({
    'Open': pd.concat([d['Open'] for d in assets_ohlcv_slice.values()], axis=1).mean(axis=1),
    'High': pd.concat([d['High'] for d in assets_ohlcv_slice.values()], axis=1).mean(axis=1),
    'Low': pd.concat([d['Low'] for d in assets_ohlcv_slice.values()], axis=1).mean(axis=1),
    'Close': pd.concat([d['Close'] for d in assets_ohlcv_slice.values()], axis=1).mean(axis=1),
    'Volume': pd.concat([d['Volume'] for d in assets_ohlcv_slice.values()], axis=1).sum(axis=1),
}).sort_index()

# Precompute spread/z on the backtest window for candidate pairs using TRAIN-fitted (a,b)
spread_z = {}
for (y_sym, x_sym), (a, b) in params.items():
    s, z = (logp[y_sym] - (a + b * logp[x_sym])).rename('spread'), None
    # rolling z-score on spread
    mu = s.rolling(ZSCORE_WINDOW).mean()
    sig = s.rolling(ZSCORE_WINDOW).std(ddof=1)
    z = (s - mu) / (sig + 1e-12)
    spread_z[(y_sym, x_sym)] = (s, z)

# Rebalance dates (weekly)
rebal_dates = pd.Series(close_prices.index, index=close_prices.index).resample('W').last().dropna().tolist()
rebal_dates = [d for d in rebal_dates if d in close_prices.index]

symbols_bt = close_prices.columns.tolist()
weights_rows = []

for dt in close_prices.index:
    if dt not in rebal_dates:
        # hold last weights
        if weights_rows:
            weights_rows.append(weights_rows[-1])
        else:
            weights_rows.append(pd.Series(0.0, index=symbols_bt))
        continue

    # Score all candidate pairs at dt
    pair_rows = []
    for (y_sym, x_sym), (s, z) in spread_z.items():
        if dt not in z.index:
            continue
        zt = float(z.loc[dt])
        if not np.isfinite(zt) or abs(zt) < ENTRY_Z:
            continue
        st = float(s.loc[dt])
        # dz uses previous day
        zprev = float(z.shift(1).loc[dt]) if dt in z.index else float('nan')
        dz = float(zt - zprev) if np.isfinite(zprev) else 0.0
        a, b = params[(y_sym, x_sym)]

        X_one = pd.DataFrame([
            {
                'z': zt,
                'abs_z': float(abs(zt)),
                'dz': dz,
                'spread': st,
                'hedge_ratio': float(b),
            }
        ])
        prob = float(pipe.predict_proba(X_one)[0, 1])
        pair_rows.append({'Y': y_sym, 'X': x_sym, 'z': zt, 'prob': prob, 'hedge_ratio': float(b)})

    if not pair_rows:
        weights_rows.append(pd.Series(0.0, index=symbols_bt))
        continue

    pair_df = pd.DataFrame(pair_rows).sort_values('prob', ascending=False)
    top = pair_df.head(MAX_PAIRS_TO_TRADE)

    # Build long/short weights across all assets
    w = pd.Series(0.0, index=symbols_bt)
    n_sel = len(top)
    if n_sel == 0:
        weights_rows.append(w)
        continue

    per_pair_gross = 1.0 / n_sel

    for _, r in top.iterrows():
        y_sym = str(r['Y'])
        x_sym = str(r['X'])
        zt = float(r['z'])
        b = float(r['hedge_ratio'])

        # Position direction: short spread if z>0, long spread if z<0
        pos = -1.0 if zt >= ENTRY_Z else (1.0 if zt <= -ENTRY_Z else 0.0)
        if pos == 0.0:
            continue

        # Unnormalized legs (gross roughly per_pair_gross)
        wy = pos * (per_pair_gross / 2.0)
        wx = -pos * b * (per_pair_gross / 2.0)

        # Normalize within pair to target per_pair_gross gross
        g = abs(wy) + abs(wx)
        if g > 0:
            wy = wy * (per_pair_gross / g)
            wx = wx * (per_pair_gross / g)

        w[y_sym] += float(wy)
        w[x_sym] += float(wx)

    weights_rows.append(w)

weights = pd.DataFrame(weights_rows, index=close_prices.index, columns=symbols_bt).fillna(0.0)

output_notebook()

cfg = BacktestConfig(
    initial_equity=1_000_000.0,
    transaction_cost_bps=5.0,
    mode='vectorized',
    allow_leverage=True,
)

res = run_backtest(close_prices, weights, config=cfg)
report = compute_backtest_report(result=res, close_prices=close_prices)
display(report.to_frame('RF Cointegration Pairs - Backtest Report'))

layout = build_interactive_portfolio_layout(
    market_ohlcv=market_df,
    equity=res.equity,
    returns=res.returns,
    weights=res.weights,
    turnover=res.turnover,
    costs=res.costs,
    close_prices=close_prices,
    title='RF Cointegration Pairs (Time Split, Top Pairs Weekly)',
)
show(layout)


Unnamed: 0,RF Cointegration Pairs - Backtest Report
Start,2024-10-11 00:00:00
End,2026-01-16 00:00:00
Duration,462 days 00:00:00
Initial Equity,999500.0
Final Equity,1025525.466274
Equity Peak,1068130.55984
Total Return [%],2.603849
CAGR [%],2.071068
Volatility (ann) [%],10.041391
Sharpe,0.249845
