# forex_signal_v28 — PF-first Top-down Multi‑TF Ensemble (4H/1H → 15m → 1m)
Энэ notebook нь Profit Factor (PF)-ийг өсгөхөд чиглэсэн **шаталсан (hierarchical) ensemble** архитектуртай.

**Гол зарчим:**
- **Decision:** 15m candle close(t) дээр шийдвэр гаргана
- **Execution:** 15m дараагийн candle open(t+1) дээр орно (lookahead хамгаалалт)
- **Macro gate:** 4H/1H дээр зөвхөн *зөв орчинд* trade зөвшөөрнө
- **Edge model:** 15m дээр XGB (эсвэл fallback)
- **Entry filter:** 1m aggregated дээр Logistic Regression
- **Backtest:** spread/slippage/commission + 1% risk sizing + TP/SL intrabar worst-case


In [3]:

# ===== 0) Imports & Config =====
import os, glob, math, warnings
import numpy as np
import pandas as pd
from dataclasses import dataclass

from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import roc_auc_score

warnings.filterwarnings("ignore")

# XGBoost (requested). If not installed, we'll fallback to sklearn HistGradientBoosting.
try:
    from xgboost import XGBClassifier
    HAS_XGB = True
except Exception:
    HAS_XGB = False
    from sklearn.ensemble import HistGradientBoostingClassifier

@dataclass
class CFG:
    # directories (stepped up from ml_models_train/)
    train_dir: str = "../data/train"
    test_dir: str  = "../data/test"

    # timeframes (PF-first selection)
    master_tf: str = "15m"      # trade decision timeframe
    entry_tf: str  = "1m"       # entry quality timeframe
    macro_tfs: tuple = ("1h","4h")

    # costs / execution
    pip_size: float = 0.0001            # EURUSD
    usd_per_pip_per_lot: float = 10.0   # 1.0 lot ~ $10/pip for EURUSD
    starting_equity: float = 10_000.0

    # realistic costs (tune to your broker)
    spread_pips: float = 1.0
    slippage_pips: float = 0.2
    commission_per_lot_usd: float = 0.0 # set if your broker charges commission (round-trip below)

    # risk & exits (we will tune TP/SL multipliers on validation)
    risk_per_trade: float = 0.01
    atr_period: int = 14
    tp_atr: float = 2.0
    sl_atr: float = 1.5
    max_hold_bars: int = 16   # 16*15m = 4h

    # thresholds (to tune)
    edge_th: float = 0.62
    entry_th: float = 0.55

cfg = CFG()
print("HAS_XGB:", HAS_XGB)
print("Config loaded.", cfg)


HAS_XGB: True
Config loaded. CFG(train_dir='../data/train', test_dir='../data/test', master_tf='15m', entry_tf='1m', macro_tfs=('1h', '4h'), pip_size=0.0001, usd_per_pip_per_lot=10.0, starting_equity=10000.0, spread_pips=1.0, slippage_pips=0.2, commission_per_lot_usd=0.0, risk_per_trade=0.01, atr_period=14, tp_atr=2.0, sl_atr=1.5, max_hold_bars=16, edge_th=0.62, entry_th=0.55)


In [4]:

# ===== 1) Robust OHLCV loader (handles filename/column variations) =====
import re

def _find_csv_by_tf(folder: str, tf: str) -> str:
    # Handle m15 vs 15m
    patterns = [
        f"*{tf}*.csv",
        f"*{tf.upper()}*.csv",
        f"*{tf.lower()}*.csv",
    ]
    # If tf="15m", also look for "m15"
    if len(tf) > 1 and tf[0].isdigit() and tf[-1].isalpha():
        rev = f"{tf[-1]}{tf[:-1]}" # 15m -> m15
        patterns.append(f"*{rev}*.csv")
        patterns.append(f"*{rev.upper()}*.csv")
        patterns.append(f"*{rev.lower()}*.csv")
        
    files = []
    for pat in patterns:
        files.extend(glob.glob(os.path.join(folder, pat)))
    files = sorted(set(files))
    
    if not files:
        # Fallback: fuzzy match using regex if nothing found
        # (e.g. if file is EURUSD-2020-2023-15m.csv)
        all_csvs = glob.glob(os.path.join(folder, "*.csv"))
        for f in all_csvs:
            bn = os.path.basename(f).lower()
            # check if both number and unit present (simple check)
            if tf in bn:
                files.append(f)
                
    if not files:
        raise FileNotFoundError(f"No CSV found for tf='{tf}' in {folder}. Tried {patterns}")
        
    # prefer shortest name (often cleanest)
    return sorted(files, key=lambda x: len(os.path.basename(x)))[0]

def load_ohlcv(folder: str, tf: str) -> pd.DataFrame:
    path = _find_csv_by_tf(folder, tf)
    df = pd.read_csv(path)

    cols = {c.lower(): c for c in df.columns}
    def pick(cands):
        for c in cands:
            if c in cols: return cols[c]
        return None

    ts = pick(["timestamp","time","date","datetime"])
    o  = pick(["open","o"])
    h  = pick(["high","h"])
    l  = pick(["low","l"])
    c  = pick(["close","c"])
    v  = pick(["volume","vol","tick_volume"])

    if any(x is None for x in [ts,o,h,l,c]):
        # Fallback: if 'time' is missing but we have 5 cols, assume they are T,O,H,L,C
        if len(df.columns) >= 5 and ts is None:
            # blind guess
            col_list = df.columns.tolist()
            print(f"Warning: guessing columns for {path}: {col_list[:5]}")
            ts, o, h, l, c = col_list[0], col_list[1], col_list[2], col_list[3], col_list[4]
        else:
            raise ValueError(f"Missing required columns in {path}. Have: {df.columns.tolist()}")

    out = df[[ts,o,h,l,c] + ([v] if v else [])].copy()
    out.columns = ["timestamp","open","high","low","close"] + (["volume"] if v else [])
    
    # Auto-detect format if possible, else standard mixed
    out["timestamp"] = pd.to_datetime(out["timestamp"], utc=True, errors="coerce")
    out = out.dropna(subset=["timestamp"]).sort_values("timestamp").reset_index(drop=True)

    for col in ["open","high","low","close"] + (["volume"] if "volume" in out.columns else []):
        out[col] = pd.to_numeric(out[col], errors="coerce")
    out = out.dropna(subset=["open","high","low","close"]).reset_index(drop=True)

    out.attrs["source_path"] = path
    return out

def load_bundle(folder: str):
    bundle = {}
    for tf in set([cfg.master_tf, cfg.entry_tf, *cfg.macro_tfs]):
        try:
            bundle[tf] = load_ohlcv(folder, tf)
            print(f"{tf:>3}  rows={len(bundle[tf]):,}  file={os.path.basename(bundle[tf].attrs['source_path'])}")
        except FileNotFoundError:
            print(f"Warning: {tf} not found in {folder}. Strategy might fail if this is essential.")
    return bundle

train_raw = load_bundle(cfg.train_dir)
test_raw  = load_bundle(cfg.test_dir)


15m  rows=224,382  file=EURUSD_m15.csv
 4h  rows=14,498  file=EURUSD_h4.csv
 1m  rows=3,354,904  file=EURUSD_m1.csv
 1h  rows=56,098  file=EURUSD_h1.csv
15m  rows=49,807  file=EURUSD_m15.csv
 4h  rows=3,220  file=EURUSD_h4.csv
 1m  rows=743,476  file=EURUSD_m1.csv
 1h  rows=12,454  file=EURUSD_h1.csv


In [5]:

# ===== 2) Technical indicators (fast, dependency-free) =====
def ema(s: pd.Series, span: int) -> pd.Series:
    return s.ewm(span=span, adjust=False).mean()

def rsi(close: pd.Series, period: int = 14) -> pd.Series:
    delta = close.diff()
    up = delta.clip(lower=0)
    dn = (-delta).clip(lower=0)
    rs = up.ewm(alpha=1/period, adjust=False).mean() / (dn.ewm(alpha=1/period, adjust=False).mean() + 1e-12)
    return 100 - (100/(1+rs))

def true_range(df: pd.DataFrame) -> pd.Series:
    prev = df["close"].shift(1)
    tr = pd.concat([
        (df["high"] - df["low"]),
        (df["high"] - prev).abs(),
        (df["low"] - prev).abs()
    ], axis=1).max(axis=1)
    return tr

def atr(df: pd.DataFrame, period: int = 14) -> pd.Series:
    return true_range(df).ewm(alpha=1/period, adjust=False).mean()

def adx(df: pd.DataFrame, period: int = 14) -> pd.Series:
    up = df["high"].diff()
    dn = -df["low"].diff()

    plus_dm = np.where((up > dn) & (up > 0), up, 0.0)
    minus_dm = np.where((dn > up) & (dn > 0), dn, 0.0)

    tr = true_range(df)
    atrn = tr.ewm(alpha=1/period, adjust=False).mean()

    plus_di = 100 * pd.Series(plus_dm, index=df.index).ewm(alpha=1/period, adjust=False).mean() / (atrn + 1e-12)
    minus_di = 100 * pd.Series(minus_dm, index=df.index).ewm(alpha=1/period, adjust=False).mean() / (atrn + 1e-12)

    dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-12)
    return dx.ewm(alpha=1/period, adjust=False).mean()

def macd_hist(close: pd.Series, fast=12, slow=26, signal=9) -> pd.Series:
    macd = ema(close, fast) - ema(close, slow)
    sig  = ema(macd, signal)
    return macd - sig

def bb_width(close: pd.Series, period=20, k=2.0) -> pd.Series:
    m = close.rolling(period).mean()
    s = close.rolling(period).std()
    upper = m + k*s
    lower = m - k*s
    width = (upper - lower) / (m.abs() + 1e-12)
    return width


In [6]:

# ===== 3) Feature building & multi‑TF alignment (NO lookahead) =====
def add_tf_features(df: pd.DataFrame, prefix: str) -> pd.DataFrame:
    out = df.copy()
    out[f"{prefix}_ema200"] = ema(out["close"], 200)
    out[f"{prefix}_ema50"]  = ema(out["close"], 50)
    out[f"{prefix}_ema20"]  = ema(out["close"], 20)
    out[f"{prefix}_ema200_slope"] = out[f"{prefix}_ema200"].diff(3)
    out[f"{prefix}_rsi14"] = rsi(out["close"], 14)
    out[f"{prefix}_adx14"] = adx(out, 14)
    out[f"{prefix}_atr14"] = atr(out, cfg.atr_period)
    out[f"{prefix}_macd_h"] = macd_hist(out["close"])
    out[f"{prefix}_bb_width"] = bb_width(out["close"])

    # ATR-scaled distances (robust across years)
    atrv = out[f"{prefix}_atr14"].replace(0, np.nan)
    out[f"{prefix}_dist_ema200_atr"] = (out["close"] - out[f"{prefix}_ema200"]) / atrv
    out[f"{prefix}_dist_ema50_atr"]  = (out["close"] - out[f"{prefix}_ema50"]) / atrv
    return out

def merge_asof_backward(left: pd.DataFrame, right: pd.DataFrame, suffix: str) -> pd.DataFrame:
    # assumes timestamps represent candle OPEN time; uses last known candle whose open <= left timestamp
    L = left.sort_values("timestamp").copy()
    R = right.sort_values("timestamp").copy()
    m = pd.merge_asof(L, R, on="timestamp", direction="backward", suffixes=("", f"_{suffix}"))
    return m

def agg_entry_1m_to_15m(one_m: pd.DataFrame) -> pd.DataFrame:
    d = one_m.copy()
    d = d.sort_values("timestamp").set_index("timestamp")

    ret1 = np.log(d["close"]).diff()
    rng = (d["high"] - d["low"])
    body = (d["close"] - d["open"]).abs()
    wick_down = (d[["open","close"]].min(axis=1) - d["low"]).clip(lower=0)
    wick_up   = (d["high"] - d[["open","close"]].max(axis=1)).clip(lower=0)
    rng_safe = rng.replace(0, np.nan)

    agg = pd.DataFrame({
        "m1_ret_mean": ret1.resample("15min").mean(),
        "m1_ret_std":  ret1.resample("15min").std(),
        "m1_rng_mean": rng.resample("15min").mean(),
        "m1_rng_max":  rng.resample("15min").max(),
        "m1_body_mean": body.resample("15min").mean(),
        "m1_wick_down_mean": (wick_down / rng_safe).resample("15min").mean(),
        "m1_wick_up_mean":   (wick_up   / rng_safe).resample("15min").mean(),
        "m1_count": rng.resample("15min").count(),
    }).reset_index()

    # shift by 1 bucket: master row at time t uses fully completed previous 15m window
    for c in agg.columns:
        if c != "timestamp":
            agg[c] = agg[c].shift(1)
    return agg

def build_master_table(bundle: dict) -> pd.DataFrame:
    # master (15m) with its own features
    master = bundle[cfg.master_tf].copy()
    master = add_tf_features(master, "m15")
    master["atr14"] = master["m15_atr14"]

    # macro features (1h, 4h)
    for tf in cfg.macro_tfs:
        mac = add_tf_features(bundle[tf], tf)
        keep = ["timestamp"] + [c for c in mac.columns if c.startswith(tf+"_")]
        master = merge_asof_backward(master, mac[keep], suffix=tf)

    # entry agg (1m -> 15m)
    entry = agg_entry_1m_to_15m(bundle[cfg.entry_tf])
    master = pd.merge(master, entry, on="timestamp", how="left")

    # clean
    master = master.replace([np.inf, -np.inf], np.nan).sort_values("timestamp")
    master = master.fillna(method="ffill").dropna().reset_index(drop=True)
    return master

train_df = build_master_table(train_raw)
test_df  = build_master_table(test_raw)

print("train_df:", train_df.shape, "test_df:", test_df.shape)
train_df.head(3)


train_df: (224102, 48) test_df: (49511, 48)


Unnamed: 0,timestamp,open,high,low,close,volume,m15_ema200,m15_ema50,m15_ema20,m15_ema200_slope,...,4h_dist_ema200_atr,4h_dist_ema50_atr,m1_ret_mean,m1_ret_std,m1_rng_mean,m1_rng_max,m1_body_mean,m1_wick_down_mean,m1_wick_up_mean,m1_count
0,2015-01-06 20:00:00+00:00,1.19129,1.19147,1.18956,1.1898,2881.09,1.195015,1.192249,1.192217,-0.000114,...,-4.630483,-3.415172,-8.3e-05,0.000155,0.000213,0.00063,0.000147,0.140234,0.231039,15.0
1,2015-01-06 20:15:00+00:00,1.1898,1.19038,1.18887,1.19019,2310.13,1.194967,1.192168,1.192024,-0.000138,...,-4.630483,-3.415172,-8.3e-05,0.000198,0.000292,0.00091,0.000192,0.184871,0.193857,15.0
2,2015-01-06 20:30:00+00:00,1.19019,1.19043,1.1896,1.1896,1716.93,1.194913,1.192067,1.191793,-0.000154,...,-4.630483,-3.415172,2.2e-05,0.000215,0.000319,0.00093,0.000195,0.243291,0.187859,15.0


In [7]:

# ===== 4) Macro gate (PF-first) =====
def macro_gate(df: pd.DataFrame) -> pd.Series:
    # conservative default; we'll optionally tune later
    ok = (
        (df["4h_dist_ema200_atr"] > 0.0) &
        (df["4h_ema200_slope"] > 0.0) &
        (df["4h_adx14"] > 18) &
        (df["1h_dist_ema200_atr"] > -0.25) &
        (df["1h_adx14"] > 16) &
        (df["4h_bb_width"] > 0.001)  # avoid ultra-compressed dead market
    )
    return ok.astype(int)

train_df["macro_ok"] = macro_gate(train_df)
test_df["macro_ok"]  = macro_gate(test_df)

train_df["macro_ok"].value_counts().to_dict(), test_df["macro_ok"].value_counts().to_dict()


({0: 163897, 1: 60205}, {0: 34811, 1: 14700})

In [8]:

# ===== 5) Triple‑barrier label on 15m (for EDGE model) =====
def make_triple_barrier_label(df: pd.DataFrame, tp_atr: float, sl_atr: float, max_hold: int) -> pd.Series:
    tp = df["open"].shift(-1) + tp_atr * df["atr14"]  # TP/SL set at decision time (t), entry at t+1 open
    sl = df["open"].shift(-1) - sl_atr * df["atr14"]

    y = np.zeros(len(df), dtype=int)
    for i in range(len(df) - max_hold - 2):
        entry_i = i + 1
        if not np.isfinite(tp.iloc[i]) or not np.isfinite(sl.iloc[i]):
            continue
        hi = df["high"].iloc[entry_i: entry_i + max_hold]
        lo = df["low"].iloc[entry_i: entry_i + max_hold]

        # earliest barrier hit
        hit_tp = np.where(hi.values >= tp.iloc[i])[0]
        hit_sl = np.where(lo.values <= sl.iloc[i])[0]

        t_tp = hit_tp[0] if len(hit_tp) else None
        t_sl = hit_sl[0] if len(hit_sl) else None

        if t_tp is not None and (t_sl is None or t_tp < t_sl):
            y[i] = 1
        else:
            y[i] = 0

    return pd.Series(y, index=df.index)

# default label params (tuned later)
train_df["y_edge"] = make_triple_barrier_label(train_df, cfg.tp_atr, cfg.sl_atr, cfg.max_hold_bars)
test_df["y_edge"]  = make_triple_barrier_label(test_df,  cfg.tp_atr, cfg.sl_atr, cfg.max_hold_bars)

train_df["y_edge"].value_counts().to_dict()


{0: 152397, 1: 71705}

In [9]:

# ===== 6) Entry label (1m aggregated): avoid immediate adverse move after entry =====
# We label "good entry" if within next 3 master bars (45m) we do NOT dip more than X*ATR below entry.
def make_entry_label(df: pd.DataFrame, adverse_atr: float = 0.5, horizon: int = 3) -> pd.Series:
    y = np.zeros(len(df), dtype=int)
    for i in range(len(df) - horizon - 2):
        entry_i = i + 1
        entry = df.loc[entry_i, "open"]
        atr_i = df.loc[i, "atr14"]
        if not np.isfinite(entry) or not np.isfinite(atr_i) or atr_i <= 0:
            continue
        floor = entry - adverse_atr * atr_i
        lo = df["low"].iloc[entry_i:entry_i+horizon].min()
        y[i] = 1 if lo > floor else 0
    return pd.Series(y, index=df.index)

train_df["y_entry"] = make_entry_label(train_df, adverse_atr=0.5, horizon=3)


In [10]:

# ===== 7) Train/Validation split inside train (2015–2023) =====
# We split by time using timestamp years if available; fallback to last 20% as validation.
train_df["year"] = train_df["timestamp"].dt.year

core = train_df[train_df["year"] <= 2021].copy()
val  = train_df[(train_df["year"] >= 2022) & (train_df["year"] <= 2023)].copy()

if len(val) < 1000:
    cut = int(len(train_df)*0.8)
    core = train_df.iloc[:cut].copy()
    val  = train_df.iloc[cut:].copy()

print("core:", core.shape, "val:", val.shape)


core: (174243, 52) val: (49859, 52)


In [11]:

# ===== 8) Feature sets =====
# Edge model features: 15m + macro context + entry agg (but NOT future)
EDGE_FEATURES = [
    # master 15m
    "m15_dist_ema200_atr","m15_dist_ema50_atr","m15_ema200_slope","m15_rsi14","m15_adx14","m15_macd_h","m15_bb_width",
    "m15_atr14",
    # macro context
    "1h_dist_ema200_atr","1h_ema200_slope","1h_adx14","1h_bb_width",
    "4h_dist_ema200_atr","4h_ema200_slope","4h_adx14","4h_bb_width",
]

# Entry features (from 1m aggregation) + a couple of local context
ENTRY_FEATURES = [
    "m1_ret_mean","m1_ret_std","m1_rng_mean","m1_rng_max","m1_body_mean","m1_wick_down_mean","m1_wick_up_mean","m1_count",
    "m15_atr14","m15_rsi14"
]

# sanity
missing_edge = [c for c in EDGE_FEATURES if c not in train_df.columns]
missing_entry = [c for c in ENTRY_FEATURES if c not in train_df.columns]
print("missing_edge:", missing_edge)
print("missing_entry:", missing_entry)


missing_edge: []
missing_entry: []


In [12]:

# ===== 9) Models =====
def make_edge_model():
    if HAS_XGB:
        return XGBClassifier(
            n_estimators=600,
            max_depth=4,
            learning_rate=0.05,
            subsample=0.8,
            colsample_bytree=0.8,
            reg_lambda=2.0,
            min_child_weight=2,
            random_state=42,
            eval_metric="logloss",
            n_jobs=4
        )
    else:
        return HistGradientBoostingClassifier(
            max_depth=4,
            learning_rate=0.05,
            max_iter=600,
            random_state=42
        )

edge_model = make_edge_model()

# Calibrate probabilities for stable thresholding (PF tuning)
edge_clf = CalibratedClassifierCV(edge_model, method="sigmoid", cv=3)

entry_clf = Pipeline([
    ("scaler", StandardScaler(with_mean=False)),
    ("lr", LogisticRegression(max_iter=2000))
])

# train edge on macro_ok==1 only (PF-first selectivity)
core_edge = core[core["macro_ok"]==1].copy()
val_edge  = val[val["macro_ok"]==1].copy()

X_core_edge = core_edge[EDGE_FEATURES]
y_core_edge = core_edge["y_edge"]

X_val_edge  = val_edge[EDGE_FEATURES]
y_val_edge  = val_edge["y_edge"]

edge_clf.fit(X_core_edge, y_core_edge)

p_val_edge = edge_clf.predict_proba(X_val_edge)[:,1]
print("Edge AUC (val, gated):", roc_auc_score(y_val_edge, p_val_edge))

# train entry filter on all macro_ok==1 rows (still ok)
core_entry = core[core["macro_ok"]==1].copy()
val_entry  = val[val["macro_ok"]==1].copy()

X_core_entry = core_entry[ENTRY_FEATURES]
y_core_entry = core_entry["y_entry"]

X_val_entry  = val_entry[ENTRY_FEATURES]
y_val_entry  = val_entry["y_entry"]

entry_clf.fit(X_core_entry, y_core_entry)
p_val_entry = entry_clf.predict_proba(X_val_entry)[:,1]
print("Entry AUC (val, gated):", roc_auc_score(y_val_entry, p_val_entry))


Edge AUC (val, gated): 0.6288569294141332
Entry AUC (val, gated): 0.6251427957062523


In [13]:

# ===== 10) Backtest engine (truthful): next-open, costs, 1% risk, intrabar worst-case =====
@dataclass
class Trade:
    entry_time: pd.Timestamp
    exit_time: pd.Timestamp
    entry_price: float
    exit_price: float
    sl: float
    tp: float
    lots: float
    pnl_usd: float
    pnl_pips: float
    outcome: str

def pip_to_price(pips: float) -> float:
    return pips * cfg.pip_size

def price_to_pips(price_delta: float) -> float:
    return price_delta / cfg.pip_size

def calc_lots_for_risk(equity: float, stop_pips: float) -> float:
    if stop_pips <= 0:
        return 0.0
    risk_usd = equity * cfg.risk_per_trade
    lots = risk_usd / (stop_pips * cfg.usd_per_pip_per_lot + 1e-12)
    return max(0.0, float(lots))

def run_backtest(df: pd.DataFrame, p_edge: np.ndarray, p_entry: np.ndarray,
                edge_th: float, entry_th: float, tp_atr: float, sl_atr: float, max_hold: int):
    df = df.reset_index(drop=True).copy()
    spread = pip_to_price(cfg.spread_pips)
    slip   = pip_to_price(cfg.slippage_pips)

    equity = cfg.starting_equity
    eq_curve = []
    trades = []
    in_pos = False
    entry_i = None
    entry_price = sl = tp = lots = None
    bars_in_trade = 0

    for i in range(len(df)-2):
        t = df.loc[i, "timestamp"]
        eq_curve.append((t, equity))

        if in_pos:
            bars_in_trade += 1
            hi = df.loc[i, "high"]
            lo = df.loc[i, "low"]
            hit_sl = lo <= sl
            hit_tp = hi >= tp

            if hit_sl or hit_tp or bars_in_trade >= max_hold:
                if hit_sl and hit_tp:
                    exit_price = sl   # worst-case for long
                    outcome = "SL(amb)"
                elif hit_sl:
                    exit_price = sl
                    outcome = "SL"
                elif hit_tp:
                    exit_price = tp
                    outcome = "TP"
                else:
                    # time exit at close(i) on bid
                    exit_price = df.loc[i, "close"]
                    outcome = "TIME"

                # long exit on bid (worse): -spread/2 - slip
                exit_price = exit_price - spread/2 - slip

                pnl_price = exit_price - entry_price
                pnl_pips  = price_to_pips(pnl_price)
                pnl_usd   = pnl_pips * cfg.usd_per_pip_per_lot * lots
                pnl_usd  -= cfg.commission_per_lot_usd * lots  # round-trip

                equity += pnl_usd
                trades.append(Trade(
                    entry_time=df.loc[entry_i, "timestamp"],
                    exit_time=t,
                    entry_price=float(entry_price),
                    exit_price=float(exit_price),
                    sl=float(sl), tp=float(tp),
                    lots=float(lots),
                    pnl_usd=float(pnl_usd),
                    pnl_pips=float(pnl_pips),
                    outcome=outcome
                ))
                in_pos = False
                entry_i = None
                bars_in_trade = 0
            continue

        # no position: decide at close(i), enter at open(i+1)
        if df.loc[i, "macro_ok"] != 1:
            continue
        if not (p_edge[i] > edge_th and p_entry[i] > entry_th):
            continue

        entry_i = i + 1
        raw_entry = df.loc[entry_i, "open"]
        entry_price = raw_entry + spread/2 + slip  # long entry on ask

        atr_i = df.loc[i, "atr14"]
        if not np.isfinite(atr_i) or atr_i <= 0:
            entry_i = None
            continue

        tp = entry_price + tp_atr * atr_i
        sl = entry_price - sl_atr * atr_i

        stop_pips = price_to_pips(entry_price - sl)
        lots = calc_lots_for_risk(equity, stop_pips)
        if lots <= 0:
            entry_i = None
            continue

        in_pos = True
        bars_in_trade = 0

    eq = pd.DataFrame(eq_curve, columns=["timestamp","equity"]).drop_duplicates("timestamp")
    eq["peak"] = eq["equity"].cummax()
    eq["dd"] = (eq["equity"] - eq["peak"]) / (eq["peak"] + 1e-12)
    return trades, eq

def summarize(trades, eq):
    if len(trades) == 0:
        return {"trades":0, "profit_factor":0.0, "max_drawdown": float(eq["dd"].min()) if len(eq) else 0.0}
    pnl = np.array([t.pnl_usd for t in trades])
    gross_profit = pnl[pnl>0].sum()
    gross_loss = -pnl[pnl<0].sum()
    pf = gross_profit / gross_loss if gross_loss > 0 else float("inf")
    return {
        "trades": int(len(trades)),
        "profit_factor": float(pf),
        "net_profit_usd": float(pnl.sum()),
        "winrate": float((pnl>0).mean()),
        "avg_trade_usd": float(pnl.mean()),
        "max_drawdown": float(eq["dd"].min()) if len(eq) else 0.0
    }


In [20]:

# ===== 11) Validation tuning (PF-first): thresholds + TP/SL grid =====
def compute_probs(df: pd.DataFrame):
    df_use = df.copy()
    X_edge = df_use[EDGE_FEATURES]
    X_ent  = df_use[ENTRY_FEATURES]
    p_edge = edge_clf.predict_proba(X_edge)[:,1]
    p_ent  = entry_clf.predict_proba(X_ent)[:,1]
    return p_edge, p_ent

p_val_edge_all, p_val_entry_all = compute_probs(val)

# Narrower grid to save time, but broader scope for thresholds
grid_edge_th  = [0.55, 0.60, 0.65]
grid_entry_th = [0.50, 0.53, 0.55]
grid_tp = [2.0, 2.5]
grid_sl = [1.5, 2.0]

best = None
rows = []

import time
t0 = time.time()

for et in grid_edge_th:
    for it in grid_entry_th:
        for tp_atr in grid_tp:
            for sl_atr in grid_sl:
                trades, eq = run_backtest(val, p_val_edge_all, p_val_entry_all, et, it, tp_atr, sl_atr, cfg.max_hold_bars)
                st = summarize(trades, eq)
                
                # constraints (PF-first but keep it reasonable)
                # Soften min trades to 10 just to find SOMETHING, in real life you want 50+
                if st["trades"] < 10:
                    continue
                if st["max_drawdown"] < -0.30: # relax DD
                    continue
                    
                score = st["profit_factor"]  # primary objective
                
                rows.append((score, et, it, tp_atr, sl_atr, st["trades"], st["max_drawdown"], st["net_profit_usd"]))
                
                # track best
                if best is None or score > best[0]:
                    best = (score, et, it, tp_atr, sl_atr, st)

print(f"Grid search done in {time.time()-t0:.1f}s. Candidates: {len(rows)}")
if best:
    print("Best PF:", best[0])
else:
    print("No config passed constraints.")


Grid search done in 18.6s. Candidates: 0
No config passed constraints.


In [21]:

# Show top validation configs
top = sorted(rows, key=lambda x: x[0], reverse=True)[:20]
pd.DataFrame(top, columns=["PF","edge_th","entry_th","tp_atr","sl_atr","trades","max_dd","net_usd"])


Unnamed: 0,PF,edge_th,entry_th,tp_atr,sl_atr,trades,max_dd,net_usd


In [22]:

# Apply best config
if best is None:
    print("WARNING: No valid config found during tuning. Using defaults.")
    # Default fallback
    PF_best = 0.0
    EDGE_TH = cfg.edge_th
    ENTRY_TH = cfg.entry_th
    TP_ATR = cfg.tp_atr
    SL_ATR = cfg.sl_atr
    st_best = {}
else:
    PF_best, EDGE_TH, ENTRY_TH, TP_ATR, SL_ATR, st_best = best
    
print("BEST on VAL:", st_best)
print("Chosen:", {"EDGE_TH":EDGE_TH,"ENTRY_TH":ENTRY_TH,"TP_ATR":TP_ATR,"SL_ATR":SL_ATR})


BEST on VAL: {}
Chosen: {'EDGE_TH': 0.62, 'ENTRY_TH': 0.55, 'TP_ATR': 2.0, 'SL_ATR': 1.5}


In [23]:

# ===== 12) Final TEST backtest (2024–2025) with tuned params =====
p_test_edge, p_test_entry = compute_probs(test_df)

trades_test, eq_test = run_backtest(test_df, p_test_edge, p_test_entry, EDGE_TH, ENTRY_TH, TP_ATR, SL_ATR, cfg.max_hold_bars)
stats_test = summarize(trades_test, eq_test)
stats_test


{'trades': 0, 'profit_factor': 0.0, 'max_drawdown': 0.0}

In [24]:

# Quick peek: outcomes
from collections import Counter
Counter([t.outcome for t in trades_test])


Counter()

In [25]:

# Equity curve preview
eq_test.tail()


Unnamed: 0,timestamp,equity,peak,dd
49504,2025-12-30 22:15:00+00:00,10000.0,10000.0,0.0
49505,2025-12-30 22:30:00+00:00,10000.0,10000.0,0.0
49506,2025-12-30 22:45:00+00:00,10000.0,10000.0,0.0
49507,2025-12-30 23:00:00+00:00,10000.0,10000.0,0.0
49508,2025-12-30 23:15:00+00:00,10000.0,10000.0,0.0
