# Pair Trading Strategy Research

Armaan Gandhara | agandhara243@gmail.com | armaangandhara.me

09/2025

## Config & Utils

*Purpose* : Centeralized parameters, imports, styles, and small helpers reused across the notebook. This keeps later sections focused on research and backtesting logic, not boilerplate

Whats inside:
- Project config (Config dataclass): dates, universe, paths, risk-free, frequency
- Reproducibility: seed setter
- Plot style: consistent figures 
- Helpers: annualizer factor, returns, rolling z-score, drawdown and risk metrics, alignment utilities
- Lightweight disk cache utility for later data ingest

### Usage Example

In [3]:
cfg = Config(
    start="2017-01-01",
    end = "2025-09-01",
    tickers = ["MSFT", "AAPL", "GOOGL", "AMZN", "META"],
    data_dir="data"
)
set_seed(42)
set_plot_style()

### Code

In [3]:
# =======================
# Config & Utils
# =======================


from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import os
import json
import hashlib
import warnings
from typing import Iterable, Tuple, Optional, Dict
import random

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

warnings.filterwarnings("ignore")

# ---------- Config ----------

@dataclass
class Config:
    start:str
    end:str
    tickers:Iterable[str]
    data_dir: str = "data"
    freq: str = "D"
    trading_days: int = 252
    rf_annual: float = 0.00

    def path(self):
        p = Path(self.data_dir)
        p.mkdir(parents=True, exist_ok=True)
        (p/"cache").mkdir(parents=True, exist_ok=True)
        return p
    
# ---------- Repro/Style ----------

def set_seed(seed: int = 42):
    np.random.seed(seed)
    random.seed(seed)

def set_plot_style():
    plt.rcParams.update({
        "figure.figsize": (10,5),
        "axes.grid":True,
        "grid.alpha": 0.3,
        "font.size": 11,
        "axes.spines.top": False,
        "axes.spines.right": False,
    })

# ---------- Frequencies/Annualization ----------

_ANNUALIZE = {
    "D": 252,
    "B": 252,
    "W": 52,
    "M": 12,
}

def annualization_factor(freq:str):
    return _ANNUALIZE.get(freq.upper(),252)

# ---------- Returns & Z-Score ----------

def compute_returns(prices:pd.DataFrame, method:str="log"):
    """
    Compute log or simple returns from price levels
    """
    if method not in {"log", "simple"}:
        raise ValueError("method must be 'log' or 'simple'")
    px = prices.sort_index()
    if method == "log":
        rets = np.log(px).diff()
    else:
        rets   = px.pct_change()
    return rets.replace([np.inf, -np.inf], np.nan)

def zscore_rolling(x: pd.Series, window:int):
    mu = x.rolling(window).mean()
    sigma = x.rolling(window).std(ddof=0)
    z = (x - mu) / sigma
    return z
    

# ---------- Drawdowns & Risk Metrics ----------

def equity_to_drawdown(equity:pd.Series):
    cummax = equity.cummax()
    dd = equity/cummax - 1.0
    return dd

def sharpe_ratio(returns: pd.Series, freq:str="D", rf_annual: float = 0.0):
    af = annualization_factor(freq)
    rf_per_step = (1+rf_annual)**(1/af) - 1
    ex = returns - rf_per_step
    mu = ex.mean() * af
    sigma = ex.std(ddof=0) * np.sqrt(af)
    if sigma == 0 or np.isnan(sigma):
        a = np.nan
    else:
        a = mu/sigma
    return a

def sortino_ratio(returns:pd.Series, freq:str = "D", rf_annual:float = 0.0):
    af = annualization_factor(freq)
    rf_per_step = (1 + rf_annual) ** (1/af) - 1
    ex = returns - rf_per_step
    downside = ex.clip(upper = 0)
    dd_sigma = downside.std(ddof=0) * np.sqrt(af)
    mu = ex.mean() * af
    if dd_sigma == 0 or np.isnan(dd_sigma):
        a = np.nan
    else:
        a = mu/ dd_sigma
    return a
    
def calmar_ratio(equity: pd.Series, freq: str = "D"):
    af = annualization_factor(freq)
    rets = equity.pct_change().dropna()
    cagr = (equity.dropna().iloc[-1] / equity.dropna().iloc[0]) ** (af / len(rets)) - 1
    dd = equity_to_drawdown(equity).min()
    max_dd = abs(dd) if pd.notna(dd) else np.nan
    if not max_dd or max_dd == 0:
        a = np.nan
    else:
        a = cagr/max_dd
    return a

def max_drawdown(equity:pd.Series):
    return abs(equity_to_drawdown(equity).min())

# ---------- Alignment/Cleaning ----------

def ensure_datetime_index(df: pd.DataFrame):
    if not isinstance(df.index, pd.DatetimeIndex):
        df = df.copy()
        df.index = pd.to_datetime(df.index)
    return df.sort_index()

def align_panels(*dfs: pd.DataFrame, dropna: bool = True):
    """
    Align multiple DataFrames on the intersection of dates and shared columns.
    """
    cols = set(dfs[0].columns)                    
    for d in dfs[1:]:
        cols &= set(d.columns)
    cols = sorted(list(cols))
    aligned = [ensure_datetime_index(d)[cols] for d in dfs]
    idx = aligned[0].index
    for d in aligned[1:]:
        idx = idx.intersection(d.index)
    aligned = [a.loc[idx] for a in aligned]
    if dropna:
        good = ~pd.concat([a.isna().any(axis=1) for a in aligned], axis=1).any(axis=1)
        aligned = [a.loc[good] for a in aligned]
    return tuple(aligned)

# ---------- Lightweight Disk Cache (CSV version) ----------

def _hash_key(key: Dict):
    payload = json.dumps(key, sort_keys=True).encode()
    return hashlib.md5(payload).hexdigest()

def cache_path(base_dir: Path, namespace: str, key: Dict):
    h = _hash_key(key)
    p = base_dir / "cache" / namespace
    p.mkdir(parents=True, exist_ok=True)
    return p / f"{h}.csv"

def cache_save(df: pd.DataFrame, path: Path):
    df.to_csv(path, index=True)

def cache_load(path: Path):
    if path.exists():
        try:
            return pd.read_csv(path, index_col=0, parse_dates=True)
        except Exception:
            return None
    return None

# --- IBFetcher integration (file-based import) ---

import importlib.util, sys

def _load_ibfetcher_module(root: str = "external/IBFetcher", filename: str = "ibfetcher.py"):
    path = Path(root) / filename
    if not path.exists():
        raise FileNotFoundError(f"IBFetcher not found at {path.resolve()}")
    spec = importlib.util.spec_from_file_location("ibfetcher", path)
    mod = importlib.util.module_from_spec(spec)
    sys.modules["ibfetcher"] = mod
    spec.loader.exec_module(mod)  # type: ignore[attr-defined]
    return mod

def _ib_duration_from_dates(start: str, end: str) -> str:
    # IB expects strings like "300 D" or "2 Y". Use days; IB will handle.
    days = (pd.to_datetime(end) - pd.to_datetime(start)).days
    days = max(1, int(days))
    return f"{days} D"

def fetch_prices_ib(tickers: Iterable[str],
                    start: str,
                    end: str,
                    ib_root: str = "external/IBFetcher",
                    bar_size: str = "1 day") -> pd.DataFrame:
    """
    Use your IBFetcher class to pull OHLCV for each ticker and return:
    columns = MultiIndex {'close','volume'} x tickers, DateTimeIndex in UTC-naive.
    """
    ibm = _load_ibfetcher_module(ib_root, "ibfetcher.py")
    IBFetcher = getattr(ibm, "IBFetcher")
    ib = IBFetcher()

    # connect TWS or IB Gateway first (adjust host/port/client_id if needed)
    ib.connect_app(host="127.0.0.1", port=7496, client_id=1)

    duration = _ib_duration_from_dates(start, end)
    tks = sorted({t.upper().replace(".", "-").strip() for t in tickers})

    frames = []
    for t in tks:
        df = ib.fetch_stock_data(
            symbol=t, duration=duration, bar_size=bar_size, endDateTime=""
        )
        if df is None or df.empty:
            raise ValueError(f"No data returned for {t}")
        # expected columns: ['datetime','open','high','low','close','volume']
        df = df.copy()
        # normalize index
        if "datetime" in df.columns:
            df["datetime"] = pd.to_datetime(df["datetime"])
            df = df.set_index("datetime")
        else:
            df.index = pd.to_datetime(df.index)
        df = df.sort_index()
        # keep close and volume
        need = []
        if "close" in df.columns: need.append("close")
        if "volume" in df.columns: need.append("volume")
        if len(need) < 2:
            raise ValueError(f"{t}: expected close and volume. Got {list(df.columns)}")
        out = df[need].copy()
        out.columns = pd.MultiIndex.from_product([need, [t]])
        frames.append(out)

    ib.disconnect_app()

    out = pd.concat(frames, axis=1).sort_index()
    # full structure and date filtering
    full_cols = pd.MultiIndex.from_product([["close","volume"], tks])
    out = out.reindex(columns=full_cols)
    out = out.loc[(out.index >= pd.to_datetime(start)) & (out.index <= pd.to_datetime(end))]
    return ensure_datetime_index(out)

# --- Cache-first loader using IB, with CSV cache in data/cache/prices ---

def _norm_tickers(tickers: Iterable[str]):
    return sorted({t.upper().replace(".", "-").strip() for t in tickers})

def _prices_cache_key(tickers: Iterable[str], start: str, end: str) -> Dict:
    return {"tickers": _norm_tickers(tickers), "start": str(start), "end": str(end)}

def load_prices(cfg: Config,
                ib_root: str = "external/IBFetcher",
                bar_size: str = "1 day") -> pd.DataFrame:
    """
    1) Try project CSV cache (data/cache/prices/<hash>.csv).
    2) If miss, call IBFetcher and save to cache.
    """
    key = _prices_cache_key(cfg.tickers, cfg.start, cfg.end)
    fpath = cache_path(cfg.path(), "prices", key)

    df = cache_load(fpath)
    if df is not None and isinstance(df.columns, pd.MultiIndex):
        return ensure_datetime_index(df)

    df = fetch_prices_ib(cfg.tickers, cfg.start, cfg.end, ib_root=ib_root, bar_size=bar_size)
    cache_save(df, fpath)
    return ensure_datetime_index(df)



In [None]:
cfg = Config(
    start="2019-01-01",
    end="2025-09-01",
    tickers=["MSFT","AAPL","GOOGL"],
    data_dir="data"
)
set_seed(42); set_plot_style()

prices = load_prices(cfg, ib_root="external/IBFetcher", bar_size="1 day")
prices.head()


Connecting to IB...
Error 502: Couldn't connect to TWS. Confirm that "Enable ActiveX and Socket EClients" 
is enabled and connection port is the same as "Socket Port" on the 
TWS "Edit->Global Configuration...->API->Settings" menu. Live Trading ports: 
TWS: 7496; IB Gateway: 4001. Simulated Trading ports for new installations 
of version 954.1 or newer:  TWS: 7497; IB Gateway: 4002
Connected.
Error 504: Not connected


## Data Ingest

## Pair Selection

## Hedge Ratio & Spread

## OU Check

## Signals & Sizing

## Cost and Execution

## Walk-Forard Backtest

## Results

## Factor Neautrality

## Sensitivity Sweeps

## Regime Splits

## OOS Holdout

## Beta Stability

## Structural Breaks

## Cost Stress Test

## ML Ranker for Pairs

## Intraday Extensions