# Project 09 — Pairs Trading: Cointegration + Kalman Hedge Ratio (Interactive)

## Goal
Build a clean, interview-ready pairs trading notebook:

- Generate or load a **pair** \((X_t, Y_t)\)
- Estimate a hedge ratio \(\beta_t\) using:
  - Rolling **OLS** (static within a window)
  - A **Kalman filter** (time-varying \(\beta_t\))
- Define spread \(S_t = Y_t - \beta_t X_t\) and its **z-score**
- Backtest a simple **mean-reversion** strategy with transaction costs

Why it matters: this is a core template for **relative value** / **stat arb**.

In [1]:
from __future__ import annotations

import numpy as np
import pandas as pd
from pathlib import Path

import plotly.express as px
import plotly.graph_objects as go

import ipywidgets as widgets
from IPython.display import display

SEED = 7
rng = np.random.default_rng(SEED)

PROJECT_DIR = Path.cwd()
ASSETS_DIR = PROJECT_DIR / "assets"
ASSETS_DIR.mkdir(parents=True, exist_ok=True)

print("CWD:", PROJECT_DIR)
print("ASSETS_DIR:", ASSETS_DIR.resolve())

CWD: c:\Users\Karim\Desktop\quant-finance-portfolio\projects
ASSETS_DIR: C:\Users\Karim\Desktop\quant-finance-portfolio\projects\assets


## 1) Data: synthetic cointegrated pair (no internet required)

We generate:
- \(X_t\): a random walk (price-like)
- \(\beta_t\): slowly drifting hedge ratio
- \(Y_t = \beta_t X_t + \varepsilon_t\) with mean-reverting noise

This creates a realistic scenario where **cointegration holds**, but the hedge ratio is not perfectly constant.

In [2]:
def make_synthetic_pair(n: int = 1500, beta0: float = 1.2, beta_drift: float = 0.0002, noise: float = 0.5):
    # X: random walk
    x = 100 + np.cumsum(rng.normal(0, 1.0, size=n))

    # beta: slow drift
    beta = beta0 + np.cumsum(rng.normal(0, beta_drift, size=n))

    # mean-reverting noise for spread
    eps = np.zeros(n)
    kappa = 0.05
    for t in range(1, n):
        eps[t] = (1 - kappa) * eps[t-1] + rng.normal(0, noise)

    y = beta * x + eps
    idx = pd.date_range("2015-01-01", periods=n, freq="B")
    df = pd.DataFrame({"X": x, "Y": y, "beta_true": beta}, index=idx)
    return df

df_pair = make_synthetic_pair()
df_pair.head()

Unnamed: 0,X,Y,beta_true
2015-01-01,100.00123,119.97711,1.199756
2015-01-02,100.299976,120.506143,1.200085
2015-01-05,100.025838,120.47873,1.200187
2015-01-06,99.135246,119.655891,1.200544
2015-01-07,98.680575,118.533926,1.200704


## 2) Hedge ratio estimation

### Rolling OLS
Estimate \(Y \approx \alpha + \beta X\) over a rolling window.

### Kalman filter (time-varying \(\beta_t\))
State model:
\[
\beta_t = \beta_{t-1} + \eta_t
\]
Observation model:
\[
Y_t = \alpha + \beta_t X_t + \epsilon_t
\]

We implement a simple 1D Kalman filter (for \(\beta_t\)) to keep dependencies minimal.

In [3]:
def rolling_ols_beta(x: pd.Series, y: pd.Series, window: int = 60):
    betas = np.full(len(x), np.nan)
    alphas = np.full(len(x), np.nan)

    X = x.values
    Y = y.values

    for i in range(window - 1, len(x)):
        Xw = X[i-window+1:i+1]
        Yw = Y[i-window+1:i+1]
        A = np.vstack([np.ones_like(Xw), Xw]).T
        coeff, *_ = np.linalg.lstsq(A, Yw, rcond=None)
        alphas[i], betas[i] = coeff[0], coeff[1]
    return pd.Series(alphas, index=x.index, name="alpha"), pd.Series(betas, index=x.index, name="beta")

def kalman_beta(x: pd.Series, y: pd.Series, q: float = 1e-5, r: float = 1e-1):
    '''
    1D Kalman filter for beta in: y_t ≈ beta_t * x_t.
    We typically demean x and y first (handled in the interactive runner).
    q: process variance (beta drift)
    r: observation variance
    '''
    x_ = x.values
    y_ = y.values

    beta = 0.0
    P = 1.0  # state variance
    betas = np.zeros(len(x_))

    for t in range(len(x_)):
        # predict
        beta_pred = beta
        P_pred = P + q

        # update
        H = x_[t]
        if np.isfinite(H) and np.isfinite(y_[t]):
            S = H * P_pred * H + r
            K = (P_pred * H) / S
            beta = beta_pred + K * (y_[t] - H * beta_pred)
            P = (1 - K * H) * P_pred
        else:
            beta, P = beta_pred, P_pred

        betas[t] = beta

    return pd.Series(betas, index=x.index, name="beta_kalman")

## 3) Strategy: trade the spread

Define:
\[
\text{spread}_t = Y_t - \beta_t X_t
\]
\[
z_t = \frac{\text{spread}_t - \mu_t}{\sigma_t}
\]

Rule (classic):
- Enter **short spread** when \(z_t > z_{entry}\)
- Enter **long spread** when \(z_t < -z_{entry}\)
- Exit when \(|z_t| < z_{exit}\)

We include transaction costs proportional to turnover.

In [4]:
def compute_zscore(spread: pd.Series, window: int = 60):
    mu = spread.rolling(window).mean()
    sig = spread.rolling(window).std(ddof=1)
    z = (spread - mu) / (sig + 1e-12)
    return z, mu, sig

def backtest_pairs(x: pd.Series, y: pd.Series, beta: pd.Series, z: pd.Series,
                   z_entry: float = 2.0, z_exit: float = 0.5, cost_bps: float = 1.0):
    '''
    Positions:
    - Spread = long Y - beta*X
    If long spread: +1 unit Y and -beta units X
    If short spread: -1 unit Y and +beta units X
    '''
    pos = np.zeros(len(z))
    state = 0  # -1 short spread, 0 flat, +1 long spread

    for t in range(len(z)):
        zt = z.iloc[t]
        if not np.isfinite(zt):
            pos[t] = state
            continue

        if state == 0:
            if zt > z_entry:
                state = -1
            elif zt < -z_entry:
                state = +1
        else:
            if abs(zt) < z_exit:
                state = 0

        pos[t] = state

    pos = pd.Series(pos, index=z.index, name="spread_pos")

    rx = x.pct_change().fillna(0.0)
    ry = y.pct_change().fillna(0.0)

    beta_aligned = beta.reindex(z.index).ffill().fillna(0.0)
    port_ret = pos.shift(1).fillna(0.0) * (ry - beta_aligned * rx)

    turnover = pos.diff().abs().fillna(0.0)
    cost = (cost_bps * 1e-4) * turnover
    port_ret_net = port_ret - cost

    equity = (1.0 + port_ret_net).cumprod()
    dd = equity / equity.cummax() - 1.0

    out = pd.DataFrame({
        "rx": rx, "ry": ry, "beta": beta_aligned, "z": z,
        "pos": pos, "ret": port_ret_net, "equity": equity, "drawdown": dd,
        "turnover": turnover, "cost": cost
    }, index=z.index)
    return out

## 4) Interactive demo

In [5]:
method_w = widgets.Dropdown(options=["Kalman", "Rolling OLS"], value="Kalman", description="beta")
win_w = widgets.IntSlider(value=60, min=20, max=250, step=5, description="window")
q_w = widgets.FloatLogSlider(value=1e-5, base=10, min=-8, max=-2, step=0.25, description="Kalman q")
r_w = widgets.FloatLogSlider(value=1e-1, base=10, min=-4, max=1, step=0.25, description="Kalman r")

z_entry_w = widgets.FloatSlider(value=2.0, min=0.5, max=4.0, step=0.1, description="z_entry")
z_exit_w  = widgets.FloatSlider(value=0.5, min=0.0, max=2.0, step=0.1, description="z_exit")
cost_w    = widgets.FloatSlider(value=1.0, min=0.0, max=20.0, step=0.5, description="cost (bps)")

btn = widgets.Button(description="Run pairs backtest", button_style="success")
out = widgets.Output()

display(widgets.VBox([
    widgets.HBox([method_w, win_w]),
    widgets.HBox([q_w, r_w]),
    widgets.HBox([z_entry_w, z_exit_w, cost_w]),
    btn,
    out
]))

VBox(children=(HBox(children=(Dropdown(description='beta', options=('Kalman', 'Rolling OLS'), value='Kalman'),…

In [6]:
def perf_stats(ret: pd.Series, periods_per_year: int = 252):
    ret = ret.dropna()
    if len(ret) < 2:
        return {}
    ann_ret = (1 + ret).prod() ** (periods_per_year / len(ret)) - 1
    ann_vol = ret.std(ddof=1) * np.sqrt(periods_per_year)
    sharpe = (ret.mean() * periods_per_year) / (ret.std(ddof=1) * np.sqrt(periods_per_year) + 1e-12)
    return {"ann_return": ann_ret, "ann_vol": ann_vol, "sharpe": sharpe}

def run(_=None):
    with out:
        out.clear_output()

        df = df_pair.copy()
        x = df["X"]
        y = df["Y"]

        # demean for Kalman stability
        x_dm = x - x.rolling(win_w.value).mean()
        y_dm = y - y.rolling(win_w.value).mean()

        if method_w.value == "Rolling OLS":
            alpha, beta = rolling_ols_beta(x, y, window=win_w.value)
        else:
            beta = kalman_beta(x_dm.fillna(0.0), y_dm.fillna(0.0), q=float(q_w.value), r=float(r_w.value))
            alpha = pd.Series(0.0, index=beta.index, name="alpha")

        spread = y - beta * x - alpha
        z, mu, sig = compute_zscore(spread, window=win_w.value)
        bt = backtest_pairs(
            x, y, beta, z,
            z_entry=float(z_entry_w.value),
            z_exit=float(z_exit_w.value),
            cost_bps=float(cost_w.value),
        )

        stats = perf_stats(bt["ret"])
        stats_df = pd.DataFrame([stats]).T.rename(columns={0:"value"})
        display(stats_df.style.format("{:.4%}"))

        fig1 = go.Figure()
        fig1.add_trace(go.Scatter(x=df.index, y=df["X"], name="X", mode="lines"))
        fig1.add_trace(go.Scatter(x=df.index, y=df["Y"], name="Y", mode="lines"))
        fig1.update_layout(template="plotly_dark", title="Synthetic pair prices", xaxis_title="Date", yaxis_title="Price")
        fig1.show()
        fig1.write_html(ASSETS_DIR / "pair_prices.html")

        figb = px.line(beta, template="plotly_dark", title=f"Hedge ratio beta(t) — {method_w.value}")
        figb.update_layout(xaxis_title="Date", yaxis_title="beta")
        figb.show()
        figb.write_html(ASSETS_DIR / "beta_series.html")

        figz = go.Figure()
        figz.add_trace(go.Scatter(x=z.index, y=z.values, name="z", mode="lines"))
        figz.add_hline(y=float(z_entry_w.value), line_dash="dash")
        figz.add_hline(y=-float(z_entry_w.value), line_dash="dash")
        figz.add_hline(y=float(z_exit_w.value), line_dash="dot")
        figz.add_hline(y=-float(z_exit_w.value), line_dash="dot")
        figz.update_layout(template="plotly_dark", title="Z-score with entry/exit bands", xaxis_title="Date", yaxis_title="z")
        figz.show()
        figz.write_html(ASSETS_DIR / "zscore.html")

        figE = go.Figure()
        figE.add_trace(go.Scatter(x=bt.index, y=bt["equity"], name="equity", mode="lines"))
        figE.update_layout(template="plotly_dark", title="Equity curve (net of costs)", xaxis_title="Date", yaxis_title="Equity")
        figE.show()
        figE.write_html(ASSETS_DIR / "equity.html")

        figD = px.area(bt, x=bt.index, y="drawdown", template="plotly_dark", title="Drawdown")
        figD.update_layout(xaxis_title="Date", yaxis_title="Drawdown")
        figD.show()
        figD.write_html(ASSETS_DIR / "drawdown.html")

        print("\nHow to interpret:")
        print("- If beta drifts, Kalman usually stabilizes the spread vs a fixed/rolling beta.")
        print("- Entry/exit thresholds control trade frequency: higher entry → fewer trades but stronger signals.")
        print("- Costs matter: mean-reversion edges can disappear with high turnover.")

btn.on_click(run)
run()

## Bonus ideas (optional)
- Add real-data CSV loader and a scanner for cointegrated pairs  
- Estimate spread half-life (OU fit) to set holding horizons  
- Walk-forward tuning for \(z_{entry}, z_{exit}\) to reduce overfitting