In [1]:
# level84_var_es_backtest.py
# Level-84: VaR + Expected Shortfall (ES / CVaR) + Backtesting (Kupiec + Christoffersen)
#
# Free-data, end-to-end script:
# - Pull daily prices from yfinance
# - Build portfolio returns (equal-weight default or user-specified weights)
# - Compute rolling 1-day VaR and ES using:
#     (A) Historical Simulation (HS)
#     (B) Filtered Historical Simulation (FHS) with EWMA volatility scaling
# - Backtest:
#     - Kupiec POF (unconditional coverage) test
#     - Christoffersen independence test
#     - Christoffersen conditional coverage test (POF + IND)
#
# Outputs:
#   - level84_var_es_panel.csv
#   - level84_var_es_summary.json
#
# Examples:
#   python level84_var_es_backtest.py
#   python level84_var_es_backtest.py --symbols SPY QQQ IWM TLT GLD --alpha 0.01 --window 750 --method fhs
#   python level84_var_es_backtest.py --symbols SPY QQQ --weights 0.6 0.4 --alpha 0.05 --method hs
#
# Notes:
# - Loss is defined as L = -return (positive = loss).
# - VaR/ES are reported as POSITIVE loss numbers (e.g., 0.02 = 2% loss).
# - Breach happens when loss > VaR.

import os
import json
import math
import argparse
from dataclasses import dataclass, asdict
from typing import Tuple, Optional, Dict, List

import numpy as np
import pandas as pd
import yfinance as yf

# Optional (for p-values). Script still runs without SciPy.
try:
    from scipy.stats import chi2  # type: ignore
    _HAVE_SCIPY = True
except Exception:
    _HAVE_SCIPY = False


# ----------------------------- Config -----------------------------
@dataclass
class Config:
    symbols: Tuple[str, ...] = ("SPY", "QQQ", "IWM", "EFA", "EEM", "TLT", "LQD", "GLD")
    weights: Optional[Tuple[float, ...]] = None  # if None => equal-weight
    start: str = "2010-01-01"

    alpha: float = 0.01            # tail probability (1% default)
    window: int = 750              # rolling lookback (~3 years)
    method: str = "fhs"            # "hs" or "fhs"

    # EWMA for FHS
    ewma_lambda: float = 0.94      # RiskMetrics-style daily lambda
    vol_floor: float = 1e-8

    seed: int = 42

    out_csv: str = "level84_var_es_panel.csv"
    out_json: str = "level84_var_es_summary.json"


# ----------------------------- Data loader -----------------------------
def load_prices(symbols: Tuple[str, ...], start: str) -> pd.DataFrame:
    px = yf.download(list(symbols), start=start, auto_adjust=True, progress=False)

    if px is None or len(px) == 0:
        raise RuntimeError("No data returned from yfinance (check symbols/start).")

    # yfinance often returns MultiIndex columns for multiple tickers
    if isinstance(px.columns, pd.MultiIndex):
        if ("Close" in px.columns.get_level_values(0)) is False:
            raise RuntimeError(f"Expected 'Close' in MultiIndex columns. Got levels: {px.columns.levels}")
        close = px["Close"].copy()
    else:
        if "Close" not in px.columns:
            raise RuntimeError(f"Expected 'Close' column. Got: {list(px.columns)}")
        close = px[["Close"]].copy()
        close.columns = [symbols[0]]

    close = close.dropna(how="any")
    close = close.sort_index()
    close.columns = [str(c) for c in close.columns]
    return close


def compute_log_returns(prices: pd.DataFrame) -> pd.DataFrame:
    rets = np.log(prices).diff().dropna()
    rets = rets.replace([np.inf, -np.inf], np.nan).dropna(how="any")
    return rets


def normalize_weights(symbols: Tuple[str, ...], weights: Optional[Tuple[float, ...]]) -> np.ndarray:
    n = len(symbols)
    if weights is None:
        w = np.ones(n) / n
        return w
    if len(weights) != n:
        raise ValueError(f"--weights length ({len(weights)}) must match --symbols length ({n}).")
    w = np.array(weights, dtype=float)
    if not np.isfinite(w).all():
        raise ValueError("Weights must be finite numbers.")
    s = float(w.sum())
    if abs(s) < 1e-12:
        raise ValueError("Weights sum to ~0; cannot normalize.")
    w = w / s
    return w


# ----------------------------- Risk metrics: HS + FHS -----------------------------
def hs_var_es(losses_window: np.ndarray, alpha: float) -> Dict[str, float]:
    """
    Historical simulation VaR/ES on a window of losses.
    Returns positive numbers (loss units).
    """
    q = float(np.quantile(losses_window, 1.0 - alpha))  # VaR at (1-alpha) quantile of losses
    tail = losses_window[losses_window >= q]
    es = float(tail.mean()) if tail.size else float(q)
    return {"VaR": q, "ES": es}


def ewma_vol(returns: np.ndarray, lam: float, vol_floor: float) -> np.ndarray:
    """
    EWMA volatility estimate (sigma_t) for each t, using past information.
    sigma_t^2 = lam * sigma_{t-1}^2 + (1-lam) * r_{t-1}^2
    """
    n = returns.size
    sig2 = np.zeros(n, dtype=float)

    # initialize with sample var of first ~50 points (or fewer)
    m = min(50, n)
    init = float(np.var(returns[:m], ddof=1)) if m >= 2 else float(returns[0] ** 2)
    sig2[0] = max(init, vol_floor)

    for t in range(1, n):
        r_prev = float(returns[t - 1])
        sig2[t] = lam * sig2[t - 1] + (1.0 - lam) * (r_prev * r_prev)
        if sig2[t] < vol_floor:
            sig2[t] = vol_floor

    return np.sqrt(sig2)


def fhs_var_es(returns_window: np.ndarray, alpha: float, lam: float, vol_floor: float) -> Dict[str, float]:
    """
    Filtered Historical Simulation:
    - Estimate EWMA vol series within the window
    - Standardize returns -> residuals
    - Compute residual VaR/ES
    - Scale back using *last* sigma in window for next-day forecast
    """
    sig = ewma_vol(returns_window, lam=lam, vol_floor=vol_floor)
    # Avoid divide-by-zero
    z = returns_window / np.maximum(sig, vol_floor)

    # Convert to losses: L = -(sigma_next * z)
    # For next-day (t+1) forecast, we use sigma_last as sigma_next
    sigma_next = float(sig[-1])

    z_losses = -(z)  # because loss = -return, and return ~ sigma_next * z
    # VaR/ES on standardized losses, then scale by sigma_next
    z_q = float(np.quantile(z_losses, 1.0 - alpha))
    z_tail = z_losses[z_losses >= z_q]
    z_es = float(z_tail.mean()) if z_tail.size else float(z_q)

    return {"VaR": sigma_next * z_q, "ES": sigma_next * z_es, "sigma_next": sigma_next}


# ----------------------------- Backtests -----------------------------
def kupiec_pof_test(breaches: np.ndarray, alpha: float) -> Dict[str, float]:
    """
    Kupiec POF test for unconditional coverage.
    breaches: 0/1 array where 1 indicates VaR breach.
    """
    n = int(breaches.size)
    x = int(breaches.sum())
    p = float(alpha)

    # handle edge cases safely
    if n == 0:
        return {"LR_pof": float("nan"), "p_value": float("nan"), "n": 0, "x": 0}

    pi_hat = x / n
    # log-likelihoods with safe clamps
    def _log(a: float) -> float:
        return math.log(max(a, 1e-15))

    ll0 = (n - x) * _log(1.0 - p) + x * _log(p)
    ll1 = (n - x) * _log(1.0 - pi_hat) + x * _log(pi_hat)

    lr = -2.0 * (ll0 - ll1)
    pv = float(chi2.sf(lr, df=1)) if _HAVE_SCIPY else float("nan")

    return {"LR_pof": float(lr), "p_value": pv, "n": n, "x": x, "pi_hat": float(pi_hat)}


def christoffersen_ind_test(breaches: np.ndarray) -> Dict[str, float]:
    """
    Christoffersen independence test using 2x2 transition counts.
    """
    b = breaches.astype(int)
    if b.size < 2:
        return {"LR_ind": float("nan"), "p_value": float("nan"), "n00": 0, "n01": 0, "n10": 0, "n11": 0}

    b0 = b[:-1]
    b1 = b[1:]

    n00 = int(((b0 == 0) & (b1 == 0)).sum())
    n01 = int(((b0 == 0) & (b1 == 1)).sum())
    n10 = int(((b0 == 1) & (b1 == 0)).sum())
    n11 = int(((b0 == 1) & (b1 == 1)).sum())

    # transition probabilities
    pi01 = n01 / max(n00 + n01, 1)
    pi11 = n11 / max(n10 + n11, 1)
    pi = (n01 + n11) / max(n00 + n01 + n10 + n11, 1)

    def _log(a: float) -> float:
        return math.log(max(a, 1e-15))

    # likelihood under independence
    ll_ind = (n00 + n10) * _log(1.0 - pi) + (n01 + n11) * _log(pi)
    # likelihood under Markov
    ll_mkv = (n00) * _log(1.0 - pi01) + (n01) * _log(pi01) + (n10) * _log(1.0 - pi11) + (n11) * _log(pi11)

    lr = -2.0 * (ll_ind - ll_mkv)
    pv = float(chi2.sf(lr, df=1)) if _HAVE_SCIPY else float("nan")

    return {
        "LR_ind": float(lr),
        "p_value": pv,
        "n00": n00, "n01": n01, "n10": n10, "n11": n11,
        "pi01": float(pi01), "pi11": float(pi11), "pi": float(pi)
    }


def christoffersen_cc_test(pof: Dict[str, float], ind: Dict[str, float]) -> Dict[str, float]:
    """
    Conditional coverage: LR_cc = LR_pof + LR_ind (df=2)
    """
    lr_pof = float(pof.get("LR_pof", float("nan")))
    lr_ind = float(ind.get("LR_ind", float("nan")))
    lr_cc = lr_pof + lr_ind
    pv = float(chi2.sf(lr_cc, df=2)) if _HAVE_SCIPY else float("nan")
    return {"LR_cc": float(lr_cc), "p_value": pv}


# ----------------------------- Pipeline -----------------------------
def run_pipeline(cfg: Config) -> Dict[str, object]:
    np.random.seed(cfg.seed)

    print(f"[INFO] Downloading prices for {cfg.symbols} from {cfg.start} ...")
    prices = load_prices(cfg.symbols, cfg.start)
    rets = compute_log_returns(prices)
    print(f"[INFO] Got {len(prices)} price rows, {len(rets)} return rows, assets={rets.shape[1]}")

    w = normalize_weights(cfg.symbols, cfg.weights)
    port_ret = rets.values @ w
    port_ret = pd.Series(port_ret, index=rets.index, name="port_ret")

    # Loss series
    loss = -port_ret

    # Rolling VaR/ES
    out = pd.DataFrame(index=rets.index)
    out["port_ret"] = port_ret
    out["loss"] = loss

    VaR = np.full(len(out), np.nan, dtype=float)
    ES = np.full(len(out), np.nan, dtype=float)
    sigma_next = np.full(len(out), np.nan, dtype=float)

    method = cfg.method.lower().strip()
    if method not in ("hs", "fhs"):
        raise ValueError("--method must be 'hs' or 'fhs'")

    for t in range(cfg.window, len(out)):
        # Use trailing window ending at t-1 for a forecast at t
        win = out["port_ret"].iloc[t - cfg.window:t].values.astype(float)
        loss_win = -win

        if method == "hs":
            m = hs_var_es(loss_win, cfg.alpha)
            VaR[t] = m["VaR"]
            ES[t] = m["ES"]
        else:
            m = fhs_var_es(win, cfg.alpha, lam=cfg.ewma_lambda, vol_floor=cfg.vol_floor)
            VaR[t] = m["VaR"]
            ES[t] = m["ES"]
            sigma_next[t] = float(m["sigma_next"])

    out["VaR"] = VaR
    out["ES"] = ES
    if method == "fhs":
        out["sigma_next"] = sigma_next

    # Breaches: realized loss > forecast VaR
    out["breach"] = ((out["loss"] > out["VaR"]) & out["VaR"].notna()).astype(int)

    # Backtest window (only where VaR exists)
    bt = out.dropna(subset=["VaR"]).copy()
    breaches = bt["breach"].values.astype(int)

    pof = kupiec_pof_test(breaches, alpha=cfg.alpha)
    ind = christoffersen_ind_test(breaches)
    cc = christoffersen_cc_test(pof, ind)

    # Basic performance stats (returns, not risk forecast accuracy)
    ann_ret = float(bt["port_ret"].mean() * 252.0)
    ann_vol = float(bt["port_ret"].std(ddof=1) * math.sqrt(252.0))
    sharpe = float(ann_ret / ann_vol) if ann_vol > 0 else float("nan")

    # Save summary
    summary = {
        "config": asdict(cfg),
        "data_window": {
            "start": str(rets.index.min().date()),
            "end": str(rets.index.max().date()),
            "n_returns": int(len(rets)),
            "n_backtest": int(len(bt)),
        },
        "portfolio": {
            "symbols": list(cfg.symbols),
            "weights": [float(x) for x in w.tolist()],
        },
        "performance": {
            "ann_ret": ann_ret,
            "ann_vol": ann_vol,
            "sharpe": sharpe,
        },
        "risk": {
            "alpha": float(cfg.alpha),
            "method": method,
            "avg_VaR": float(bt["VaR"].mean()),
            "avg_ES": float(bt["ES"].mean()),
        },
        "backtests": {
            "kupiec_pof": pof,
            "christoffersen_ind": ind,
            "christoffersen_cc": cc,
            "scipy_available_for_pvalues": bool(_HAVE_SCIPY),
        }
    }

    return {"out": out, "summary": summary}


def save_outputs(result: Dict[str, object], cfg: Config) -> None:
    out: pd.DataFrame = result["out"]  # type: ignore
    summary: Dict = result["summary"]  # type: ignore

    os.makedirs(os.path.dirname(cfg.out_csv) or ".", exist_ok=True)
    os.makedirs(os.path.dirname(cfg.out_json) or ".", exist_ok=True)

    out.to_csv(cfg.out_csv)
    with open(cfg.out_json, "w", encoding="utf-8") as f:
        json.dump(summary, f, indent=2)

    # Console summary
    bt = out.dropna(subset=["VaR"])
    n = int(len(bt))
    x = int(bt["breach"].sum())
    rate = x / n if n > 0 else float("nan")

    print(f"[OK] Saved panel → {cfg.out_csv}")
    print(f"[OK] Saved summary → {cfg.out_json}")
    print(f"[INFO] Backtest points: {n}, Breaches: {x}, Breach rate: {rate:.4f}, Expected: {cfg.alpha:.4f}")
    print(f"[INFO] Avg VaR={bt['VaR'].mean():.5f}  Avg ES={bt['ES'].mean():.5f}  (loss units)")
    pof = summary["backtests"]["kupiec_pof"]
    ind = summary["backtests"]["christoffersen_ind"]
    cc = summary["backtests"]["christoffersen_cc"]
    print(f"[TEST] Kupiec LR={pof['LR_pof']:.3f}  p={pof['p_value']}")
    print(f"[TEST] Ind   LR={ind['LR_ind']:.3f}  p={ind['p_value']}")
    print(f"[TEST] CC    LR={cc['LR_cc']:.3f}  p={cc['p_value']}")


# ----------------------------- CLI -----------------------------
def parse_args() -> Config:
    p = argparse.ArgumentParser(description="Level-84: VaR+ES with Kupiec/Christoffersen backtests")

    p.add_argument("--start", type=str, default="2010-01-01")
    p.add_argument("--symbols", nargs="+", default=list(Config.symbols))

    p.add_argument("--weights", nargs="*", type=float, default=None)
    p.add_argument("--alpha", type=float, default=0.01)
    p.add_argument("--window", type=int, default=750)
    p.add_argument("--method", type=str, default="fhs", choices=["hs", "fhs"])

    p.add_argument("--ewma-lambda", type=float, default=0.94)
    p.add_argument("--vol-floor", type=float, default=1e-8)

    p.add_argument("--seed", type=int, default=42)

    p.add_argument("--csv", type=str, default="level84_var_es_panel.csv")
    p.add_argument("--json", type=str, default="level84_var_es_summary.json")

    a = p.parse_args()

    weights_tuple = tuple(a.weights) if a.weights is not None and len(a.weights) > 0 else None

    return Config(
        symbols=tuple(a.symbols),
        weights=weights_tuple,
        start=a.start,
        alpha=float(a.alpha),
        window=int(a.window),
        method=str(a.method),
        ewma_lambda=float(a.ewma_lambda),
        vol_floor=float(a.vol_floor),
        seed=int(a.seed),
        out_csv=a.csv,
        out_json=a.json,
    )


def main() -> None:
    cfg = parse_args()
    result = run_pipeline(cfg)
    save_outputs(result, cfg)


if __name__ == "__main__":
    # Jupyter/PyCharm cell shim: strip "-f kernel.json" etc.
    import sys
    sys.argv = [sys.argv[0]] + [
        arg for arg in sys.argv[1:]
        if arg != "-f" and not (arg.endswith(".json") and "kernel" in arg)
    ]
    main()


[INFO] Downloading prices for ('SPY', 'QQQ', 'IWM', 'EFA', 'EEM', 'TLT', 'LQD', 'GLD') from 2010-01-01 ...
[INFO] Got 4021 price rows, 4020 return rows, assets=8
[OK] Saved panel → level84_var_es_panel.csv
[OK] Saved summary → level84_var_es_summary.json
[INFO] Backtest points: 3270, Breaches: 38, Breach rate: 0.0116, Expected: 0.0100
[INFO] Avg VaR=0.01935  Avg ES=0.02518  (loss units)
[TEST] Kupiec LR=0.825  p=0.3638026067475352
[TEST] Ind   LR=11.211  p=0.0008131871473722776
[TEST] CC    LR=12.036  p=0.0024350039072983027
