In [1]:
import os, json, math
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
# -------------------- Defaults & Szenarien --------------------
ROUNDTRIP_BPS_DEFAULT = 15.0
SENSI_BPS = [5.0, 10.0, 15.0, 25.0, 50.0, 100.0]
SLIPPAGE_BPS_PER_LEG = 2.0

In [3]:
# -------------------- Artefakte laden --------------------
ROOT = Path("..")
with open(ROOT/"config.json","r") as f:
    C = json.load(f)

RESULTS_DIR = Path(C.get("results_dir","../results"))
runs = sorted(RESULTS_DIR.glob("*_lstm"), key=lambda p: p.stat().st_mtime, reverse=True)
assert runs, "Kein *_lstm Run-Ordner gefunden."
RUN_DIR = runs[0]

with open(RUN_DIR/"config.json","r") as f:
    RCFG = json.load(f)

TRAIN_CSV = Path(RCFG["train_csv"])
H = int(RCFG["horizon"])
LOOKBACK = int(RCFG["lookback"])

preds = pd.read_csv(RUN_DIR/"preds_test.csv", parse_dates=["timestamp"]).set_index("timestamp").sort_index()
df = pd.read_csv(TRAIN_CSV, index_col=0, parse_dates=True).sort_index()

close = df["close"].reindex(preds.index)
assert close.notna().all(), "Close-Preise fehlen auf Test-Index."

with open(RUN_DIR/"evaluation.json","r") as f:
    EVAL = json.load(f)
thr = float(EVAL["threshold_selection"]["threshold"])

proba_used = preds["y_proba_used"].values
signals_t  = (proba_used >= thr).astype(int)
signals_t1 = pd.Series(signals_t, index=preds.index).shift(1).fillna(0).astype(int).values  # T+1 handelbar

In [4]:
# -------------------- Kosten & Backtest (No-Overlap) --------------------
# CHANGE: neue realistische Backtest-Logik (T+1, Non-Overlapping Positions)
def backtest_t1_no_overlap(close: pd.Series, signals, H: int,
                           rt_bps: float = 15.0, slip_bps_per_leg: float = 2.0):
    """
    Handel erst am Tag t+1 nach Signal(t). Halte maximal H Tage. Keine Überlappung.
    Kosten: pro Leg (roundtrip/2 + slippage_per_leg) in bps, als Log-Abschlag.
    """
    r = np.log(close).diff().fillna(0.0).values
    entry_cost = (rt_bps/2.0 + slip_bps_per_leg) / 1e4
    exit_cost  = entry_cost

    pos = np.zeros_like(r, dtype=int)
    i = 0
    while i < len(r) - 1:
        if signals[i] == 1:
            start = i + 1                 # Entry @ t+1
            end   = min(i + H, len(r) - 1)  # Halte bis inkl. end (max H Tage)
            pos[start:end+1] = 1
            i = end + 1                    # Non-overlap
        else:
            i += 1

    net = pos * r
    pos_prev = np.r_[0, pos[:-1]]
    entries = (pos == 1) & (pos_prev == 0)
    exits   = (pos == 0) & (pos_prev == 1)
    # Log-Kosten addieren
    net = net + entries * np.log(1 - entry_cost) + exits * np.log(1 - exit_cost)
    eq = np.exp(np.cumsum(net))
    return pd.Series(eq, index=close.index), pd.Series(net, index=close.index), pd.Series(pos, index=close.index)

# CHANGE: optionaler Upper-Bound (nicht handelbar): Entry noch am selben Tag (nur als Referenzlinie)
def backtest_t0_upper_bound(close: pd.Series, signals, H: int,
                            rt_bps: float = 15.0, slip_bps_per_leg: float = 2.0):
    r = np.log(close).diff().fillna(0.0).values
    entry_cost = (rt_bps/2.0 + slip_bps_per_leg) / 1e4
    exit_cost  = entry_cost

    pos = np.zeros_like(r, dtype=int)
    i = 0
    while i < len(r):
        if signals[i] == 1:
            start = i                      # unrealistisch: Entry @ t
            end   = min(i + H - 1, len(r) - 1)
            pos[start:end+1] = 1
            i = end + 1
        else:
            i += 1

    net = pos * r
    pos_prev = np.r_[0, pos[:-1]]
    entries = (pos == 1) & (pos_prev == 0)
    exits   = (pos == 0) & (pos_prev == 1)
    net = net + entries * np.log(1 - entry_cost) + exits * np.log(1 - exit_cost)
    eq = np.exp(np.cumsum(net))
    return pd.Series(eq, index=close.index), pd.Series(net, index=close.index), pd.Series(pos, index=close.index)

In [5]:
# -------------------- Kennzahlen & Auswertung --------------------
def _cagr(eq: pd.Series, periods_per_year=252):
    eq = eq.dropna()
    if len(eq) < 2:
        return 0.0
    T = len(eq) / periods_per_year
    return float((eq.iloc[-1] / eq.iloc[0])**(1.0/max(T,1e-12)) - 1.0)

def _sharpe(net_logrets: pd.Series, periods_per_year=252):
    lr = pd.Series(net_logrets).dropna()
    if len(lr) < 2:
        return 0.0
    mu = lr.mean() * periods_per_year
    sd = lr.std(ddof=1) * math.sqrt(periods_per_year)
    return float(mu / (sd + 1e-12))

def _max_dd(eq: pd.Series):
    cum = np.log(eq.values)
    peak = np.maximum.accumulate(cum)
    dd = np.exp(cum - peak) - 1.0
    return float(dd.min())

# CHANGE: zusätzliche Kennzahlen
def _exposure(pos: pd.Series):
    return float((pos > 0).mean())

def _turnover(pos: pd.Series):
    d = pos.diff().fillna(0).abs()
    return float(d.sum())   # ≈ #entries + #exits

def _trade_stats(net_logrets: pd.Series, pos: pd.Series):
    # Trades via Pos-Wechsel identifizieren
    p = pos.astype(int).values
    entries = np.where((p == 1) & (np.r_[0, p[:-1]] == 0))[0]
    exits   = np.where((p == 0) & (np.r_[0, p[:-1]] == 1))[0] - 1
    # Falls Position bis zum Ende offen ist
    if len(exits) < len(entries):
        exits = np.r_[exits, len(p) - 1]
    # Trade-PnLs summieren (Netto-Logs)
    pnls = []
    for s, e in zip(entries, exits):
        if e >= s:
            pnls.append(float(net_logrets.iloc[s:e+1].sum()))
    if not pnls:
        return dict(n_trades=0, hit_rate=None, median=None, iqr=None)
    pnls = np.array(pnls)
    hit_rate = float((pnls > 0).mean())
    q25, q50, q75 = np.percentile(pnls, [25, 50, 75])
    return dict(n_trades=int(len(pnls)), hit_rate=hit_rate, median=float(q50), iqr=float(q75 - q25))

# CHANGE: Bootstrap-CIs (Block-Bootstrap) für CAGR/Sharpe
def _block_bootstrap_stats(net_logrets: pd.Series, block: int, n=500, seed=42, periods_per_year=252):
    rng = np.random.default_rng(seed)
    lr = net_logrets.dropna().values
    if len(lr) == 0:
        return {"CAGR_CI": [0,0,0], "Sharpe_CI": [0,0,0]}
    idx = np.arange(len(lr))
    cagr_vals, sharpe_vals = [], []
    for _ in range(n):
        starts = rng.integers(0, max(1, len(idx)-block+1), size=max(1, len(idx)//block))
        bs_idx = np.concatenate([np.arange(s, min(s+block, len(idx))) for s in starts])
        lr_bs = lr[bs_idx]
        eq_bs = np.exp(np.cumsum(lr_bs))
        # CAGR/Sharpe auf Bootstrapped Sequenz
        T = len(lr_bs) / periods_per_year
        cagr = (eq_bs[-1] / eq_bs[0])**(1.0/max(T,1e-12)) - 1.0 if len(eq_bs) > 1 else 0.0
        mu = lr_bs.mean() * periods_per_year
        sd = lr_bs.std(ddof=1) * math.sqrt(periods_per_year)
        sh = mu / (sd + 1e-12) if len(lr_bs) > 1 else 0.0
        cagr_vals.append(float(cagr)); sharpe_vals.append(float(sh))
    return {
        "CAGR_CI": list(np.percentile(cagr_vals, [2.5, 50, 97.5]).astype(float)),
        "Sharpe_CI": list(np.percentile(sharpe_vals, [2.5, 50, 97.5]).astype(float))
    }

In [6]:
# -------------------- Hauptszenario (nur T+1 als KPI) --------------------
main_rt = ROUNDTRIP_BPS_DEFAULT
eq_t1, net_t1, pos_t1 = backtest_t1_no_overlap(close, signals_t1, H, main_rt, SLIPPAGE_BPS_PER_LEG)  # CHANGE: realistische KPI
eq_t0, net_t0, pos_t0 = backtest_t0_upper_bound(close, signals_t,  H, main_rt, SLIPPAGE_BPS_PER_LEG)  # CHANGE: Upper bound / nicht handelbar

# KPIs (T+1 = final)
stats_t1 = dict(
    CAGR=_cagr(eq_t1), Sharpe=_sharpe(net_t1), MaxDD=_max_dd(eq_t1),
    final_equity=float(eq_t1.iloc[-1]),
    exposure=_exposure(pos_t1), turnover=_turnover(pos_t1),
    **_trade_stats(net_t1, pos_t1)
)

# Referenz (T=upper bound) – nur zur Illustration
stats_t0 = dict(
    CAGR=_cagr(eq_t0), Sharpe=_sharpe(net_t0), MaxDD=_max_dd(eq_t0),
    final_equity=float(eq_t0.iloc[-1]),
    exposure=_exposure(pos_t0), turnover=_turnover(pos_t0),
    **_trade_stats(net_t0, pos_t0)
)

# Bootstrap-CIs (auf T+1, realistisch)
cis = _block_bootstrap_stats(net_t1, block=LOOKBACK, n=400, seed=int(C.get("seed",42)))

print(f"[Block 6] Hauptszenario RT={main_rt:.0f} bps | Slippage/Leg={SLIPPAGE_BPS_PER_LEG:.1f} bps")
print("Entry@t (Upper bound / nicht handelbar) :", stats_t0)
print("Entry@t+1 (KPI, realistisch, No-Overlap):", stats_t1)
print("Bootstrap CIs (T+1):", cis)

[Block 6] Hauptszenario RT=15 bps | Slippage/Leg=2.0 bps
Entry@t (Upper bound / nicht handelbar) : {'CAGR': -0.19250383943113258, 'Sharpe': -1.2077426885658606, 'MaxDD': -0.39524290881273394, 'final_equity': 0.6808158334182854, 'exposure': 0.4424778761061947, 'turnover': 58.0, 'n_trades': 30, 'hit_rate': 0.36666666666666664, 'median': -0.009358170914656593, 'iqr': 0.04526691195369392}
Entry@t+1 (KPI, realistisch, No-Overlap): {'CAGR': -0.26856835112502364, 'Sharpe': -2.5388277543788846, 'MaxDD': -0.4296133004104955, 'final_equity': 0.5706569826237805, 'exposure': 0.23893805309734514, 'turnover': 215.0, 'n_trades': 108, 'hit_rate': 0.48148148148148145, 'median': -0.000492573691746233, 'iqr': 0.015144935334914011}
Bootstrap CIs (T+1): {'CAGR_CI': [np.float64(-0.47245000766429585), np.float64(-0.2639489928966192), np.float64(-0.08125472732298347)], 'Sharpe_CI': [np.float64(-3.8513543794216427), np.float64(-2.502157355190083), np.float64(-0.7598554589299138)]}


In [7]:
# -------------------- Sensitivität (bps), T+1-only --------------------
rows = []
for rt in SENSI_BPS:
    eqB, netB, posB = backtest_t1_no_overlap(close, signals_t1, H, rt, SLIPPAGE_BPS_PER_LEG)
    tr_stats = _trade_stats(netB, posB)
    rows.append(dict(
        model="Entry@t+1 (No-Overlap)", roundtrip_bps=rt,
        trades=tr_stats["n_trades"], exposure=_exposure(posB), turnover=_turnover(posB),
        CAGR=_cagr(eqB), Sharpe=_sharpe(netB), MaxDD=_max_dd(eqB), final_equity=float(eqB.iloc[-1])
    ))

sensi = pd.DataFrame(rows).sort_values(["roundtrip_bps"])
sensi_path = RUN_DIR/"cost_sensitivity.csv"
sensi.to_csv(sensi_path, index=False)

print("\nSensitivität (bps) – T+1, No-Overlap:")
print(sensi.to_string(index=False, float_format=lambda x: f"{x:,.4f}"))


Sensitivität (bps) – T+1, No-Overlap:
                 model  roundtrip_bps  trades  exposure  turnover    CAGR  Sharpe   MaxDD  final_equity
Entry@t+1 (No-Overlap)         5.0000     108    0.2389  215.0000 -0.2234 -2.0621 -0.3658        0.6355
Entry@t+1 (No-Overlap)        10.0000     108    0.2389  215.0000 -0.2463 -2.3013 -0.3985        0.6022
Entry@t+1 (No-Overlap)        15.0000     108    0.2389  215.0000 -0.2686 -2.5388 -0.4296        0.5707
Entry@t+1 (No-Overlap)        25.0000     108    0.2389  215.0000 -0.3112 -3.0081 -0.4876        0.5124
Entry@t+1 (No-Overlap)        50.0000     108    0.2389  215.0000 -0.4072 -4.1410 -0.6086        0.3914
Entry@t+1 (No-Overlap)       100.0000     108    0.2389  215.0000 -0.5612 -6.1880 -0.7718        0.2282


In [8]:
# -------------------- Plots --------------------
figdir = RUN_DIR/"figures"
figdir.mkdir(parents=True, exist_ok=True)

plt.figure(figsize=(9,4))
plt.plot(eq_t0.index, eq_t0.values,  label=f"Entry@t (Upper bound / nicht handelbar, {main_rt:.0f}bps, slip {SLIPPAGE_BPS_PER_LEG:.0f}/leg)")
plt.plot(eq_t1.index, eq_t1.values,  label=f"Entry@t+1 (KPI, No-Overlap, {main_rt:.0f}bps, slip {SLIPPAGE_BPS_PER_LEG:.0f}/leg)")
bh_log = (np.log(close) - np.log(close.iloc[0])).fillna(0.0)
bh_eq  = np.exp(bh_log)
plt.plot(eq_t1.index, bh_eq.reindex(eq_t1.index), label="Buy & Hold", linestyle="--")
plt.title(f"Equity (H={H}) – Upper bound vs. KPI (realistisch)")
plt.legend(); plt.tight_layout()
plt.savefig(figdir/"equity_costed.png", dpi=160); plt.close()

In [9]:
# -------------------- evaluation.json anreichern --------------------
cost_block = {
    "roundtrip_bps_default": main_rt,
    "slippage_bps_per_leg": SLIPPAGE_BPS_PER_LEG,
    # CHANGE: Upper bound separat & eindeutig deklariert
    "upper_bound_entry_t":  {"note": "nicht handelbar", **stats_t0},
    # CHANGE: Final KPI ausschliesslich T+1 No-Overlap
    "final_kpi_entry_t1":   {"note": "realistisch, T+1, No-Overlap", **stats_t1, **cis},
    "sensitivity_csv": str(sensi_path.as_posix()),
    "equity_costed_png": str((figdir/"equity_costed.png").as_posix())
}
# Vorherige Backtest-Felder beibehalten, aber klarstellen, was 'final' ist
EVAL["backtest_costs"] = cost_block
EVAL.setdefault("report_notes", {})
EVAL["report_notes"].update({
    "kpi_basis": "entry_t1_no_overlap",    # CHANGE: explizit
    "entry_t_is_upper_bound": True
})

with open(RUN_DIR/"evaluation.json","w") as f:
    json.dump(EVAL, f, indent=2)

print("\nBlock 6 abgeschlossen →")
print(" - figures/equity_costed.png")
print(" - cost_sensitivity.csv")
print(" - evaluation.json (mit backtest_costs, KPI=T+1 No-Overlap)")


Block 6 abgeschlossen →
 - figures/equity_costed.png
 - cost_sensitivity.csv
 - evaluation.json (mit backtest_costs, KPI=T+1 No-Overlap)
