In [1]:
# %% [step0-setup]

from __future__ import annotations

# --- Reproduzierbares Setup, Importe, Logging, Pfade ---

import json
import logging
import math
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt

# SciPy / Stats für Signifikanztests

try:
    from statsmodels.stats.contingency_tables import mcnemar as sm_mcnemar
except Exception:
    sm_mcnemar = None  # Fallback später
try:
    from scipy import stats
except Exception:
    stats = None  # Fallback später

# YAML für Feature-Set-Config

try:
    import yaml
except Exception:
    yaml = None  # Falls nicht vorhanden, nur Heuristik verwenden

# Sklearn Metriken (nur Funktionen, KEINE seaborn)

from sklearn.metrics import (
    accuracy_score,
    brier_score_loss,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
    roc_auc_score,
)

# --- Seeds (deterministisch soweit möglich) ---

GLOBAL_SEED: int = 42
np.random.seed(GLOBAL_SEED)

# --- Logging konfigurieren ---

LOG_FMT = "%(asctime)s | %(levelname)s | %(message)s"
logging.basicConfig(level=logging.INFO, format=LOG_FMT)
logger = logging.getLogger("baselines10")

# --- Artefakt-Pfade einrichten ---

def find_project_root(start: Path) -> Path:
    start = start.resolve()
    for p in [start, *start.parents]:
        # Bevorzugt: Projekt hat 'config' und 'src' am Root
        if (p / "config").exists() and (p / "src").exists():
            return p
        # Fallback: es existiert 'artifacts/data' (z. B. bei deinem Pfad)
        if (p / "artifacts" / "data").exists():
            return p
    raise AssertionError("Project root not found – expected 'config'+'src' or 'artifacts/data' somewhere above.")

ROOT = find_project_root(Path.cwd())
ARTIFACTS = ROOT / "artifacts"
DATA_DIR = ARTIFACTS / "data"
CONF_DIR = ARTIFACTS / "config"
FORECASTS_DIR = ARTIFACTS / "forecasts"
METRICS_DIR = ARTIFACTS / "metrics"
REPORTS_DIR = ARTIFACTS / "reports"

for p in [DATA_DIR, CONF_DIR, FORECASTS_DIR, METRICS_DIR, REPORTS_DIR]:
    p.mkdir(parents=True, exist_ok=True)

# --- Feste Zeitfenster (Train/Test) ---

TRAIN_START = pd.Timestamp("2009-02-28")
TRAIN_END = pd.Timestamp("2019-12-31")
TEST_START = pd.Timestamp("2020-01-31")
TEST_END = pd.Timestamp("2025-05-31")

# --- Feste Vorgaben ---

HORIZON_MONTHS = 1  # 1-Schritt in die Zukunft (t -> t+1)
VAL_WINDOW = 12  # Validierungsfenster 12 Monate
EMBARGO = 1  # Embargo 1 Monat
N_FOLDS = 5  # Expanding 5-Fold

# --- Eingänge ---

FEATURES_PARQUET = DATA_DIR / "features_monthly.parquet"
RAW_PARQUET = DATA_DIR / "raw_data.parquet"
FEATURE_GROUPS_YAML = CONF_DIR / "feature_groups.yaml"

# --- Modell-/Feature-Set-Listen ---

BASELINE_MODELS = ["always_up", "majority_class", "persistence", "sma_crossover"]
FEATURE_SETS = ["TECH", "MACRO", "INTEGRATED"]

In [2]:
# %% [step1-daten_und_featuresets]

# Daten laden, Spalten prüfen, Feature-Sets laden/erstellen (YAML oder Heuristik)

def _ensure_datetime_index(df: pd.DataFrame, name: str) -> pd.DataFrame:
    """Sichert, dass ein DatetimeIndex vorliegt (sonst Fehler)."""
    if not isinstance(df.index, pd.DatetimeIndex):
        raise TypeError(f"{name} benötigt einen DatetimeIndex.")
    if not df.index.is_monotonic_increasing:
        df = df.sort_index()
    return df

def _read_parquet_safely(path: Path, name: str) -> pd.DataFrame:
    """Robustes Laden von Parquet mit klaren Fehlermeldungen."""
    if not path.exists():
        raise FileNotFoundError(f"Fehlende Datei: {path} ({name})")
    try:
        df = pd.read_parquet(path)
    except Exception as e:
        raise RuntimeError(f"Fehler beim Laden von {path}: {e}") from e
    return _ensure_datetime_index(df, name)

def _infer_price_column(df_raw: pd.DataFrame) -> str:
    """Heuristik zur Wahl der Preis-Spalte (S&P 500 o.ä.), bevorzugt GSPC/SP500/adjclose."""
    candidates_priority = [
        "SP500",
        "^GSPC_adjclose",
        "^GSPC",
        "GSPC",
        "sp500",
        "S&P500",
        "Adj Close",
        "Adj_Close",
        "Close",
        "close",
    ]
    # Direkte Treffer
    for c in candidates_priority:
        if c in df_raw.columns:
            return c
    # Heuristik: größte Autokorrelation (Preis-Level), numerische Spalten
    num_cols = [c for c in df_raw.columns if pd.api.types.is_numeric_dtype(df_raw[c])]
    if not num_cols:
        raise RuntimeError("Keine numerischen Spalten in raw_data gefunden.")
    ac_scores = {}
    for c in num_cols:
        s = df_raw[c].dropna()
        if len(s) < 24:
            continue
        ac = s.autocorr(lag=1)
        ac_scores[c] = ac if not np.isnan(ac) else -999.0
    if not ac_scores:
        raise RuntimeError("Konnte keine geeignete Preis-Spalte heuristisch bestimmen.")
    best = max(ac_scores, key=ac_scores.get)
    logger.warning(f"Preisspalte heuristisch gewählt: {best}")
    return best

def _load_or_infer_feature_sets(df_features: pd.DataFrame) -> Dict[str, List[str]]:
    """Lädt TECH/MACRO aus YAML, oder leitet sie heuristisch aus Spaltennamen ab."""
    target_cols = {"y_direction_next", "y_return_next_pct"}
    available = [c for c in df_features.columns if c not in target_cols]

    # YAML vorhanden?
    if FEATURE_GROUPS_YAML.exists() and yaml is not None:
        try:
            with open(FEATURE_GROUPS_YAML, "r", encoding="utf-8") as f:
                cfg = yaml.safe_load(f) or {}
            tech = cfg.get("TECH_FEATURES", [])
            macro = cfg.get("MACRO_FEATURES", [])
            # Plausibilisierung
            assert isinstance(tech, list) and isinstance(macro, list)
            # Filter auf vorhandene
            tech = [c for c in tech if c in available]
            macro = [c for c in macro if c in available and c not in tech]
            logger.info(f"Feature-Sets aus YAML geladen: TECH={len(tech)}, MACRO={len(macro)}")
            if not tech or not macro:
                logger.warning("YAML unvollständig – ergänze per Heuristik.")
                raise AssertionError
            return {"TECH": tech, "MACRO": macro, "INTEGRATED": sorted(set(tech + macro))}
        except Exception as e:
            logger.error(f"Feature-Gruppen YAML ungültig/fehlend, nutze Heuristik: {e}")

    # Heuristik: TECH Indikatoren vs. MACRO Indikatoren
    tech_patterns = ["SMA", "EMA", "MA", "Momentum", "Mom", "Volatility", "Vol", "Return_Lag", "RSI", "MACD", "Bollinger", "BB", "ATR", "Stoch"]
    macro_patterns = ["CPI", "Inflat", "Unemployment", "VIX", "EPU", "FSI", "Fed", "Funds", "Delta", "USD", "EUR", "WTI", "Gold", "oil", "Brent", "DGS", "Yield", "Rate"]

    tech: List[str] = []
    macro: List[str] = []
    for c in available:
        uc = c.upper()
        if any(pat.upper() in uc for pat in tech_patterns):
            tech.append(c)
        elif any(pat.upper() in uc for pat in macro_patterns):
            macro.append(c)
        else:
            # Unklassifizierte eher zu MACRO (konservativ)
            macro.append(c)

    # Deduplizieren & Plausibilisierung
    tech = sorted(set(tech))
    macro = [c for c in sorted(set(macro)) if c not in tech]
    if not tech:
        # Fallback: nimm ein paar offensichtliche technische von den ersten Spalten
        tech = [c for c in available if "SMA" in c or "Mom" in c or "Vol" in c][:5]
    if not macro:
        macro = [c for c in available if c not in tech]

    integ = sorted(set(tech + macro))
    logger.info(f"Feature-Sets heuristisch bestimmt: TECH={len(tech)}, MACRO={len(macro)}, INTEGRATED={len(integ)}")

    # Speichern (für Reproduzierbarkeit)
    if yaml is not None:
        try:
            with open(FEATURE_GROUPS_YAML, "w", encoding="utf-8") as f:
                yaml.safe_dump(
                    {"TECH_FEATURES": tech, "MACRO_FEATURES": macro},
                    f,
                    sort_keys=False,
                    allow_unicode=True,
                )
            logger.info(f"Feature-Gruppen gespeichert: {FEATURE_GROUPS_YAML}")
        except Exception as e:
            logger.error(f"Konnte Feature-Gruppen nicht speichern: {e}")

    return {"TECH": tech, "MACRO": macro, "INTEGRATED": integ}

# Laden der Artefakte

df_features: pd.DataFrame = _read_parquet_safely(FEATURES_PARQUET, "features_monthly")
df_raw: pd.DataFrame = _read_parquet_safely(RAW_PARQUET, "raw_data")

# Targets prüfen

required_targets = ["y_direction_next", "y_return_next_pct"]
missing_targets = [c for c in required_targets if c not in df_features.columns]
if missing_targets:
    raise KeyError(f"Zielspalten fehlen in features_monthly: {missing_targets}")

# Preisreihe bestimmen

price_col = _infer_price_column(df_raw)
s_price = df_raw[price_col].astype(float)

# Auf gemeinsame Monatsachse beschränken

common_idx = df_features.index.intersection(s_price.index)
df_features = df_features.loc[common_idx].copy()
s_price = s_price.loc[common_idx].copy()

# Feature-Sets laden/ableiten

FEATURE_GROUPS = _load_or_infer_feature_sets(df_features)

# Indexe für Train/Test schneiden (inklusive)

df_train = df_features.loc[(df_features.index >= TRAIN_START) & (df_features.index <= TRAIN_END)].copy()
df_test = df_features.loc[(df_features.index >= TEST_START) & (df_features.index <= TEST_END)].copy()
s_price_train = s_price.loc[df_train.index]
s_price_test = s_price.loc[df_test.index]

logger.info(f"Train {df_train.index.min().date()} → {df_train.index.max().date()} | n={len(df_train)}")
logger.info(f"Test  {df_test.index.min().date()} → {df_test.index.max().date()} | n={len(df_test)}")


2025-08-24 13:59:10,662 | INFO | Feature-Sets heuristisch bestimmt: TECH=5, MACRO=9, INTEGRATED=14
2025-08-24 13:59:10,664 | INFO | Feature-Gruppen gespeichert: C:\Users\gamer\Desktop\AktienPrognose\artifacts\config\feature_groups.yaml
2025-08-24 13:59:10,666 | INFO | Train 2009-02-28 → 2019-12-31 | n=131
2025-08-24 13:59:10,666 | INFO | Test  2020-01-31 → 2025-05-31 | n=65


In [3]:
# %% [step2-tscv_splitter]

# Expanding TSCV mit Validierungsfenster=12 und Embargo=1

def generate_expanding_cv_indices(
    idx: pd.DatetimeIndex,
    n_folds: int = 5,
    val_len: int = 12,
    embargo: int = 1,
) -> List[Tuple[np.ndarray, np.ndarray]]:
    """
    Erzeuge Expanding CV-Folds (ohne Überlappung der Validierung), rückwärts geplant:
    - Letzter Fold endet am letzten Trainingsmonat und umfasst val_len Monate.
    - Davor: Lücke von 'embargo' Monaten, dann wieder val_len Monate, etc.
    - Train-Fenster: vom ersten bis zum Anfang der Val-Periode minus Embargo.
    Returns: Liste von (train_idx_positions, val_idx_positions).
    """
    n = len(idx)
    if n < (n_folds * val_len + (n_folds - 1) * embargo + 24):
        logger.warning("Trainingsfenster ist knapp; Folds könnten sehr kurze Train-Segmente ergeben.")

    folds: List[Tuple[np.ndarray, np.ndarray]] = []
    end_pos = n - 1
    for _ in range(n_folds):
        val_start = end_pos - val_len + 1
        val_end = end_pos
        if val_start < 0:
            break
        train_end = val_start - embargo - 1
        train_start = 0
        if train_end - train_start + 1 < 12:  # minimal 12 Monate Training
            break
        train_idx = np.arange(train_start, train_end + 1, dtype=int)
        val_idx = np.arange(val_start, val_end + 1, dtype=int)
        folds.append((train_idx, val_idx))
        # Nächste Val endet vor Embargo und aktueller Val
        end_pos = val_start - embargo - 1
        if end_pos < val_len - 1:
            break

    folds = list(reversed(folds))  # zeitlich aufsteigend zurückgeben
    if len(folds) < n_folds:
        logger.warning(f"Nur {len(folds)} Folds erzeugt (gewünscht: {n_folds}).")
    return folds

# Test-Folds auf Trainingsindex erstellen (nur Indizes)

TRAIN_IDX = df_train.index
CV_FOLDS = generate_expanding_cv_indices(TRAIN_IDX, n_folds=N_FOLDS, val_len=VAL_WINDOW, embargo=EMBARGO)
logger.info("CV-Folds (Train/Val Längen): " + ", ".join([f"{len(tr)}/{len(v)}" for tr, v in CV_FOLDS]))


2025-08-24 13:59:10,673 | INFO | CV-Folds (Train/Val Längen): 66/12, 79/12, 92/12, 105/12, 118/12


In [4]:
# %% [step3-utils_metrics_tests]

# Utils: Renditen, Buy&Hold, Metriken, DM/McNemar, Bootstrap (optional)

def monthly_returns_pct(price: pd.Series) -> pd.Series:
    """Einfache Monatsrendite in %."""
    r = 100.0 * (price / price.shift(1) - 1.0)
    return r

def strategy_returns_long_cash(
    pred_dir: pd.Series,  # 0/1 pro Monat (Entscheidung am Monatsende t)
    next_month_returns_pct: pd.Series,  # y_return_next_pct (Rendite des Folgemonats in %)
) -> pd.Series:
    """Long/Cash-Strategie: bei 1 investiert, sonst 0% Rendite; Kosten=0."""
    # Index-Ausrichtung: pred_dir und next_month_returns_pct beide auf gleiche Achse (Monat t).
    pred = pred_dir.astype(float).reindex(next_month_returns_pct.index).fillna(0.0)
    ret = (pred * next_month_returns_pct).astype(float)
    return ret

def safe_std(a: np.ndarray, ddof: int = 1) -> float:
    """Numerisch robuste Standardabweichung (0, wenn <2 Beobachtungen)."""
    if a.size < 2:
        return 0.0
    return float(np.std(a, ddof=ddof))

def sharpe_ratio(returns_pct: pd.Series) -> float:
    """Sharpe (monatlich, risikofrei ~0 angenommen)."""
    r = returns_pct.dropna().values
    s = safe_std(r, ddof=1)
    if s == 0.0:
        return np.nan
    return float(np.mean(r) / s)

def sortino_ratio(returns_pct: pd.Series) -> float:
    """Sortino (Downside-Std als Nenner)."""
    r = returns_pct.dropna().values
    downside = r[r < 0.0]
    ds = safe_std(downside, ddof=1) if downside.size >= 2 else 0.0
    if ds == 0.0:
        return np.nan
    return float(np.mean(r) / ds)

def rachev_ratio(returns_pct: pd.Series, alpha: float = 0.05) -> float:
    """Rachev-Ratio ≈ VaR-Gewinnquantil / |VaR-Verlustquantil| (einfacher Proxy)."""
    r = returns_pct.dropna().values
    if r.size < 10:
        return np.nan
    top = np.quantile(r, 1.0 - alpha)
    bot = np.quantile(r, alpha)
    denom = abs(bot) if bot != 0 else np.nan
    if denom == 0 or np.isnan(denom):
        return np.nan
    return float(top / denom)

def diebold_mariano(
    loss_a: np.ndarray,
    loss_b: np.ndarray,
    h: int = 1,
    alternative: str = "two_sided",
) -> Tuple[float, float]:
    """
    Einfache DM-Implementierung (Newey-West Varianz, Lag=h-1).
    loss_a, loss_b: Verlustreihen gleicher Länge (z.B. 0/1-Loss).
    Returns: (DM-Stat, p-Wert)
    """
    if loss_a.shape[0] != loss_b.shape[0]:
        raise ValueError("DM: Verlustreihen ungleicher Länge.")
    d = loss_a - loss_b
    T = d.shape[0]
    if T < 10:
        return (np.nan, np.nan)
    d_bar = float(np.mean(d))

    # Newey-West Varianzschätzer mit Lag = h-1 (bei h=1 -> 0)
    q = max(h - 1, 0)
    gamma0 = float(np.var(d, ddof=1))
    s = gamma0
    for lag in range(1, q + 1):
        cov = float(np.cov(d[:-lag], d[lag:], ddof=1)[0, 1])
        w = 1.0 - lag / (q + 1)
        s += 2.0 * w * cov
    var_hat = s / T
    if var_hat <= 0:
        return (np.nan, np.nan)
    dm_stat = d_bar / math.sqrt(var_hat)

    # p-Wert
    if stats is None:
        return (dm_stat, np.nan)
    if alternative == "two_sided":
        pval = 2.0 * (1 - stats.norm.cdf(abs(dm_stat)))
    elif alternative == "greater":
        pval = 1.0 - stats.norm.cdf(dm_stat)
    else:  # "less"
        pval = stats.norm.cdf(dm_stat)
    return (float(dm_stat), float(pval))

def mcnemar_test(
    y_true: Sequence[int], y_pred_a: Sequence[int], y_pred_b: Sequence[int]
) -> Tuple[float, float]:
    """
    McNemar-Test für gepaarte binäre Klassifikatoren.
    Gibt (stat, p) zurück (mit Kontinuitätskorrektur, falls statsmodels verfügbar).
    """
    y_true = np.asarray(y_true).astype(int)
    a = (np.asarray(y_pred_a).astype(int) == y_true).astype(int)
    b = (np.asarray(y_pred_b).astype(int) == y_true).astype(int)
    # Kontingenztafel: b01 (a falsch, b richtig), b10 (a richtig, b falsch)
    b01 = int(((a == 0) & (b == 1)).sum())
    b10 = int(((a == 1) & (b == 0)).sum())

    # statsmodels bevorzugt, sonst Approximation
    if sm_mcnemar is not None:
        table = np.array([[0, b01], [b10, 0]], dtype=int)
        try:
            res = sm_mcnemar(table, exact=False, correction=True)
            return float(res.statistic), float(res.pvalue)
        except Exception:
            pass

    # Approx. Chi^2 mit Kontinuitätskorrektur
    if b01 + b10 == 0:
        return (np.nan, np.nan)
    stat = (abs(b01 - b10) - 1) ** 2 / (b01 + b10)
    if stats is None:
        return (float(stat), np.nan)
    p = 1.0 - stats.chi2.cdf(stat, df=1)
    return (float(stat), float(p))

def plot_cumulative_returns(curves: Dict[str, pd.Series], out_path: Path) -> None:
    """Kumulierte Renditekurven (eine Figure)."""
    plt.figure(figsize=(10, 5))
    for name, s in curves.items():
        base = (1.0 + s.fillna(0.0) / 100.0).cumprod()
        plt.plot(base.index, base.values, label=name)
    plt.title("Kumulierte Renditen – Strategien vs. Buy&Hold")
    plt.xlabel("Datum")
    plt.ylabel("Wachstumsfaktor")
    plt.legend(loc="best")
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.savefig(out_path, dpi=150)
    plt.close()

def plot_metric_bars(metrics_df: pd.DataFrame, metric_cols: List[str], out_path: Path) -> None:
    """Balkenvergleich ausgewählter Metriken über Modelle×Feature-Set (eine Figure)."""
    plt.figure(figsize=(10, 5))
    # Einfache Aggregation: Mittel über Feature-Sets pro Modell (oder nutze alle Einträge)
    df_plot = metrics_df.copy()
    df_plot["label"] = df_plot["model"] + " (" + df_plot["feature_set"] + ")"
    x = np.arange(len(df_plot["label"]))
    width = 0.8 / len(metric_cols)
    for i, m in enumerate(metric_cols):
        vals = df_plot[m].astype(float).values
        plt.bar(x + i * width, vals, width=width, label=m)
    plt.xticks(x + width * (len(metric_cols) - 1) / 2, df_plot["label"], rotation=45, ha="right")
    plt.title("Vergleich zentraler Metriken")
    plt.ylabel("Wert")
    plt.grid(True, axis="y", alpha=0.3)
    plt.legend(loc="best")
    plt.tight_layout()
    plt.savefig(out_path, dpi=150)
    plt.close()

def plot_confusion_matrix_figure(cm: np.ndarray, out_path: Path, title: str = "Confusion-Matrix") -> None:
    """Einfache CM-Visualisierung ohne seaborn (eine Figure)."""
    plt.figure(figsize=(4, 4))
    plt.imshow(cm, interpolation="nearest")
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(2)
    plt.xticks(tick_marks, ["Down(0)", "Up(1)"])
    plt.yticks(tick_marks, ["Down(0)", "Up(1)"])
    thresh = cm.max() / 2.0 if cm.max() > 0 else 0.5
    for i in range(2):
        for j in range(2):
            plt.text(
                j,
                i,
                f"{cm[i, j]:.0f}",
                ha="center",
                va="center",
                color="white" if cm[i, j] > thresh else "black",
            )
    plt.ylabel("True")
    plt.xlabel("Pred")
    plt.tight_layout()
    plt.savefig(out_path, dpi=150)
    plt.close()


In [5]:
# %% [step4-baselines_oof_tuning]

# Baselines + TSCV-Tuning (SMA) + OOF-Threshold/Kalibrierung (falls zutreffend)

@dataclass
class OOFResult:
    """OOF-Ergebnisstruktur für ein Baseline-Modell."""
    model: str
    feature_set: str
    oof_index: List[pd.Timestamp]
    y_true: List[int]
    y_pred: List[int]
    proba_up: List[Optional[float]]
    details_path: Optional[Path] = None
    threshold: Optional[float] = None
    oof_f1: Optional[float] = None
    oof_auc: Optional[float] = None

def majority_label(y: pd.Series) -> int:
    """Mehrheitsklasse (0/1) aus y bestimmen (bei Gleichstand -> 1 bevorzugen)."""
    vc = y.value_counts()
    if 1 in vc and 0 in vc and vc[1] == vc[0]:
        return 1
    return int(vc.idxmax())

def sma_signal(price: pd.Series, fast: int, slow: int) -> pd.Series:
    """SMA-Crossover-Signal am Zeitpunkt t (1 wenn SMA_fast > SMA_slow)."""
    sma_f = price.rolling(fast, min_periods=fast).mean()
    sma_s = price.rolling(slow, min_periods=slow).mean()
    sig = (sma_f > sma_s).astype(int)
    return sig

def build_oof_for_baseline(
    name: str,
    df_tr: pd.DataFrame,
    fs_cols: List[str],
    s_price_tr: pd.Series,
    folds: List[Tuple[np.ndarray, np.ndarray]],
) -> OOFResult:
    """
    Erzeuge OOF-Vorhersagen auf dem Trainingszeitraum für ein Baseline-Modell.
    Für SMA: Tuning der (fast, slow)-Paare per F1 im CV; OOF aus best-param pro Fold.
    Andere Baselines deterministisch ohne Probas.
    """
    y_true_all: List[int] = []
    y_pred_all: List[int] = []
    proba_all: List[Optional[float]] = []
    idx_all: List[pd.Timestamp] = []

    # Kandidaten für SMA
    sma_fast_grid = [2, 3, 4, 6]
    sma_slow_grid = [9, 12, 18]
    per_fold_params: List[Tuple[int, int, float]] = []  # (fast, slow, f1)

    for fold_id, (tr_idx, val_idx) in enumerate(folds, start=1):
        tr_dates = df_tr.index[tr_idx]
        val_dates = df_tr.index[val_idx]

        y_tr = df_tr.loc[tr_dates, "y_direction_next"].astype(int)
        y_val = df_tr.loc[val_dates, "y_direction_next"].astype(int)

        if name == "always_up":
            y_pred = np.ones_like(y_val.values)
            proba = [1.0] * len(y_val)
        elif name == "majority_class":
            maj = majority_label(y_tr)
            y_pred = np.full_like(y_val.values, fill_value=maj)
            proba = [np.nan] * len(y_val)
        elif name == "persistence":
            # Persistenz: nutze Rendite r_t (aktueller Monat) -> Richtung für t+1
            # Align: Label bezieht sich auf t (Vorhersage für t+1); Prädiktor = Sign(r_t) am t
            r = monthly_returns_pct(s_price_tr)
            sig = (r >= 0).astype(int).reindex(val_dates).fillna(1).values
            y_pred = sig
            proba = [np.nan] * len(y_val)
        elif name == "sma_crossover":
            # Hyperparameter-Tuning per F1 auf Val
            best_f, best_s, best_f1 = None, None, -1.0
            price_subset = s_price_tr.loc[tr_dates.union(val_dates)]
            for f in sma_fast_grid:
                for s in sma_slow_grid:
                    if f >= s:
                        continue
                    sig = sma_signal(price_subset, f, s)
                    yhat_val = sig.reindex(val_dates).fillna(0).astype(int).values
                    f1 = f1_score(y_val.values, yhat_val, zero_division=0)
                    if f1 > best_f1:
                        best_f1 = f1
                        best_f, best_s = f, s
            if best_f is None:
                best_f, best_s = 3, 12
                best_f1 = 0.0
            per_fold_params.append((best_f, best_s, best_f1))
            # OOF mit bestem Paar
            sig_best = sma_signal(s_price_tr, best_f, best_s)
            y_pred = sig_best.reindex(val_dates).fillna(0).astype(int).values
            proba = [np.nan] * len(y_val)
        else:
            raise ValueError(f"Unbekanntes Baseline-Modell: {name}")

        # Sammeln
        y_true_all.extend(y_val.values.tolist())
        y_pred_all.extend(y_pred.tolist())
        proba_all.extend(proba)
        idx_all.extend(val_dates.tolist())

    # OOF-Metriken
    oof_f1 = f1_score(y_true_all, y_pred_all, zero_division=0)
    try:
        # AUC nur sinnvoll mit Wahrscheinlichkeiten – hier meist NaN
        if all(isinstance(p, float) and (0.0 <= p <= 1.0) for p in proba_all):
            oof_auc = roc_auc_score(y_true_all, proba_all)
        else:
            oof_auc = np.nan
    except Exception:
        oof_auc = np.nan

    # CV-Details speichern (nur für SMA sinnvoll)
    details_path = None
    if name == "sma_crossover":
        details_path = METRICS_DIR / f"cv_details_{name}.csv"
        pd.DataFrame(per_fold_params, columns=["fast", "slow", "f1"]).to_csv(details_path, index=False)

    return OOFResult(
        model=name,
        feature_set="INTERNAL",  # OOF unabhängig vom Feature-Set
        oof_index=idx_all,
        y_true=y_true_all,
        y_pred=y_pred_all,
        proba_up=proba_all,
        details_path=details_path,
        threshold=0.5,
        oof_f1=float(oof_f1),
        oof_auc=float(oof_auc) if not np.isnan(oof_auc) else np.nan,
    )

# OOF für alle Baselines (einmal, unabhängig von Feature-Set)

OOF_RESULTS: Dict[str, OOFResult] = {}
for m in BASELINE_MODELS:
    OOF_RESULTS[m] = build_oof_for_baseline(
        name=m,
        df_tr=df_train,
        fs_cols=FEATURE_GROUPS["INTEGRATED"],
        s_price_tr=s_price_train,
        folds=CV_FOLDS,
    )
    logger.info(f"OOF {m}: F1={OOF_RESULTS[m].oof_f1:.3f}, AUC={OOF_RESULTS[m].oof_auc}")

# SMA-Gesamtbestes Paar aus OOF (mittels CV-Details ermitteln)

def select_global_sma_params(s_price_tr: pd.Series) -> Tuple[int, int]:
    """Wähle global bestes (fast, slow) anhand OOF (mittels CV-Details); Fallback 3/12."""
    details_path = OOF_RESULTS["sma_crossover"].details_path
    if details_path is None or not details_path.exists():
        return 3, 12
    df = pd.read_csv(details_path)
    if df.empty:
        return 3, 12
    # Einfach: wähle das Paar des Folds mit höchstem F1 (könnte auch Voting/Mehrheit)
    best_row = df.iloc[df["f1"].idxmax()]
    return int(best_row["fast"]), int(best_row["slow"])

SMA_FAST_BEST, SMA_SLOW_BEST = select_global_sma_params(s_price_train)
logger.info(f"Global bestes SMA-Paar: fast={SMA_FAST_BEST}, slow={SMA_SLOW_BEST}")


2025-08-24 13:59:10,711 | INFO | OOF always_up: F1=0.800, AUC=0.5
2025-08-24 13:59:10,719 | INFO | OOF majority_class: F1=0.800, AUC=nan
2025-08-24 13:59:10,728 | INFO | OOF persistence: F1=0.723, AUC=nan
2025-08-24 13:59:10,864 | INFO | OOF sma_crossover: F1=0.774, AUC=nan
2025-08-24 13:59:10,872 | INFO | Global bestes SMA-Paar: fast=2, slow=9


In [7]:
# %% [step5-walk_forward_test]

# Walk-Forward-Test (expanding origin), Vorhersage je Testmonat (Horizon=1)

def run_walk_forward_baseline(
    name: str,
    feature_set: str,
    df_tr: pd.DataFrame,
    df_te: pd.DataFrame,
    s_price_tr: pd.Series,
    s_price_te: pd.Series,
    horizon: int = 1,
) -> pd.DataFrame:
    """
    Führt Walk-Forward für ein Baseline-Modell aus.
    Gibt DataFrame mit Schema:
    ["date","y_true_dir","y_pred_dir","proba_up","model","model_class","feature_set",
    "horizon_months","train_start","train_end","test_month","seed"]
    zurück.
    """
    records: List[Dict[str, Any]] = []

    # Gesamtdaten (für expanding train)
    full_idx = df_tr.index.append(df_te.index)

    # Rolling-origin: für jeden Testmonat t train bis t-1, pred für t
    for t in df_te.index:
        train_end = full_idx[full_idx.get_loc(t) - 1] if t in full_idx else df_tr.index.max()
        train_slice = df_tr.loc[df_tr.index <= train_end]

        # y_true am t (Richtung für t+1)
        y_true = int(df_te.loc[t, "y_direction_next"])

        # Vorhersage je Modell
        if name == "always_up":
            y_pred = 1
            proba = 1.0
        elif name == "majority_class":
            maj = majority_label(train_slice["y_direction_next"].astype(int))
            y_pred = maj
            proba = np.nan
        elif name == "persistence":
            # nutze r_t am Zeitpunkt t aus Preisreihe
            r = monthly_returns_pct(pd.concat([s_price_tr, s_price_te]))
            y_pred = int((r.loc[t] >= 0.0) if not pd.isna(r.loc[t]) else 1)
            proba = np.nan
        elif name == "sma_crossover":
            sig = sma_signal(pd.concat([s_price_tr, s_price_te]), SMA_FAST_BEST, SMA_SLOW_BEST)
            y_pred = int(sig.loc[t]) if not pd.isna(sig.loc[t]) else 0
            proba = np.nan
        else:
            raise ValueError(f"Unbekanntes Modell {name}")

        records.append(
            {
                "date": t,
                "y_true_dir": y_true,
                "y_pred_dir": y_pred,
                "proba_up": proba,
                "model": name,
                "model_class": "baseline",
                "feature_set": feature_set,
                "horizon_months": horizon,
                "train_start": df_tr.index.min(),
                "train_end": train_end,
                "test_month": t,
                "seed": GLOBAL_SEED,
            }
        )

    out = pd.DataFrame.from_records(records).set_index("date")
    return out

# Walk-Forward für alle Modelle × Feature-Sets (Baselines nutzen Features nicht, aber Schema verlangt es)

ALL_FORECASTS: List[pd.DataFrame] = []
for fs in FEATURE_SETS:
    for m in BASELINE_MODELS:
        fc = run_walk_forward_baseline(
            name=m,
            feature_set=fs,
            df_tr=df_train,
            df_te=df_test,
            s_price_tr=s_price_train,
            s_price_te=s_price_test,
            horizon=HORIZON_MONTHS,
        )
        # Speichern je Modell×FS
        out_path = FORECASTS_DIR / f"baseline_{m}_{fs}.csv"
        fc.reset_index().to_csv(out_path, index=False)
        logger.info(f"Forecast gespeichert: {out_path} ({len(fc)} Zeilen)")
        ALL_FORECASTS.append(fc)

# Kombinierte Vorhersagen-Tabelle

df_forecasts_all = pd.concat(ALL_FORECASTS, axis=0).sort_index()


2025-08-24 15:06:41,002 | INFO | Forecast gespeichert: C:\Users\gamer\Desktop\AktienPrognose\artifacts\forecasts\baseline_always_up_TECH.csv (65 Zeilen)
2025-08-24 15:06:41,038 | INFO | Forecast gespeichert: C:\Users\gamer\Desktop\AktienPrognose\artifacts\forecasts\baseline_majority_class_TECH.csv (65 Zeilen)
2025-08-24 15:06:41,076 | INFO | Forecast gespeichert: C:\Users\gamer\Desktop\AktienPrognose\artifacts\forecasts\baseline_persistence_TECH.csv (65 Zeilen)
2025-08-24 15:06:41,115 | INFO | Forecast gespeichert: C:\Users\gamer\Desktop\AktienPrognose\artifacts\forecasts\baseline_sma_crossover_TECH.csv (65 Zeilen)
2025-08-24 15:06:41,132 | INFO | Forecast gespeichert: C:\Users\gamer\Desktop\AktienPrognose\artifacts\forecasts\baseline_always_up_MACRO.csv (65 Zeilen)
2025-08-24 15:06:41,167 | INFO | Forecast gespeichert: C:\Users\gamer\Desktop\AktienPrognose\artifacts\forecasts\baseline_majority_class_MACRO.csv (65 Zeilen)
2025-08-24 15:06:41,204 | INFO | Forecast gespeichert: C:\Users\

In [8]:
# %% [step6-metriken_signifikanz_json]

# Metriken & Signifikanz (DM/McNemar) je Modell×FS, JSON speichern

def evaluate_model_from_forecasts(
    fc: pd.DataFrame,
    s_return_next_test: pd.Series,
    ref_model_name: str = "always_up",
    ref_fc: Optional[pd.DataFrame] = None,
) -> Dict[str, Any]:
    """Berechnet alle geforderten Metriken und Tests für ein Vorhersage-DF."""
    y_true = fc["y_true_dir"].astype(int).values
    y_pred = fc["y_pred_dir"].astype(int).values
    proba = fc["proba_up"].values

    # Klassifikationsmetriken
    acc = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, zero_division=0)
    rec = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    try:
        if np.all(np.isfinite(proba)) and np.nanmin(proba) >= 0.0 and np.nanmax(proba) <= 1.0:
            auc = roc_auc_score(y_true, proba)
            brier = brier_score_loss(y_true, proba)
        else:
            auc, brier = np.nan, np.nan
    except Exception:
        auc, brier = np.nan, np.nan

    # Finanzmetriken (Long/Cash, Signal t → Rendite t+1)
    sr = strategy_returns_long_cash(fc["y_pred_dir"], s_return_next_test.reindex(fc.index))
    sharpe = sharpe_ratio(sr)
    sortino = sortino_ratio(sr)
    rachev = rachev_ratio(sr)

    # Signifikanztests vs. Always-Up (0/1-Loss)
    if ref_fc is None:
        dm_stat = np.nan
        dm_p = np.nan
        mcn_stat = np.nan
        mcn_p = np.nan
    else:
        loss_model = (y_pred != y_true).astype(int)
        y_pred_ref = ref_fc["y_pred_dir"].astype(int).reindex(fc.index).fillna(1).values
        loss_ref = (y_pred_ref != y_true).astype(int)
        dm_stat, dm_p = diebold_mariano(loss_model, loss_ref, h=1, alternative="two_sided")
        mcn_stat, mcn_p = mcnemar_test(y_true, y_pred, y_pred_ref)

    return {
        "accuracy": float(acc),
        "precision": float(prec),
        "recall": float(rec),
        "f1": float(f1),
        "auc": float(auc) if not np.isnan(auc) else np.nan,
        "brier": float(brier) if not np.isnan(brier) else np.nan,
        "sharpe": float(sharpe) if not np.isnan(sharpe) else np.nan,
        "sortino": float(sortino) if not np.isnan(sortino) else np.nan,
        "rachev": float(rachev) if not np.isnan(rachev) else np.nan,
        "dm_stat": float(dm_stat) if not np.isnan(dm_stat) else np.nan,
        "dm_pvalue": float(dm_p) if not np.isnan(dm_p) else np.nan,
        "mcnemar_stat": float(mcn_stat) if not np.isnan(mcn_stat) else np.nan,
        "mcnemar_p": float(mcn_p) if not np.isnan(mcn_p) else np.nan,
        "n_obs": int(len(fc)),
    }

# Buy&Hold (S&P) Renditen für Test

s_ret_test = df_test["y_return_next_pct"].astype(float)

# Referenz Always-Up pro Feature-Set laden (für Tests)

REF_FORECASTS: Dict[str, pd.DataFrame] = {
    fs: df_forecasts_all[(df_forecasts_all["model"] == "always_up") & (df_forecasts_all["feature_set"] == fs)]
    for fs in FEATURE_SETS
}

# Metriken pro Modell×FS berechnen und JSON speichern

METRICS_ROWS: List[Dict[str, Any]] = []
for fs in FEATURE_SETS:
    ref_fc = REF_FORECASTS[fs]
    for m in BASELINE_MODELS:
        fc = df_forecasts_all[(df_forecasts_all["model"] == m) & (df_forecasts_all["feature_set"] == fs)]
        met = evaluate_model_from_forecasts(fc, s_ret_test, ref_model_name="always_up", ref_fc=ref_fc)

        row = {
            "model": m,
            "model_class": "baseline",
            "feature_set": fs,
            "task": "direction_1m",
            **met,
            "train_start": str(TRAIN_START.date()),
            "train_end": str(TRAIN_END.date()),
            "test_start": str(TEST_START.date()),
            "test_end": str(TEST_END.date()),
            "seed": GLOBAL_SEED,
            "coef_path": None,
            "permimp_path": None,
            "cv_details_path": str(OOF_RESULTS[m].details_path) if OOF_RESULTS[m].details_path else None,
            "threshold": OOF_RESULTS[m].threshold,
            "oof_f1": OOF_RESULTS[m].oof_f1,
            "oof_auc": OOF_RESULTS[m].oof_auc,
        }
        METRICS_ROWS.append(row)

        # JSON pro Modell×FS
        out_json = METRICS_DIR / f"baseline_{m}_{fs}.json"
        with open(out_json, "w", encoding="utf-8") as f:
            json.dump(row, f, indent=2, ensure_ascii=False)
        logger.info(f"Metrics gespeichert: {out_json}")

# Gesamttabelle

df_metrics = pd.DataFrame(METRICS_ROWS)


  statistic = (np.abs(n1 - n2) - corr)**2 / (1. * (n1 + n2))
2025-08-24 15:06:51,812 | INFO | Metrics gespeichert: C:\Users\gamer\Desktop\AktienPrognose\artifacts\metrics\baseline_always_up_TECH.json
  statistic = (np.abs(n1 - n2) - corr)**2 / (1. * (n1 + n2))
2025-08-24 15:06:51,819 | INFO | Metrics gespeichert: C:\Users\gamer\Desktop\AktienPrognose\artifacts\metrics\baseline_majority_class_TECH.json
2025-08-24 15:06:51,825 | INFO | Metrics gespeichert: C:\Users\gamer\Desktop\AktienPrognose\artifacts\metrics\baseline_persistence_TECH.json
2025-08-24 15:06:51,832 | INFO | Metrics gespeichert: C:\Users\gamer\Desktop\AktienPrognose\artifacts\metrics\baseline_sma_crossover_TECH.json
  statistic = (np.abs(n1 - n2) - corr)**2 / (1. * (n1 + n2))
2025-08-24 15:06:51,840 | INFO | Metrics gespeichert: C:\Users\gamer\Desktop\AktienPrognose\artifacts\metrics\baseline_always_up_MACRO.json
  statistic = (np.abs(n1 - n2) - corr)**2 / (1. * (n1 + n2))
2025-08-24 15:06:51,850 | INFO | Metrics gespeich

In [9]:
# %% [step7-plots]

# Plots: (1) Kumulierte Rendite vs. Buy&Hold, (2) Balkenvergleich Metriken, (3) aggregierte Confusion-Matrix (bestes Modell)

# (1) Kumulierte Renditen – pro Feature-Set zusammen mit Buy&Hold

for fs in FEATURE_SETS:
    curves: Dict[str, pd.Series] = {}
    # Buy&Hold: immer investiert → nutze y_return_next_pct
    curves["Buy&Hold"] = s_ret_test
    for m in BASELINE_MODELS:
        fc = df_forecasts_all[(df_forecasts_all["model"] == m) & (df_forecasts_all["feature_set"] == fs)]
        strat = strategy_returns_long_cash(fc["y_pred_dir"], s_ret_test.reindex(fc.index))
        curves[m] = strat
    out_path = REPORTS_DIR / f"10_cum_returns_{fs}.png"
    plot_cumulative_returns(curves, out_path)

# (2) Balkendiagramm zentraler Metriken (z. B. F1, Accuracy, Sharpe) – je Feature-Set

for fs in FEATURE_SETS:
    sub = df_metrics[df_metrics["feature_set"] == fs].copy()
    sub = sub.sort_values(by=["f1", "accuracy"], ascending=False)
    out_path = REPORTS_DIR / f"10_metric_bars_{fs}.png"
    plot_metric_bars(sub, metric_cols=["f1", "accuracy", "sharpe"], out_path=out_path)

# (3) Aggregierte Confusion-Matrix: für bestes Modell (nach F1) über alle Feature-Sets

best_row = df_metrics.sort_values(by=["f1", "accuracy"], ascending=False).iloc[0]
best_model = best_row["model"]
best_fs = best_row["feature_set"]
fc_best = df_forecasts_all[(df_forecasts_all["model"] == best_model) & (df_forecasts_all["feature_set"] == best_fs)]
cm = confusion_matrix(fc_best["y_true_dir"].astype(int), fc_best["y_pred_dir"].astype(int), labels=[0, 1])
cm_path = REPORTS_DIR / f"10_confusion_matrix_{best_model}_{best_fs}.png"
plot_confusion_matrix_figure(cm, cm_path, title=f"Confusion-Matrix ({best_model}, {best_fs})")


In [10]:
# %% [step8-uebersicht]

# Übersicht 10: Gesamttabelle Baselines×Feature-Sets (CSV + PNG-Rendering)

# CSV

overview_csv = REPORTS_DIR / "10_overview_metrics.csv"
df_metrics.to_csv(overview_csv, index=False)

# PNG-Rendering einer Tabelle mit ausgewählten Spalten

cols_to_show = [
    "model", "feature_set", "accuracy", "precision", "recall", "f1", "auc", "brier", "sharpe", "sortino", "rachev",
    "dm_stat", "dm_pvalue", "mcnemar_stat", "mcnemar_p",
]
table_df = df_metrics[cols_to_show].copy()
table_df = table_df.sort_values(by=["f1", "accuracy"], ascending=False)

# Matplotlib Table (eine Figure)

fig, ax = plt.subplots(figsize=(12, 6))
ax.axis("off")
tbl = ax.table(
    cellText=table_df.round(3).values.tolist(),
    colLabels=table_df.columns.tolist(),
    loc="center",
)
tbl.auto_set_font_size(False)
tbl.set_fontsize(8)
tbl.scale(1.0, 1.4)
plt.tight_layout()
overview_png = REPORTS_DIR / "10_overview_metrics.png"
plt.savefig(overview_png, dpi=150)
plt.close()

logger.info(f"Übersicht gespeichert: {overview_csv}, {overview_png}")


2025-08-24 15:07:00,296 | INFO | Übersicht gespeichert: C:\Users\gamer\Desktop\AktienPrognose\artifacts\reports\10_overview_metrics.csv, C:\Users\gamer\Desktop\AktienPrognose\artifacts\reports\10_overview_metrics.png
