In [4]:
import MetaTrader5 as mt5
import pandas as pd
import numpy as np
import pytz
from datetime import datetime
import pandas_ta as ta

SYMBOLS = ["EURUSD", "GBPUSD", "USDJPY"] # Starting with top 3 for speed
TIMEFRAME_M15 = mt5.TIMEFRAME_M15
EMA_PERIOD = 50
INVALIDATION_PIPS = 15.0 # The 'ruin' level that stops the MFE measurement
TIMEOUT_BARS = 48 

def fetch_data(symbol, timeframe, utc_from, utc_to):
    rates = mt5.copy_rates_range(symbol, timeframe, utc_from, utc_to)
    if rates is None or len(rates) == 0: return None
    df = pd.DataFrame(rates)
    df['time'] = pd.to_datetime(df['time'], unit='s')
    return df

def process_mfe_symbol(symbol_name, utc_from, utc_to):
    print(f"\n>>> Extracting Regression Targets for {symbol_name}...")
    
    symbols_found = mt5.symbols_get(f"*{symbol_name}*")
    if not symbols_found: return None
    actual_symbol = symbols_found[0].name
    
    df = fetch_data(actual_symbol, TIMEFRAME_M15, utc_from, utc_to)
    if df is None: return None

    info = mt5.symbol_info(actual_symbol)
    pip_val = info.point * (10.0 if info.digits in [3, 5] else 1.0)

    # Base Features (Simplified for the Regressor data generation example)
    df['ema_50'] = df['close'].ewm(span=EMA_PERIOD, adjust=False).mean()
    df['dist_to_ema'] = (df['close'] - df['ema_50']) / pip_val
    df['hour_of_day'] = df['time'].dt.hour
    df['atr_14'] = ta.atr(df['high'], df['low'], df['close'], length=14)
    df['rsi_14'] = ta.rsi(df['close'], length=14)
    df.dropna(inplace=True)

    df['prev_high'] = df['high'].shift(2)
    df['prev_low'] = df['low'].shift(2)
    
    df['bull_gap'] = df['low'] - df['prev_high']
    df['is_bull_fvg'] = (df['bull_gap'] > 0) & (df['close'].shift(1) > df['close'].shift(2))
    df['bull_fvg_size'] = np.where(df['is_bull_fvg'], df['bull_gap'] / pip_val, 0.0)
    
    df['bear_gap'] = df['prev_low'] - df['high']
    df['is_bear_fvg'] = (df['bear_gap'] > 0) & (df['close'].shift(1) < df['close'].shift(2))
    df['bear_fvg_size'] = np.where(df['is_bear_fvg'], df['bear_gap'] / pip_val, 0.0)
    
    df['bull_fvg_atr_ratio'] = np.where(df['is_bull_fvg'], df['bull_gap'] / df['atr_14'], 0.0)
    df['bear_fvg_atr_ratio'] = np.where(df['is_bear_fvg'], df['bear_gap'] / df['atr_14'], 0.0)
    
    fvg_df = df[(df['is_bull_fvg']) | (df['is_bear_fvg'])].copy()
    
    # --- MFE REGRESSION LABELING ---
    print(f"Calculating Maximum Favorable Excursion (MFE) for {actual_symbol}...")
    mfe_labels = []
    invalidation_points = INVALIDATION_PIPS * pip_val
    
    times = df['time'].values
    highs = df['high'].values
    lows = df['low'].values
    
    for _, row in fvg_df.iterrows():
        future_idx = np.where(times > row['time'])[0]
        if len(future_idx) == 0: 
            mfe_labels.append(0.0); continue
            
        max_search = min(len(future_idx), TIMEOUT_BARS)
        f_highs = highs[future_idx[:max_search]]
        f_lows = lows[future_idx[:max_search]]
        
        max_excursion = 0.0
        
        if row['is_bull_fvg']:
            entry = row['low']
            ruin_price = entry - invalidation_points
            for h, l in zip(f_highs, f_lows):
                if h - entry > max_excursion:
                    max_excursion = h - entry
                if l <= ruin_price: # Setup invalidated, stop measuring
                    break
        else: # Bear FVG
            entry = row['high']
            ruin_price = entry + invalidation_points
            for h, l in zip(f_highs, f_lows):
                if entry - l > max_excursion:
                    max_excursion = entry - l
                if h >= ruin_price: # Setup invalidated, stop measuring
                    break
                    
        # Convert maximum excursion distance to pips
        mfe_labels.append(max_excursion / pip_val)
        
    fvg_df['target_mfe_pips'] = mfe_labels
    fvg_df['symbol'] = symbol_name 
    
    return fvg_df[['time', 'symbol', 'tick_volume', 'hour_of_day', 'rsi_14', 'atr_14', 
                   'dist_to_ema', 'bull_fvg_size', 'bear_fvg_size', 
                   'bull_fvg_atr_ratio', 'bear_fvg_atr_ratio', 'target_mfe_pips']]

def run_extraction():
    if not mt5.initialize(): return
    timezone = pytz.timezone("Etc/UTC")
    utc_from = datetime(2024, 5, 1, tzinfo=timezone)
    utc_to = datetime(2025, 5, 1, tzinfo=timezone)

    all_data = []
    for sym in SYMBOLS:
        res = process_mfe_symbol(sym, utc_from, utc_to)
        if res is not None: all_data.append(res)
    
    mt5.shutdown()
    if all_data:
        master_df = pd.concat(all_data, ignore_index=True)
        # Filter out negative or zero MFEs (we only want to train on setups that actually moved)
        master_df = master_df[master_df['target_mfe_pips'] > 2.0]
        master_df.to_csv("MFE_REGRESSION_MATRIX.csv", index=False)
        print("\nSUCCESS! Regression Matrix Saved: MFE_REGRESSION_MATRIX.csv")

if __name__ == "__main__":
    run_extraction()


>>> Extracting Regression Targets for EURUSD...
Calculating Maximum Favorable Excursion (MFE) for EURUSD...

>>> Extracting Regression Targets for GBPUSD...
Calculating Maximum Favorable Excursion (MFE) for GBPUSD...

>>> Extracting Regression Targets for USDJPY...
Calculating Maximum Favorable Excursion (MFE) for USDJPY...

SUCCESS! Regression Matrix Saved: MFE_REGRESSION_MATRIX.csv


In [5]:
import numpy as np
import pandas as pd
import xgboost as xgb

from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import TimeSeriesSplit

import onnx
from skl2onnx import convert_sklearn, update_registered_converter
from skl2onnx.common.data_types import FloatTensorType
from skl2onnx.common.shape_calculator import calculate_linear_regressor_output_shapes
from onnxmltools.convert.xgboost.operator_converters.XGBoost import convert_xgboost

# REGISTER REGRESSOR CONVERTER
update_registered_converter(
    xgb.XGBRegressor,
    "XGBoostXGBRegressor",
    calculate_linear_regressor_output_shapes,
    convert_xgboost
)

# ---- CONFIG ----
TIME_COL = "time"         # <-- change if your column is named differently (e.g. 'timestamp', 'datetime')
N_SPLITS = 5              # walk-forward folds
GAP_BARS = 0              # optional embargo gap between train and test (set >0 to reduce leakage)
MIN_ROWS = 500            # skip symbols with too little history

FEATURES = [
    "tick_volume", "hour_of_day", "rsi_14", "atr_14",
    "dist_to_ema", "bull_fvg_size", "bear_fvg_size",
    "bull_fvg_atr_ratio", "bear_fvg_atr_ratio"
]
TARGET = "target_mfe_pips"


def _ensure_sorted(sym_df: pd.DataFrame) -> pd.DataFrame:
    """Sort strictly by time to avoid leakage."""
    if TIME_COL in sym_df.columns:
        out = sym_df.copy()
        out[TIME_COL] = pd.to_datetime(out[TIME_COL], errors="coerce", utc=True)
        out = out.dropna(subset=[TIME_COL])
        return out.sort_values(TIME_COL).reset_index(drop=True)

    # fallback: if you have bar index / open_time / etc.
    # If you don't have time, random split is dangerous — but we'll still enforce stable order.
    return sym_df.reset_index(drop=True)


def _regressor() -> xgb.XGBRegressor:
    return xgb.XGBRegressor(
        n_estimators=300,
        max_depth=4,
        learning_rate=0.05,
        subsample=0.9,
        colsample_bytree=0.9,
        objective="reg:squarederror",
        random_state=42,
        n_jobs=-1
    )


def _walkforward_cv_mae(X: np.ndarray, y: np.ndarray, n_splits: int, gap: int) -> float:
    """
    Walk-forward validation using TimeSeriesSplit.
    Adds an optional 'gap' (embargo) between train and test indices.
    """
    tss = TimeSeriesSplit(n_splits=n_splits)
    maes = []

    for fold, (train_idx, test_idx) in enumerate(tss.split(X), start=1):
        if gap > 0:
            # shrink training set to enforce embargo gap before test
            max_train = train_idx[-1]
            min_test = test_idx[0]
            cut = min_test - gap
            train_idx = train_idx[train_idx <= cut]
            if len(train_idx) < 50:
                # too small after gap
                continue

        model = _regressor()
        model.fit(X[train_idx], y[train_idx])

        preds = model.predict(X[test_idx])
        mae = mean_absolute_error(y[test_idx], preds)
        maes.append(mae)

    return float(np.mean(maes)) if maes else float("nan")


def train_regressor():
    try:
        df = pd.read_csv("MFE_REGRESSION_MATRIX.csv").dropna()
    except Exception as e:
        print("Run the data extraction script first. Error:", e)
        return

    if "symbol" not in df.columns:
        print("ERROR: CSV must contain a 'symbol' column.")
        return

    missing = [c for c in FEATURES + [TARGET] if c not in df.columns]
    if missing:
        print("ERROR: Missing columns:", missing)
        return

    symbols = df["symbol"].unique()

    for sym in symbols:
        print(f"\n--- Training MFE Regressor for {sym} ---")
        sym_df = df[df["symbol"] == sym].copy()
        sym_df = _ensure_sorted(sym_df)

        if len(sym_df) < MIN_ROWS:
            print(f"Skipping {sym}: only {len(sym_df)} rows (need >= {MIN_ROWS}).")
            continue

        X = sym_df[FEATURES].to_numpy(dtype=np.float32)
        y = sym_df[TARGET].to_numpy(dtype=np.float32)

        # ---- Walk-forward CV (no leakage) ----
        cv_mae = _walkforward_cv_mae(X, y, n_splits=N_SPLITS, gap=GAP_BARS)
        if np.isnan(cv_mae):
            print("WARNING: CV failed (too little data after gap/splits).")
        else:
            print(f"Walk-forward CV MAE (pips): ±{cv_mae:.2f}")

        # ---- Final model: train on ALL history ----
        model = _regressor()
        model.fit(X, y)

        # Optional sanity: in-sample mean predicted run (not a metric)
        preds_all = model.predict(X)
        print(f"Avg predicted run (in-sample): {preds_all.mean():.2f} pips")

        # ---- Export ONNX ----
        initial_type = [("float_input", FloatTensorType([None, 9]))]
        onx = convert_sklearn(
            model,
            initial_types=initial_type,
            target_opset={"": 12, "ai.onnx.ml": 3}
        )

        filename = f"fvg_mfe_regressor_{sym}_v5.onnx"
        with open(filename, "wb") as f:
            f.write(onx.SerializeToString())

        print(f"Saved Regressor Brain: {filename}")


if __name__ == "__main__":
    train_regressor()


--- Training MFE Regressor for EURUSD ---
Walk-forward CV MAE (pips): ±18.81
Avg predicted run (in-sample): 27.03 pips
Saved Regressor Brain: fvg_mfe_regressor_EURUSD_v5.onnx

--- Training MFE Regressor for GBPUSD ---
Walk-forward CV MAE (pips): ±21.43
Avg predicted run (in-sample): 30.34 pips
Saved Regressor Brain: fvg_mfe_regressor_GBPUSD_v5.onnx

--- Training MFE Regressor for USDJPY ---
Walk-forward CV MAE (pips): ±39.23
Avg predicted run (in-sample): 48.94 pips
Saved Regressor Brain: fvg_mfe_regressor_USDJPY_v5.onnx
