## Notebook 04 – Walk Forward Validation

This notebook performs walk-forward validation of long-short momentum returns, conditioned on volatility and dispersion regimes. This notebook then exports the results for downstream analysis.

### Step 0 - Import packages and functions

In [1]:
import sys, os
sys.path.append(os.path.abspath("../src"))
import pandas as pd
import numpy as np

# Helper functions to perform walk-forward testing
from backtest_engine import model_decile_long_short, walk_forward_model_apply



### Step 1 - Import log returns, momentum, volatility regime, and dispersion regime datasets

In [2]:
# Define regime methods
methods = ["static", "rolling", "expanding"]

# Set up dict for volatility regime datasets and walk-forward results
vol_regimes = {}
vol_wf_results = {}

# Set up dict for dispersion regime datasets and walk-forward results
disp_regimes = {}
disp_wf_results = {}

In [3]:
# Import log returns
log_returns = pd.read_parquet("../data/processed/log_returns.parquet")

# Import momentum
momentum = pd.read_parquet("../data/processed/momentum.parquet")

# Import volatility regimes
for method in methods:
    path = f"../data/processed/regime_market_vol_{method}.parquet"
    vol_regimes[method] = pd.read_parquet(path)["vol_regime"]

# Import dispersion regimes
for method in methods:
    path = f"../data/processed/regime_cross_dispersion_{method}.parquet"
    disp_regimes[method] = pd.read_parquet(path)["disp_regime"]

### Step 2 - Perform walk-forward validation by volatility regime

In [4]:
# Apply walk-forward per regime/method
for method in methods:
    for regime_label in ["low_vol", "high_vol"]:
        key = f"{regime_label}_{method}"
        vol_wf_results[key] = walk_forward_model_apply(
            momentum, log_returns, vol_regimes[method], regime_label,
            model_fn=model_decile_long_short
        )
        
# Apply walk-forward per regime/method
for method in methods:
    for regime_label in ["low_disp", "high_disp"]:
        key = f"{regime_label}_{method}"
        disp_wf_results[key] = walk_forward_model_apply(
            momentum, log_returns, disp_regimes[method], regime_label,
            model_fn=model_decile_long_short
        )

### Step 3 - Print Results

In [5]:
# Print summary
for method in methods:
    for regime_label in ["low_vol", "high_vol"]:
        key = f"{regime_label}_{method}"
        wf = vol_wf_results[key]
        print(f"{regime_label.capitalize()} regime ({method}): {wf['returns'].notna().sum()}/{len(wf)} valid days")

print("\n")
              
# Print summary
for method in methods:
    for regime_label in ["low_disp", "high_disp"]:
        key = f"{regime_label}_{method}"
        wf = disp_wf_results[key]
        print(f"{regime_label.capitalize()} regime ({method}): {wf['returns'].notna().sum()}/{len(wf)} valid days")

Low_vol regime (static): 1131/3843 valid days
High_vol regime (static): 999/3843 valid days
Low_vol regime (rolling): 1280/3843 valid days
High_vol regime (rolling): 938/3843 valid days
Low_vol regime (expanding): 1269/3843 valid days
High_vol regime (expanding): 918/3843 valid days


Low_disp regime (static): 1129/3843 valid days
High_disp regime (static): 1077/3843 valid days
Low_disp regime (rolling): 1191/3843 valid days
High_disp regime (rolling): 1072/3843 valid days
Low_disp regime (expanding): 939/3843 valid days
High_disp regime (expanding): 1373/3843 valid days


### Step 4 - Export

In [6]:
# Creat dict for volatility vs dispersion results
wf_result_map = {
    "vol": vol_wf_results,
    "disp": disp_wf_results,
}

# Create dict for different regime type labels between volatility and dispersion
regime_type_to_labels = {
    "vol": ["low_vol", "high_vol"],
    "disp": ["low_disp", "high_disp"],
}

# iterate through and export as parquet files
for regime_type, result_dict in wf_result_map.items():
    for method in methods:
        for regime_label in regime_type_to_labels[regime_type]:
            key = f"{regime_label}_{method}"  # e.g. "low_vol_static", "high_disp_expanding"
            col_name = f"wf_mom_{regime_label}"  # no double type
            out_path = f"../data/processed/wf_mom_{key}.parquet"
            
            result_dict[key].rename(columns={"returns": col_name}).to_parquet(out_path)