In [1]:
# level97_var_es_backtesting.py
# Level-97: VaR Backtesting (Kupiec + Christoffersen) and ES Backtest (simple robust score)
#
# Outputs:
#   - level97_var_backtest_series.csv
#   - level97_var_backtest_summary.json
#
# Run:
#   python level97_var_es_backtesting.py
#   python level97_var_es_backtesting.py --symbols SPY QQQ IWM TLT GLD --weights 0.4 0.25 0.15 0.15 0.05
#   python level97_var_es_backtesting.py --alpha 0.01 --window 750 --method hist
#   python level97_var_es_backtesting.py --alpha 0.05 --method normal

import os
import json
import math
import argparse
from dataclasses import dataclass, asdict
from typing import Tuple, Optional, Dict, List

import numpy as np
import pandas as pd
import yfinance as yf


# ----------------------------- Config -----------------------------
@dataclass
class Config:
    symbols: Tuple[str, ...] = ("SPY", "QQQ", "IWM", "EFA", "EEM", "TLT", "LQD", "GLD")
    start: str = "2010-01-01"

    # Portfolio
    weights: Optional[Tuple[float, ...]] = None  # None -> equal weight

    # Risk
    alpha: float = 0.05
    window: int = 756  # ~3 years trading days

    # VaR/ES method: "hist" or "normal"
    method: str = "hist"

    use_log_returns: bool = True
    dropna: bool = True
    seed: int = 42

    out_csv: str = "level97_var_backtest_series.csv"
    out_json: str = "level97_var_backtest_summary.json"


# ----------------------------- Robust yfinance loader -----------------------------
def _extract_close_series(px: pd.DataFrame, symbol: str) -> pd.Series:
    if px is None or px.empty:
        raise RuntimeError(f"No data returned for {symbol}")

    if isinstance(px.columns, pd.MultiIndex):
        for key in [("Adj Close", symbol), ("Close", symbol), (symbol, "Adj Close"), (symbol, "Close")]:
            if key in px.columns:
                s = px[key].copy()
                if isinstance(s, pd.DataFrame):
                    s = s.iloc[:, 0]
                s.name = symbol
                return s
        raise RuntimeError(f"Could not extract Close/Adj Close for {symbol} from MultiIndex columns.")

    for col in ["Adj Close", "Close"]:
        if col in px.columns:
            s = px[col].copy()
            if isinstance(s, pd.DataFrame):
                s = s.iloc[:, 0]
            s.name = symbol
            return s

    raise RuntimeError(f"Missing Close/Adj Close for {symbol}. Columns={list(px.columns)}")


def load_prices(symbols: Tuple[str, ...], start: str) -> pd.DataFrame:
    symbols = tuple(symbols)

    # batch attempt
    try:
        px_all = yf.download(list(symbols), start=start, progress=False, group_by="column", auto_adjust=False)
        if px_all is not None and not px_all.empty:
            series = []
            ok = True
            for s in symbols:
                try:
                    series.append(_extract_close_series(px_all, s))
                except Exception:
                    ok = False
                    break
            if ok and series:
                return pd.concat(series, axis=1).sort_index()
    except Exception:
        pass

    # fallback single symbol
    series = []
    for s in symbols:
        px = yf.download(s, start=start, progress=False, auto_adjust=False)
        series.append(_extract_close_series(px, s))
    return pd.concat(series, axis=1).sort_index()


def compute_returns(prices: pd.DataFrame, use_log: bool) -> pd.DataFrame:
    prices = prices.replace([np.inf, -np.inf], np.nan)
    if use_log:
        rets = np.log(prices).diff()
    else:
        rets = prices.pct_change()
    rets = rets.replace([np.inf, -np.inf], np.nan)
    return rets.dropna(how="all")


def portfolio_weights(symbols: Tuple[str, ...], weights: Optional[Tuple[float, ...]]) -> np.ndarray:
    n = len(symbols)
    if weights is None:
        return np.ones(n) / n
    if len(weights) != n:
        raise RuntimeError(f"--weights length {len(weights)} must match symbols length {n}")
    w = np.array(weights, dtype=float)
    s = float(w.sum())
    if not np.isfinite(s) or s == 0.0:
        raise RuntimeError("Weights sum is invalid/zero.")
    return w / s


# ----------------------------- Stats helpers (NO SciPy) -----------------------------
def _safe_log(x: float) -> float:
    return math.log(max(x, 1e-15))


def kupiec_pof(exceed: np.ndarray, alpha: float) -> Dict[str, float]:
    """
    Kupiec (1995) Proportion of Failures test (unconditional coverage).
    LR_pof ~ chi2(1) asymptotically. We report LR only (p-value requires chi2 CDF).
    """
    T = int(exceed.size)
    x = int(exceed.sum())
    phat = x / T if T > 0 else 0.0

    # Likelihood ratio
    # L0 = (1-a)^(T-x) * a^x
    # L1 = (1-phat)^(T-x) * phat^x
    # LR = -2 ln(L0/L1)
    if x == 0:
        # phat=0 -> L1 uses phat^x = 1, (1-phat)^(T) = 1^T = 1
        # LR = -2[ (T)*ln(1-a) - 0 ]? actually ln(L0) = T ln(1-a)
        lr = -2.0 * (T * _safe_log(1.0 - alpha) - 0.0)
    elif x == T:
        lr = -2.0 * (T * _safe_log(alpha) - 0.0)
    else:
        lnL0 = (T - x) * _safe_log(1.0 - alpha) + x * _safe_log(alpha)
        lnL1 = (T - x) * _safe_log(1.0 - phat) + x * _safe_log(phat)
        lr = -2.0 * (lnL0 - lnL1)

    return {"T": float(T), "x": float(x), "phat": float(phat), "LR_pof": float(lr)}


def christoffersen_ind(exceed: np.ndarray) -> Dict[str, float]:
    """
    Christoffersen (1998) Independence test for exceedance clustering.
    LR_ind ~ chi2(1) asymptotically.
    """
    e = exceed.astype(int)
    if e.size < 2:
        return {"LR_ind": float("nan"), "n00": 0.0, "n01": 0.0, "n10": 0.0, "n11": 0.0}

    e0 = e[:-1]
    e1 = e[1:]

    n00 = int(((e0 == 0) & (e1 == 0)).sum())
    n01 = int(((e0 == 0) & (e1 == 1)).sum())
    n10 = int(((e0 == 1) & (e1 == 0)).sum())
    n11 = int(((e0 == 1) & (e1 == 1)).sum())

    # Transition probabilities
    pi0 = n01 / (n00 + n01) if (n00 + n01) > 0 else 0.0
    pi1 = n11 / (n10 + n11) if (n10 + n11) > 0 else 0.0
    pi = (n01 + n11) / (n00 + n01 + n10 + n11) if (n00 + n01 + n10 + n11) > 0 else 0.0

    # Log-likelihoods
    # L1: Markov
    lnL1 = 0.0
    lnL1 += n00 * _safe_log(1.0 - pi0) + n01 * _safe_log(pi0)
    lnL1 += n10 * _safe_log(1.0 - pi1) + n11 * _safe_log(pi1)

    # L0: iid
    lnL0 = 0.0
    lnL0 += (n00 + n10) * _safe_log(1.0 - pi) + (n01 + n11) * _safe_log(pi)

    lr = -2.0 * (lnL0 - lnL1)
    return {
        "LR_ind": float(lr),
        "n00": float(n00), "n01": float(n01), "n10": float(n10), "n11": float(n11),
        "pi": float(pi), "pi0": float(pi0), "pi1": float(pi1)
    }


def christoffersen_cc(exceed: np.ndarray, alpha: float) -> Dict[str, float]:
    """
    Conditional Coverage: LR_cc = LR_pof + LR_ind ~ chi2(2)
    """
    pof = kupiec_pof(exceed, alpha)
    ind = christoffersen_ind(exceed)
    lr_cc = float(pof["LR_pof"] + ind["LR_ind"]) if np.isfinite(ind["LR_ind"]) else float("nan")
    return {"LR_cc": lr_cc, **{f"pof_{k}": v for k, v in pof.items()}, **{f"ind_{k}": v for k, v in ind.items()}}


def es_backtest_score(returns: np.ndarray, var: np.ndarray, es: np.ndarray, alpha: float) -> Dict[str, float]:
    """
    Simple, stable ES backtest score (no SciPy):
    - Count VaR exceedances
    - Compare realized tail loss vs predicted ES on exceedance days

    score = mean( (r - ES_t) | r <= VaR_t )
    If ES is calibrated, this should be ~0 (negative means ES too optimistic).
    """
    mask = returns <= var
    n = int(mask.sum())
    if n == 0:
        return {"n_tail": 0.0, "tail_mean_r": float("nan"), "tail_mean_es": float("nan"), "score": float("nan")}

    tail_r = returns[mask]
    tail_es = es[mask]
    return {
        "n_tail": float(n),
        "tail_mean_r": float(tail_r.mean()),
        "tail_mean_es": float(tail_es.mean()),
        "score": float((tail_r - tail_es).mean()),
    }


# ----------------------------- Rolling VaR/ES -----------------------------
def rolling_hist_var_es(r: np.ndarray, alpha: float, window: int) -> Tuple[np.ndarray, np.ndarray]:
    var = np.full_like(r, np.nan, dtype=float)
    es = np.full_like(r, np.nan, dtype=float)

    for t in range(window, r.size):
        hist = r[t - window:t]
        q = float(np.quantile(hist, alpha))
        var[t] = q
        es[t] = float(hist[hist <= q].mean())
    return var, es


def rolling_normal_var_es(r: np.ndarray, alpha: float, window: int) -> Tuple[np.ndarray, np.ndarray]:
    """
    Parametric Normal VaR/ES with rolling mean/std.
    Uses an inverse-CDF approximation for z_alpha to avoid SciPy.
    """
    var = np.full_like(r, np.nan, dtype=float)
    es = np.full_like(r, np.nan, dtype=float)

    z = inv_norm_cdf(alpha)  # negative number
    pdf = (1.0 / math.sqrt(2.0 * math.pi)) * math.exp(-0.5 * z * z)

    for t in range(window, r.size):
        hist = r[t - window:t]
        mu = float(hist.mean())
        sig = float(hist.std(ddof=1))
        if sig <= 0 or not np.isfinite(sig):
            continue
        var[t] = mu + sig * z
        # ES for normal: mu - sig * pdf / alpha  (for left tail alpha)
        es[t] = mu - sig * (pdf / alpha)
    return var, es


def inv_norm_cdf(p: float) -> float:
    """
    Approx inverse standard normal CDF (Acklam-like rational approximation, scalar).
    Good enough for risk backtesting. No SciPy.
    """
    if p <= 0.0 or p >= 1.0:
        raise ValueError("p must be in (0,1)")

    # Coefficients
    a = [-3.969683028665376e+01,  2.209460984245205e+02, -2.759285104469687e+02,
          1.383577518672690e+02, -3.066479806614716e+01,  2.506628277459239e+00]
    b = [-5.447609879822406e+01,  1.615858368580409e+02, -1.556989798598866e+02,
          6.680131188771972e+01, -1.328068155288572e+01]
    c = [-7.784894002430293e-03, -3.223964580411365e-01, -2.400758277161838e+00,
         -2.549732539343734e+00,  4.374664141464968e+00,  2.938163982698783e+00]
    d = [ 7.784695709041462e-03,  3.224671290700398e-01,  2.445134137142996e+00,
          3.754408661907416e+00]

    plow = 0.02425
    phigh = 1 - plow

    if p < plow:
        q = math.sqrt(-2 * math.log(p))
        num = (((((c[0]*q + c[1])*q + c[2])*q + c[3])*q + c[4])*q + c[5])
        den = ((((d[0]*q + d[1])*q + d[2])*q + d[3])*q + 1)
        return num / den
    if p > phigh:
        q = math.sqrt(-2 * math.log(1 - p))
        num = (((((c[0]*q + c[1])*q + c[2])*q + c[3])*q + c[4])*q + c[5])
        den = ((((d[0]*q + d[1])*q + d[2])*q + d[3])*q + 1)
        return -(num / den)

    q = p - 0.5
    r = q * q
    num = (((((a[0]*r + a[1])*r + a[2])*r + a[3])*r + a[4])*r + a[5]) * q
    den = (((((b[0]*r + b[1])*r + b[2])*r + b[3])*r + b[4]) * r + 1)
    return num / den


# ----------------------------- Pipeline -----------------------------
def run_pipeline(cfg: Config) -> Dict[str, object]:
    np.random.seed(cfg.seed)

    print(f"[INFO] Downloading prices for {cfg.symbols} from {cfg.start} ...")
    prices = load_prices(cfg.symbols, cfg.start)
    rets = compute_returns(prices, cfg.use_log_returns)
    if cfg.dropna:
        rets = rets.dropna(how="any")

    if rets.empty or len(rets) <= cfg.window + 5:
        raise RuntimeError(f"Not enough data after cleaning. rows={len(rets)} window={cfg.window}")

    w = portfolio_weights(cfg.symbols, cfg.weights)
    port = rets.values @ w
    idx = rets.index

    method = cfg.method.lower().strip()
    if method not in ("hist", "normal"):
        raise RuntimeError("--method must be 'hist' or 'normal'")

    print(f"[INFO] Computing rolling {method} VaR/ES: alpha={cfg.alpha} window={cfg.window} ...")
    if method == "hist":
        var, es = rolling_hist_var_es(port, cfg.alpha, cfg.window)
    else:
        var, es = rolling_normal_var_es(port, cfg.alpha, cfg.window)

    # Backtest region (where var is defined)
    valid = np.isfinite(var) & np.isfinite(es)
    r_bt = port[valid]
    var_bt = var[valid]
    es_bt = es[valid]

    exceed = (r_bt <= var_bt).astype(int)

    pof = kupiec_pof(exceed, cfg.alpha)
    ind = christoffersen_ind(exceed)
    cc = christoffersen_cc(exceed, cfg.alpha)
    es_score = es_backtest_score(r_bt, var_bt, es_bt, cfg.alpha)

    out = pd.DataFrame({
        "date": idx,
        "port_ret": port,
        "VaR": var,
        "ES": es,
    })
    out["exceed"] = ((out["port_ret"] <= out["VaR"]) & np.isfinite(out["VaR"])).astype(int)
    out = out.set_index("date")

    summary = {
        "config": asdict(cfg),
        "data_window": {
            "start": str(idx.min().date()),
            "end": str(idx.max().date()),
            "n_returns": int(len(rets)),
            "window": int(cfg.window),
            "n_backtest": int(r_bt.size),
        },
        "portfolio": {
            "symbols": list(cfg.symbols),
            "weights": [float(x) for x in w],
        },
        "var_backtests": {
            "kupiec_pof": pof,
            "christoffersen_ind": ind,
            "christoffersen_cc": cc,
        },
        "es_backtest": es_score,
        "notes": [
            "LR statistics are reported without p-values (chi-square CDF would require SciPy).",
            "Rules of thumb: LR_pof (df=1), LR_ind (df=1), LR_cc (df=2).",
            "ES score ~ 0 is desirable; negative means ES too optimistic (underestimates tail loss)."
        ],
    }

    return {"series": out, "summary": summary}


def save_outputs(result: Dict[str, object], cfg: Config) -> None:
    series: pd.DataFrame = result["series"]  # type: ignore
    summary: Dict = result["summary"]  # type: ignore

    os.makedirs(os.path.dirname(cfg.out_csv) or ".", exist_ok=True)
    os.makedirs(os.path.dirname(cfg.out_json) or ".", exist_ok=True)

    series.to_csv(cfg.out_csv)
    with open(cfg.out_json, "w", encoding="utf-8") as f:
        json.dump(summary, f, indent=2)

    pof = summary["var_backtests"]["kupiec_pof"]
    ind = summary["var_backtests"]["christoffersen_ind"]
    cc = summary["var_backtests"]["christoffersen_cc"]
    esb = summary["es_backtest"]

    print(f"[OK] Saved series → {cfg.out_csv}")
    print(f"[OK] Saved summary → {cfg.out_json}")
    print(
        f"[POF] T={int(pof['T'])} x={int(pof['x'])} phat={pof['phat']:.4f} LR={pof['LR_pof']:.3f}"
    )
    print(
        f"[IND] LR={ind['LR_ind']:.3f}  n01={int(ind['n01'])} n11={int(ind['n11'])}  pi0={ind['pi0']:.3f} pi1={ind['pi1']:.3f}"
    )
    print(
        f"[CC ] LR={cc['LR_cc']:.3f}"
    )
    print(
        f"[ES ] tail_n={int(esb['n_tail'])} score={esb['score']:.6f} (near 0 is good)"
    )


# ----------------------------- CLI -----------------------------
def parse_args() -> Config:
    p = argparse.ArgumentParser(description="Level-97: VaR/ES backtesting (Kupiec + Christoffersen)")

    p.add_argument("--start", type=str, default=Config.start)
    p.add_argument("--symbols", nargs="+", default=list(Config.symbols))
    p.add_argument("--weights", nargs="+", type=float, default=None)

    p.add_argument("--alpha", type=float, default=Config.alpha)
    p.add_argument("--window", type=int, default=Config.window)
    p.add_argument("--method", type=str, default=Config.method, choices=["hist", "normal"])

    p.add_argument("--simple-returns", action="store_true")
    p.add_argument("--no-dropna", action="store_true")

    p.add_argument("--seed", type=int, default=Config.seed)

    p.add_argument("--csv", type=str, default=Config.out_csv)
    p.add_argument("--json", type=str, default=Config.out_json)

    a = p.parse_args()
    weights = tuple(a.weights) if a.weights is not None else None

    return Config(
        symbols=tuple(a.symbols),
        start=a.start,
        weights=weights,
        alpha=float(a.alpha),
        window=int(a.window),
        method=str(a.method),
        use_log_returns=(not a.simple_returns),
        dropna=(not a.no_dropna),
        seed=int(a.seed),
        out_csv=a.csv,
        out_json=a.json,
    )


def main() -> None:
    cfg = parse_args()
    result = run_pipeline(cfg)
    save_outputs(result, cfg)


if __name__ == "__main__":
    # Jupyter/PyCharm shim
    import sys
    sys.argv = [sys.argv[0]] + [
        arg for arg in sys.argv[1:]
        if arg != "-f" and not (arg.endswith(".json") and "kernel" in arg)
    ]
    main()


[INFO] Downloading prices for ('SPY', 'QQQ', 'IWM', 'EFA', 'EEM', 'TLT', 'LQD', 'GLD') from 2010-01-01 ...
[INFO] Computing rolling hist VaR/ES: alpha=0.05 window=756 ...
[OK] Saved series → level97_var_backtest_series.csv
[OK] Saved summary → level97_var_backtest_summary.json
[POF] T=3264 x=159 phat=0.0487 LR=0.115
[IND] LR=11.300  n01=141 n11=18  pi0=0.045 pi1=0.113
[CC ] LR=11.415
[ES ] tail_n=159 score=0.000146 (near 0 is good)
