In [1]:
# === SYSTEM & IMPORTS ===
# Block 6: Der Realitäts-Check (Costed Backtest)
#
# Hintergrund: Viele Trading-Strategien sehen "auf dem Papier" (ohne Kosten) gut aus.
# Sobald man Transaktionskosten (Gebühren + Slippage) abzieht, verschwinden die Gewinne oft.
# Dieses Notebook simuliert den Handel unter realistischen Bedingungen.

import os, json, math
from pathlib import Path

# Datenanalyse
import numpy as np
import pandas as pd

# Visualisierung
import matplotlib.pyplot as plt

In [2]:
# === KOSTEN-MODELLE DEFINIEREN ===
# Wir rechnen in "Basispunkten" (bps). 1 bp = 0.01%.

# 1. Roundtrip-Kosten: Gebühren für Kauf + Verkauf zusammen.
# 15 bps = 0.15%. Das ist typisch für günstige Retail-Broker (Spread + Commission).
ROUNDTRIP_BPS_DEFAULT = 15.0

# 2. Slippage: Der Markt bewegt sich oft gegen uns, während wir kaufen.
# "2.0 bps per leg" bedeutet, wir zahlen beim Einstieg 0.02% mehr und beim Ausstieg kriegen wir 0.02% weniger.
SLIPPAGE_BPS_PER_LEG = 2.0

# Sensitivitäts-Analyse: Wir testen verschiedene Kosten-Szenarien, um zu sehen, wie robust die Strategie ist.
SENSI_BPS = [5.0, 10.0, 15.0, 25.0, 50.0, 100.0]

In [3]:
# === ARTEFAKTE LADEN ===
# Wir holen uns die Ergebnisse des letzten Runs.

ROOT = Path("..")
with open(ROOT/"config.json","r") as f:
    C = json.load(f)

# Neuesten LSTM-Run suchen
RESULTS_DIR = Path(C.get("results_dir","../results"))
runs = sorted(RESULTS_DIR.glob("*_lstm"), key=lambda p: p.stat().st_mtime, reverse=True)
assert runs, "Kein *_lstm Run-Ordner gefunden. Bitte erst trainieren!"
RUN_DIR = runs[0]

# Run-Config laden
with open(RUN_DIR/"config.json","r") as f:
    RCFG = json.load(f)

# Metadaten wiederherstellen
TRAIN_CSV = Path(RCFG["train_csv"])
H = int(RCFG["horizon"])    # Haltedauer
LOOKBACK = int(RCFG["lookback"])

# Vorhersagen (Test-Set) laden, die in Block 4 gespeichert wurden
preds = pd.read_csv(RUN_DIR/"preds_test.csv", parse_dates=["timestamp"]).set_index("timestamp").sort_index()

# Original-Marktdaten laden (Close-Preise)
df = pd.read_csv(TRAIN_CSV, index_col=0, parse_dates=True).sort_index()
close = df["close"].reindex(preds.index)
assert close.notna().all(), "Fehlende Close-Preise im Test-Zeitraum!"

# Den optimierten Threshold aus Block 4 laden
with open(RUN_DIR/"evaluation.json","r") as f:
    EVAL = json.load(f)
thr = float(EVAL["threshold_selection"]["threshold"])

# Signale generieren: Alles über Threshold ist ein Kauf-Signal (1)
proba_used = preds["y_proba_used"].values
signals_t  = (proba_used >= thr).astype(int)

# WICHTIG: T+1 Logik!
# Wenn wir heute Abend (t) das Signal berechnen (auf Basis des Schlusskurses),
# können wir frühestens morgen früh (t+1) kaufen.
signals_t1 = pd.Series(signals_t, index=preds.index).shift(1).fillna(0).astype(int).values

In [4]:
# === BACKTEST ENGINE ===
# Wir implementieren zwei Backtest-Logicen:
# A) Realistisch: T+1 Entry, Kosten, Keine Positions-Überlappung.
# B) Theoretisch: T0 Entry (Referenz, "Upper Bound").

def backtest_t1_no_overlap(close: pd.Series, signals, H: int,
                           rt_bps: float = 15.0, slip_bps_per_leg: float = 2.0):
    """
    Simuliert realistischen Handel:
    - Wir steigen bei Open von Tag t+1 ein (approximiert durch Close t bis Close t+1 Return).
    - Wir halten H Tage.
    - 'No Overlap': Wenn wir schon investiert sind, ignorieren wir neue Signale, bis wir verkauft haben.
    """
    # Log-Returns des Marktes
    r = np.log(close).diff().fillna(0.0).values
    
    # Gesamtkosten pro Trade (Entry + Exit) als dezimaler Abschlag
    # RT-BPS wird halbiert (halbe Gebühr bei Kauf, halbe bei Verkauf) + Slippage jedes Mal
    entry_cost = (rt_bps/2.0 + slip_bps_per_leg) / 1e4
    exit_cost  = entry_cost

    pos = np.zeros_like(r, dtype=int) # Array für Positionen (1=Investiert, 0=Cash)
    i = 0
    while i < len(r) - 1:
        if signals[i] == 1:
            # Signal ist da -> Trade startet morgen (i+1)
            start = i + 1
            # Trade endet nach H Tagen (oder am Ende der Daten)
            end   = min(i + H, len(r) - 1)
            
            # Position markieren
            pos[start:end+1] = 1 
            
            # Wir springen direkt zum Ende des Trades (keine neuen Signale währenddessen)
            i = end + 1
        else:
            i += 1

    # Rendite berechnen: Markt-Return * Position
    net = pos * r
    
    # Kosten abziehen: Immer wenn sich pos von 0 auf 1 ändert (Entry) oder 1 auf 0 (Exit)
    pos_prev = np.r_[0, pos[:-1]]
    entries = (pos == 1) & (pos_prev == 0)
    exits   = (pos == 0) & (pos_prev == 1)
    
    # Kosten mindern die Log-Rendite (log(1-cost) ist approx -cost)
    net = net + entries * np.log(1 - entry_cost) + exits * np.log(1 - exit_cost)
    
    # Equity Curve (Kumulierte Rendite)
    eq = np.exp(np.cumsum(net))
    return pd.Series(eq, index=close.index), pd.Series(net, index=close.index), pd.Series(pos, index=close.index)

def backtest_t0_upper_bound(close: pd.Series, signals, H: int,
                            rt_bps: float = 15.0, slip_bps_per_leg: float = 2.0):
    """
    Optimistisches Szenario:
    - Wir handeln SOFORT zum Schlusskurs (t=0), wenn das Signal kommt.
    - Das ist in der Praxis fast unmöglich, zeigt aber das theoretische Potential.
    """
    r = np.log(close).diff().fillna(0.0).values
    entry_cost = (rt_bps/2.0 + slip_bps_per_leg) / 1e4
    exit_cost  = entry_cost

    pos = np.zeros_like(r, dtype=int)
    i = 0
    while i < len(r):
        if signals[i] == 1:
            start = i  # Sofort rein
            end   = min(i + H - 1, len(r) - 1)
            pos[start:end+1] = 1
            i = end + 1
        else:
            i += 1

    net = pos * r
    pos_prev = np.r_[0, pos[:-1]]
    entries = (pos == 1) & (pos_prev == 0)
    exits   = (pos == 0) & (pos_prev == 1)
    net = net + entries * np.log(1 - entry_cost) + exits * np.log(1 - exit_cost)
    eq = np.exp(np.cumsum(net))
    return pd.Series(eq, index=close.index), pd.Series(net, index=close.index), pd.Series(pos, index=close.index)

In [5]:
# === KPI BERECHNUNG ===
# Standard-Funktionen für Finanz-Kennzahlen

def _cagr(eq: pd.Series, periods_per_year=252):
    # Compound Annual Growth Rate: Jährliche Wachstumsrate
    eq = eq.dropna()
    if len(eq) < 2: return 0.0
    T = len(eq) / periods_per_year
    if T < 1e-12: return 0.0
    return float((eq.iloc[-1] / eq.iloc[0])**(1.0/T) - 1.0)

def _sharpe(net_logrets: pd.Series, periods_per_year=252):
    # Sharpe Ratio: Rendite / Volatilität
    lr = pd.Series(net_logrets).dropna()
    if len(lr) < 2: return 0.0
    mu = lr.mean() * periods_per_year
    sd = lr.std(ddof=1) * math.sqrt(periods_per_year)
    return float(mu / (sd + 1e-12))

def _max_dd(eq: pd.Series):
    # Maximum Drawdown: Größter Verlust von einem Höchststand
    cum = np.log(eq.values)
    peak = np.maximum.accumulate(cum)
    dd = np.exp(cum - peak) - 1.0
    return float(dd.min())

def _exposure(pos: pd.Series):
    # Zeitanteil im Markt
    return float((pos > 0).mean())

def _turnover(pos: pd.Series):
    # Umschlagshäufigkeit (Wie oft kaufen/verkaufen wir?)
    d = pos.diff().fillna(0).abs()
    return float(d.sum())

def _trade_stats(net_logrets: pd.Series, pos: pd.Series):
    # Detaillierte Analyse pro Trade (Gewinn/Verlust p. Trade)
    p = pos.astype(int).values
    entries = np.where((p == 1) & (np.r_[0, p[:-1]] == 0))[0]
    exits   = np.where((p == 0) & (np.r_[0, p[:-1]] == 1))[0] - 1
    
    # Falls letzter Trade noch offen ist
    if len(exits) < len(entries):
        exits = np.r_[exits, len(p) - 1]
        
    pnls = []
    for s, e in zip(entries, exits):
        if e >= s:
            pnls.append(float(net_logrets.iloc[s:e+1].sum()))
            
    if not pnls:
        return dict(n_trades=0, hit_rate=None, median=None, iqr=None)
        
    pnls = np.array(pnls)
    hit_rate = float((pnls > 0).mean())
    q25, q50, q75 = np.percentile(pnls, [25, 50, 75]) # Quartile
    return dict(n_trades=int(len(pnls)), hit_rate=hit_rate, median=float(q50), iqr=float(q75 - q25))

# Bootstrapping für Signifikanz-Tests
def _block_bootstrap_stats(net_logrets: pd.Series, block: int, n=500, seed=42, periods_per_year=252):
    rng = np.random.default_rng(seed)
    lr = net_logrets.dropna().values
    if len(lr) == 0: return {"CAGR_CI": [0,0,0], "Sharpe_CI": [0,0,0]}
    
    cagr_vals, sharpe_vals = [], []
    idx = np.arange(len(lr))
    for _ in range(n):
        # Block-Sampling
        starts = rng.integers(0, max(1, len(idx)-block+1), size=max(1, len(idx)//block))
        bs_idx = np.concatenate([np.arange(s, min(s+block, len(idx))) for s in starts])
        
        lr_bs = lr[bs_idx]
        eq_bs = np.exp(np.cumsum(lr_bs))
        
        # Metrics für diesen Bootstrap-Sample
        T = len(lr_bs) / periods_per_year
        if T > 1e-12:
             cagr = (eq_bs[-1] / eq_bs[0])**(1.0/T) - 1.0
        else:
             cagr = 0.0
             
        mu = lr_bs.mean() * periods_per_year
        sd = lr_bs.std(ddof=1) * math.sqrt(periods_per_year)
        sh = mu / (sd + 1e-12)
        
        cagr_vals.append(float(cagr))
        sharpe_vals.append(float(sh))
        
    # Konfidenzintervalle (2.5% - 97.5%)
    return {
        "CAGR_CI": list(np.percentile(cagr_vals, [2.5, 50, 97.5]).astype(float)),
        "Sharpe_CI": list(np.percentile(sharpe_vals, [2.5, 50, 97.5]).astype(float))
    }

In [6]:
# === 1) HAUPT-SZENARIO AUSWERTEN ===
main_rt = ROUNDTRIP_BPS_DEFAULT

# Backtests laufen lassen
eq_t1, net_t1, pos_t1 = backtest_t1_no_overlap(close, signals_t1, H, main_rt, SLIPPAGE_BPS_PER_LEG)
eq_t0, net_t0, pos_t0 = backtest_t0_upper_bound(close, signals_t,  H, main_rt, SLIPPAGE_BPS_PER_LEG)

# Statistiken berechnen
stats_t1 = dict(
    CAGR=_cagr(eq_t1), Sharpe=_sharpe(net_t1), MaxDD=_max_dd(eq_t1),
    final_equity=float(eq_t1.iloc[-1]),
    exposure=_exposure(pos_t1), turnover=_turnover(pos_t1),
    **_trade_stats(net_t1, pos_t1)
)

stats_t0 = dict(
    CAGR=_cagr(eq_t0), Sharpe=_sharpe(net_t0), MaxDD=_max_dd(eq_t0),
    final_equity=float(eq_t0.iloc[-1]),
    exposure=_exposure(pos_t0), turnover=_turnover(pos_t0),
    **_trade_stats(net_t0, pos_t0)
)

# Bootstrap (nur für T+1 wichtig)
cis = _block_bootstrap_stats(net_t1, block=LOOKBACK, n=400, seed=int(C.get("seed",42)))

print(f"[Block 6] Hauptszenario RT={main_rt:.0f} bps | Slippage/Leg={SLIPPAGE_BPS_PER_LEG:.1f} bps")
print("Entry@t (Idealisiert/Upper Bound):", stats_t0)
print("Entry@t+1 (Realistisch):          ", stats_t1)
print("Bootstrap CIs (T+1):              ", cis)

[Block 6] Hauptszenario RT=15 bps | Slippage/Leg=2.0 bps
Entry@t (Idealisiert/Upper Bound): {'CAGR': -0.07346389873754433, 'Sharpe': -0.36552054075039964, 'MaxDD': -0.32908351184333384, 'final_equity': 0.848098834338252, 'exposure': 0.48613678373382624, 'turnover': 55.0, 'n_trades': 28, 'hit_rate': 0.39285714285714285, 'median': -0.007701402910465137, 'iqr': 0.03451749510365243}
Entry@t+1 (Realistisch):           {'CAGR': -0.15901528235344375, 'Sharpe': -1.3231675065259005, 'MaxDD': -0.34251476083971655, 'final_equity': 0.6894982865603037, 'exposure': 0.2587800369685767, 'turnover': 279.0, 'n_trades': 140, 'hit_rate': 0.5142857142857142, 'median': 0.00023581386747410066, 'iqr': 0.012839448484275096}
Bootstrap CIs (T+1):               {'CAGR_CI': [-0.35107020824173774, -0.1664297065604906, 0.025432763789283445], 'Sharpe_CI': [-2.768300865377776, -1.39743039605241, 0.18425905923048091]}


In [7]:
# === 2) SENSITIVITÄT ÜBER KOSTEN ===
# Was passiert, wenn die Kosten höher/niedriger sind?

rows = []
for rt in SENSI_BPS:
    eqB, netB, posB = backtest_t1_no_overlap(close, signals_t1, H, rt, SLIPPAGE_BPS_PER_LEG)
    tr_stats = _trade_stats(netB, posB)
    rows.append(dict(
        model="Entry@t+1 (No-Overlap)", 
        roundtrip_bps=rt,
        trades=tr_stats["n_trades"], 
        exposure=_exposure(posB), 
        turnover=_turnover(posB),
        CAGR=_cagr(eqB), 
        Sharpe=_sharpe(netB), 
        MaxDD=_max_dd(eqB), 
        final_equity=float(eqB.iloc[-1])
    ))

sensi = pd.DataFrame(rows).sort_values(["roundtrip_bps"])
sensi_path = RUN_DIR/"cost_sensitivity.csv"
sensi.to_csv(sensi_path, index=False)

print("\nSensitivität (bps) – T+1, No-Overlap:")
print(sensi.to_string(index=False, float_format=lambda x: f"{x:,.4f}"))


Sensitivität (bps) – T+1, No-Overlap:
                 model  roundtrip_bps  trades  exposure  turnover    CAGR  Sharpe   MaxDD  final_equity
Entry@t+1 (No-Overlap)         5.0000     140    0.2588  279.0000 -0.1025 -0.8280 -0.2567        0.7928
Entry@t+1 (No-Overlap)        10.0000     140    0.2588  279.0000 -0.1312 -1.0759 -0.2979        0.7394
Entry@t+1 (No-Overlap)        15.0000     140    0.2588  279.0000 -0.1590 -1.3232 -0.3425        0.6895
Entry@t+1 (No-Overlap)        25.0000     140    0.2588  279.0000 -0.2120 -1.8149 -0.4248        0.5996
Entry@t+1 (No-Overlap)        50.0000     140    0.2588  279.0000 -0.3304 -3.0212 -0.5883        0.4228
Entry@t+1 (No-Overlap)       100.0000     140    0.2588  279.0000 -0.5167 -5.2767 -0.7920        0.2099


In [8]:
# === PLOTS: EQUITY CURVES ===
figdir = RUN_DIR/"figures"
figdir.mkdir(parents=True, exist_ok=True)

plt.figure(figsize=(9,4))
plt.plot(eq_t0.index, eq_t0.values,  label=f"Entry@t (Ideal, {main_rt:.0f}bps)", alpha=0.6)
plt.plot(eq_t1.index, eq_t1.values,  label=f"Entry@t+1 (Real, {main_rt:.0f}bps)", linewidth=2)

# Buy & Hold als Benchmark (ohne Kosten simuliert)
bh_log = (np.log(close) - np.log(close.iloc[0])).fillna(0.0)
bh_eq  = np.exp(bh_log)
plt.plot(eq_t1.index, bh_eq.reindex(eq_t1.index), label="Buy & Hold", linestyle="--", color="gray")

plt.title(f"Equity Curve (H={H}) – Real vs. Ideal")
plt.ylabel("Wertentwicklung (Start=1.0)")
plt.legend()
plt.tight_layout()
plt.savefig(figdir/"equity_costed.png", dpi=160)
plt.close()

In [9]:
# === ERGEBNIS SPEICHERN ===
# Wir hängen die neuen Kostenergebnisse an das evaluation.json an.

cost_block = {
    "roundtrip_bps_default": main_rt,
    "slippage_bps_per_leg": SLIPPAGE_BPS_PER_LEG,
    "upper_bound_entry_t":  {"note": "nicht handelbar", **stats_t0},
    "final_kpi_entry_t1":   {"note": "realistisch, T+1, No-Overlap", **stats_t1, **cis},
    "sensitivity_csv": str(sensi_path.as_posix()),
    "equity_costed_png": str((figdir/"equity_costed.png").as_posix())
}

EVAL["backtest_costs"] = cost_block
EVAL.setdefault("report_notes", {})
EVAL["report_notes"].update({
    "kpi_basis": "entry_t1_no_overlap",
    "entry_t_is_upper_bound": True
})

with open(RUN_DIR/"evaluation.json","w") as f:
    json.dump(EVAL, f, indent=2)

print("\nBlock 6 abgeschlossen →")
print(" - figures/equity_costed.png")
print(" - cost_sensitivity.csv")
print(" - evaluation.json (aktualisiert)")


Block 6 abgeschlossen →
 - figures/equity_costed.png
 - cost_sensitivity.csv
 - evaluation.json (aktualisiert)
