
# LeanTrader — Walk-Forward Validation

This notebook runs **rolling train/test splits** (walk-forward) to evaluate robustness.
- Symbols: XAUUSD (default). You can add EURUSD, BTCUSDT.
- For each window: tune a small parameter grid on the **train** slice, then evaluate on the **test** slice.
- Metrics: Sharpe, Max Drawdown, and a stitched equity curve across test windows.
- Saves a YAML summary to `data/tuned/walkforward_summary.yaml`.

> Tip: Run the Sweep notebook first if you want a larger grid; this uses a tiny grid for speed.


In [None]:

import os, yaml
import pandas as pd
from pathlib import Path
from datetime import timedelta

from leantrader.data.feeds import get_ohlc_csv, resample_frames
from leantrader.features.ta import rsi, adx, fvg_score
from leantrader.dsl.compiler import compile_strategy, load_strategy
from leantrader.backtest.engine import backtest
from leantrader.backtest.metrics import sharpe, max_drawdown

REPO_ROOT = Path.cwd().resolve()
DATA_DIR = REPO_ROOT / "data" / "ohlc"
TUNE_DIR = REPO_ROOT / "data" / "tuned"
TUNE_DIR.mkdir(parents=True, exist_ok=True)

PAIR = "XAUUSD"
M15_PATH = DATA_DIR / f"{PAIR}_M15.csv"
assert M15_PATH.exists(), f"Missing {M15_PATH}"
df_m15 = get_ohlc_csv(str(M15_PATH))

# Helper: engineer features with paramizable FVG lookback
def engineer_param(df: pd.DataFrame, fvg_lb: int = 3) -> pd.DataFrame:
    out = df.copy()
    from leantrader.features.ta import adx as _adx, rsi as _rsi, fvg_score as _fvg
    out["rsi_14"] = _rsi(out["close"], 14)
    out["adx_14"] = _adx(out, 14)
    out["fvg_score"] = _fvg(out, fvg_lb)
    out["ms_state"] = "flat"
    swing_up = (out["high"] > out["high"].shift(1)) & (out["low"] > out["low"].shift(1))
    swing_dn = (out["high"] < out["high"].shift(1)) & (out["low"] < out["low"].shift(1))
    out.loc[swing_up, "ms_state"] = "bull"
    out.loc[swing_dn, "ms_state"] = "bear"
    out["rsi_div"] = (out["close"].diff() < 0).astype(int) * ((out["rsi_14"].diff() > 0).astype(int)) -                      (out["close"].diff() > 0).astype(int) * ((out["rsi_14"].diff() < 0).astype(int))
    return out

# Load base strategy
spec_path = REPO_ROOT / "src" / "leantrader" / "dsl" / "examples" / "xauusd_master.yaml"
assert spec_path.exists(), f"Missing {spec_path}"
base_spec = load_strategy(str(spec_path))

def spec_with_params(spec, adx_thr: int, atr_stop: float, r_mult: float):
    import copy
    s = copy.deepcopy(spec)
    for sig in s.signals:
        for cond in sig.entry:
            if cond.feature.startswith("adx_14"):
                cond.feature = f"adx_14 > {adx_thr}"
        sig.stop = f"atr_14 * {atr_stop}"
        sig.take = f"R:{r_mult}"
    return s

# Small grid (expand if needed)
GRID = {
    "adx_thr": [18, 20, 22],
    "atr_stop": [1.4, 1.6],
    "r_mult": [2.0, 2.2],
    "fvg_lb": [2, 3]
}


In [None]:

from itertools import product
from leantrader.backtest.metrics import sharpe, max_drawdown

def score_equity(eq: pd.Series) -> float:
    return float(sharpe(eq)) - abs(float(max_drawdown(eq)))

def fit_on_train(frames: dict) -> dict:
    # Try all param combos on the train slice and pick the best by score_equity
    best = None
    for adx_thr, atr_stop, r_mult, fvg_lb in product(GRID["adx_thr"], GRID["atr_stop"], GRID["r_mult"], GRID["fvg_lb"]):
        eng = {k: engineer_param(v, fvg_lb=fvg_lb) for k,v in frames.items()}
        spec = spec_with_params(base_spec, adx_thr=adx_thr, atr_stop=atr_stop, r_mult=r_mult)
        strat = compile_strategy(spec)
        sigs = strat(eng)
        m15 = eng["M15"]
        sigs["price"] = m15["close"].reindex(sigs.index, method="ffill")
        eq = backtest(m15, sigs, risk_cfg=None)
        if len(eq)==0: 
            continue
        sc = score_equity(eq)
        row = {"adx_thr": adx_thr, "atr_stop": atr_stop, "r_mult": r_mult, "fvg_lb": fvg_lb, "score": sc}
        if best is None or sc > best["score"]:
            best = row
    return best


In [None]:

# Build walk-forward windows (dates based on M15 index)
idx = df_m15.index.sort_values()
if len(idx) < 500:
    print("Note: The sample CSV is tiny; for real WFV use larger histories.")
start = idx[0]
end = idx[-1]

# Define a few windows: e.g., 60 days train, 15 days test (adjust as needed)
def make_windows(index, train_days=60, test_days=15, step_days=15):
    windows = []
    i0 = index[0]
    while True:
        train_start = i0
        train_end = train_start + pd.Timedelta(days=train_days)
        test_end = train_end + pd.Timedelta(days=test_days)
        if test_end >= index[-1]:
            break
        windows.append((train_start, train_end, train_end, test_end))
        i0 = i0 + pd.Timedelta(days=step_days)
    return windows

windows = make_windows(idx, train_days=60, test_days=15, step_days=15)
len(windows), windows[:2]


In [None]:

from leantrader.data.feeds import resample_frames

summary = []
stitched_eq = []

for (tr_s, tr_e, te_s, te_e) in windows:
    # Slice M15 dataframe
    train_df = df_m15.loc[(df_m15.index>=tr_s) & (df_m15.index<tr_e)]
    test_df  = df_m15.loc[(df_m15.index>=te_s) & (df_m15.index<te_e)]
    if len(train_df)<50 or len(test_df)<10:
        continue

    # Build frames
    tr_frames = resample_frames(train_df)
    te_frames = resample_frames(test_df)

    # Fit best params on train
    best = fit_on_train(tr_frames)
    if not best:
        continue

    # Evaluate on test with best params
    eng = {k: engineer_param(v, fvg_lb=best["fvg_lb"]) for k,v in te_frames.items()}
    spec = spec_with_params(base_spec, adx_thr=best["adx_thr"], atr_stop=best["atr_stop"], r_mult=best["r_mult"])
    strat = compile_strategy(spec)
    sigs = strat(eng)
    m15 = eng["M15"]
    sigs["price"] = m15["close"].reindex(sigs.index, method="ffill")
    eq = backtest(m15, sigs, risk_cfg=None)
    if len(eq)==0:
        continue

    row = {
        "train_start": str(tr_s), "train_end": str(tr_e),
        "test_start": str(te_s),  "test_end": str(te_e),
        "score": float(sharpe(eq)) - abs(float(max_drawdown(eq))),
        "sharpe": float(sharpe(eq)),
        "maxdd": float(max_drawdown(eq)),
        "best_params": best
    }
    summary.append(row)
    stitched_eq.append(eq)

len(summary)


In [None]:

# Combine equity curves across windows (simple concat; in practice, scale appropriately)
import pandas as pd
stitched = None
for eq in stitched_eq:
    stitched = eq if stitched is None else pd.concat([stitched, eq]).sort_index()
stitched = stitched if stitched is not None else pd.Series(dtype=float)
stitched.tail()


In [None]:

# Report table & plot stitched equity
import pandas as pd
import matplotlib.pyplot as plt

df_sum = pd.DataFrame(summary)
df_sum.sort_values("test_start", inplace=True)
df_sum


In [None]:

import matplotlib.pyplot as plt

if len(stitched) > 0:
    plt.figure()
    stitched.plot(title="Walk-Forward Stitched Equity — XAUUSD")
    plt.xlabel("Time")
    plt.ylabel("Equity (relative)")
    plt.show()
else:
    print("No stitched equity to plot (insufficient data/windows).")


In [None]:

# Save YAML summary
out_path = TUNE_DIR / "walkforward_summary.yaml"
with open(out_path, "w", encoding="utf-8") as f:
    yaml.safe_dump({"pair": PAIR, "windows": summary}, f, sort_keys=False)
out_path
