Rolling-window Walk-Forward Backtest

In [5]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.vector_ar.vecm import coint_johansen
import statsmodels.api as sm
import os

In [6]:
os.makedirs("../results", exist_ok=True)

prices = pd.read_csv("../data/basket_prices.csv", index_col=0, parse_dates=True).ffill()
tickers = list(prices.columns)

Helper Functions 

In [7]:
def johansen_weights(price_df, det_order=0, k_ar_diff=1):
    """
    Compute first Johansen cointegrating vector and normalize it so sum(abs)=1.
    Returns numpy array of weights (length = number of series).
    """
    arr = price_df.values
    jres = coint_johansen(arr, det_order=det_order, k_ar_diff=k_ar_diff)
    vec = jres.evec[:, 0]
    w = vec / np.sum(np.abs(vec))
    return w

def construct_spread(price_df, weights):
    """Linear spread series = prices dot weights (returns pd.Series aligned with price_df.index)."""
    s = price_df.dot(weights)
    return pd.Series(s, index=price_df.index)

def compute_zscore(series, lookback=20):
    """Rolling z-score (series - rolling_mean)/rolling_std."""
    mu = series.rolling(window=lookback, min_periods=5).mean()
    sigma = series.rolling(window=lookback, min_periods=5).std()
    return (series - mu) / sigma

def half_life(series):
    """Estimate half-life of mean reversion via AR(1) on series."""
    s = series.dropna()
    if len(s) < 10:
        return np.nan
    s_lag = s.shift(1).dropna()
    delta = s.diff().dropna()
    s_lag = s_lag.loc[delta.index]
    X = sm.add_constant(s_lag.values)
    model = sm.OLS(delta.values, X).fit()
    b = model.params[1]
    return -np.log(2) / b if b < 0 else np.nan

def simple_strategy_pnl(spread, entry_z=2.0, exit_z=0.5, zlook=20, notional=1_000_000, tc=0.0):
    """
    Simple zscore mean-reversion strategy on a spread series.
    positions: 1 long spread, -1 short spread, 0 flat.
    Returns DataFrame with position, daily_pnl, cum_pnl, daily_returns (pnl/notional).
    tc = transaction cost per trade as fraction of notional (applied on entry & exit).
    """
    z = compute_zscore(spread, lookback=zlook).dropna()
    positions = pd.Series(0.0, index=z.index)
    pos = 0.0
    last_pos = 0.0
    trades = 0

    for t in z.index:
        if pos == 0:
            if z.loc[t] > entry_z:
                pos = -1.0
            elif z.loc[t] < -entry_z:
                pos = 1.0
        elif pos == 1.0:
            if z.loc[t] >= -exit_z:
                pos = 0.0
        elif pos == -1.0:
            if z.loc[t] <= exit_z:
                pos = 0.0
        positions.loc[t] = pos
        if pos != last_pos:
            trades += 1
            last_pos = pos

    positions = positions.ffill().fillna(0.0)
    # Spread diff as absolute price change (P/L per unit of spread)
    spread_diff = spread.reindex(positions.index).diff().fillna(0.0)
    # pnl per day = position(t-1) * spread_diff(t)
    pnl = positions.shift(1).fillna(0.0) * spread_diff

    # Apply transaction cost: approximate cost = tc * notional on each trade (divided across day of trade)
    # Here we subtract tc on days where position changes (entry or exit)
    pos_changes = (positions != positions.shift(1)).astype(int)
    pnl = pnl - pos_changes * (tc * notional)

    cum_pnl = pnl.cumsum()
    daily_returns = pnl / notional
    return pd.DataFrame({
        "spread": spread.reindex(positions.index),
        "zscore": compute_zscore(spread, lookback=zlook).reindex(positions.index),
        "position": positions,
        "pnl": pnl,
        "cum_pnl": cum_pnl,
        "daily_return": daily_returns
    })

def performance_metrics(daily_returns, pnl_series=None):
    """Return dict of key metrics: annualized return, annualized volatility, Sharpe, max drawdown, total pnl."""
    if daily_returns.dropna().empty:
        return {"annual_return": np.nan, "annual_vol": np.nan, "sharpe": np.nan, "max_dd": np.nan, "total_pnl": np.nan}
    avg = daily_returns.mean()
    std = daily_returns.std()
    ann_ret = (1 + avg) ** 252 - 1 if avg > -1 else avg * 252  # approx if very small
    ann_vol = std * np.sqrt(252)
    sharpe = (avg / std) * np.sqrt(252) if std != 0 else np.nan
    if pnl_series is not None:
        running_max = pnl_series.cummax()
        drawdown = (pnl_series - running_max)
        max_dd = drawdown.min()
        total_pnl = pnl_series.iloc[-1]
    else:
        max_dd = np.nan
        total_pnl = np.nan
    return {"annual_return": ann_ret, "annual_vol": ann_vol, "sharpe": sharpe, "max_dd": max_dd, "total_pnl": total_pnl}


In [8]:
# rolling / walk-forward parameters

train_window_days = 252 * 3    # 3 years training
test_window_days = 252         # 1 year test
step_days = 63                 # roll forward every 3 months
zlook = 20
entry_z = 2.0
exit_z = 0.5
transaction_cost = 0.0005      # 5 bps per trade on notional
notional = 1_000_000           # scaling for pnl -> later use realistic sizing
det_order = 0
k_ar_diff = 1

# build date index list for windows
dates = prices.index
start_idx = 0
results_list = []
window_idx = 0

In [9]:
# walk-forward loop

while True:
    train_start = start_idx
    train_end = train_start + train_window_days
    test_end = train_end + test_window_days
    # stop if test_end beyond data
    if test_end >= len(dates):
        break

    train_slice = prices.iloc[train_start:train_end]
    test_slice = prices.iloc[train_end:test_end]

    # compute johansen weights on train slice
    try:
        w = johansen_weights(train_slice, det_order=det_order, k_ar_diff=k_ar_diff)
    except Exception as e:
        # fallback: equal weights if Johansen fails
        print(f"Johansen failed at window {window_idx}: {e}. Using equal weights.")
        w = np.ones(len(tickers)) / len(tickers)

    # construct spread on test slice using train weights
    spread_test = construct_spread(test_slice, w)

    # run simple strategy on spread_test
    strat_df = simple_strategy_pnl(spread_test, entry_z=entry_z, exit_z=exit_z,
                                  zlook=zlook, notional=notional, tc=transaction_cost)

    # metrics
    metrics = performance_metrics(strat_df["daily_return"], pnl_series=strat_df["cum_pnl"])
    metrics.update({
        "window_idx": window_idx,
        "train_start": dates[train_start].strftime("%Y-%m-%d"),
        "train_end": dates[train_end-1].strftime("%Y-%m-%d"),
        "test_start": dates[train_end].strftime("%Y-%m-%d"),
        "test_end": dates[test_end-1].strftime("%Y-%m-%d"),
        "weights": w
    })

    results_list.append(metrics)

    # optional: save test period pnl to file for later aggregation/plotting
    out_file = f"../results/test_pnl_window_{window_idx}.csv"
    strat_df.to_csv(out_file)

    # advance window
    start_idx += step_days
    window_idx += 1

In [10]:
# aggregate results
results_df = pd.DataFrame(results_list)
results_df.to_csv("../results/walkforward_summary.csv", index=False)
print("Walk-forward summary saved to ../results/walkforward_summary.csv")

Walk-forward summary saved to ../results/walkforward_summary.csv


In [None]:
# inspect aggregated results & plots
results_df = pd.read_csv("../results/walkforward_summary.csv")
display(results_df[["window_idx","train_start","train_end","test_start","test_end","sharpe","annual_return","annual_vol","max_dd","total_pnl"]])

# Quick summary stats across windows
print("Average Sharpe across test windows:", results_df["sharpe"].mean())
print("Median Sharpe across test windows:", results_df["sharpe"].median())

# Plot total_pnl across windows
plt.figure(figsize=(10,4))
plt.bar(results_df["window_idx"], results_df["total_pnl"])
plt.xlabel("Window #")
plt.ylabel("Total PnL (test period)")
plt.title("Total PnL per Test Window")
plt.grid(True)
plt.show()
