In [None]:
# Your standard convention
data_path = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data"

# Point this to your M5 raw data folder (one CSV per symbol)
RAW_M5_DIR = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data"  # <-- CHANGE IF NEEDED


In [2]:
import os
import glob
import pandas as pd
import numpy as np

# ============================================================
# CONFIG (EDIT ONLY THIS SECTION)
# ============================================================

# Your standard convention
data_path = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data"

# Point this to your M5 raw data folder (one CSV per symbol)
RAW_M5_DIR = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data"  # <-- CHANGE IF NEEDED

# Where QC report should be saved
OUT_DIR = os.path.join(data_path, "QC Reports")
os.makedirs(OUT_DIR, exist_ok=True)

# Expected columns (case-insensitive mapping handled below)
REQUIRED_COLS = ["symbol", "date", "open", "high", "low", "close", "volume"]

# Column names used after standardization
DT_COL = "date"
SYMBOL_COL = "symbol"

# Frequency diagnostics (we don't enforce, we report + optionally flag)
EXPECTED_FREQ_MIN = 5

# If pct of diffs == 5m is below this, we mark status as CHECK (set to 0 to disable)
FLAG_IRREGULAR_IF_PCT_LT = 0.80

# Count gaps >= this many minutes
GAP_FLAG_MIN = 60

# Close-to-close absolute pct move regarded as outlier (report only, also flags CHECK if >0)
OUTLIER_PCT_MOVE = 0.20

# ============================================================
# UTILS
# ============================================================

def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Make column names lower-case and strip spaces.
    Also handles common variations like 'Date', 'datetime', 'Open', etc.
    """
    df = df.copy()
    df.columns = [c.strip().lower() for c in df.columns]

    # Common aliases mapping -> standard names
    alias_map = {
        "datetime": "date",
        "timestamp": "date",
        "time": "date",
        "dt": "date",
        "ticker": "symbol",
        "sym": "symbol",
        "vol": "volume",
    }

    df = df.rename(columns={k: v for k, v in alias_map.items() if k in df.columns})
    return df


def safe_to_numeric(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
    df = df.copy()
    for c in cols:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")
    return df


# ============================================================
# QC LOGIC
# ============================================================

def qc_single_file(fp: str) -> tuple[dict, pd.DataFrame]:
    """
    Returns (summary_dict, issues_df) for a single CSV file.
    Never raises; always returns something useful.
    """
    issues = []
    file_name = os.path.basename(fp)

    # -------------------------
    # Read file
    # -------------------------
    try:
        df = pd.read_csv(fp)
    except Exception as e:
        summary = {
            "file": file_name,
            "status": "READ_FAIL",
            "rows": 0,
            "cols": 0,
            "error": str(e),
        }
        issues_df = pd.DataFrame([{"file": file_name, "issue": "READ_FAIL", "detail": str(e)}])
        return summary, issues_df

    df = standardize_columns(df)

    summary = {
        "file": file_name,
        "status": "OK",
        "rows": int(len(df)),
        "cols": int(len(df.columns)),
        "error": "",
    }

    # -------------------------
    # Column checks
    # -------------------------
    missing_cols = [c for c in REQUIRED_COLS if c not in df.columns]
    extra_cols = [c for c in df.columns if c not in REQUIRED_COLS]

    summary["missing_cols"] = ",".join(missing_cols) if missing_cols else ""
    summary["extra_cols"] = ",".join(extra_cols) if extra_cols else ""

    if missing_cols:
        issues.append({"file": file_name, "issue": "MISSING_COLUMNS", "detail": ",".join(missing_cols)})
        summary["status"] = "CHECK"

    if extra_cols:
        issues.append({"file": file_name, "issue": "EXTRA_COLUMNS", "detail": ",".join(extra_cols)})

    # If no datetime column, stop here (still return)
    if DT_COL not in df.columns:
        issues_df = pd.DataFrame(issues)
        return summary, issues_df

    # -------------------------
    # Datetime parsing
    # -------------------------
    df["_dt"] = pd.to_datetime(df[DT_COL], errors="coerce")
    bad_dt = int(df["_dt"].isna().sum())
    summary["bad_datetime_rows"] = bad_dt
    if bad_dt > 0:
        issues.append({"file": file_name, "issue": "BAD_DATETIME", "detail": f"{bad_dt} rows"})
        summary["status"] = "CHECK"

    # Drop rows where datetime is invalid for subsequent checks
    df_valid = df.dropna(subset=["_dt"]).copy()

    if len(df_valid) == 0:
        issues.append({"file": file_name, "issue": "NO_VALID_DATETIME_ROWS", "detail": "all datetimes invalid"})
        summary["status"] = "CHECK"
        issues_df = pd.DataFrame(issues)
        return summary, issues_df

    # -------------------------
    # Missingness summary
    # -------------------------
    for c in REQUIRED_COLS:
        if c in df.columns:
            nmiss = int(df[c].isna().sum())
            summary[f"na_{c}"] = nmiss
            if nmiss > 0:
                issues.append({"file": file_name, "issue": "MISSING_VALUES", "detail": f"{c}: {nmiss}"})
                summary["status"] = "CHECK"
        else:
            summary[f"na_{c}"] = np.nan

    # -------------------------
    # Sorting, duplicates, monotonic
    # -------------------------
    df_valid = df_valid.sort_values("_dt")

    if SYMBOL_COL in df_valid.columns:
        dup = int(df_valid.duplicated(subset=[SYMBOL_COL, "_dt"]).sum())
        summary["duplicate_symbol_dt"] = dup
        if dup > 0:
            issues.append({"file": file_name, "issue": "DUPLICATE_SYMBOL_DATETIME", "detail": f"{dup} rows"})
            summary["status"] = "CHECK"
    else:
        dup = int(df_valid.duplicated(subset=["_dt"]).sum())
        summary["duplicate_dt"] = dup
        if dup > 0:
            issues.append({"file": file_name, "issue": "DUPLICATE_DATETIME", "detail": f"{dup} rows"})
            summary["status"] = "CHECK"

    summary["monotonic_time"] = bool(df_valid["_dt"].is_monotonic_increasing)
    if not summary["monotonic_time"]:
        issues.append({"file": file_name, "issue": "NON_MONOTONIC_TIME", "detail": "time not increasing"})
        summary["status"] = "CHECK"

    # -------------------------
    # Numeric sanity checks (OHLCV)
    # -------------------------
    df_valid = safe_to_numeric(df_valid, ["open", "high", "low", "close", "volume"])

    # High < Low
    if "high" in df_valid.columns and "low" in df_valid.columns:
        bad_hl = int((df_valid["high"] < df_valid["low"]).sum())
        summary["bad_high_lt_low"] = bad_hl
        if bad_hl > 0:
            issues.append({"file": file_name, "issue": "HIGH_LT_LOW", "detail": f"{bad_hl} rows"})
            summary["status"] = "CHECK"
    else:
        summary["bad_high_lt_low"] = np.nan

    # Open outside [low, high]
    if {"open", "low", "high"}.issubset(df_valid.columns):
        bad_open = int(((df_valid["open"] < df_valid["low"]) | (df_valid["open"] > df_valid["high"])).sum())
        summary["bad_open_outside_range"] = bad_open
        if bad_open > 0:
            issues.append({"file": file_name, "issue": "OPEN_OUTSIDE_RANGE", "detail": f"{bad_open} rows"})
            summary["status"] = "CHECK"
    else:
        summary["bad_open_outside_range"] = np.nan

    # Close outside [low, high]
    if {"close", "low", "high"}.issubset(df_valid.columns):
        bad_close = int(((df_valid["close"] < df_valid["low"]) | (df_valid["close"] > df_valid["high"])).sum())
        summary["bad_close_outside_range"] = bad_close
        if bad_close > 0:
            issues.append({"file": file_name, "issue": "CLOSE_OUTSIDE_RANGE", "detail": f"{bad_close} rows"})
            summary["status"] = "CHECK"
    else:
        summary["bad_close_outside_range"] = np.nan

    # Non-positive close
    if "close" in df_valid.columns:
        nonpos = int((df_valid["close"] <= 0).sum())
        summary["non_positive_close"] = nonpos
        if nonpos > 0:
            issues.append({"file": file_name, "issue": "NON_POSITIVE_PRICE", "detail": f"{nonpos} rows (close<=0)"})
            summary["status"] = "CHECK"
    else:
        summary["non_positive_close"] = np.nan

    # Negative volume
    if "volume" in df_valid.columns:
        neg_vol = int((df_valid["volume"] < 0).sum())
        summary["negative_volume"] = neg_vol
        if neg_vol > 0:
            issues.append({"file": file_name, "issue": "NEGATIVE_VOLUME", "detail": f"{neg_vol} rows"})
            summary["status"] = "CHECK"
    else:
        summary["negative_volume"] = np.nan

    # -------------------------
    # Frequency diagnostics (REPORT + optional flag)
    # -------------------------
    if len(df_valid) >= 2:
        diffs_min = df_valid["_dt"].diff().dt.total_seconds().div(60).dropna()

        if len(diffs_min) > 0:
            mode_series = diffs_min.mode()
            mode_val = float(mode_series.iloc[0]) if len(mode_series) else np.nan

            pct_eq_5m = float((diffs_min == EXPECTED_FREQ_MIN).mean())
            max_gap = float(diffs_min.max())
            gaps_ge_flag = int((diffs_min >= GAP_FLAG_MIN).sum())

            summary["mode_diff_min"] = mode_val
            summary["pct_diff_eq_5m"] = pct_eq_5m
            summary["max_gap_min"] = max_gap
            summary["gaps_ge_flag"] = gaps_ge_flag

            # Flag irregular only if enabled
            if FLAG_IRREGULAR_IF_PCT_LT is not None and FLAG_IRREGULAR_IF_PCT_LT > 0:
                if pct_eq_5m < FLAG_IRREGULAR_IF_PCT_LT:
                    issues.append({
                        "file": file_name,
                        "issue": "IRREGULAR_FREQUENCY",
                        "detail": f"pct_eq_5m={pct_eq_5m:.2%}, mode={mode_val}"
                    })
                    summary["status"] = "CHECK"
        else:
            summary["mode_diff_min"] = np.nan
            summary["pct_diff_eq_5m"] = np.nan
            summary["max_gap_min"] = np.nan
            summary["gaps_ge_flag"] = np.nan
    else:
        summary["mode_diff_min"] = np.nan
        summary["pct_diff_eq_5m"] = np.nan
        summary["max_gap_min"] = np.nan
        summary["gaps_ge_flag"] = np.nan

    # -------------------------
    # Outlier close moves (REPORT + flags CHECK if any)
    # -------------------------
    if "close" in df_valid.columns and len(df_valid) >= 2:
        ret_abs = df_valid["close"].pct_change().abs()
        outlier = int((ret_abs > OUTLIER_PCT_MOVE).sum())
        summary["outlier_close_moves"] = outlier
        if outlier > 0:
            issues.append({
                "file": file_name,
                "issue": "OUTLIER_CLOSE_MOVE",
                "detail": f">{OUTLIER_PCT_MOVE:.0%}: {outlier} bars"
            })
            summary["status"] = "CHECK"
    else:
        summary["outlier_close_moves"] = np.nan

    # -------------------------
    # Coverage
    # -------------------------
    summary["dt_start"] = df_valid["_dt"].min()
    summary["dt_end"] = df_valid["_dt"].max()
    summary["unique_days"] = int(df_valid["_dt"].dt.date.nunique())

    issues_df = pd.DataFrame(issues)
    return summary, issues_df


def qc_folder(folder: str, pattern: str = "*.csv") -> tuple[pd.DataFrame, pd.DataFrame]:
    files = sorted(glob.glob(os.path.join(folder, pattern)))

    if len(files) == 0:
        print(f"[QC] WARNING: No CSV files found in: {folder}")
        # Return empty but structured DFs
        summary_df = pd.DataFrame(columns=["file", "status", "rows", "cols", "error"])
        issues_df = pd.DataFrame(columns=["file", "issue", "detail"])
        return summary_df, issues_df

    summaries = []
    all_issues = []

    for i, fp in enumerate(files, 1):
        s, iss = qc_single_file(fp)
        summaries.append(s)
        if iss is not None and len(iss) > 0:
            all_issues.append(iss)

        # Progress print
        if i % 50 == 0 or i == len(files):
            print(f"[QC] Processed {i}/{len(files)} files...")

    summary_df = pd.DataFrame(summaries)
    issues_df = pd.concat(all_issues, ignore_index=True) if len(all_issues) > 0 else pd.DataFrame(columns=["file", "issue", "detail"])
    return summary_df, issues_df


def save_qc_report(summary_df: pd.DataFrame, issues_df: pd.DataFrame, out_xlsx: str) -> None:
    """
    Always writes at least one visible sheet to avoid openpyxl 'no visible sheet' errors.
    """
    # Ensure expected columns exist
    if summary_df is None or len(summary_df) == 0:
        summary_df = pd.DataFrame(columns=[
            "file", "status", "rows", "cols", "error",
            "missing_cols", "extra_cols", "bad_datetime_rows",
            "duplicate_symbol_dt", "duplicate_dt",
            "monotonic_time",
            "mode_diff_min", "pct_diff_eq_5m", "max_gap_min", "gaps_ge_flag",
            "outlier_close_moves", "dt_start", "dt_end", "unique_days"
        ])

    if "status" not in summary_df.columns:
        summary_df["status"] = "CHECK"
    if "file" not in summary_df.columns:
        summary_df["file"] = ""

    sort_cols = [c for c in ["status", "file"] if c in summary_df.columns]
    if sort_cols:
        summary_out = summary_df.sort_values(sort_cols)
    else:
        summary_out = summary_df.copy()

    # Guarantee issues_df exists
    if issues_df is None:
        issues_df = pd.DataFrame(columns=["file", "issue", "detail"])
    if len(issues_df) == 0:
        issues_df = pd.DataFrame(columns=["file", "issue", "detail"])

    with pd.ExcelWriter(out_xlsx, engine="openpyxl") as writer:
        summary_out.to_excel(writer, index=False, sheet_name="summary")
        issues_df.to_excel(writer, index=False, sheet_name="issues")


# ============================================================
# RUN
# ============================================================

print("RAW_M5_DIR =", RAW_M5_DIR)
files = glob.glob(os.path.join(RAW_M5_DIR, "*.csv"))
print("CSV count:", len(files))
print("Sample files:", files[:5])

summary_df, issues_df = qc_folder(RAW_M5_DIR)

out_xlsx = os.path.join(OUT_DIR, "m5_qc_report.xlsx")
save_qc_report(summary_df, issues_df, out_xlsx)

print("Saved QC report:", out_xlsx)

# Quick peek
display(summary_df.head(10))
display(issues_df.head(20))


RAW_M5_DIR = D:/work/Client/Maatra/Trade Level Data/M5 Raw data
CSV count: 131
Sample files: ['D:/work/Client/Maatra/Trade Level Data/M5 Raw data\\ACN.csv', 'D:/work/Client/Maatra/Trade Level Data/M5 Raw data\\ADBE.csv', 'D:/work/Client/Maatra/Trade Level Data/M5 Raw data\\ADI.csv', 'D:/work/Client/Maatra/Trade Level Data/M5 Raw data\\ADP.csv', 'D:/work/Client/Maatra/Trade Level Data/M5 Raw data\\ADSK.csv']
[QC] Processed 50/131 files...
[QC] Processed 100/131 files...
[QC] Processed 131/131 files...
Saved QC report: D:/work/Client/Maatra/Trade Level Data/M5 Raw data\QC Reports\m5_qc_report.xlsx


Unnamed: 0,file,status,rows,cols,error,missing_cols,extra_cols,bad_datetime_rows,na_symbol,na_date,...,non_positive_close,negative_volume,mode_diff_min,pct_diff_eq_5m,max_gap_min,gaps_ge_flag,outlier_close_moves,dt_start,dt_end,unique_days
0,ACN.csv,OK,29666,7,,,,0,0,0,...,0,0,5.0,0.829294,4990.0,456,0,2025-01-01 00:00:00,2025-12-27 00:55:00,257
1,ADBE.csv,OK,38143,7,,,,0,0,0,...,0,0,5.0,0.893372,4985.0,277,0,2025-01-01 00:00:00,2025-12-27 00:55:00,259
2,ADI.csv,OK,30437,7,,,,0,0,0,...,0,0,5.0,0.840189,5040.0,460,0,2025-01-01 00:20:00,2025-12-27 00:55:00,258
3,ADP.csv,CHECK,26815,7,,,,0,0,0,...,0,0,5.0,0.821362,5595.0,567,15,2025-01-01 00:35:00,2025-12-27 00:55:00,255
4,ADSK.csv,OK,27555,7,,,,0,0,0,...,0,0,5.0,0.81088,5125.0,525,0,2025-01-02 09:00:00,2025-12-27 00:55:00,255
5,AEO.csv,OK,29541,7,,,,0,0,0,...,0,0,5.0,0.854028,5035.0,495,0,2025-01-07 00:30:00,2025-12-27 00:55:00,251
6,AG.csv,OK,38643,7,,,,0,0,0,...,0,0,5.0,0.905983,4995.0,269,0,2025-01-01 00:05:00,2025-12-27 00:55:00,259
7,AJG.csv,CHECK,22970,7,,,,0,0,0,...,0,0,5.0,0.825765,5040.0,668,46,2025-01-01 00:25:00,2025-12-27 00:55:00,248
8,AKAM.csv,OK,25873,7,,,,0,0,0,...,0,0,5.0,0.820153,5160.0,621,0,2025-01-01 00:50:00,2025-12-27 00:55:00,256
9,AMAT.csv,OK,39376,7,,,,0,0,0,...,0,0,5.0,0.910984,5005.0,263,0,2025-01-01 00:15:00,2025-12-27 00:55:00,259


Unnamed: 0,file,issue,detail
0,ADP.csv,OUTLIER_CLOSE_MOVE,>20%: 15 bars
1,AJG.csv,OUTLIER_CLOSE_MOVE,>20%: 46 bars
2,AMP.csv,OUTLIER_CLOSE_MOVE,>20%: 65 bars
3,ATI.csv,OUTLIER_CLOSE_MOVE,>20%: 40 bars
4,BKNG.csv,IRREGULAR_FREQUENCY,"pct_eq_5m=77.99%, mode=5.0"
5,BKNG.csv,OUTLIER_CLOSE_MOVE,>20%: 15 bars
6,BR.csv,OUTLIER_CLOSE_MOVE,>20%: 124 bars
7,CDNS.csv,OUTLIER_CLOSE_MOVE,>20%: 3 bars
8,CHIQ.csv,IRREGULAR_FREQUENCY,"pct_eq_5m=74.38%, mode=5.0"
9,CHIQ.csv,OUTLIER_CLOSE_MOVE,>20%: 83 bars


In [5]:
import os
import pandas as pd

M5_DIR = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data"

files = []
for f in os.listdir(M5_DIR):
    if f.lower().endswith(".csv"):
        ticker = os.path.splitext(f)[0].strip().upper()
        files.append({
            "filename": f,
            "ticker": ticker
        })

df = pd.DataFrame(files).sort_values("ticker").reset_index(drop=True)

out_path = os.path.join(M5_DIR, "m5_file_list.csv")
df.to_csv(out_path, index=False)

print("Total M5 files:", len(df))
print("Saved to:", out_path)
print("\nSample:")
print(df.head(20))


Total M5 files: 131
Saved to: D:/work/Client/Maatra/Trade Level Data/M5 Raw data\m5_file_list.csv

Sample:
    filename ticker
0    ACN.csv    ACN
1   ADBE.csv   ADBE
2    ADI.csv    ADI
3    ADP.csv    ADP
4   ADSK.csv   ADSK
5    AEO.csv    AEO
6     AG.csv     AG
7    AJG.csv    AJG
8   AKAM.csv   AKAM
9   AMAT.csv   AMAT
10   AMP.csv    AMP
11  ASML.csv   ASML
12   ATI.csv    ATI
13    BA.csv     BA
14  BKNG.csv   BKNG
15    BR.csv     BR
16   BSX.csv    BSX
17  CDNS.csv   CDNS
18   CFG.csv    CFG
19  CHIQ.csv   CHIQ


In [None]:
TRADE_XLSX_PATH = r"D:/work/Client/Maatra/Trade Level Data/3 Month vol 14 data with sharpe prob Fnl.xlsx"   # <-- your file

M5_DIR = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data"          # <-- your M5 folder

In [None]:
import os
import numpy as np
import pandas as pd
from dataclasses import dataclass
from datetime import timedelta

# ============================================================
# CONFIG (EDIT THESE)
# ============================================================

TRADE_XLSX_PATH   = r"D:/work/Client/Maatra/Trade Level Data/3 Month vol 14 data with sharpe prob Fnl.xlsx"
M5_DIR            = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data"
M5_FILE_LIST_PATH = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data/m5_file_list.csv"

OUT_DIR = os.path.join(os.path.dirname(TRADE_XLSX_PATH), "TrailingSL_TeamPack")
os.makedirs(OUT_DIR, exist_ok=True)

# Run size: set to None to run ALL trades
N_TRADES = None
RANDOM_SEED = 42

# Trailing stop parameters
ATR_WINDOW = 14
ATR_MULT = 3.0

SLOPE_WINDOW = 12
SLOPE_METHOD = "regression"   # "regression" or "polyfit"
USE_TIME_AWARE_SLOPE = True

# Regime thresholds on ATR-normalised slope
SLOPE_UPPER = 0.30
SLOPE_LOWER = 0.10

# Tightening / acceleration controls
ACCEL_FACTOR = 1.0
MIN_MULT = 0.5

# Entry is at EOD on EntryDate, so trailing starts from next day
APPLY_FROM_NEXT_DAY = True

# Uncapped mode safety: stop after X calendar days if still not hit
UNCAPPED_MAX_DAYS = 120  # set 90/120/180 as you like


# ============================================================
# Utilities: tickers + M5 map
# ============================================================

def normalize_ticker(x: str) -> str:
    """Uppercase alnum only. Matches your M5 filenames like ADBE.csv, ADP.csv etc."""
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return ""
    x = str(x).strip().upper()
    return "".join([ch for ch in x if ch.isalnum()])

def ticker_from_currency(currency: str) -> str:
    """Trade file stores Currency like 'ADBE/USD'. We take base and normalize."""
    if currency is None or (isinstance(currency, float) and np.isnan(currency)):
        return ""
    s = str(currency).strip().upper()
    base = s.split("/")[0].strip() if "/" in s else s
    return normalize_ticker(base)

def load_m5_file_map(m5_dir: str, file_list_path: str) -> dict:
    """Build ticker_norm -> absolute filepath from m5_file_list.csv."""
    fl = pd.read_csv(file_list_path)
    fl["ticker_norm"] = fl["ticker"].apply(normalize_ticker)
    fl["path"] = fl["filename"].apply(lambda f: os.path.join(m5_dir, f))
    return dict(zip(fl["ticker_norm"], fl["path"]))

M5_MAP = load_m5_file_map(M5_DIR, M5_FILE_LIST_PATH)

# Cache to avoid re-reading same M5 file repeatedly
_m5_cache = {}

def load_m5(ticker_norm: str) -> pd.DataFrame | None:
    """Load M5 OHLCV data for ticker_norm. Returns df sorted by datetime with _dt column."""
    if ticker_norm in _m5_cache:
        return _m5_cache[ticker_norm]

    fp = M5_MAP.get(ticker_norm)
    if not fp or (not os.path.exists(fp)):
        _m5_cache[ticker_norm] = None
        return None

    df = pd.read_csv(fp)
    df.columns = [c.strip().lower() for c in df.columns]

    needed = ["date", "open", "high", "low", "close"]
    if not all(c in df.columns for c in needed):
        _m5_cache[ticker_norm] = None
        return None

    df["_dt"] = pd.to_datetime(df["date"], errors="coerce")
    df = df.dropna(subset=["_dt"]).sort_values("_dt").reset_index(drop=True)

    for c in ["open", "high", "low", "close", "volume"]:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")

    # basic cleanup
    df = df.dropna(subset=["open", "high", "low", "close"])
    _m5_cache[ticker_norm] = df
    return df


# ============================================================
# Indicators
# ============================================================

def compute_atr(df: pd.DataFrame, window: int) -> pd.Series:
    """ATR computed from True Range on M5 bars."""
    high = df["high"]
    low = df["low"]
    close = df["close"]
    tr = pd.concat([
        high - low,
        (high - close.shift(1)).abs(),
        (low - close.shift(1)).abs()
    ], axis=1).max(axis=1)
    return tr.rolling(window, min_periods=window).mean()

def rolling_slope(y: pd.Series, tmin: pd.Series | None, method: str) -> float:
    """Slope of y over time. If tmin is provided, slope is per-minute scaled; otherwise per-index."""
    yy = y.values.astype(float)
    if len(yy) < 2 or np.all(np.isnan(yy)):
        return np.nan

    if tmin is None:
        xx = np.arange(len(yy), dtype=float)
    else:
        xx = tmin.values.astype(float)
        xx = xx - xx[0]

    mask = ~np.isnan(xx) & ~np.isnan(yy)
    xx = xx[mask]; yy = yy[mask]
    if len(yy) < 2:
        return np.nan

    if method == "polyfit":
        return np.polyfit(xx, yy, 1)[0]

    xm = xx.mean(); ym = yy.mean()
    denom = ((xx - xm) ** 2).sum()
    if denom == 0:
        return np.nan
    return (((xx - xm) * (yy - ym)).sum()) / denom


# ============================================================
# Build trade list from Excel
# ============================================================

def build_trade_summary(daily: pd.DataFrame) -> pd.DataFrame:
    """
    Converts your daily/row-level trade file into one row per TradeID.

    We use:
      - Entry: DayStatus=OPEN and TradeRole=OPEN_POSITION (price = entry price)
      - Scheduled exit price: TradeRole=EXIT_TRADE if present else last price in trade
      - EntryDate = Opening Date
      - ExitDate_Scheduled = Closing Date
    """
    df = daily.copy()
    df.columns = [c.strip() for c in df.columns]

    required = ["TradeID", "Currency", "Direction", "date", "price",
                "Opening Date", "Closing Date", "DayStatus", "TradeRole"]
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"Trade file missing required columns: {missing}")

    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    df["Opening Date"] = pd.to_datetime(df["Opening Date"], errors="coerce")
    df["Closing Date"] = pd.to_datetime(df["Closing Date"], errors="coerce")
    df["price"] = pd.to_numeric(df["price"], errors="coerce")

    df["DayStatusU"] = df["DayStatus"].astype(str).str.upper().str.strip()
    df["TradeRoleU"] = df["TradeRole"].astype(str).str.upper().str.strip()
    df["TickerNorm"] = df["Currency"].apply(ticker_from_currency)

    open_rows = df[(df["DayStatusU"] == "OPEN") & (df["TradeRoleU"] == "OPEN_POSITION")].copy()
    exit_rows = df[df["TradeRoleU"] == "EXIT_TRADE"].copy()

    entry = open_rows.sort_values("date").groupby("TradeID", as_index=False).first()
    entry = entry[["TradeID", "TickerNorm", "Currency", "Direction", "Opening Date", "Closing Date", "price"]].rename(columns={
        "Opening Date": "EntryDate",
        "Closing Date": "ExitDate_Scheduled",
        "price": "EntryPrice"
    })

    if len(exit_rows) > 0:
        ex = exit_rows.sort_values("date").groupby("TradeID", as_index=False).last()
        ex = ex[["TradeID", "price"]].rename(columns={"price": "ExitPrice_Scheduled"})
        out = entry.merge(ex, on="TradeID", how="left")
    else:
        out = entry.copy()
        out["ExitPrice_Scheduled"] = np.nan

    # fallback scheduled exit price if EXIT_TRADE missing
    last_px = df.sort_values("date").groupby("TradeID", as_index=False).last()[["TradeID", "price"]].rename(columns={"price": "ExitPrice_Fallback"})
    out = out.merge(last_px, on="TradeID", how="left")
    out["ExitPrice_Scheduled"] = out["ExitPrice_Scheduled"].fillna(out["ExitPrice_Fallback"])
    out = out.drop(columns=["ExitPrice_Fallback"])

    out = out.dropna(subset=["TradeID","TickerNorm","Direction","EntryDate","ExitDate_Scheduled","EntryPrice","ExitPrice_Scheduled"])
    return out


# ============================================================
# Trailing stop simulation (two modes)
# ============================================================

@dataclass
class TrailResult:
    exit_time: pd.Timestamp
    exit_price: float
    exit_reason: str  # TrailingSL / Scheduled / MaxDays / NoData
    best_favorable: float
    max_high: float
    min_low: float
    stop_last: float
    mult_last: float

def simulate_trailing(tr: pd.Series, m5: pd.DataFrame, mode: str) -> TrailResult:
    """
    mode:
      - 'capped': trailing active only until scheduled exit date (T+3 cap)
      - 'uncapped': trailing runs beyond scheduled exit until stop hit (or max days / no data)
    """
    entry_date = pd.to_datetime(tr["EntryDate"])
    sched_exit_date = pd.to_datetime(tr["ExitDate_Scheduled"])
    entry_price = float(tr["EntryPrice"])
    sched_exit_price = float(tr["ExitPrice_Scheduled"])
    direction = str(tr["Direction"]).strip().upper()
    is_long = (direction == "LONG")

    start_dt = (entry_date + timedelta(days=1)) if APPLY_FROM_NEXT_DAY else entry_date

    if mode == "capped":
        end_dt = sched_exit_date + timedelta(days=1)  # include that day
    else:
        end_dt = start_dt + timedelta(days=UNCAPPED_MAX_DAYS)

    df = m5[(m5["_dt"] >= start_dt) & (m5["_dt"] < end_dt)].copy()
    if len(df) == 0:
        # no intraday bars -> fallback to scheduled
        return TrailResult(
            exit_time=sched_exit_date,
            exit_price=sched_exit_price,
            exit_reason="NoData",
            best_favorable=entry_price,
            max_high=np.nan,
            min_low=np.nan,
            stop_last=np.nan,
            mult_last=np.nan
        )

    # Indicators
    df["ATR"] = compute_atr(df, ATR_WINDOW)

    if USE_TIME_AWARE_SLOPE:
        df["_tmin"] = (df["_dt"] - df["_dt"].iloc[0]).dt.total_seconds() / 60.0
        df["slope"] = df["close"].rolling(SLOPE_WINDOW, min_periods=SLOPE_WINDOW).apply(
            lambda s: rolling_slope(s, tmin=df.loc[s.index, "_tmin"], method=SLOPE_METHOD),
            raw=False
        )
    else:
        df["slope"] = df["close"].rolling(SLOPE_WINDOW, min_periods=SLOPE_WINDOW).apply(
            lambda s: rolling_slope(s, tmin=None, method=SLOPE_METHOD),
            raw=False
        )

    df["slope_norm"] = df["slope"] / df["ATR"]

    # Track extremes over the whole evaluation window
    max_high = float(df["high"].max())
    min_low = float(df["low"].min())

    # "Best favorable price" for the trade direction
    best_favorable = entry_price
    stop = -np.inf if is_long else np.inf
    stop_last = np.nan
    mult_last = np.nan

    for i in range(len(df)):
        row = df.iloc[i]
        dt = row["_dt"]
        atr = row["ATR"]
        if pd.isna(atr) or atr <= 0:
            continue

        # update favorable extreme using bar extremes
        if is_long:
            best_favorable = max(best_favorable, row["high"])
        else:
            best_favorable = min(best_favorable, row["low"])

        slope_norm = row["slope_norm"]

        mult = ATR_MULT
        if not pd.isna(slope_norm) and slope_norm > SLOPE_UPPER:
            mult = max(MIN_MULT, ATR_MULT - (slope_norm * ACCEL_FACTOR))
        elif not pd.isna(slope_norm) and slope_norm < SLOPE_LOWER:
            mult = ATR_MULT

        # chandelier-style trailing stop (anchored to best favorable extreme)
        if is_long:
            stop = max(stop, best_favorable - mult * atr)
            stop_last = float(stop)
            mult_last = float(mult)
            if row["low"] <= stop:
                return TrailResult(dt, float(stop), "TrailingSL", float(best_favorable), max_high, min_low, stop_last, mult_last)
        else:
            stop = min(stop, best_favorable + mult * atr)  # best_favorable is the lowest low for shorts
            stop_last = float(stop)
            mult_last = float(mult)
            if row["high"] >= stop:
                return TrailResult(dt, float(stop), "TrailingSL", float(best_favorable), max_high, min_low, stop_last, mult_last)

    # If not hit:
    if mode == "capped":
        return TrailResult(sched_exit_date, sched_exit_price, "Scheduled",
                           float(best_favorable), max_high, min_low, stop_last, mult_last)
    else:
        # uncapped: if not hit by max days, mark as MaxDays exit at last close
        last_close = float(df["close"].iloc[-1])
        return TrailResult(df["_dt"].iloc[-1], last_close, "MaxDays",
                           float(best_favorable), max_high, min_low, stop_last, mult_last)


# ============================================================
# Metrics + summary helpers
# ============================================================

def direction_aware_better(direction: str, new_exit: float, old_exit: float) -> float:
    """Returns improvement in *price units* where >0 is better for the trade."""
    d = str(direction).strip().upper()
    if pd.isna(new_exit) or pd.isna(old_exit):
        return np.nan
    if d == "LONG":
        return float(new_exit - old_exit)
    else:
        return float(old_exit - new_exit)

def summarize(trades_out: pd.DataFrame) -> pd.DataFrame:
    """Builds a compact summary table overall and split by direction."""
    df = trades_out.copy()
    df["ImprovedFlag"] = df["PriceImprovement"] > 0
    df["WorseFlag"] = df["PriceImprovement"] < 0
    df["UnchangedFlag"] = df["PriceImprovement"] == 0
    df["TriggeredFlag"] = df["ExitReason"].astype(str).eq("TrailingSL")

    def agg_block(g: pd.DataFrame) -> dict:
        n = len(g)
        trig = int(g["TriggeredFlag"].sum())
        notrig = int((~g["TriggeredFlag"]).sum())
        improved = int(g["ImprovedFlag"].sum())
        worse = int(g["WorseFlag"].sum())
        unchanged = int(g["UnchangedFlag"].sum())
        improved_trig = int((g["TriggeredFlag"] & g["ImprovedFlag"]).sum())
        worse_trig = int((g["TriggeredFlag"] & g["WorseFlag"]).sum())
        unchanged_trig = int((g["TriggeredFlag"] & g["UnchangedFlag"]).sum())

        return {
            "Trades": n,
            "TrailingTriggered": trig,
            "TrailingNotTriggered": notrig,
            "PctTriggered": trig / n if n else np.nan,

            "Improved": improved,
            "Worse": worse,
            "Unchanged": unchanged,
            "WinRate_Improved": improved / n if n else np.nan,

            "Improved_Triggered": improved_trig,
            "Worse_Triggered": worse_trig,
            "Unchanged_Triggered": unchanged_trig,

            "SumPriceImprovement": float(g["PriceImprovement"].sum(skipna=True)),
            "AvgPriceImprovement": float(g["PriceImprovement"].mean(skipna=True)),
            "MedianPriceImprovement": float(g["PriceImprovement"].median(skipna=True)),
        }

    rows = []
    rows.append({"Group": "ALL", **agg_block(df)})
    for d, g in df.groupby(df["Direction"].astype(str).str.upper().str.strip()):
        rows.append({"Group": f"Direction={d}", **agg_block(g)})

    return pd.DataFrame(rows)


# ============================================================
# Main runner: produce both capped + uncapped outputs
# ============================================================

def run(mode: str) -> tuple[pd.DataFrame, pd.DataFrame]:
    daily = pd.read_excel(TRADE_XLSX_PATH)
    trades = build_trade_summary(daily)

    if N_TRADES is None:
        sample = trades.reset_index(drop=True)
    else:
        sample = trades.sample(n=min(N_TRADES, len(trades)), random_state=RANDOM_SEED).reset_index(drop=True)

    out_rows = []
    missing_m5 = 0

    for i in range(len(sample)):
        tr = sample.iloc[i]
        t = tr["TickerNorm"]
        m5 = load_m5(t)
        if m5 is None:
            missing_m5 += 1
            continue

        res = simulate_trailing(tr, m5, mode=mode)

        old_exit_price = float(tr["ExitPrice_Scheduled"])
        new_exit_price = float(res.exit_price)

        # direction-aware improvement in price units
        price_impr = direction_aware_better(tr["Direction"], new_exit_price, old_exit_price)

        out_rows.append({
            "TradeID": tr["TradeID"],
            "Currency": tr["Currency"],
            "TickerNorm": tr["TickerNorm"],
            "Direction": tr["Direction"],
            "EntryDate": tr["EntryDate"],
            "EntryPrice": tr["EntryPrice"],

            # Original exit (your existing logic)
            "ExitDate_Original": tr["ExitDate_Scheduled"],
            "ExitPrice_Original": old_exit_price,

            # Trailing exit
            "ExitTime_Trailing": res.exit_time,
            "ExitDate_Trailing": pd.to_datetime(res.exit_time).date() if pd.notna(res.exit_time) else np.nan,
            "ExitPrice_Trailing": new_exit_price,
            "ExitReason": res.exit_reason,

            # Extremes / excursions
            "MaxHigh_InWindow": res.max_high,
            "MinLow_InWindow": res.min_low,
            "BestFavorable_InWindow": res.best_favorable,  # long=max high seen; short=min low seen

            # Improvement metric
            "PriceImprovement": price_impr,
            "PctImprovement_vs_OriginalExit": (price_impr / old_exit_price) if old_exit_price != 0 else np.nan,

            # For quick review
            "ClosedEarlierOrSameDay": (pd.to_datetime(res.exit_time).date() <= pd.to_datetime(tr["ExitDate_Scheduled"]).date())
                                      if (pd.notna(res.exit_time) and pd.notna(tr["ExitDate_Scheduled"])) else np.nan,
        })

        if (i + 1) % 500 == 0:
            print(f"[{mode}] Processed {i+1}/{len(sample)} trades...")

    out = pd.DataFrame(out_rows)
    summary = summarize(out)

    print(f"\n[{mode}] Done. Trades in output: {len(out)} | Missing M5 skipped: {missing_m5}")
    return out, summary


# ============================================================
# Execute both modes + save
# ============================================================

capped_trades, capped_summary = run(mode="capped")
uncapped_trades, uncapped_summary = run(mode="uncapped")

capped_trades_csv = os.path.join(OUT_DIR, "trades_trailing_capped.csv")
capped_summary_csv = os.path.join(OUT_DIR, "summary_trailing_capped.csv")

uncapped_trades_csv = os.path.join(OUT_DIR, "trades_trailing_uncapped.csv")
uncapped_summary_csv = os.path.join(OUT_DIR, "summary_trailing_uncapped.csv")

capped_trades.to_csv(capped_trades_csv, index=False)
capped_summary.to_csv(capped_summary_csv, index=False)

uncapped_trades.to_csv(uncapped_trades_csv, index=False)
uncapped_summary.to_csv(uncapped_summary_csv, index=False)

print("\nSaved files:")
print(capped_trades_csv)
print(capped_summary_csv)
print(uncapped_trades_csv)
print(uncapped_summary_csv)

print("\n=== CAPPED SUMMARY (T+3 cap) ===")
print(capped_summary)

print("\n=== UNCAPED SUMMARY (trail-only, max-days safety) ===")
print(uncapped_summary)


Trade file: D:/work/Client/Maatra/Trade Level Data/3 Month vol 14 data with sharpe prob Fnl.xlsx
M5 dir: D:/work/Client/Maatra/Trade Level Data/M5 Raw data
M5 tickers available: 131
Output dir: D:/work/Client/Maatra/Trade Level Data\TrailingSL_FullRun
Unique trades with clean entry/exit: 12815
Running ALL trades: 12815
Processed 1250/12815 trades...
Processed 1750/12815 trades...
Processed 2250/12815 trades...
Processed 4500/12815 trades...
Processed 7500/12815 trades...
Processed 7750/12815 trades...
Processed 8000/12815 trades...
Processed 8750/12815 trades...
Processed 9000/12815 trades...
Processed 9250/12815 trades...
Processed 9500/12815 trades...
Processed 11000/12815 trades...
Processed 11750/12815 trades...
Processed 12250/12815 trades...
Processed 12750/12815 trades...

Done.
Saved results: D:/work/Client/Maatra/Trade Level Data\TrailingSL_FullRun\all_trades_trailing_sl_results.csv
Saved missing list: D:/work/Client/Maatra/Trade Level Data\TrailingSL_FullRun\missing_m5_trades

In [1]:
import os
import numpy as np
import pandas as pd

# ============================================================
# CONFIG (EDIT THESE 3 PATHS)
# ============================================================

RESULTS_CSV = r"D:/work/Client/Maatra/Trade Level Data/TrailingSL_FullRun/all_trades_trailing_sl_results.csv"
M5_DIR = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data"
M5_FILE_LIST_PATH = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data/m5_file_list.csv"

OUT_DIR = os.path.join(os.path.dirname(RESULTS_CSV), "PostProcess_BaselineM5")
os.makedirs(OUT_DIR, exist_ok=True)

# If True: baseline is last candle close on ExitDate.
# If no candle exists for that calendar date, fallback to last candle BEFORE end of that day.
USE_FALLBACK_LAST_BEFORE_DAY_END = True

# ============================================================
# HELPERS
# ============================================================

def normalize_ticker(x: str) -> str:
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return ""
    x = str(x).strip().upper()
    return "".join([ch for ch in x if ch.isalnum()])

def load_m5_file_map(m5_dir: str, file_list_path: str) -> dict:
    fl = pd.read_csv(file_list_path)
    fl["ticker_norm"] = fl["ticker"].apply(normalize_ticker)
    fl["path"] = fl["filename"].apply(lambda f: os.path.join(m5_dir, f))
    return dict(zip(fl["ticker_norm"], fl["path"]))

M5_MAP = load_m5_file_map(M5_DIR, M5_FILE_LIST_PATH)

_m5_daily_last_close_cache = {}

def build_daily_last_close_for_ticker(ticker_norm: str) -> pd.Series | None:
    """
    Returns a Series indexed by 'date' (datetime.date) containing the last M5 close of that date.
    Cached per ticker.
    """
    if ticker_norm in _m5_daily_last_close_cache:
        return _m5_daily_last_close_cache[ticker_norm]

    fp = M5_MAP.get(ticker_norm)
    if not fp or (not os.path.exists(fp)):
        _m5_daily_last_close_cache[ticker_norm] = None
        return None

    df = pd.read_csv(fp)
    df.columns = [c.strip().lower() for c in df.columns]
    needed = ["date", "close"]
    if not all(c in df.columns for c in needed):
        _m5_daily_last_close_cache[ticker_norm] = None
        return None

    df["_dt"] = pd.to_datetime(df["date"], errors="coerce")
    df = df.dropna(subset=["_dt"]).sort_values("_dt")
    df["close"] = pd.to_numeric(df["close"], errors="coerce")
    df = df.dropna(subset=["close"])

    if len(df) == 0:
        _m5_daily_last_close_cache[ticker_norm] = None
        return None

    df["_d"] = df["_dt"].dt.date
    # last close per day
    last_close = df.groupby("_d")["close"].last()
    _m5_daily_last_close_cache[ticker_norm] = last_close
    return last_close

def baseline_exit_price_m5(ticker_norm: str, exit_date: pd.Timestamp) -> float:
    """
    Baseline exit = last M5 close on exit_date.
    If missing and fallback enabled, uses last close before end-of-day.
    """
    s = build_daily_last_close_for_ticker(ticker_norm)
    if s is None or pd.isna(exit_date):
        return np.nan

    d = exit_date.date()
    if d in s.index:
        return float(s.loc[d])

    if not USE_FALLBACK_LAST_BEFORE_DAY_END:
        return np.nan

    # Fallback: find nearest previous day that exists (still consistent "exit around that day end")
    # This is a pragmatic fallback in case of holiday / missing data.
    prev_days = s.index[s.index < d]
    if len(prev_days) == 0:
        return np.nan
    return float(s.loc[prev_days[-1]])

def direction_aware_improvement(direction: str, trailing_exit: float, baseline_exit: float) -> float:
    """
    PriceImprovement > 0 means trailing exit is better than baseline exit.
    """
    if pd.isna(trailing_exit) or pd.isna(baseline_exit):
        return np.nan
    d = str(direction).strip().upper()
    if d == "LONG":
        return float(trailing_exit - baseline_exit)
    else:
        # SHORT
        return float(baseline_exit - trailing_exit)

def build_summary(df: pd.DataFrame) -> dict:
    n = len(df)
    if n == 0:
        return {"n_trades": 0}

    triggered = df["ExitReason"].astype(str).eq("TrailingSL")
    n_trig = int(triggered.sum())
    n_not = int((~triggered).sum())

    # use PriceImprovement (direction-aware)
    imp = df["PriceImprovement"].copy()

    def bucket(sub: pd.Series):
        sub = sub.dropna()
        if len(sub) == 0:
            return {"count": 0, "improved": 0, "worse": 0, "unchanged": 0,
                    "win_rate": np.nan, "avg": np.nan, "median": np.nan, "sum": np.nan}
        return {
            "count": int(len(sub)),
            "improved": int((sub > 0).sum()),
            "worse": int((sub < 0).sum()),
            "unchanged": int((sub == 0).sum()),
            "win_rate": float((sub > 0).mean()),
            "avg": float(sub.mean()),
            "median": float(sub.median()),
            "sum": float(sub.sum()),
        }

    overall_b = bucket(imp)
    trig_b = bucket(df.loc[triggered, "PriceImprovement"])
    not_b = bucket(df.loc[~triggered, "PriceImprovement"])

    # Also % improvement stats
    pct_imp = df["PctImprovement"].dropna()
    pct_avg = float(pct_imp.mean()) if len(pct_imp) else np.nan
    pct_median = float(pct_imp.median()) if len(pct_imp) else np.nan

    return {
        "n_trades": n,
        "n_trailing_triggered": n_trig,
        "n_trailing_not_triggered": n_not,
        "pct_trailing_triggered": n_trig / n if n else np.nan,

        "overall_improved": overall_b["improved"],
        "overall_worse": overall_b["worse"],
        "overall_unchanged": overall_b["unchanged"],
        "overall_win_rate": overall_b["win_rate"],
        "overall_avg_price_improvement": overall_b["avg"],
        "overall_median_price_improvement": overall_b["median"],
        "overall_sum_price_improvement": overall_b["sum"],

        "triggered_improved": trig_b["improved"],
        "triggered_worse": trig_b["worse"],
        "triggered_unchanged": trig_b["unchanged"],
        "triggered_win_rate": trig_b["win_rate"],
        "triggered_avg_price_improvement": trig_b["avg"],
        "triggered_median_price_improvement": trig_b["median"],
        "triggered_sum_price_improvement": trig_b["sum"],

        "not_triggered_improved": not_b["improved"],
        "not_triggered_worse": not_b["worse"],
        "not_triggered_unchanged": not_b["unchanged"],

        "overall_avg_pct_improvement": pct_avg,
        "overall_median_pct_improvement": pct_median,
    }

def group_summary(df: pd.DataFrame, group_cols: list[str]) -> pd.DataFrame:
    rows = []
    for keys, g in df.groupby(group_cols):
        if not isinstance(keys, tuple):
            keys = (keys,)
        s = build_summary(g)
        row = {col: val for col, val in zip(group_cols, keys)}
        row.update(s)
        rows.append(row)
    out = pd.DataFrame(rows)
    if "overall_sum_price_improvement" in out.columns:
        out = out.sort_values("overall_sum_price_improvement", ascending=False)
    return out

# ============================================================
# RUN
# ============================================================

df = pd.read_csv(RESULTS_CSV)

# Required columns from your trailing run output
required_cols = ["TradeID", "TickerNorm", "Direction", "ExitDate_Scheduled", "ExitPrice_TrailingSL", "ExitReason"]
missing = [c for c in required_cols if c not in df.columns]
if missing:
    raise ValueError(f"Results CSV missing required columns: {missing}")

df["ExitDate_Scheduled"] = pd.to_datetime(df["ExitDate_Scheduled"], errors="coerce")

# Compute baseline exit price from M5 (last candle close on exit date)
baseline_prices = []
missing_baseline = 0

unique_tickers = df["TickerNorm"].dropna().astype(str).unique()
print("Tickers in results:", len(unique_tickers))

for i, row in df.iterrows():
    t = str(row["TickerNorm"]).strip()
    ed = row["ExitDate_Scheduled"]
    px = baseline_exit_price_m5(t, ed)
    if pd.isna(px):
        missing_baseline += 1
    baseline_prices.append(px)

df["ExitPrice_Baseline_M5Close"] = baseline_prices

# Direction-aware improvement (this is your core metric)
df["PriceImprovement"] = df.apply(
    lambda r: direction_aware_improvement(r["Direction"], r["ExitPrice_TrailingSL"], r["ExitPrice_Baseline_M5Close"]),
    axis=1
)

# % improvement (normalize by baseline exit price)
df["PctImprovement"] = df["PriceImprovement"] / df["ExitPrice_Baseline_M5Close"]

# Save enriched results
out_results = os.path.join(OUT_DIR, "results_with_baseline_m5.csv")
df.to_csv(out_results, index=False)

# Overall summary + group summaries
overall = build_summary(df)
overall_df = pd.DataFrame([overall])

summary_overall_csv = os.path.join(OUT_DIR, "summary_overall_m5exit.csv")
overall_df.to_csv(summary_overall_csv, index=False)

by_symbol = group_summary(df, ["TickerNorm"])
by_dir = group_summary(df, ["Direction"])
by_symbol_dir = group_summary(df, ["TickerNorm", "Direction"])

by_symbol_csv = os.path.join(OUT_DIR, "summary_by_symbol_m5exit.csv")
by_dir_csv = os.path.join(OUT_DIR, "summary_by_direction_m5exit.csv")
by_symbol_dir_csv = os.path.join(OUT_DIR, "summary_by_symbol_direction_m5exit.csv")

by_symbol.to_csv(by_symbol_csv, index=False)
by_dir.to_csv(by_dir_csv, index=False)
by_symbol_dir.to_csv(by_symbol_dir_csv, index=False)

# Print what you care about
print("\n================ RESULTS (Baseline = ClosingDate last M5 close) ================")
print(f"Trades processed: {len(df)}")
print(f"Baseline M5 close missing: {missing_baseline} ({missing_baseline / max(len(df),1):.2%})")

print("\n=== Summary (what you asked) ===")
print(f"Total trades: {overall['n_trades']}")
print(f"Trailing triggered: {overall['n_trailing_triggered']} ({overall['pct_trailing_triggered']:.2%})")
print(f"Trailing NOT triggered: {overall['n_trailing_not_triggered']} ({(overall['n_trailing_not_triggered']/overall['n_trades']):.2%})")

print("\nWithin TRIGGERED trades:")
print(f"  Improved: {overall['triggered_improved']}")
print(f"  Worse: {overall['triggered_worse']}")
print(f"  Unchanged: {overall['triggered_unchanged']}")
print(f"  Win-rate (improved): {overall['triggered_win_rate']:.2%}" if pd.notna(overall['triggered_win_rate']) else "  Win-rate: NA")

print("\nOverall impact (price units):")
print(f"  Total price improvement (sum): {overall['overall_sum_price_improvement']}")
print(f"  Avg price improvement: {overall['overall_avg_price_improvement']}")
print(f"  Median price improvement: {overall['overall_median_price_improvement']}")

print("\nOverall impact (%):")
print(f"  Avg % improvement: {overall['overall_avg_pct_improvement']:.4%}" if pd.notna(overall['overall_avg_pct_improvement']) else "  Avg % improvement: NA")
print(f"  Median % improvement: {overall['overall_median_pct_improvement']:.4%}" if pd.notna(overall['overall_median_pct_improvement']) else "  Median % improvement: NA")

print("\nSaved outputs:")
print(out_results)
print(summary_overall_csv)
print(by_symbol_csv)
print(by_dir_csv)
print(by_symbol_dir_csv)


Tickers in results: 62

Trades processed: 4765
Baseline M5 close missing: 0 (0.00%)

=== Summary (what you asked) ===
Total trades: 4765
Trailing triggered: 4602 (96.58%)
Trailing NOT triggered: 163 (3.42%)

Within TRIGGERED trades:
  Improved: 2656
  Worse: 1946
  Unchanged: 0
  Win-rate (improved): 57.71%

Overall impact (price units):
  Total price improvement (sum): 4046.4170228571293
  Avg price improvement: 0.8491955976615172
  Median price improvement: 0.3274499999999989

Overall impact (%):
  Avg % improvement: 0.2459%
  Median % improvement: 0.2733%

Saved outputs:
D:/work/Client/Maatra/Trade Level Data/TrailingSL_FullRun\PostProcess_BaselineM5\results_with_baseline_m5.csv
D:/work/Client/Maatra/Trade Level Data/TrailingSL_FullRun\PostProcess_BaselineM5\summary_overall_m5exit.csv
D:/work/Client/Maatra/Trade Level Data/TrailingSL_FullRun\PostProcess_BaselineM5\summary_by_symbol_m5exit.csv
D:/work/Client/Maatra/Trade Level Data/TrailingSL_FullRun\PostProcess_BaselineM5\summary_by

In [5]:
import os
import numpy as np
import pandas as pd
from dataclasses import dataclass
from datetime import timedelta

# ============================================================
# CONFIG (EDIT THESE PATHS)
# ============================================================

TRADE_XLSX_PATH      = r"D:/work/Client/Maatra/Trade Level Data/3 Month vol 14 data with sharpe prob Fnl.xlsx"
M5_DIR               = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data"
M5_FILE_LIST_PATH    = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data/m5_file_list.csv"
MAPPING_XLSX_PATH    = r"D:/work/Client/Maatra/Trade Level Data/Instrument Mapping.xlsx"

OUT_DIR = os.path.join(os.path.dirname(TRADE_XLSX_PATH), "TrailingSL_CAPPED_ONLY_MKT1430_2100")
os.makedirs(OUT_DIR, exist_ok=True)

RUN_ALL_TRADES = True
N_SAMPLE_TRADES = 2000
RANDOM_SEED = 42

# ============================================================
# MARKET HOURS FILTER (ONLY USE 14:30 to 21:00)
# ============================================================
MARKET_START = "14:30"
MARKET_END   = "21:00"   # inclusive

# ============================================================
# Trailing Stop Parameters (your approach)
# ============================================================

ATR_WINDOW = 14
ATR_MULT = 3.0

SLOPE_WINDOW = 12
SLOPE_METHOD = "regression"     # "regression" or "polyfit"
USE_TIME_AWARE_SLOPE = True

SLOPE_UPPER = 0.30
SLOPE_LOWER = 0.10

ACCEL_FACTOR = 1.0
MIN_MULT = 0.5

APPLY_FROM_NEXT_DAY = True

# ============================================================
# Utilities: normalization + mapping
# ============================================================

def normalize_ticker(x: str) -> str:
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return ""
    x = str(x).strip().upper()
    return "".join([ch for ch in x if ch.isalnum()])

def trade_base_from_currency(currency: str) -> str:
    if currency is None or (isinstance(currency, float) and np.isnan(currency)):
        return ""
    s = str(currency).strip().upper()
    base = s.split("/")[0].strip() if "/" in s else s
    return normalize_ticker(base)

def load_trade_to_m5_mapping(mapping_xlsx_path: str) -> dict:
    mp = pd.read_excel(mapping_xlsx_path)
    mp.columns = [c.strip() for c in mp.columns]
    if "org_symbol" not in mp.columns or "Symbol" not in mp.columns:
        raise ValueError("Mapping file must contain columns: org_symbol, Symbol")

    mp["m5_ticker"] = mp["org_symbol"].apply(normalize_ticker)
    mp["trade_base"] = mp["Symbol"].astype(str).str.upper().str.strip().str.split("/").str[0].apply(normalize_ticker)
    mp = mp[(mp["trade_base"] != "") & (mp["m5_ticker"] != "")]
    mp = mp.drop_duplicates(subset=["trade_base"], keep="first")
    return dict(zip(mp["trade_base"], mp["m5_ticker"]))

TRADE_TO_M5 = load_trade_to_m5_mapping(MAPPING_XLSX_PATH)

def map_trade_base_to_m5(trade_base: str) -> str:
    t = normalize_ticker(trade_base)
    return TRADE_TO_M5.get(t, t)

# ============================================================
# M5 map
# ============================================================

def load_m5_file_map(m5_dir: str, file_list_path: str) -> dict:
    fl = pd.read_csv(file_list_path)
    fl["ticker_norm"] = fl["ticker"].apply(normalize_ticker)
    fl["path"] = fl["filename"].apply(lambda f: os.path.join(m5_dir, f))
    return dict(zip(fl["ticker_norm"], fl["path"]))

M5_MAP = load_m5_file_map(M5_DIR, M5_FILE_LIST_PATH)
M5_TICKERS = set(M5_MAP.keys())

# ============================================================
# Indicators
# ============================================================

def compute_atr(df: pd.DataFrame, window: int) -> pd.Series:
    high = df["high"]
    low = df["low"]
    close = df["close"]
    tr = pd.concat([
        high - low,
        (high - close.shift(1)).abs(),
        (low - close.shift(1)).abs()
    ], axis=1).max(axis=1)
    return tr.rolling(window, min_periods=window).mean()

def rolling_slope(y: pd.Series, tmin: pd.Series | None, method: str) -> float:
    yy = y.values.astype(float)
    if len(yy) < 2 or np.all(np.isnan(yy)):
        return np.nan

    if tmin is None:
        xx = np.arange(len(yy), dtype=float)
    else:
        xx = tmin.values.astype(float)
        xx = xx - xx[0]

    mask = ~np.isnan(xx) & ~np.isnan(yy)
    xx = xx[mask]; yy = yy[mask]
    if len(yy) < 2:
        return np.nan

    if method == "polyfit":
        return np.polyfit(xx, yy, 1)[0]

    xm = xx.mean(); ym = yy.mean()
    denom = ((xx - xm) ** 2).sum()
    if denom == 0:
        return np.nan
    return (((xx - xm) * (yy - ym)).sum()) / denom

# ============================================================
# Market-hours filter
# ============================================================

def filter_market_hours(df: pd.DataFrame) -> pd.DataFrame:
    """
    Keep only rows with time between MARKET_START and MARKET_END (inclusive),
    based on df['_dt'] (naive timestamps as present in your M5 files).
    """
    if df.empty:
        return df

    # Use between_time on a DatetimeIndex (fast and correct)
    dfi = df.set_index("_dt", drop=False).sort_index()

    try:
        dfi = dfi.between_time(MARKET_START, MARKET_END, inclusive="both")
    except TypeError:
        # older pandas fallback (should not happen in most setups)
        idx = dfi.index.indexer_between_time(
            pd.to_datetime(MARKET_START).time(),
            pd.to_datetime(MARKET_END).time(),
            include_start=True,
            include_end=True
        )
        dfi = dfi.iloc[idx]

    return dfi.reset_index(drop=True)

# ============================================================
# Trade extraction (1 row per TradeID)
# ============================================================

def build_trade_summary(raw: pd.DataFrame):
    df = raw.copy()
    df.columns = [c.strip() for c in df.columns]

    required = ["TradeID", "Currency", "Direction", "date", "price", "Closing Date", "DayStatus"]
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"Trade file missing required columns: {missing}")

    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    df["Closing Date"] = pd.to_datetime(df["Closing Date"], errors="coerce")
    df["price"] = pd.to_numeric(df["price"], errors="coerce")
    df["DayStatusU"] = df["DayStatus"].astype(str).str.upper().str.strip()

    df["TradeBase"] = df["Currency"].apply(trade_base_from_currency)
    df["TickerNorm"] = df["TradeBase"].apply(map_trade_base_to_m5)

    rows = []
    reasons = []

    for tid, g in df.groupby("TradeID", dropna=True):
        g = g.sort_values("date")

        currency = g["Currency"].dropna().iloc[0] if g["Currency"].notna().any() else np.nan
        direction = g["Direction"].dropna().iloc[0] if g["Direction"].notna().any() else np.nan
        trade_base = g["TradeBase"].dropna().iloc[0] if g["TradeBase"].notna().any() else ""
        ticker = g["TickerNorm"].dropna().iloc[0] if g["TickerNorm"].notna().any() else ""

        open_mask = g["DayStatusU"].eq("OPEN")
        entry_date = g.loc[open_mask, "date"].dropna().min() if open_mask.any() else g["date"].dropna().min()

        exit_date = g["Closing Date"].dropna().max()

        entry_px = np.nan
        if pd.notna(entry_date):
            day = entry_date.normalize()
            day_rows = g[g["date"].dt.normalize() == day]
            if day_rows["price"].notna().any():
                entry_px = float(day_rows["price"].dropna().iloc[-1])

        exit_px = np.nan
        if pd.notna(exit_date):
            day = exit_date.normalize()
            day_rows = g[g["date"].dt.normalize() == day]
            if day_rows["price"].notna().any():
                exit_px = float(day_rows["price"].dropna().iloc[-1])

        miss = []
        if pd.isna(entry_date): miss.append("MISSING_ENTRY_DATE")
        if pd.isna(exit_date): miss.append("MISSING_EXIT_DATE")
        if pd.isna(entry_px): miss.append("MISSING_ENTRY_PRICE")
        if pd.isna(exit_px): miss.append("MISSING_EXIT_PRICE")
        if not ticker: miss.append("MISSING_TICKER")
        if ticker and (ticker not in M5_TICKERS): miss.append("TICKER_NOT_IN_M5_LIST")

        if miss:
            reasons.append({
                "TradeID": tid, "Currency": currency, "TradeBase": trade_base, "TickerNorm": ticker,
                "MissingReasons": ";".join(miss)
            })

        rows.append({
            "TradeID": tid,
            "Currency": currency,
            "TradeBase": trade_base,
            "TickerNorm": ticker,
            "Direction": direction,
            "EntryDate": entry_date,
            "ExitDate_Original": exit_date,
            "EntryPrice": entry_px,
            "ExitPrice_Original": exit_px,
            "HasAllCoreFields": (len(miss) == 0),
            "MissingReasons": ";".join(miss) if miss else ""
        })

    trade_summary = pd.DataFrame(rows)
    missing_df = pd.DataFrame(reasons)

    coverage = pd.DataFrame([{
        "Raw_Unique_TradeIDs": int(df["TradeID"].nunique()),
        "Extracted_Unique_TradeIDs": int(trade_summary["TradeID"].nunique()),
        "Trades_With_All_Core_Fields": int(trade_summary["HasAllCoreFields"].sum()),
        "Trades_Missing_Something": int((~trade_summary["HasAllCoreFields"]).sum()),
        "Trades_Missing_M5": int(trade_summary["MissingReasons"].astype(str).str.contains("TICKER_NOT_IN_M5_LIST", na=False).sum()),
        "Unique_M5_Tickers_Available": int(len(M5_TICKERS)),
        "Unique_M5_Tickers_Used": int(trade_summary["TickerNorm"].nunique())
    }])

    return trade_summary, missing_df, coverage

# ============================================================
# Trailing logic (CAPPED ONLY)
# ============================================================

@dataclass
class TrailResult:
    exit_time: pd.Timestamp
    exit_price: float
    exit_reason: str
    best_favorable: float
    max_high: float
    min_low: float

def direction_aware_improvement(direction: str, new_exit: float, old_exit: float) -> float:
    d = str(direction).strip().upper()
    if pd.isna(new_exit) or pd.isna(old_exit):
        return np.nan
    return float(new_exit - old_exit) if d == "LONG" else float(old_exit - new_exit)

def prepare_m5_with_indicators_market_only(ticker: str) -> tuple[pd.DataFrame | None, dict]:
    fp = M5_MAP.get(ticker)
    if not fp or not os.path.exists(fp):
        return None, {"ticker": ticker, "status": "missing_file"}

    df = pd.read_csv(fp)
    df.columns = [c.strip().lower() for c in df.columns]
    needed = ["date", "open", "high", "low", "close"]
    if not all(c in df.columns for c in needed):
        return None, {"ticker": ticker, "status": "bad_columns"}

    df["_dt"] = pd.to_datetime(df["date"], errors="coerce")
    df = df.dropna(subset=["_dt"]).sort_values("_dt").reset_index(drop=True)

    for c in ["open", "high", "low", "close"]:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    df = df.dropna(subset=["open", "high", "low", "close"])

    total_rows = len(df)

    # FILTER MARKET HOURS ONLY
    df = filter_market_hours(df)
    kept_rows = len(df)

    if kept_rows == 0:
        return None, {"ticker": ticker, "status": "no_rows_in_market_hours", "total_rows": total_rows, "kept_rows": 0}

    # Indicators on market-only data
    df["ATR"] = compute_atr(df, ATR_WINDOW)

    if USE_TIME_AWARE_SLOPE:
        df["_tmin"] = (df["_dt"] - df["_dt"].iloc[0]).dt.total_seconds() / 60.0
        df["slope"] = df["close"].rolling(SLOPE_WINDOW, min_periods=SLOPE_WINDOW).apply(
            lambda s: rolling_slope(s, tmin=df.loc[s.index, "_tmin"], method=SLOPE_METHOD),
            raw=False
        )
    else:
        df["slope"] = df["close"].rolling(SLOPE_WINDOW, min_periods=SLOPE_WINDOW).apply(
            lambda s: rolling_slope(s, tmin=None, method=SLOPE_METHOD),
            raw=False
        )

    df["slope_norm"] = df["slope"] / df["ATR"]

    info = {
        "ticker": ticker,
        "status": "ok",
        "total_rows": total_rows,
        "kept_rows": kept_rows,
        "kept_pct": (kept_rows / total_rows) if total_rows else np.nan
    }
    return df, info

def simulate_trailing_capped(tr: pd.Series, m5: pd.DataFrame) -> TrailResult:
    entry_date = pd.to_datetime(tr["EntryDate"])
    exit_date = pd.to_datetime(tr["ExitDate_Original"])
    entry_price = float(tr["EntryPrice"])
    orig_exit_price = float(tr["ExitPrice_Original"])

    direction = str(tr["Direction"]).strip().upper()
    is_long = (direction == "LONG")

    start_dt = (entry_date + timedelta(days=1)) if APPLY_FROM_NEXT_DAY else entry_date
    end_dt = exit_date + timedelta(days=1)

    window = m5[(m5["_dt"] >= start_dt) & (m5["_dt"] < end_dt)].copy()
    if len(window) == 0:
        return TrailResult(exit_date, orig_exit_price, "NoM5DataInMarketHours", entry_price, np.nan, np.nan)

    max_high = float(window["high"].max())
    min_low = float(window["low"].min())

    best_fav = entry_price
    stop = -np.inf if is_long else np.inf

    for _, row in window.iterrows():
        atr = row["ATR"]
        if pd.isna(atr) or atr <= 0:
            continue

        if is_long:
            best_fav = max(best_fav, row["high"])
        else:
            best_fav = min(best_fav, row["low"])

        slope_norm = row["slope_norm"]
        mult = ATR_MULT
        if not pd.isna(slope_norm) and slope_norm > SLOPE_UPPER:
            mult = max(MIN_MULT, ATR_MULT - slope_norm * ACCEL_FACTOR)
        elif not pd.isna(slope_norm) and slope_norm < SLOPE_LOWER:
            mult = ATR_MULT

        if is_long:
            stop = max(stop, best_fav - mult * atr)
            if row["low"] <= stop:
                return TrailResult(row["_dt"], float(stop), "TrailingSL", float(best_fav), max_high, min_low)
        else:
            stop = min(stop, best_fav + mult * atr)
            if row["high"] >= stop:
                return TrailResult(row["_dt"], float(stop), "TrailingSL", float(best_fav), max_high, min_low)

    return TrailResult(exit_date, orig_exit_price, "OriginalExit", float(best_fav), max_high, min_low)

def summarize(out: pd.DataFrame) -> pd.DataFrame:
    df = out.copy()
    df["TriggeredFlag"] = df["ExitReason"].astype(str).eq("TrailingSL")
    df["ImprovedFlag"] = df["PriceImprovement"] > 0
    df["WorseFlag"] = df["PriceImprovement"] < 0

    def agg(g: pd.DataFrame) -> dict:
        n = len(g)
        trig = int(g["TriggeredFlag"].sum())
        return {
            "Trades": n,
            "TrailingTriggered": trig,
            "TrailingNotTriggered": int((~g["TriggeredFlag"]).sum()),
            "PctTriggered": trig / n if n else np.nan,
            "Improved": int(g["ImprovedFlag"].sum()),
            "Worse": int(g["WorseFlag"].sum()),
            "Unchanged": int(((g["PriceImprovement"] == 0) | g["PriceImprovement"].isna()).sum()),
            "SumPriceImprovement": float(g["PriceImprovement"].sum(skipna=True)),
            "AvgPriceImprovement": float(g["PriceImprovement"].mean(skipna=True)),
            "MedianPriceImprovement": float(g["PriceImprovement"].median(skipna=True)),
            "AvgPctImprovement": float(g["PctImprovement_vs_OriginalExit"].mean(skipna=True)),
            "MedianPctImprovement": float(g["PctImprovement_vs_OriginalExit"].median(skipna=True)),
        }

    rows = [{"Group": "ALL", **agg(df)}]
    for d, g in df.groupby(df["Direction"].astype(str).str.upper().str.strip()):
        rows.append({"Group": f"Direction={d}", **agg(g)})

    return pd.DataFrame(rows)

# ============================================================
# MAIN (CAPPED ONLY, MARKET HOURS ONLY)
# ============================================================

def main():
    print("Running CAPPED ONLY + MARKET HOURS ONLY")
    print("Market hours:", MARKET_START, "to", MARKET_END)
    print("Trade file:", TRADE_XLSX_PATH)
    print("Output:", OUT_DIR)

    raw = pd.read_excel(TRADE_XLSX_PATH)
    trade_summary, missing_df, coverage_df = build_trade_summary(raw)

    trade_summary.to_csv(os.path.join(OUT_DIR, "trade_summary_extracted.csv"), index=False)
    missing_df.to_csv(os.path.join(OUT_DIR, "trade_summary_missing_reasons.csv"), index=False)
    coverage_df.to_csv(os.path.join(OUT_DIR, "coverage_report.csv"), index=False)

    print("\nCoverage report:")
    print(coverage_df.to_string(index=False))

    if RUN_ALL_TRADES:
        sample = trade_summary.copy()
    else:
        sample = trade_summary.sample(n=min(N_SAMPLE_TRADES, len(trade_summary)), random_state=RANDOM_SEED).copy()

    sample = sample[sample["HasAllCoreFields"]].reset_index(drop=True)
    print("\nTrades to evaluate (with core fields):", len(sample))

    out_rows = []
    total = len(sample)

    m5_stats = []

    for tkr, grp in sample.groupby("TickerNorm"):
        if tkr not in M5_TICKERS:
            continue

        m5, info = prepare_m5_with_indicators_market_only(tkr)
        m5_stats.append(info)

        if m5 is None:
            continue

        grp = grp.sort_values("EntryDate")
        for _, tr in grp.iterrows():
            res = simulate_trailing_capped(tr, m5)

            old_exit = float(tr["ExitPrice_Original"])
            new_exit = float(res.exit_price)
            impr = direction_aware_improvement(tr["Direction"], new_exit, old_exit)

            orig_exit_date = pd.to_datetime(tr["ExitDate_Original"])
            new_exit_time = pd.to_datetime(res.exit_time)
            new_exit_date = new_exit_time.normalize() if pd.notna(new_exit_time) else pd.NaT

            out_rows.append({
                "TradeID": tr["TradeID"],
                "Currency": tr["Currency"],
                "TradeBase": tr["TradeBase"],
                "TickerNorm": tr["TickerNorm"],
                "Direction": tr["Direction"],

                "EntryDate": tr["EntryDate"],
                "EntryPrice": float(tr["EntryPrice"]),

                "ExitDate_Original": orig_exit_date,
                "ExitPrice_Original": old_exit,

                "ExitTime_Trailing": new_exit_time,
                "ExitDate_Trailing": new_exit_date,
                "ExitPrice_Trailing": new_exit,
                "ExitReason": res.exit_reason,

                "MaxHigh_InWindow": res.max_high,
                "MinLow_InWindow": res.min_low,
                "BestFavorable_InWindow": res.best_favorable,

                "PriceImprovement": impr,
                "PctImprovement_vs_OriginalExit": (impr / old_exit) if old_exit != 0 else np.nan,

                "ClosedEarlierOrSameDay": (new_exit_date.date() <= orig_exit_date.date())
                                          if (pd.notna(new_exit_date) and pd.notna(orig_exit_date)) else np.nan
            })

        print(f"Processed ticker {tkr} | trades: {len(grp)} | total done so far: {len(out_rows)}/{total}")

    out = pd.DataFrame(out_rows)
    summ = summarize(out)

    out_path = os.path.join(OUT_DIR, "trades_trailing_capped_market_only.csv")
    summ_path = os.path.join(OUT_DIR, "summary_trailing_capped_market_only.csv")
    m5_stats_path = os.path.join(OUT_DIR, "m5_market_hours_filter_stats.csv")

    out.to_csv(out_path, index=False)
    summ.to_csv(summ_path, index=False)
    pd.DataFrame(m5_stats).to_csv(m5_stats_path, index=False)

    print("\nSaved:")
    print(out_path)
    print(summ_path)
    print(m5_stats_path)

    print("\n=== SUMMARY (CAPPED, MARKET HOURS ONLY) ===")
    print(summ.to_string(index=False))

if __name__ == "__main__":
    main()


Running CAPPED ONLY + MARKET HOURS ONLY
Market hours: 14:30 to 21:00
Trade file: D:/work/Client/Maatra/Trade Level Data/3 Month vol 14 data with sharpe prob Fnl.xlsx
Output: D:/work/Client/Maatra/Trade Level Data\TrailingSL_CAPPED_ONLY_MKT1430_2100

Coverage report:
 Raw_Unique_TradeIDs  Extracted_Unique_TradeIDs  Trades_With_All_Core_Fields  Trades_Missing_Something  Trades_Missing_M5  Unique_M5_Tickers_Available  Unique_M5_Tickers_Used
               14002                      14002                        11578                      2424               2174                          131                     160

Trades to evaluate (with core fields): 11578
Processed ticker ACN | trades: 7 | total done so far: 7/11578
Processed ticker ADBE | trades: 83 | total done so far: 90/11578
Processed ticker ADI | trades: 273 | total done so far: 363/11578
Processed ticker ADP | trades: 124 | total done so far: 487/11578
Processed ticker ADSK | trades: 94 | total done so far: 581/11578
Processed ti

In [6]:
import os
import numpy as np
import pandas as pd
from dataclasses import dataclass
from datetime import timedelta

# ============================================================
# PURPOSE
# ============================================================
"""
CAPPED trailing stop-loss backtest using M5 candles.

Key assumptions:
1) Trades are entered at end-of-day (EOD) on EntryDate, so trailing logic starts from next day (configurable).
2) Trades are NOT allowed to extend beyond the original exit date (CAPPED).
   - If the trailing stop triggers before the original exit date => we exit early at the stop.
   - If it never triggers within the window => we exit at the original scheduled exit date price.
3) Only market-session M5 bars are used (pre/post market excluded).
   - Market session filter: 14:30 to 21:00 (inclusive), based on M5 timestamps.
4) Direction handling:
   - Direction may be "LONG"/"SHORT" or numeric (1, -1). We normalize it robustly.

Outputs:
- trade_summary_extracted.csv: 1 row per TradeID (entry/exit pulled from trade file)
- trades_trailing_capped_market_only.csv: per-trade trailing results (exit time/price/reason + analytics)
- summary_trailing_capped_market_only.csv: high-level summary
- coverage_report.csv: coverage checks
- m5_market_hours_filter_stats.csv: per-ticker M5 filtering stats
"""

# ============================================================
# CONFIG (EDIT THESE PATHS)
# ============================================================

TRADE_XLSX_PATH      = r"D:/work/Client/Maatra/Trade Level Data/3 Month vol 14 data with sharpe prob Fnl.xlsx"
M5_DIR               = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data"
M5_FILE_LIST_PATH    = r"D:/work/Client/Maatra/Trade Level Data/M5 Raw data/m5_file_list.csv"
MAPPING_XLSX_PATH    = r"D:/work/Client/Maatra/Trade Level Data/Instrument Mapping.xlsx"

OUT_DIR = os.path.join(os.path.dirname(TRADE_XLSX_PATH), "TrailingSL_CAPPED_ONLY_MKT1430_2100_FIXEDDIR")
os.makedirs(OUT_DIR, exist_ok=True)

RUN_ALL_TRADES = True
N_SAMPLE_TRADES = 2000
RANDOM_SEED = 42

# ============================================================
# MARKET HOURS FILTER (ONLY USE 14:30 to 21:00)
# ============================================================
MARKET_START = "14:30"
MARKET_END   = "21:00"   # inclusive

# ============================================================
# TRAILING STOP PARAMETERS (your approach)
# ============================================================
ATR_WINDOW = 14
ATR_MULT = 3.0

SLOPE_WINDOW = 12
SLOPE_METHOD = "regression"   # "regression" or "polyfit"
USE_TIME_AWARE_SLOPE = True

# slope_norm = slope / ATR
SLOPE_UPPER = 0.30
SLOPE_LOWER = 0.10

ACCEL_FACTOR = 1.0
MIN_MULT = 0.5

# Since entry is at EOD, apply trailing from next day
APPLY_FROM_NEXT_DAY = True

# ============================================================
# HELPERS: normalization + direction parsing + mapping
# ============================================================

def normalize_ticker(x: str) -> str:
    """Uppercase alnum-only ticker key."""
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return ""
    x = str(x).strip().upper()
    return "".join([ch for ch in x if ch.isalnum()])

def trade_base_from_currency(currency: str) -> str:
    """
    Trade file may have Currency like 'AAPL/USD' or just 'AAPL'.
    We extract the base and normalize.
    """
    if currency is None or (isinstance(currency, float) and np.isnan(currency)):
        return ""
    s = str(currency).strip().upper()
    base = s.split("/")[0].strip() if "/" in s else s
    return normalize_ticker(base)

def normalize_direction(x) -> str:
    """
    Normalizes direction to 'LONG' or 'SHORT'.

    Handles cases:
      - 'LONG'/'SHORT'
      - 'BUY'/'SELL'
      - 1 / -1 (or '1', '-1', 1.0, -1.0)
    """
    if x is None or (isinstance(x, float) and np.isnan(x)):
        return "UNKNOWN"

    s = str(x).strip().upper()

    if s in ["LONG", "BUY", "B", "1", "1.0", "+1", "+1.0"]:
        return "LONG"
    if s in ["SHORT", "SELL", "S", "-1", "-1.0"]:
        return "SHORT"

    # Try numeric parsing
    try:
        v = float(s)
        if v > 0:
            return "LONG"
        if v < 0:
            return "SHORT"
    except Exception:
        pass

    return "UNKNOWN"

def load_trade_to_m5_mapping(mapping_xlsx_path: str) -> dict:
    """
    Uses Instrument Mapping.xlsx to map trade instrument -> M5 file ticker.
    Expected columns: org_symbol (M5 ticker), Symbol (trade symbol like AAPL/USD).
    """
    mp = pd.read_excel(mapping_xlsx_path)
    mp.columns = [c.strip() for c in mp.columns]
    if "org_symbol" not in mp.columns or "Symbol" not in mp.columns:
        raise ValueError("Mapping file must contain columns: org_symbol, Symbol")

    mp["m5_ticker"] = mp["org_symbol"].apply(normalize_ticker)
    mp["trade_base"] = mp["Symbol"].astype(str).str.upper().str.strip().str.split("/").str[0].apply(normalize_ticker)
    mp = mp[(mp["trade_base"] != "") & (mp["m5_ticker"] != "")]
    mp = mp.drop_duplicates(subset=["trade_base"], keep="first")
    return dict(zip(mp["trade_base"], mp["m5_ticker"]))

TRADE_TO_M5 = load_trade_to_m5_mapping(MAPPING_XLSX_PATH)

def map_trade_base_to_m5(trade_base: str) -> str:
    """Map trade base ticker to M5 ticker using mapping. If not found, fallback to itself."""
    t = normalize_ticker(trade_base)
    return TRADE_TO_M5.get(t, t)

# ============================================================
# M5 FILE MAP
# ============================================================

def load_m5_file_map(m5_dir: str, file_list_path: str) -> dict:
    """
    Reads m5_file_list.csv, expected columns: ticker, filename
    and returns dict: ticker_norm -> full filepath
    """
    fl = pd.read_csv(file_list_path)
    fl["ticker_norm"] = fl["ticker"].apply(normalize_ticker)
    fl["path"] = fl["filename"].apply(lambda f: os.path.join(m5_dir, f))
    return dict(zip(fl["ticker_norm"], fl["path"]))

M5_MAP = load_m5_file_map(M5_DIR, M5_FILE_LIST_PATH)
M5_TICKERS = set(M5_MAP.keys())

# ============================================================
# INDICATORS: ATR + SLOPE
# ============================================================

def compute_atr(df: pd.DataFrame, window: int) -> pd.Series:
    """
    ATR using simple rolling mean of True Range.
    True Range = max(high-low, abs(high-prev_close), abs(low-prev_close))
    """
    high = df["high"]
    low = df["low"]
    close = df["close"]
    tr = pd.concat([
        high - low,
        (high - close.shift(1)).abs(),
        (low - close.shift(1)).abs()
    ], axis=1).max(axis=1)
    return tr.rolling(window, min_periods=window).mean()

def rolling_slope(y: pd.Series, tmin: pd.Series | None, method: str) -> float:
    """
    Computes slope of y vs x where x is time (minutes) or simple index.
    - regression: OLS slope
    - polyfit: np.polyfit slope
    """
    yy = y.values.astype(float)
    if len(yy) < 2 or np.all(np.isnan(yy)):
        return np.nan

    if tmin is None:
        xx = np.arange(len(yy), dtype=float)
    else:
        xx = tmin.values.astype(float)
        xx = xx - xx[0]

    mask = ~np.isnan(xx) & ~np.isnan(yy)
    xx = xx[mask]; yy = yy[mask]
    if len(yy) < 2:
        return np.nan

    if method == "polyfit":
        return np.polyfit(xx, yy, 1)[0]

    xm = xx.mean(); ym = yy.mean()
    denom = ((xx - xm) ** 2).sum()
    if denom == 0:
        return np.nan
    return (((xx - xm) * (yy - ym)).sum()) / denom

# ============================================================
# MARKET HOURS FILTER
# ============================================================

def filter_market_hours(df: pd.DataFrame) -> pd.DataFrame:
    """
    Keep only rows with time between MARKET_START and MARKET_END (inclusive),
    based on df['_dt'].
    """
    if df.empty:
        return df

    dfi = df.set_index("_dt", drop=False).sort_index()
    try:
        dfi = dfi.between_time(MARKET_START, MARKET_END, inclusive="both")
    except TypeError:
        # older pandas fallback
        idx = dfi.index.indexer_between_time(
            pd.to_datetime(MARKET_START).time(),
            pd.to_datetime(MARKET_END).time(),
            include_start=True,
            include_end=True
        )
        dfi = dfi.iloc[idx]
    return dfi.reset_index(drop=True)

# ============================================================
# TRADE EXTRACTION: 1 ROW PER TRADEID
# ============================================================

def build_trade_summary(raw: pd.DataFrame):
    """
    From the trade-level file (multiple rows per TradeID across days),
    we extract:
      - EntryDate: DayStatus == OPEN else earliest date
      - ExitDate_Original: Closing Date (scheduled exit date)
      - EntryPrice: last trade-file price on EntryDate
      - ExitPrice_Original: last trade-file price on ExitDate
      - Direction normalized to LONG/SHORT
      - Currency -> TradeBase -> TickerNorm (using mapping)
    """
    df = raw.copy()
    df.columns = [c.strip() for c in df.columns]

    required = ["TradeID", "Currency", "Direction", "date", "price", "Closing Date", "DayStatus"]
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"Trade file missing required columns: {missing}")

    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    df["Closing Date"] = pd.to_datetime(df["Closing Date"], errors="coerce")
    df["price"] = pd.to_numeric(df["price"], errors="coerce")
    df["DayStatusU"] = df["DayStatus"].astype(str).str.upper().str.strip()

    df["TradeBase"] = df["Currency"].apply(trade_base_from_currency)
    df["TickerNorm"] = df["TradeBase"].apply(map_trade_base_to_m5)
    df["DirectionNorm"] = df["Direction"].apply(normalize_direction)

    rows = []
    reasons = []

    for tid, g in df.groupby("TradeID", dropna=True):
        g = g.sort_values("date")

        currency = g["Currency"].dropna().iloc[0] if g["Currency"].notna().any() else np.nan
        direction = g["DirectionNorm"].dropna().iloc[0] if g["DirectionNorm"].notna().any() else "UNKNOWN"
        trade_base = g["TradeBase"].dropna().iloc[0] if g["TradeBase"].notna().any() else ""
        ticker = g["TickerNorm"].dropna().iloc[0] if g["TickerNorm"].notna().any() else ""

        open_mask = g["DayStatusU"].eq("OPEN")
        entry_date = g.loc[open_mask, "date"].dropna().min() if open_mask.any() else g["date"].dropna().min()
        exit_date = g["Closing Date"].dropna().max()

        # Entry price: last trade-file price on entry date
        entry_px = np.nan
        if pd.notna(entry_date):
            day = entry_date.normalize()
            day_rows = g[g["date"].dt.normalize() == day]
            if day_rows["price"].notna().any():
                entry_px = float(day_rows["price"].dropna().iloc[-1])

        # Exit price: last trade-file price on exit date
        exit_px = np.nan
        if pd.notna(exit_date):
            day = exit_date.normalize()
            day_rows = g[g["date"].dt.normalize() == day]
            if day_rows["price"].notna().any():
                exit_px = float(day_rows["price"].dropna().iloc[-1])

        miss = []
        if pd.isna(entry_date): miss.append("MISSING_ENTRY_DATE")
        if pd.isna(exit_date): miss.append("MISSING_EXIT_DATE")
        if pd.isna(entry_px): miss.append("MISSING_ENTRY_PRICE")
        if pd.isna(exit_px): miss.append("MISSING_EXIT_PRICE")
        if not ticker: miss.append("MISSING_TICKER")
        if ticker and (ticker not in M5_TICKERS): miss.append("TICKER_NOT_IN_M5_LIST")
        if direction == "UNKNOWN": miss.append("UNKNOWN_DIRECTION")

        if miss:
            reasons.append({
                "TradeID": tid, "Currency": currency, "TradeBase": trade_base, "TickerNorm": ticker,
                "DirectionNorm": direction, "MissingReasons": ";".join(miss)
            })

        rows.append({
            "TradeID": tid,
            "Currency": currency,
            "TradeBase": trade_base,
            "TickerNorm": ticker,
            "Direction": direction,
            "EntryDate": entry_date,
            "ExitDate_Original": exit_date,
            "EntryPrice": entry_px,
            "ExitPrice_Original": exit_px,
            "HasAllCoreFields": (len(miss) == 0),
            "MissingReasons": ";".join(miss) if miss else ""
        })

    trade_summary = pd.DataFrame(rows)
    missing_df = pd.DataFrame(reasons)

    coverage = pd.DataFrame([{
        "Raw_Unique_TradeIDs": int(df["TradeID"].nunique()),
        "Extracted_Unique_TradeIDs": int(trade_summary["TradeID"].nunique()),
        "Trades_With_All_Core_Fields": int(trade_summary["HasAllCoreFields"].sum()),
        "Trades_Missing_Something": int((~trade_summary["HasAllCoreFields"]).sum()),
        "Trades_Missing_M5": int(trade_summary["MissingReasons"].astype(str).str.contains("TICKER_NOT_IN_M5_LIST", na=False).sum()),
        "Trades_Unknown_Direction": int(trade_summary["MissingReasons"].astype(str).str.contains("UNKNOWN_DIRECTION", na=False).sum()),
        "Unique_M5_Tickers_Available": int(len(M5_TICKERS)),
        "Unique_M5_Tickers_Used": int(trade_summary["TickerNorm"].nunique())
    }])

    return trade_summary, missing_df, coverage

# ============================================================
# TRAILING LOGIC (CAPPED)
# ============================================================

@dataclass
class TrailResult:
    exit_time: pd.Timestamp
    exit_price: float
    exit_reason: str
    best_favorable: float
    max_high: float
    min_low: float

def direction_aware_improvement(direction: str, new_exit: float, old_exit: float) -> float:
    """
    Returns positive if trailing is better.
    LONG: new - old
    SHORT: old - new
    """
    d = normalize_direction(direction)
    if pd.isna(new_exit) or pd.isna(old_exit):
        return np.nan
    return float(new_exit - old_exit) if d == "LONG" else float(old_exit - new_exit)

def prepare_m5_with_indicators_market_only(ticker: str) -> tuple[pd.DataFrame | None, dict]:
    """
    Loads one ticker M5 file, filters market hours only, and computes:
      - ATR
      - slope (rolling)
      - slope_norm = slope / ATR

    Returns (df, stats_dict)
    """
    fp = M5_MAP.get(ticker)
    if not fp or not os.path.exists(fp):
        return None, {"ticker": ticker, "status": "missing_file"}

    df = pd.read_csv(fp)
    df.columns = [c.strip().lower() for c in df.columns]
    needed = ["date", "open", "high", "low", "close"]
    if not all(c in df.columns for c in needed):
        return None, {"ticker": ticker, "status": "bad_columns"}

    df["_dt"] = pd.to_datetime(df["date"], errors="coerce")
    df = df.dropna(subset=["_dt"]).sort_values("_dt").reset_index(drop=True)

    for c in ["open", "high", "low", "close"]:
        df[c] = pd.to_numeric(df[c], errors="coerce")
    df = df.dropna(subset=["open", "high", "low", "close"])

    total_rows = len(df)

    # Market hours only (remove pre/post market)
    df = filter_market_hours(df)
    kept_rows = len(df)

    if kept_rows == 0:
        return None, {"ticker": ticker, "status": "no_rows_in_market_hours", "total_rows": total_rows, "kept_rows": 0}

    # Indicators on market-only data
    df["ATR"] = compute_atr(df, ATR_WINDOW)

    if USE_TIME_AWARE_SLOPE:
        df["_tmin"] = (df["_dt"] - df["_dt"].iloc[0]).dt.total_seconds() / 60.0
        df["slope"] = df["close"].rolling(SLOPE_WINDOW, min_periods=SLOPE_WINDOW).apply(
            lambda s: rolling_slope(s, tmin=df.loc[s.index, "_tmin"], method=SLOPE_METHOD),
            raw=False
        )
    else:
        df["slope"] = df["close"].rolling(SLOPE_WINDOW, min_periods=SLOPE_WINDOW).apply(
            lambda s: rolling_slope(s, tmin=None, method=SLOPE_METHOD),
            raw=False
        )

    df["slope_norm"] = df["slope"] / df["ATR"]

    info = {
        "ticker": ticker,
        "status": "ok",
        "total_rows": total_rows,
        "kept_rows": kept_rows,
        "kept_pct": (kept_rows / total_rows) if total_rows else np.nan
    }
    return df, info

def simulate_trailing_capped(tr: pd.Series, m5: pd.DataFrame) -> TrailResult:
    """
    Simulates trailing stop from (EntryDate+1 day) to ExitDate_Original (inclusive day).
    If stop triggers: exit at stop price at that bar timestamp.
    Else: exit at original scheduled exit price.

    Stop logic:
      - Start with trailing distance = ATR_MULT * ATR
      - Track best favorable excursion:
          LONG  -> running max of high
          SHORT -> running min of low
      - Accelerate (tighten) when slope_norm > SLOPE_UPPER by reducing ATR_MULT
      - If slope_norm < SLOPE_LOWER, keep stop at baseline ATR_MULT
    """
    entry_date = pd.to_datetime(tr["EntryDate"])
    exit_date = pd.to_datetime(tr["ExitDate_Original"])
    entry_price = float(tr["EntryPrice"])
    orig_exit_price = float(tr["ExitPrice_Original"])

    direction = normalize_direction(tr["Direction"])
    is_long = (direction == "LONG")

    start_dt = (entry_date + timedelta(days=1)) if APPLY_FROM_NEXT_DAY else entry_date
    end_dt = exit_date + timedelta(days=1)  # include exit date's session

    window = m5[(m5["_dt"] >= start_dt) & (m5["_dt"] < end_dt)].copy()
    if len(window) == 0:
        return TrailResult(exit_date, orig_exit_price, "NoM5DataInMarketHours", entry_price, np.nan, np.nan)

    max_high = float(window["high"].max())
    min_low = float(window["low"].min())

    best_fav = entry_price
    stop = -np.inf if is_long else np.inf

    for _, row in window.iterrows():
        atr = row["ATR"]
        if pd.isna(atr) or atr <= 0:
            continue

        # Update favorable excursion
        if is_long:
            best_fav = max(best_fav, row["high"])
        else:
            best_fav = min(best_fav, row["low"])

        slope_norm = row["slope_norm"]
        mult = ATR_MULT

        # Acceleration / tightening logic
        if not pd.isna(slope_norm) and slope_norm > SLOPE_UPPER:
            mult = max(MIN_MULT, ATR_MULT - slope_norm * ACCEL_FACTOR)
        elif not pd.isna(slope_norm) and slope_norm < SLOPE_LOWER:
            mult = ATR_MULT

        # Compute / update stop and check hit
        if is_long:
            stop = max(stop, best_fav - mult * atr)
            if row["low"] <= stop:
                return TrailResult(row["_dt"], float(stop), "TrailingSL", float(best_fav), max_high, min_low)
        else:
            stop = min(stop, best_fav + mult * atr)
            if row["high"] >= stop:
                return TrailResult(row["_dt"], float(stop), "TrailingSL", float(best_fav), max_high, min_low)

    # If never triggered, exit at scheduled exit
    return TrailResult(exit_date, orig_exit_price, "OriginalExit", float(best_fav), max_high, min_low)

def summarize(out: pd.DataFrame) -> pd.DataFrame:
    """
    Summarizes:
      - trigger rate
      - improved vs worse vs unchanged
      - sum/avg/median improvement
    """
    df = out.copy()
    df["TriggeredFlag"] = df["ExitReason"].astype(str).eq("TrailingSL")
    df["ImprovedFlag"] = df["PriceImprovement"] > 0
    df["WorseFlag"] = df["PriceImprovement"] < 0
    df["UnchangedFlag"] = (df["PriceImprovement"] == 0) | df["PriceImprovement"].isna()

    def agg(g: pd.DataFrame) -> dict:
        n = len(g)
        trig = int(g["TriggeredFlag"].sum())
        return {
            "Trades": n,
            "TrailingTriggered": trig,
            "TrailingNotTriggered": int((~g["TriggeredFlag"]).sum()),
            "PctTriggered": trig / n if n else np.nan,
            "Improved": int(g["ImprovedFlag"].sum()),
            "Worse": int(g["WorseFlag"].sum()),
            "Unchanged": int(g["UnchangedFlag"].sum()),
            "SumPriceImprovement": float(g["PriceImprovement"].sum(skipna=True)),
            "AvgPriceImprovement": float(g["PriceImprovement"].mean(skipna=True)),
            "MedianPriceImprovement": float(g["PriceImprovement"].median(skipna=True)),
            "AvgPctImprovement": float(g["PctImprovement_vs_OriginalExit"].mean(skipna=True)),
            "MedianPctImprovement": float(g["PctImprovement_vs_OriginalExit"].median(skipna=True)),
        }

    rows = [{"Group": "ALL", **agg(df)}]
    for d, g in df.groupby(df["Direction"].astype(str).str.upper().str.strip()):
        rows.append({"Group": f"Direction={d}", **agg(g)})

    return pd.DataFrame(rows)

# ============================================================
# MAIN
# ============================================================

def main():
    print("Running: CAPPED ONLY + MARKET HOURS ONLY")
    print("Market session:", MARKET_START, "to", MARKET_END)
    print("Output:", OUT_DIR)

    raw = pd.read_excel(TRADE_XLSX_PATH)
    trade_summary, missing_df, coverage_df = build_trade_summary(raw)

    # Save audits / coverage
    trade_summary.to_csv(os.path.join(OUT_DIR, "trade_summary_extracted.csv"), index=False)
    missing_df.to_csv(os.path.join(OUT_DIR, "trade_summary_missing_reasons.csv"), index=False)
    coverage_df.to_csv(os.path.join(OUT_DIR, "coverage_report.csv"), index=False)

    print("\nCoverage:")
    print(coverage_df.to_string(index=False))

    # Trade selection
    if RUN_ALL_TRADES:
        sample = trade_summary.copy()
    else:
        sample = trade_summary.sample(n=min(N_SAMPLE_TRADES, len(trade_summary)), random_state=RANDOM_SEED).copy()

    # Use only fully valid trades
    sample = sample[sample["HasAllCoreFields"]].reset_index(drop=True)
    print("\nTrades to evaluate:", len(sample))

    # Speed: group by ticker -> load M5 once per ticker
    out_rows = []
    m5_stats = []
    total = len(sample)

    for tkr, grp in sample.groupby("TickerNorm"):
        if tkr not in M5_TICKERS:
            continue

        m5, info = prepare_m5_with_indicators_market_only(tkr)
        m5_stats.append(info)

        if m5 is None:
            continue

        grp = grp.sort_values("EntryDate")
        for _, tr in grp.iterrows():
            res = simulate_trailing_capped(tr, m5)

            old_exit = float(tr["ExitPrice_Original"])
            new_exit = float(res.exit_price)
            impr = direction_aware_improvement(tr["Direction"], new_exit, old_exit)

            orig_exit_date = pd.to_datetime(tr["ExitDate_Original"])
            new_exit_time = pd.to_datetime(res.exit_time)
            new_exit_date = new_exit_time.normalize() if pd.notna(new_exit_time) else pd.NaT

            out_rows.append({
                "TradeID": tr["TradeID"],
                "Currency": tr["Currency"],
                "TradeBase": tr["TradeBase"],
                "TickerNorm": tr["TickerNorm"],
                "Direction": tr["Direction"],

                "EntryDate": tr["EntryDate"],
                "EntryPrice": float(tr["EntryPrice"]),

                "ExitDate_Original": orig_exit_date,
                "ExitPrice_Original": old_exit,

                "ExitTime_Trailing": new_exit_time,
                "ExitDate_Trailing": new_exit_date,
                "ExitPrice_Trailing": new_exit,
                "ExitReason": res.exit_reason,

                "MaxHigh_InWindow": res.max_high,
                "MinLow_InWindow": res.min_low,
                "BestFavorable_InWindow": res.best_favorable,

                "PriceImprovement": impr,
                "PctImprovement_vs_OriginalExit": (impr / old_exit) if old_exit != 0 else np.nan,

                "ClosedEarlier": (new_exit_date < orig_exit_date) if (pd.notna(new_exit_date) and pd.notna(orig_exit_date)) else np.nan
            })

        print(f"Processed ticker {tkr} | trades: {len(grp)} | total done: {len(out_rows)}/{total}")

    out = pd.DataFrame(out_rows)
    summ = summarize(out)

    # Save outputs
    out_path = os.path.join(OUT_DIR, "trades_trailing_capped_market_only.csv")
    summ_path = os.path.join(OUT_DIR, "summary_trailing_capped_market_only.csv")
    m5_stats_path = os.path.join(OUT_DIR, "m5_market_hours_filter_stats.csv")

    out.to_csv(out_path, index=False)
    summ.to_csv(summ_path, index=False)
    pd.DataFrame(m5_stats).to_csv(m5_stats_path, index=False)

    print("\nSaved:")
    print(out_path)
    print(summ_path)
    print(m5_stats_path)

    print("\n=== SUMMARY ===")
    print(summ.to_string(index=False))

if __name__ == "__main__":
    main()


Running: CAPPED ONLY + MARKET HOURS ONLY
Market session: 14:30 to 21:00
Output: D:/work/Client/Maatra/Trade Level Data\TrailingSL_CAPPED_ONLY_MKT1430_2100_FIXEDDIR

Coverage:
 Raw_Unique_TradeIDs  Extracted_Unique_TradeIDs  Trades_With_All_Core_Fields  Trades_Missing_Something  Trades_Missing_M5  Trades_Unknown_Direction  Unique_M5_Tickers_Available  Unique_M5_Tickers_Used
               14002                      14002                        11578                      2424               2174                         0                          131                     160

Trades to evaluate: 11578
Processed ticker ACN | trades: 7 | total done: 7/11578
Processed ticker ADBE | trades: 83 | total done: 90/11578
Processed ticker ADI | trades: 273 | total done: 363/11578
Processed ticker ADP | trades: 124 | total done: 487/11578
Processed ticker ADSK | trades: 94 | total done: 581/11578
Processed ticker AG | trades: 11 | total done: 592/11578
Processed ticker AJG | trades: 230 | total done: 

In [8]:
import pandas as pd
import numpy as np

FILE = r"D:/work/Client/Maatra/Trade Level Data/TrailingSL_CAPPED_ONLY_MKT1430_2100_FIXEDDIR/trades_trailing_capped_market_only.csv"

df = pd.read_csv(FILE)

# keep valid percentage rows
df = df[df["PctImprovement_vs_OriginalExit"].notna()]

improved = df[df["PctImprovement_vs_OriginalExit"] > 0]
worse    = df[df["PctImprovement_vs_OriginalExit"] < 0]

summary = {
    "Total_Trades": len(df),

    "Improved_Trades": len(improved),
    "Worse_Trades": len(worse),

    "Pct_Improved": len(improved) / len(df),
    "Pct_Worse": len(worse) / len(df),

    # headline number
    "Avg_Pct_Impact_All_Trades": df["PctImprovement_vs_OriginalExit"].mean(),

    # payoff asymmetry
    "Avg_Pct_Gain_Improved_Trades": improved["PctImprovement_vs_OriginalExit"].mean(),
    "Avg_Pct_Loss_Worse_Trades": worse["PctImprovement_vs_OriginalExit"].mean(),

    # robustness check
    "Median_Pct_Impact_All_Trades": df["PctImprovement_vs_OriginalExit"].median(),

    "Payoff_Ratio_(AvgGain/AvgLoss)": (
        improved["PctImprovement_vs_OriginalExit"].mean()
        / abs(worse["PctImprovement_vs_OriginalExit"].mean())
        if len(worse) > 0 else np.nan
    )
}

pd.DataFrame([summary]).T


Unnamed: 0,0
Total_Trades,11578.0
Improved_Trades,5212.0
Worse_Trades,5386.0
Pct_Improved,0.450164
Pct_Worse,0.465193
Avg_Pct_Impact_All_Trades,0.00085
Avg_Pct_Gain_Improved_Trades,0.012044
Avg_Pct_Loss_Worse_Trades,-0.009827
Median_Pct_Impact_All_Trades,0.0
Payoff_Ratio_(AvgGain/AvgLoss),1.225528
