In [None]:
"""
Volatility-Integrated LSTM and GRU Models for Heston-Nandi and Component Heston-Nandi
"""

"""
Volatility-Integrated GRU Model for Heston-Nandi

Minimal TensorFlow 2 implementation:
- Single-layer GRU with hidden size 1
- Heston–Nandi variance enters as a dynamic bias in the GRU update gate
- Joint MLE over HN parameters and GRU weights using the HN-style likelihood

This is designed as a compact starting point; extensions (CHN, LSTM, multi-layer, options) can be added later.
"""

from __future__ import annotations

from typing import Optional

import tensorflow as tf


class VolIntegratedGRU(tf.keras.Model):
    """Single-asset, single-layer GRU with scalar hidden state.

    The Heston–Nandi variance h_t enters the GRU update gate as a dynamic bias.
    The hidden state η_t is interpreted as the conditional variance used in the
    return likelihood:

        R_t | η_t ~ N(λ * η_t, η_t)

    The Heston–Nandi recursion updates h_t using the same shock that drives
    returns, but with η_t in the inversion as in the proposal's NN-HN variant.
    """

    def __init__(self, input_dim: int, dtype: tf.dtypes.DType = tf.float32):
        super().__init__(dtype=dtype)

        # === Heston–Nandi parameters (trainable scalars) ===
        # Re-parameterised form: h_{t+1} = ω + ϕ(h_t - ω) + α(z_t^2 - 1 - 2 γ sqrt(h_t) z_t)
        self.omega = self.add_weight(
            name="omega", shape=(), initializer="zeros", dtype=dtype
        )
        self.alpha = self.add_weight(
            name="alpha", shape=(), initializer="zeros", dtype=dtype
        )
        self.phi = self.add_weight(
            name="phi", shape=(), initializer="zeros", dtype=dtype
        )
        self.lam = self.add_weight(
            name="lam", shape=(), initializer="zeros", dtype=dtype
        )
        self.gam = self.add_weight(
            name="gam", shape=(), initializer="zeros", dtype=dtype
        )

        # === GRU parameters (hidden size = 1) ===
        self.input_dim = int(input_dim)

        # reset gate r_t
        self.W_r = self.add_weight(
            name="W_r", shape=(self.input_dim, 1), initializer="glorot_uniform", dtype=dtype
        )
        self.U_r = self.add_weight(
            name="U_r", shape=(1, 1), initializer="glorot_uniform", dtype=dtype
        )
        self.b_r = self.add_weight(
            name="b_r", shape=(1,), initializer="zeros", dtype=dtype
        )

        # update gate u_t with dynamic bias b_t = gamma_u * h_t
        self.W_u = self.add_weight(
            name="W_u", shape=(self.input_dim, 1), initializer="glorot_uniform", dtype=dtype
        )
        self.U_u = self.add_weight(
            name="U_u", shape=(1, 1), initializer="glorot_uniform", dtype=dtype
        )
        self.b_u = self.add_weight(
            name="b_u", shape=(1,), initializer="zeros", dtype=dtype
        )
        self.gamma_u = self.add_weight(  # scalar scale for dynamic bias
            name="gamma_u", shape=(), initializer="zeros", dtype=dtype
        )

        # candidate state n_t
        self.W_n = self.add_weight(
            name="W_n", shape=(self.input_dim, 1), initializer="glorot_uniform", dtype=dtype
        )
        self.U_n = self.add_weight(
            name="U_n", shape=(1, 1), initializer="glorot_uniform", dtype=dtype
        )
        self.b_n = self.add_weight(
            name="b_n", shape=(1,), initializer="zeros", dtype=dtype
        )

    @property
    def eps(self) -> tf.Tensor:
        return tf.constant(1e-8, dtype=self.dtype)

    def gru_step(self, x_t: tf.Tensor, h_vol_t: tf.Tensor, h_prev: tf.Tensor) -> tf.Tensor:
        """One GRU step with volatility-integrated update gate.

        Shapes (single asset):
            x_t:      [1, input_dim]
            h_vol_t:  [1, 1]   (Heston–Nandi variance h_t)
            h_prev:   [1, 1]   (previous hidden state)
        Returns:
            eta_t:    [1, 1]   (interpreted as variance, forced positive)
        """

        # reset gate r_t
        r_t = tf.sigmoid(
            tf.matmul(x_t, self.W_r) + self.b_r + tf.matmul(h_prev, self.U_r)
        )

        # update gate u_t with dynamic bias b_t = gamma_u * h_vol_t
        b_t = self.gamma_u * h_vol_t
        u_t = tf.sigmoid(
            tf.matmul(x_t, self.W_u) + self.b_u + tf.matmul(h_prev, self.U_u) + b_t
        )

        # candidate state n_t
        n_t = tf.tanh(
            tf.matmul(x_t, self.W_n) + self.b_n + r_t * (tf.matmul(h_prev, self.U_n))
        )

        h_t = (1.0 - u_t) * n_t + u_t * h_prev

        # Interpret hidden state as conditional variance η_t, enforce positivity via softplus
        eta_t = tf.nn.softplus(h_t)
        return eta_t

    def forward(self, y: tf.Tensor, rv: tf.Tensor) -> tuple[tf.Tensor, tf.Tensor]:
        """Run joint HN–GRU recursion and return standardized shocks and variances.

        Args
        -----
        y  : shape [T]
            Excess returns time series.
        rv : shape [T]
            Realized variance time series.

        Returns
        -------
        z_val : shape [T]
            Standardized shocks z_t = (y_t - λ η_t) / sqrt(η_t).
        h_val : shape [T]
            Variance path η_t used in the likelihood.
        """

        y = tf.convert_to_tensor(y, dtype=self.dtype)
        rv = tf.convert_to_tensor(rv, dtype=self.dtype)

        T = tf.shape(y)[0]
        z_array = tf.TensorArray(self.dtype, size=T)
        h_array = tf.TensorArray(self.dtype, size=T)

        # Initial variance: simple empirical variance of returns
        h_vol_t = tf.math.reduce_variance(y)
        eta_t = h_vol_t
        h_prev = tf.reshape(eta_t, (1, 1))

        # t = 0 step
        z_t = (y[0] - self.lam * eta_t) / tf.sqrt(eta_t)
        z_array = z_array.write(0, z_t)
        h_array = h_array.write(0, h_vol_t)

        # Iterate for t = 1, ..., T-1
        for t in tf.range(1, T):
            y_t = y[t]
            rv_t = rv[t]

            # Heston–Nandi variance update for h_{t}
            term = tf.square(z_t) - 1.0 - 2.0 * self.gam * tf.sqrt(h_vol_t + self.eps) * z_t
            h_vol_t = self.omega + self.phi * (h_vol_t - self.omega) + self.alpha * term
            h_vol_t = tf.nn.softplus(h_vol_t)

            # Build GRU input x_t = [η_{t-1}, y_t, rv_t, h_vol_t]
            x_t = tf.stack([eta_t, y_t, rv_t, h_vol_t])
            x_t = tf.reshape(x_t, (1, self.input_dim))

            # One GRU step → η_t (variance), using dynamic bias from h_vol_t
            h_vol_mat = tf.reshape(h_vol_t, (1, 1))
            eta_mat = self.gru_step(x_t, h_vol_mat, h_prev)
            eta_t = tf.reshape(eta_mat, ())
            h_prev = eta_mat

            # Standardized shock using η_t
            z_t = (y_t - self.lam * eta_t) / tf.sqrt(eta_t + self.eps)

            z_array = z_array.write(t, z_t)
            h_array = h_array.write(t, eta_t)

        z_val = z_array.stack()
        h_val = h_array.stack()
        return z_val, h_val

    def neg_loglike(
        self,
        z_val: tf.Tensor,
        h_val: tf.Tensor,
    ) -> tf.Tensor:
        """Negative log-likelihood given standardized shocks and variances.

        We assume return dynamics of the form

            R_t = λ η_t + sqrt(η_t) z_t,   z_t ~ N(0, 1).

        The log-likelihood per observation is

            -1/2 [ log(2π) + log(η_t) + z_t^2 ].
        """

        z_val = tf.convert_to_tensor(z_val, dtype=self.dtype)
        h_val = tf.convert_to_tensor(h_val, dtype=self.dtype)

        log2pi = tf.math.log(2.0 * tf.constant(3.141592653589793, dtype=self.dtype))
        nll = 0.5 * tf.reduce_sum(log2pi + tf.math.log(h_val + self.eps) + tf.square(z_val))
        return nll


@tf.function
def train_epoch(
    model: VolIntegratedGRU,
    optimizer: tf.keras.optimizers.Optimizer,
    returns: tf.Tensor,
    rv: tf.Tensor,
) -> tf.Tensor:
    """One full-sequence gradient step (no batching) for SPX.

    This does MLE over the entire available history each call using the
    joint HN–GRU recursion.
    """

    with tf.GradientTape() as tape:
        z_val, h_val = model.forward(returns, rv)
        loss = model.neg_loglike(z_val, h_val)

    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    return loss


__all__ = ["VolIntegratedGRU", "train_epoch"]

In [None]:
import pandas as pd
import tensorflow as tf
from vol_integrate_gru import VolIntegratedGRU, train_epoch

# Load returns
df_ret = pd.read_csv("returns.csv")
df_rv  = pd.read_csv("RV5.csv")

returns_tf = tf.convert_to_tensor(df_ret["exret"].values, dtype=tf.float32)
rv_tf      = tf.convert_to_tensor(df_rv["rv5"].values, dtype=tf.float32)

model = VolIntegratedGRU(input_dim=4, dtype=tf.float32)
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

for epoch in range(500):
    loss = train_epoch(model, optimizer, returns_tf, rv_tf)
    if epoch % 10 == 0:
        print(f"Epoch {epoch} | Loss: {loss.numpy():.6f}")

2025-11-13 20:42:36.061631: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M1 Pro
2025-11-13 20:42:36.061661: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 16.00 GB
2025-11-13 20:42:36.061668: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 5.33 GB
I0000 00:00:1763091756.061685 1415250 pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
I0000 00:00:1763091756.061708 1415250 pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


Epoch 0 | Loss: 11879.858398


In [2]:
print(returns_tf.shape)
print(rv_tf.shape)

(7559,)
(5017,)


In [None]:


def forward(self, y: tf.Tensor, rv: tf.Tensor) -> tf.Tensor:
        """Heston–Nandi variance recursion update with softplus positivity enforcement.

        h_vol_t: current variance (shape [1,1])
        z_t: shock at time t (shape [1,1])
        y: returns data
        rv: realized variance data
        """
        T = tf.size(y)  # returns data is a 1D vector if 5000
        h_vol, z_val = tf.zeros(T), tf.zeros(T)
        
        h_vol_t = tf.var(y)  # starting variance
        eta_t = h_vol_t   # start with the GARCH-type volatility
        z_t = (y[0] - self.lam * eta_t) / tf.sqrt(eta_t)
        h_vol[t] = h_vol_t
        z_val[t] = z_t
        
        for t in tf.range(1, T):
            X_t = tf.stack(eta_t, y[t], rv[t], h_vol_t)
            h_vol_t = self.omega + self.phi * (h_vol_t - self.omega) + self.alpha * (tf.square(z_t) - 1.0 - 2.0 * self.gam * tf.sqrt(h_vol_t) * z_t)
            eta_t = self.gru_step(X_t, h_vol_t)  # call vol integrated GRU to get eta_t
            z_t = (y[t] - self.lam * eta_t) / tf.sqrt(eta_t)
            h_vol[t] = h_vol_t
            z_val[t] = z_t
            
        return z_val, h_vol




Saved P-eval artifacts to artifacts/benchmark_eval/20250919-193730/
Heston (portfolio) MV RMSE=0.036396  |  LRM RMSE=0.006590
Bates  (portfolio) MV RMSE=0.050755  |  LRM RMSE=0.007781


In [None]:
"""
Minimal, readable quadratic-hedging benchmark (Heston & Bates)

What this file does (only):
1) Load calibrated params (oneDheston.json, oneDbates.json).
2) Build ONE real-data case: S0 from JSON, T=59/252, daily hedging (n_steps=59),
   strikes = [3225, 2600] with types ['call','put'].
3) Train time-step hedge ratios θ_k by sequential projection
   - MVH: simulate under VOMM
   - LRM: simulate under MMM
   (all in *discounted* units X_t = e^{-(r-q)t} S_t)
4) Evaluate under P (discounted)
   - MVH: V̄_T = v0 + ∑ θ_k ΔX_k with v0 := E[H~ - gains]  (same rule for Heston/Bates)
   - LRM: V̄_T = C_T + ∑ θ_k ΔX_k where C_T = H0 + L_T and
           L_T := H~ - H0 - ∑ θ_k ΔM_k with ΔM_k = ΔX_k - E[ΔX_k] (batch-wise de-drift)
5) Save only portfolio-level metrics & thetas in artifacts/benchmark_eval/<ts>/

Notes:
- Uses powers-of-two path counts (Sobol-friendly). We still call the model’s own RNG;
  if your SV_models uses Sobol for QMC, it will benefit directly.
- No jump-impact analysis, no multi-asset scaffolding, no ODE/Riccati code here.
- Keeps names short and descriptive.

Author: refactor for clarity/accuracy
"""

from __future__ import annotations

import json, os, time
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional

import numpy as np

from SV_models import HestonModel, BatesModel  # must provide: simulate_mmm, simulate_vomm, simulate_under_P


# ------------------------------- helpers -------------------------------------

def _next_pow2(n: int) -> int:
    n = int(max(1, n))
    return 1 << (n - 1).bit_length()

def _disc_grid(r: float, q: float, T: float, n_steps: int) -> Tuple[np.ndarray, np.ndarray]:
    t = np.linspace(0.0, T, n_steps + 1)
    disc = np.exp(-(r - q) * t)
    return t, disc

def _option_payoff(S_T: np.ndarray, K: float, kind: str) -> np.ndarray:
    if kind.lower() == "call":
        return np.maximum(S_T - K, 0.0)
    elif kind.lower() == "put":
        return np.maximum(K - S_T, 0.0)
    raise ValueError("kind must be 'call' or 'put'")

def _batch_discounted_payoffs(S_T: np.ndarray, Ks: np.ndarray, kinds: List[str], disc_T: float, S0: float) -> np.ndarray:
    # Normalize strikes by S0 for discounted/normalized payoff computation
    Ks_norm = Ks / S0
    out = np.zeros((len(Ks), S_T.shape[0]), dtype=np.float64)
    for i, (K, knd) in enumerate(zip(Ks_norm, kinds)):
        out[i] = disc_T * _option_payoff(S_T, K, knd)
    return out


# --------------------------- data / test case --------------------------------

def _load_json(path: str) -> dict:
    with open(path, "r") as f:
        return json.load(f)

def _first(d: dict, key: str, alt: str = "", default=None):
    if key in d:
        v = d[key]
        return v[0] if isinstance(v, list) else v
    if alt and alt in d:
        v = d[alt]
        return v[0] if isinstance(v, list) else v
    return default

def _normalize_heston(d: dict) -> dict:
    return {
        "S0":    _first(d, "S0", "S0_list"),
        "v0":    _first(d, "v0", "v0_list"),
        "r":     _first(d, "r", "r_list"),
        "q":     _first(d, "q", "q_list"),
        "kappa": _first(d, "kappa", "kappa_list"),
        "theta": _first(d, "theta", "theta_list"),
        "sigma": _first(d, "sigma", "sigma_list"),
        "rho":   _first(d, "rho", "rho_list"),
        "xi_s":  _first(d, "xi_s", "xi_s_list", 0.0),
    }

def _normalize_bates(d: dict) -> dict:
    p = _normalize_heston(d)
    p.update({
        "lambda_j": _first(d, "lambda_j", default=0.0),
        "mu_j": _first(d, "mu_j", default=0.0),
        "sigma_j": _first(d, "sigma_j", default=0.0),
        "E_J_minus_1": _first(d, "E_J_minus_1", default=0.0),
        "drift_adjustment": _first(d, "drift_adjustment", default=0.0),
    })
    return p

def build_real_case(heston_json="oneDheston.json", bates_json="oneDbates.json"):
    h_raw = _load_json(heston_json) if os.path.exists(heston_json) else {}
    b_raw = _load_json(bates_json) if os.path.exists(bates_json) else {}
    h = _normalize_heston(h_raw) if h_raw else None
    b = _normalize_bates(b_raw) if b_raw else None
    S0 = (h or b)["S0"]
    V0 = (h or b)["v0"]
    r, q = (h or b)["r"], (h or b)["q"]
    # real-data config
    T = 59.0 / 252.0
    n_steps = 59
    Ks = np.array([3225.0, 2600.0], dtype=float)
    kinds = ["call", "put"]
    return (h, b, dict(S0=S0, V0=V0, r=r, q=q, T=T, n_steps=n_steps, Ks=Ks, kinds=kinds))


# ----------------------------- core hedger -----------------------------------

@dataclass
class SimpleBench:
    name: str
    model: object  # HestonModel or BatesModel
    r: float
    q: float

    # ---------- simulation wrapper (returns discounted X, dX and discounted payoffs) ----------
    def _sim_discounted(self, measure: str, S0: float, V0: float, T: float, n_steps: int,
                        n_paths: int, Ks: np.ndarray, kinds: List[str]) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        dt = T / n_steps
        if measure == "mmm":
            S, V, _ = self.model.simulate_mmm(S0, V0, T, dt, n_paths, _show_progress=False)
        elif measure == "vomm":
            S, V, _ = self.model.simulate_vomm(S0, V0, T, dt, n_paths, _show_progress=False)
        elif measure == "p":
            S, V, _ = self.model.simulate_under_P(S0, V0, T, dt, n_paths, _show_progress=False)
        else:
            raise ValueError("measure must be 'mmm', 'vomm', or 'p'")
        # Normalize simulated prices by S0
        S = S / S0
        t, disc = _disc_grid(self.r, self.q, T, n_steps)
        X = (S * disc[None, :]).astype(np.float64)          # discounted price paths
        dX = X[:, 1:] - X[:, :-1]                           # discounted increments
        H_tilde = _batch_discounted_payoffs(S[:, -1], Ks, kinds, disc[-1], S0)
        return X, dX, H_tilde

    # ---------- LSM helpers (basis + continuation values) ----------
    def _poly_features(self, Xk: np.ndarray, Vk: Optional[np.ndarray], tk: float) -> np.ndarray:
        """
        Build simple polynomial basis in discounted (normalized) state at time k.
        Xk: (n_paths,) discounted normalized price; Vk: (n_paths,) variance (can be None); tk: scalar in [0, T].
        """
        if Vk is None:
            return np.column_stack([
                np.ones_like(Xk),
                Xk, Xk**2,
                tk*np.ones_like(Xk),
                tk*Xk
            ]).astype(np.float64)
        return np.column_stack([
            np.ones_like(Xk),
            Xk, Vk,
            Xk**2, Vk**2, Xk*Vk,
            tk*np.ones_like(Xk),
            tk*Xk, tk*Vk
        ]).astype(np.float64)

    def _discount_normalize_from_S(self, S: np.ndarray, S0: float, T: float, n_steps: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Given raw S paths (n_paths, n_steps+1), return (X, dX, disc) with
        X_t = e^{-(r-q)t} S_t / S0 and dX = ΔX."""
        t, disc = _disc_grid(self.r, self.q, T, n_steps)
        S_norm = S / S0
        X = (S_norm * disc[None, :]).astype(np.float64)
        dX = X[:, 1:] - X[:, :-1]
        return X, dX, disc
    
    def _lsm_continuation_values(self, X: np.ndarray, V: Optional[np.ndarray],
                                 H_tilde_port: np.ndarray, t_grid: np.ndarray) -> np.ndarray:
        """
        Least–squares Monte Carlo estimate of continuation value E[H~ | F_k] at each time k.
        Returns Vhat with shape (n_paths, n_steps+1).
        """
        n_paths, T_plus_1 = X.shape
        Vhat = np.zeros((n_paths, T_plus_1), dtype=np.float64)

        # terminal anchor
        Vhat[:, -1] = H_tilde_port

        # ridge for numerical stability
        ridge = 1e-10

        # backward regressions of H~ on basis at each time k
        for k in range(T_plus_1 - 1):
            # forward (independent) fits at time k; we regress H~ directly on current state
            Xk = X[:, k]
            Vk = None if V is None else V[:, k]
            Phi = self._poly_features(Xk, Vk, t_grid[k])
            # solve (Phi^T Phi + ridge I) beta = Phi^T H
            A = Phi.T @ Phi
            A.flat[::A.shape[0]+1] += ridge
            beta = np.linalg.solve(A, Phi.T @ H_tilde_port)
            Vhat[:, k] = Phi @ beta

        return Vhat

    # ---------- LRM theta under MMM (local projection on ΔX without drift-removal) ----------
    def train_theta_lrm(self, S0: float, V0: float, T: float, n_steps: int,
                        n_paths: int, Ks: np.ndarray, kinds: List[str]) -> np.ndarray:
        """
        Discrete-time LRM theta under MMM: θ_{k+1}^{LRM} = Cov(ΔV_{k+1}, ΔX_{k+1} | F_k) / Var(ΔX_{k+1} | F_k)
        where V_k = E^{MMM}[H~ | F_k]. NO drift-removal here; drift is removed only at P-evaluation.
        Returns a single portfolio-level theta of shape (T-1,).
        """
        # simulate discounted normalized paths under MMM
        X, dX, H_tilde_mat = self._sim_discounted("mmm", S0, V0, T, n_steps, n_paths, Ks, kinds)
        # portfolio payoff (call + put sum) in discounted units
        H_tilde_port = H_tilde_mat.sum(axis=0)
        # continuation values by LSM
        t_grid, _disc = _disc_grid(self.r, self.q, T, n_steps)
        Vhat = self._lsm_continuation_values(X, None, H_tilde_port, t_grid)
        # ∆V and θ via covariance/variance ratio (cross-sectional, NO drift-removal)
        dV = Vhat[:, 1:] - Vhat[:, :-1]            # (n_paths, T-1)
        theta = np.zeros(dX.shape[1], dtype=np.float64)
        eps = 1e-12
        for k in range(dX.shape[1]):
            x = dX[:, k]
            y = dV[:, k]
            x_mean = x.mean()
            y_mean = y.mean()
            cov = ( (x - x_mean) * (y - y_mean) ).mean()
            var = ( (x - x_mean)**2 ).mean() + eps
            theta[k] = cov / var
        return theta

    # ---------- MVH theta under VOMM (GKW integrand via LSM; v0 handled at evaluation) ----------
    def train_theta_mvh(self, S0: float, V0: float, T: float, n_steps: int,
                        n_paths: int, Ks: np.ndarray, kinds: List[str]) -> np.ndarray:
        """
        Discrete-time MVH theta under VOMM: GKW integrand (ΔV-on-ΔX regression) plus Schweizer feedback term if available.
        If the model does not expose a density martingale, falls back to ξ_k.
        Returns portfolio-level θ with shape (T-1,).
        """
        # (1) Simulate under VOMM; take density if available
        have_with_density = hasattr(self.model, 'simulate_vomm_with_density')
        if have_with_density:
            S, V, Z, comp_ratio = self.model.simulate_vomm_with_density(S0, V0, T, T/n_steps, n_paths, _show_progress=False)
        else:
            S, V, _ = self.model.simulate_vomm(S0, V0, T, T/n_steps, n_paths, _show_progress=False)
            Z = None
            comp_ratio = np.zeros(n_steps, dtype=float)
        # (2) Discount/normalize and build portfolio payoff H~
        X, dX, disc = self._discount_normalize_from_S(S, S0, T, n_steps)
        S_T_norm = (S / S0)[:, -1]
        H_tilde_mat = _batch_discounted_payoffs(S_T_norm, Ks, kinds, disc[-1], S0=S0)
        H_port = H_tilde_mat.sum(axis=0)
        # (3) LSM continuation values V^H_k = E[H~ | F_k]
        t_grid, _ = _disc_grid(self.r, self.q, T, n_steps)
        Vhat = self._lsm_continuation_values(X, None, H_port, t_grid)
        dV = Vhat[:, 1:] - Vhat[:, :-1]
        # (4) First the GKW integrand ξ_k by ΔV-on-ΔX regression (cross-sectional)
        Tm1 = dX.shape[1]
        xi = np.zeros(Tm1, dtype=np.float64)
        eps = 1e-12
        for k in range(Tm1):
            x = dX[:, k]
            y = dV[:, k]
            x_mean = x.mean(); y_mean = y.mean()
            cov = ((x - x_mean) * (y - y_mean)).mean()
            var = ((x - x_mean)**2).mean() + eps
            xi[k] = cov / var
        # (5) Schweizer feedback using comp_ratio (if provided):
        #     fb_k = comp_ratio[k] * ( E[V^H_k] - E[H~] - E[Σ_{i<k} θ_i ΔX_i] ), θ_k = ξ_k - fb_k
        EH = float(np.mean(H_port))
        theta = xi.copy()
        if comp_ratio is not None and np.any(comp_ratio != 0.0):
            cum_gains_mean = 0.0
            for k in range(Tm1):
                if k > 0:
                    cum_gains_mean += float(np.mean(dX[:, k-1] * theta[k-1]))
                Vk_mean = float(np.mean(Vhat[:, k]))
                fb_k = comp_ratio[k] * (Vk_mean - EH - cum_gains_mean)
                theta[k] = xi[k] - fb_k
        return theta

    # ---------- evaluate under P (portfolio = CALL+PUT) ----------
    def evaluate_P(self, theta_mvh: np.ndarray, theta_lrm: np.ndarray,
                   S0: float, V0: float, T: float, n_steps: int, n_paths: int,
                   Ks: np.ndarray, kinds: List[str]) -> Dict:
        """
        P-measure evaluation in discounted units.
        Metrics are computed from hedging residuals (L_T), not by adding L_T back into the portfolio value.
        All in discounted, normalized units.
        """
        # simulate discounted/normalized paths under P
        X, dX, H_tilde_mat = self._sim_discounted("p", S0, V0, T, n_steps, n_paths, Ks, kinds)
        # portfolio = CALL + PUT (sum over listed options)
        payoff_port = H_tilde_mat.sum(axis=0)                      # (n_paths,)
        # thetas are already portfolio-level (shape (T-1,))
        T_al = min(dX.shape[1], theta_mvh.shape[0], theta_lrm.shape[0])
        dX = dX[:, :T_al]
        th_mvh = theta_mvh[:T_al]
        th_lrm = theta_lrm[:T_al]
        dX_mean = dX.mean(axis=0, keepdims=True)
        # ----------- MVH: hedging error is residual L_T (do not add L_T to value) -----------
        gains_mvh = (dX * th_mvh[None, :]).sum(axis=1)             # Σ θ ΔX
        v0_mvh = float(np.mean(payoff_port - gains_mvh))           # center under P for evaluation
        err_mvh = payoff_port - (v0_mvh + gains_mvh)               # residual ≈ L_T
        # ----------- LRM: error is L_T = H - H0 - Σ θ ΔM -----------
        dX_mean = dX.mean(axis=0, keepdims=True)                   # (1, T_al)
        dM = dX - dX_mean                                          # ΔM via cross-sectional mean
        H0 = float(np.mean(payoff_port))                           # E_P[H~]
        gains_M = (dM * th_lrm[None, :]).sum(axis=1)               # Σ θ ΔM
        err_lrm = payoff_port - H0 - gains_M                       # = L_T
        # ---------------- metrics (portfolio-level only) ----------------
        def _metrics(e: np.ndarray, unhedged: np.ndarray) -> Dict:
            mean = float(np.mean(e))
            mse = float(np.mean(e**2))
            var = max(mse - mean**2, 0.0)
            rmse = float(np.sqrt(mse))
            std = float(np.sqrt(var))
            heff = 1.0 - (var / max(float(np.var(unhedged)), 1e-12))
            return dict(MSE=mse, RMSE=rmse, Mean=mean, Std=std, HedgeEffPct=100.0*heff)
        mv = _metrics(err_mvh, payoff_port)
        lr = _metrics(err_lrm, payoff_port)
        return dict(
            measure="P",
            v0_mvh_disc=v0_mvh,
            H0_lrm_disc=H0,
            mv_metrics=mv,
            lr_metrics=lr,
            theta_mvh_port=th_mvh.tolist(),
            theta_lrm_port=th_lrm.tolist(),
        )

# ------------------------------- runner --------------------------------------

def run_realdata():
    # load calibrated params and real case
    h_p, b_p, case = build_real_case()
    S0, V0, r, q, T, n_steps, Ks, kinds = case["S0"], case["V0"], case["r"], case["q"], case["T"], case["n_steps"], case["Ks"], case["kinds"]


    # models
    heston = HestonModel({'kappa': h_p['kappa'], 'theta': h_p['theta'], 'sigma': h_p['sigma'], 'rho': h_p['rho'],
                          'v0': h_p['v0'], 'r': h_p['r'], 'q': h_p['q'], 'xi_s': h_p.get('xi_s', 0.0)})
    if b_p is not None and all(k in b_p for k in ['lambda_j','mu_j','sigma_j']):
        bates = BatesModel({'kappa': b_p['kappa'], 'theta': b_p['theta'], 'sigma': b_p['sigma'], 'rho': b_p['rho'],
                            'v0': b_p['v0'], 'r': b_p['r'], 'q': b_p['q'], 'xi_s': b_p.get('xi_s', 0.0),
                            'lambda_j': b_p['lambda_j'], 'mu_j': b_p['mu_j'], 'sigma_j': b_p['sigma_j'],
                            'E_J_minus_1': b_p.get('E_J_minus_1', 0.0), 'drift_adjustment': b_p.get('drift_adjustment', 0.0)})
    else:
        bates = BatesModel({'kappa': h_p['kappa'], 'theta': h_p['theta'], 'sigma': h_p['sigma'], 'rho': h_p['rho'],
                            'v0': h_p['v0'], 'r': h_p['r'], 'q': h_p['q'], 'xi_s': h_p.get('xi_s', 0.0),
                            'lambda_j': 0.0, 'mu_j': 0.0, 'sigma_j': 0.0, 'E_J_minus_1': 0.0, 'drift_adjustment': 0.0})

    # simple benches
    hb = SimpleBench("Heston", heston, r, q)
    bb = SimpleBench("Bates", bates, r, q)

    # path budgets (powers of two)
    n_train = _next_pow2(2**18)        # 262,144 (validation-speed)
    n_eval  = _next_pow2(2**17)        # 131,072 (evaluation under P)

    # train thetas (portfolio-level)
    theta_lrm_h = hb.train_theta_lrm(S0, V0, T, n_steps, n_train, Ks, kinds)
    theta_mvh_h = hb.train_theta_mvh(S0, V0, T, n_steps, n_train, Ks, kinds)

    theta_lrm_b = bb.train_theta_lrm(S0, V0, T, n_steps, n_train, Ks, kinds)
    theta_mvh_b = bb.train_theta_mvh(S0, V0, T, n_steps, n_train, Ks, kinds)

    # evaluate under P
    res_h = hb.evaluate_P(theta_mvh_h, theta_lrm_h, S0, V0, T, n_steps, n_eval, Ks, kinds)
    res_b = bb.evaluate_P(theta_mvh_b, theta_lrm_b, S0, V0, T, n_steps, n_eval, Ks, kinds)

    # save artifacts
    ts = time.strftime("%Y%m%d-%H%M%S")
    outdir = f"artifacts/benchmark_eval/{ts}/"
    os.makedirs(outdir, exist_ok=True)
    with open(os.path.join(outdir, "heston_eval_P.json"), "w") as f:
        json.dump(res_h, f, indent=2)
    with open(os.path.join(outdir, "bates_eval_P.json"), "w") as f:
        json.dump(res_b, f, indent=2)

    print(f"\nSaved P-eval artifacts to {outdir}")
    print(f"Heston (portfolio) MV RMSE={res_h['mv_metrics']['RMSE']:.6f}  |  LRM RMSE={res_h['lr_metrics']['RMSE']:.6f}")
    print(f"Bates  (portfolio) MV RMSE={res_b['mv_metrics']['RMSE']:.6f}  |  LRM RMSE={res_b['lr_metrics']['RMSE']:.6f}")

    return dict(heston=res_h, bates=res_b, outdir=outdir)


# ------------------------------ entry point ----------------------------------

def runner():
    return run_realdata()

if __name__ == "__main__":
    runner()




Saved P-eval artifacts to artifacts/benchmark_eval/20250919-211159/
Heston (portfolio) MV RMSE=0.036155  |  LRM RMSE=0.035266
Bates  (portfolio) MV RMSE=0.050477  |  LRM RMSE=0.047679


In [14]:
"""
Minimal, readable quadratic-hedging benchmark (Heston & Bates)

What this file does (only):
1) Load calibrated params (oneDheston.json, oneDbates.json).
2) Build ONE real-data case: S0 from JSON, T=59/252, daily hedging (n_steps=59),
   strikes = [3225, 2600] with types ['call','put'].
3) Train time-step hedge ratios θ_k by sequential projection
   - MVH: simulate under VOMM
   - LRM: simulate under MMM
   (all in *discounted* units X_t = e^{-(r-q)t} S_t)
4) Evaluate under P (discounted)
   - MVH: V̄_T = v0 + ∑ θ_k ΔX_k with v0 := E[H~ - gains]  (same rule for Heston/Bates)
   - LRM: V̄_T = C_T + ∑ θ_k ΔX_k where C_T = H0 + L_T and
           L_T := H~ - H0 - ∑ θ_k ΔM_k with ΔM_k = ΔX_k - E[ΔX_k] (batch-wise de-drift)
5) Save only portfolio-level metrics & thetas in artifacts/benchmark_eval/<ts>/

Notes:
- Uses powers-of-two path counts (Sobol-friendly). We still call the model’s own RNG;
  if your SV_models uses Sobol for QMC, it will benefit directly.
- No jump-impact analysis, no multi-asset scaffolding, no ODE/Riccati code here.
- Keeps names short and descriptive.

Author: refactor for clarity/accuracy
"""

from __future__ import annotations

import json, os, time
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional

import numpy as np

from SV_models import HestonModel, BatesModel  # must provide: simulate_mmm, simulate_vomm, simulate_under_P


# ------------------------------- helpers -------------------------------------

def _next_pow2(n: int) -> int:
    n = int(max(1, n))
    return 1 << (n - 1).bit_length()

def _disc_grid(r: float, q: float, T: float, n_steps: int) -> Tuple[np.ndarray, np.ndarray]:
    t = np.linspace(0.0, T, n_steps + 1)
    disc = np.exp(-(r - q) * t)
    return t, disc

def _option_payoff(S_T: np.ndarray, K: float, kind: str) -> np.ndarray:
    if kind.lower() == "call":
        return np.maximum(S_T - K, 0.0)
    elif kind.lower() == "put":
        return np.maximum(K - S_T, 0.0)
    raise ValueError("kind must be 'call' or 'put'")

def _batch_discounted_payoffs(S_T: np.ndarray, Ks: np.ndarray, kinds: List[str], disc_T: float, S0: float) -> np.ndarray:
    # Normalize strikes by S0 for discounted/normalized payoff computation
    Ks_norm = Ks / S0
    out = np.zeros((len(Ks), S_T.shape[0]), dtype=np.float64)
    for i, (K, knd) in enumerate(zip(Ks_norm, kinds)):
        out[i] = disc_T * _option_payoff(S_T, K, knd)
    return out


# --------------------------- data / test case --------------------------------

def _load_json(path: str) -> dict:
    with open(path, "r") as f:
        return json.load(f)

def _first(d: dict, key: str, alt: str = "", default=None):
    if key in d:
        v = d[key]
        return v[0] if isinstance(v, list) else v
    if alt and alt in d:
        v = d[alt]
        return v[0] if isinstance(v, list) else v
    return default

def _normalize_heston(d: dict) -> dict:
    return {
        "S0":    _first(d, "S0", "S0_list"),
        "v0":    _first(d, "v0", "v0_list"),
        "r":     _first(d, "r", "r_list"),
        "q":     _first(d, "q", "q_list"),
        "kappa": _first(d, "kappa", "kappa_list"),
        "theta": _first(d, "theta", "theta_list"),
        "sigma": _first(d, "sigma", "sigma_list"),
        "rho":   _first(d, "rho", "rho_list"),
        "xi_s":  _first(d, "xi_s", "xi_s_list", 0.0),
    }

def _normalize_bates(d: dict) -> dict:
    p = _normalize_heston(d)
    p.update({
        "lambda_j": _first(d, "lambda_j", default=0.0),
        "mu_j": _first(d, "mu_j", default=0.0),
        "sigma_j": _first(d, "sigma_j", default=0.0),
        "E_J_minus_1": _first(d, "E_J_minus_1", default=0.0),
        "drift_adjustment": _first(d, "drift_adjustment", default=0.0),
    })
    return p

def build_real_case(heston_json="oneDheston.json", bates_json="oneDbates.json"):
    h_raw = _load_json(heston_json) if os.path.exists(heston_json) else {}
    b_raw = _load_json(bates_json) if os.path.exists(bates_json) else {}
    h = _normalize_heston(h_raw) if h_raw else None
    b = _normalize_bates(b_raw) if b_raw else None
    S0 = (h or b)["S0"]
    V0 = (h or b)["v0"]
    r, q = (h or b)["r"], (h or b)["q"]
    # real-data config
    T = 59.0 / 252.0
    n_steps = 59
    Ks = np.array([3225.0, 2600.0], dtype=float)
    kinds = ["call", "put"]
    return (h, b, dict(S0=S0, V0=V0, r=r, q=q, T=T, n_steps=n_steps, Ks=Ks, kinds=kinds))


# ----------------------------- core hedger -----------------------------------

@dataclass
class SimpleBench:
    name: str
    model: object  # HestonModel or BatesModel
    r: float
    q: float

    # ---------- simulation wrapper (returns discounted X, dX and discounted payoffs) ----------
    def _sim_discounted(self, measure: str, S0: float, V0: float, T: float, n_steps: int,
                        n_paths: int, Ks: np.ndarray, kinds: List[str]) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        dt = T / n_steps
        if measure == "mmm":
            S, V, _ = self.model.simulate_mmm(S0, V0, T, dt, n_paths, _show_progress=False)
        elif measure == "vomm":
            S, V, _ = self.model.simulate_vomm(S0, V0, T, dt, n_paths, _show_progress=False)
        elif measure == "p":
            S, V, _ = self.model.simulate_under_P(S0, V0, T, dt, n_paths, _show_progress=False)
        else:
            raise ValueError("measure must be 'mmm', 'vomm', or 'p'")
        # Normalize simulated prices by S0
        S = S / S0
        t, disc = _disc_grid(self.r, self.q, T, n_steps)
        X = (S * disc[None, :]).astype(np.float64)          # discounted price paths
        dX = X[:, 1:] - X[:, :-1]                           # discounted increments
        H_tilde = _batch_discounted_payoffs(S[:, -1], Ks, kinds, disc[-1], S0)
        return X, dX, H_tilde

    # ---------- LSM helpers (basis + continuation values) ----------
    def _poly_features(self, Xk: np.ndarray, Vk: Optional[np.ndarray], tk: float) -> np.ndarray:
        """
        Build simple polynomial basis in discounted (normalized) state at time k.
        Xk: (n_paths,) discounted normalized price; Vk: (n_paths,) variance (can be None); tk: scalar in [0, T].
        """
        if Vk is None:
            return np.column_stack([
                np.ones_like(Xk),
                Xk, Xk**2,
                tk*np.ones_like(Xk),
                tk*Xk
            ]).astype(np.float64)
        return np.column_stack([
            np.ones_like(Xk),
            Xk, Vk,
            Xk**2, Vk**2, Xk*Vk,
            tk*np.ones_like(Xk),
            tk*Xk, tk*Vk
        ]).astype(np.float64)

    def _cond_mean_var(self, dX_col: np.ndarray, Xk_col: np.ndarray, Vk_col: Optional[np.ndarray], tk: float) -> Tuple[np.ndarray, float]:
        """
        Conditional mean/variance of ΔX_k given F_k via polynomial regression.
        Returns:
          mu_hat: pathwise conditional mean (n_paths,)
          var_hat: scalar conditional variance estimate
        """
        Phi = self._poly_features(Xk_col, (Vk_col if Vk_col is not None else None), tk)
        A = Phi.T @ Phi
        A.flat[::A.shape[0]+1] += 1e-10
        beta = np.linalg.solve(A, Phi.T @ dX_col)
        mu_hat = Phi @ beta
        resid = dX_col - mu_hat
        var_hat = float(np.mean(resid * resid))
        return mu_hat, var_hat

    def _discount_normalize_from_S(self, S: np.ndarray, S0: float, T: float, n_steps: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Given raw S paths (n_paths, n_steps+1), return (X, dX, disc) with
        X_t = e^{-(r-q)t} S_t / S0 and dX = ΔX."""
        t, disc = _disc_grid(self.r, self.q, T, n_steps)
        S_norm = S / S0
        X = (S_norm * disc[None, :]).astype(np.float64)
        dX = X[:, 1:] - X[:, :-1]
        return X, dX, disc
     
    def _apply_betas(self, X: np.ndarray, V: Optional[np.ndarray], t_grid: np.ndarray, betas: List[np.ndarray]) -> np.ndarray:
      n_paths, T_plus_1 = X.shape
      Vhat = np.zeros((n_paths, T_plus_1), dtype=np.float64)
      for k in range(T_plus_1 - 1):
         Xk = X[:, k]
         Vk = None if V is None else V[:, k]
         Phi = self._poly_features(Xk, Vk, t_grid[k])
         Vhat[:, k] = Phi @ betas[k]
      # terminal left at 0; we only need k<T values for dV etc.
      return Vhat
    
    def _lsm_continuation_values(self, X: np.ndarray, V: Optional[np.ndarray],
                             H_tilde_port: np.ndarray, t_grid: np.ndarray) -> Tuple[np.ndarray, List[np.ndarray]]:
         n_paths, T_plus_1 = X.shape
         Vhat = np.zeros((n_paths, T_plus_1), dtype=np.float64)
         betas: List[np.ndarray] = [None]*(T_plus_1)  # store beta per time

         Vhat[:, -1] = H_tilde_port
         ridge = 1e-10

         for k in range(T_plus_1 - 1):
            Xk = X[:, k]
            Vk = None if V is None else V[:, k]
            Phi = self._poly_features(Xk, Vk, t_grid[k])
            A = Phi.T @ Phi
            A.flat[::A.shape[0]+1] += ridge
            beta = np.linalg.solve(A, Phi.T @ H_tilde_port)
            Vhat[:, k] = Phi @ beta
            betas[k] = beta

         betas[-1] = np.zeros_like(betas[0])  # terminal dummy
         return Vhat, betas

    # ---------- LRM theta under MMM (local projection on ΔX without drift-removal) ----------
    def train_theta_lrm(self, S0, V0, T, n_steps, n_paths, Ks, kinds):
         X, dX, H_tilde_mat = self._sim_discounted("mmm", S0, V0, T, n_steps, n_paths, Ks, kinds)
         H_port = H_tilde_mat.sum(axis=0)
         t_grid, _ = _disc_grid(self.r, self.q, T, n_steps)
         Vhat, betas = self._lsm_continuation_values(X, None, H_port, t_grid)

         dV = Vhat[:, 1:] - Vhat[:, :-1]
         theta = np.zeros(dX.shape[1], dtype=np.float64)
         eps = 1e-12
         for k in range(dX.shape[1]):
            x, y = dX[:, k], dV[:, k]
            cov = ((x - x.mean()) * (y - y.mean())).mean()
            var = ((x - x.mean())**2).mean() + eps
            theta[k] = cov / var

         H0_lrm = float(np.mean(H_port))  # E^{MMM}[H~]
         return theta, betas, H0_lrm

    # ---------- MVH theta under VOMM (GKW integrand via LSM; v0 handled at evaluation) ----------
    def train_theta_mvh(self, S0, V0, T, n_steps, n_paths, Ks, kinds):
         have_with_density = hasattr(self.model, 'simulate_vomm_with_density')
         if have_with_density:
            S, V, Z, comp_ratio = self.model.simulate_vomm_with_density(S0, V0, T, T/n_steps, n_paths, _show_progress=False)
         else:
            S, V, _ = self.model.simulate_vomm(S0, V0, T, T/n_steps, n_paths, _show_progress=False)
            Z = None; comp_ratio = None

         X, dX, disc = self._discount_normalize_from_S(S, S0, T, n_steps)
         S_T_norm = (S / S0)[:, -1]
         H_tilde_mat = _batch_discounted_payoffs(S_T_norm, Ks, kinds, disc[-1], S0=S0)
         H_port = H_tilde_mat.sum(axis=0)

         t_grid, _ = _disc_grid(self.r, self.q, T, n_steps)
         Vhat, betas = self._lsm_continuation_values(X, None, H_port, t_grid)
         dV = Vhat[:, 1:] - Vhat[:, :-1]

         Tm1 = dX.shape[1]
         xi = np.zeros(Tm1, dtype=np.float64)
         eps = 1e-12
         for k in range(Tm1):
            x, y = dX[:, k], dV[:, k]
            cov = ((x - x.mean()) * (y - y.mean())).mean()
            var = ((x - x.mean())**2).mean() + eps
            xi[k] = cov / var

         H0_mvh = float(np.mean(H_port))  # E^{VOMM}[H~]
         return xi, betas, H0_mvh, comp_ratio

    # ---------- evaluate under P (portfolio = CALL+PUT) ----------
    def evaluate_P(self,
               xi_mvh: np.ndarray, betas_vomm: List[np.ndarray], H0_mvh: float,
               theta_lrm: np.ndarray, betas_mmm: List[np.ndarray], H0_lrm: float,
               S0: float, V0: float, T: float, n_steps: int, n_paths: int,
               Ks: np.ndarray, kinds: List[str]) -> Dict:
        """
        Paper-faithful P-evaluation (discounted, normalized).
        - MVH error:  H~ - (H0_mvh + sum θ^{MVH}_k(path) ΔX_k)  with feedback (Eq. 4.24)
        - LRM error:  H~ - (H0_lrm + sum θ^{LRM}_k ΔM_k)        with ΔM_k = ΔX_k - E[ΔX_k|F_k]
        Returns orthogonality metrics for both MVH and LRM.
        """
        dt = T / n_steps

        # Simulate under P and build discounted normalized state
        S, V, _ = self.model.simulate_under_P(S0, V0, T, dt, n_paths, _show_progress=False)
        X, dX, disc = self._discount_normalize_from_S(S, S0, T, n_steps)

        # Discounted payoff (portfolio = sum over options), already normalized by S0 in payoff helper
        S_T_norm = (S / S0)[:, -1]
        H_tilde_mat = _batch_discounted_payoffs(S_T_norm, Ks, kinds, disc[-1], S0=S0)
        payoff_port = H_tilde_mat.sum(axis=0)  # (n_paths,)

        T_al = min(dX.shape[1], xi_mvh.shape[0], theta_lrm.shape[0])
        dX = dX[:, :T_al]
        Xk = X[:, :T_al]              # state X at begins of steps
        Vk = V[:, :T_al] if V is not None else None
        t_grid, _ = _disc_grid(self.r, self.q, T, n_steps)

        # ---------- LRM: build ΔM_k via conditional mean regression per time ----------
        Phi_list = []
        for k in range(T_al):
            phi = np.column_stack([
                np.ones_like(Xk[:, k]), Xk[:, k],
                (Vk[:, k] if Vk is not None else Xk[:, k]*0.0),
                Xk[:, k]**2,
                (Xk[:, k]*(Vk[:, k] if Vk is not None else 0.0)),
                (Vk[:, k] if Vk is not None else 0.0)**2
            ]).astype(np.float64)
            Phi_list.append(phi)
        dM = np.empty_like(dX)
        mu_cond = np.empty_like(dX)
        for k in range(T_al):
            A = Phi_list[k].T @ Phi_list[k]
            A.flat[::A.shape[0]+1] += 1e-10
            beta_mu = np.linalg.solve(A, Phi_list[k].T @ dX[:, k])
            mu_hat = Phi_list[k] @ beta_mu                  # ≈ E_P[ΔX_k | F_k]
            mu_cond[:, k] = mu_hat
            dM[:, k] = dX[:, k] - mu_hat

        # LRM: pathwise gains and error
        gains_lrm = (dM * theta_lrm[:T_al][None, :]).sum(axis=1)
        err_lrm = payoff_port - (H0_lrm + gains_lrm)       # equals L_T (discrete)

        # ---------- LRM orthogonality: E[L_T * M_T] with M_T = Σ θ_k ΔM_k (should be ~0) ----------
        # For LRM, orthogonality: E[L_T * M_T] where M_T = sum θ_k dM_k
        M_T_lrm = gains_lrm
        L_T_lrm = err_lrm
        ortho_lrm = float(np.mean(L_T_lrm * M_T_lrm))

        # ---------- MVH: feedback using conditional regressions (stable scalar α_k) ----------
        # Reconstruct V^{H,~P}_k along P-paths using VOMM betas
        Vhat_vomm = self._apply_betas(X[:, :T_al+1], None, t_grid, betas_vomm)

        # Compute scalar α_k = E[μ_k] / Var_k with μ_k, Var_k from conditional regressions at each time
        alpha_scalar = np.zeros(T_al, dtype=np.float64)
        for k in range(T_al):
            mu_hat_k, var_hat_k = self._cond_mean_var(dX[:, k], Xk[:, k], (Vk[:, k] if Vk is not None else None), t_grid[k])
            num = float(np.mean(mu_hat_k))
            den = max(var_hat_k, 1e-12)
            a = num / den
            # conservative clamp to avoid runaway recursion; keeps unbiasedness for reasonable ranges
            alpha_scalar[k] = np.clip(a, -100.0, 100.0)

        # Recursion: θ^{MVH}_k(path) = ξ_k + α_k * ( Vhat_k - H0_mvh - sum_{j<k} θ_j ΔX_j )
        theta_path = np.zeros_like(dX)
        cum = np.zeros(dX.shape[0], dtype=np.float64)   # cumulative gains up to k-1
        theta_clip = 50.0
        cum_clip = 10.0
        for k in range(T_al):
            Vk_meanless = Vhat_vomm[:, k] - H0_mvh
            raw_theta = xi_mvh[k] + alpha_scalar[k] * (Vk_meanless - cum)
            raw_theta = np.clip(raw_theta, -theta_clip, theta_clip)
            theta_path[:, k] = raw_theta
            cum = np.clip(cum + raw_theta * dX[:, k], -cum_clip, cum_clip)

        gains_mvh = (theta_path * dX).sum(axis=1)
        # MVH error equals cost martingale \tilde L_T under P (HPS Eq.4.24 discrete analogue)
        err_mvh = payoff_port - (H0_mvh + gains_mvh)

        # ---------- metrics ----------
        def _metrics(e: np.ndarray, unhedged: np.ndarray) -> Dict:
            mean = float(np.mean(e))
            mse = float(np.mean(e**2))
            var = max(mse - mean**2, 0.0)
            rmse = float(np.sqrt(mse))
            std = float(np.sqrt(var))
            heff = 1.0 - (var / max(float(np.var(unhedged)), 1e-12))
            return dict(MSE=mse, RMSE=rmse, Mean=mean, Std=std, HedgeEffPct=100.0*heff)

        mv = _metrics(err_mvh, payoff_port)
        lr = _metrics(err_lrm, payoff_port)

        return dict(
            measure="P",
            v0_mvh_disc=float(H0_mvh),
            H0_lrm_disc=float(H0_lrm),
            mv_metrics=mv,
            lr_metrics=lr,
            theta_mvh_port=None,  # pathwise; too big to dump by default
            theta_lrm_port=theta_lrm[:T_al].tolist(),
            # orthogonality_mv=float(ortho_mvh),
            orthogonality_lrm=float(ortho_lrm),
        )

# ------------------------------- runner --------------------------------------

def run_realdata():
    # load calibrated params and real case
    h_p, b_p, case = build_real_case()
    S0, V0, r, q, T, n_steps, Ks, kinds = case["S0"], case["V0"], case["r"], case["q"], case["T"], case["n_steps"], case["Ks"], case["kinds"]


    # models
    heston = HestonModel({'kappa': h_p['kappa'], 'theta': h_p['theta'], 'sigma': h_p['sigma'], 'rho': h_p['rho'],
                          'v0': h_p['v0'], 'r': h_p['r'], 'q': h_p['q'], 'xi_s': h_p.get('xi_s', 0.0)})
    if b_p is not None and all(k in b_p for k in ['lambda_j','mu_j','sigma_j']):
        bates = BatesModel({'kappa': b_p['kappa'], 'theta': b_p['theta'], 'sigma': b_p['sigma'], 'rho': b_p['rho'],
                            'v0': b_p['v0'], 'r': b_p['r'], 'q': b_p['q'], 'xi_s': b_p.get('xi_s', 0.0),
                            'lambda_j': b_p['lambda_j'], 'mu_j': b_p['mu_j'], 'sigma_j': b_p['sigma_j'],
                            'E_J_minus_1': b_p.get('E_J_minus_1', 0.0), 'drift_adjustment': b_p.get('drift_adjustment', 0.0)})
    else:
        bates = BatesModel({'kappa': h_p['kappa'], 'theta': h_p['theta'], 'sigma': h_p['sigma'], 'rho': h_p['rho'],
                            'v0': h_p['v0'], 'r': h_p['r'], 'q': h_p['q'], 'xi_s': h_p.get('xi_s', 0.0),
                            'lambda_j': 0.0, 'mu_j': 0.0, 'sigma_j': 0.0, 'E_J_minus_1': 0.0, 'drift_adjustment': 0.0})

    # simple benches
    hb = SimpleBench("Heston", heston, r, q)
    bb = SimpleBench("Bates", bates, r, q)

    # path budgets (powers of two)
    n_train = _next_pow2(2**18)        # 262,144 (validation-speed)
    n_eval  = _next_pow2(2**17)        # 131,072 (evaluation under P)

    # train thetas (portfolio-level)
    xi_lrm_h, betas_mmm_h, H0_lrm_h = hb.train_theta_lrm(S0, V0, T, n_steps, n_train, Ks, kinds)
    xi_mvh_h, betas_vomm_h, H0_mvh_h, comp_h = hb.train_theta_mvh(S0, V0, T, n_steps, n_train, Ks, kinds)

    xi_lrm_b, betas_mmm_b, H0_lrm_b = bb.train_theta_lrm(S0, V0, T, n_steps, n_train, Ks, kinds)
    xi_mvh_b, betas_vomm_b, H0_mvh_b, comp_b = bb.train_theta_mvh(S0, V0, T, n_steps, n_train, Ks, kinds)

    # evaluate under P
    res_h = hb.evaluate_P(xi_mvh_h, betas_vomm_h, H0_mvh_h,
                      xi_lrm_h, betas_mmm_h, H0_lrm_h,
                      S0, V0, T, n_steps, n_eval, Ks, kinds)
    res_b = bb.evaluate_P(xi_mvh_b, betas_vomm_b, H0_mvh_b,
                      xi_lrm_b, betas_mmm_b, H0_lrm_b,
                      S0, V0, T, n_steps, n_eval, Ks, kinds)

    # save artifacts
    ts = time.strftime("%Y%m%d-%H%M%S")
    outdir = f"artifacts/benchmark_eval/{ts}/"
    os.makedirs(outdir, exist_ok=True)
    with open(os.path.join(outdir, "heston_eval_P.json"), "w") as f:
        json.dump(res_h, f, indent=2)
    with open(os.path.join(outdir, "bates_eval_P.json"), "w") as f:
        json.dump(res_b, f, indent=2)

    print(f"\nSaved P-eval artifacts to {outdir}")
    print(f"Heston (portfolio) MV RMSE={res_h['mv_metrics']['RMSE']:.6f}  |  LRM RMSE={res_h['lr_metrics']['RMSE']:.6f}")
    print(f"Bates  (portfolio) MV RMSE={res_b['mv_metrics']['RMSE']:.6f}  |  LRM RMSE={res_b['lr_metrics']['RMSE']:.6f}")

    return dict(heston=res_h, bates=res_b, outdir=outdir)


# ------------------------------ entry point ----------------------------------

def runner():
    return run_realdata()

if __name__ == "__main__":
    runner()


Saved P-eval artifacts to artifacts/benchmark_eval/20250919-212340/
Heston (portfolio) MV RMSE=0.034057  |  LRM RMSE=0.038170
Bates  (portfolio) MV RMSE=0.050461  |  LRM RMSE=0.048618


In [20]:
"""
Minimal, readable quadratic-hedging benchmark (Heston & Bates)

What this file does (only):
1) Load calibrated params (oneDheston.json, oneDbates.json).
2) Build ONE real-data case: S0 from JSON, T=59/252, daily hedging (n_steps=59),
   strikes = [3225, 2600] with types ['call','put'].
3) Train time-step hedge ratios θ_k by sequential projection
   - MVH: simulate under VOMM
   - LRM: simulate under MMM
   (all in *discounted* units X_t = e^{-(r-q)t} S_t)
4) Evaluate under P (discounted)
   - MVH: V̄_T = v0 + ∑ θ_k ΔX_k with v0 := E[H~ - gains]  (same rule for Heston/Bates)
   - LRM: V̄_T = C_T + ∑ θ_k ΔX_k where C_T = H0 + L_T and
           L_T := H~ - H0 - ∑ θ_k ΔM_k with ΔM_k = ΔX_k - E[ΔX_k] (batch-wise de-drift)
5) Save only portfolio-level metrics & thetas in artifacts/benchmark_eval/<ts>/

Notes:
- Uses powers-of-two path counts (Sobol-friendly). We still call the model’s own RNG;
  if your SV_models uses Sobol for QMC, it will benefit directly.
- No jump-impact analysis, no multi-asset scaffolding, no ODE/Riccati code here.
- Keeps names short and descriptive.

Author: refactor for clarity/accuracy
"""

from __future__ import annotations

import json, os, time
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional

import numpy as np

from SV_models import HestonModel, BatesModel  # must provide: simulate_mmm, simulate_vomm, simulate_under_P


# ------------------------------- helpers -------------------------------------

def _next_pow2(n: int) -> int:
    n = int(max(1, n))
    return 1 << (n - 1).bit_length()

def _disc_grid(r: float, q: float, T: float, n_steps: int) -> Tuple[np.ndarray, np.ndarray]:
    t = np.linspace(0.0, T, n_steps + 1)
    disc = np.exp(-(r - q) * t)
    return t, disc

def _option_payoff(S_T: np.ndarray, K: float, kind: str) -> np.ndarray:
    if kind.lower() == "call":
        return np.maximum(S_T - K, 0.0)
    elif kind.lower() == "put":
        return np.maximum(K - S_T, 0.0)
    raise ValueError("kind must be 'call' or 'put'")

def _batch_discounted_payoffs(S_T: np.ndarray, Ks: np.ndarray, kinds: List[str], disc_T: float, S0: float) -> np.ndarray:
    # Normalize strikes by S0 for discounted/normalized payoff computation
    Ks_norm = Ks / S0
    out = np.zeros((len(Ks), S_T.shape[0]), dtype=np.float64)
    for i, (K, knd) in enumerate(zip(Ks_norm, kinds)):
        out[i] = disc_T * _option_payoff(S_T, K, knd)
    return out


# --------------------------- data / test case --------------------------------

def _load_json(path: str) -> dict:
    with open(path, "r") as f:
        return json.load(f)

def _first(d: dict, key: str, alt: str = "", default=None):
    if key in d:
        v = d[key]
        return v[0] if isinstance(v, list) else v
    if alt and alt in d:
        v = d[alt]
        return v[0] if isinstance(v, list) else v
    return default

def _normalize_heston(d: dict) -> dict:
    return {
        "S0":    _first(d, "S0", "S0_list"),
        "v0":    _first(d, "v0", "v0_list"),
        "r":     _first(d, "r", "r_list"),
        "q":     _first(d, "q", "q_list"),
        "kappa": _first(d, "kappa", "kappa_list"),
        "theta": _first(d, "theta", "theta_list"),
        "sigma": _first(d, "sigma", "sigma_list"),
        "rho":   _first(d, "rho", "rho_list"),
        "xi_s":  _first(d, "xi_s", "xi_s_list", 0.0),
    }

def _normalize_bates(d: dict) -> dict:
    p = _normalize_heston(d)
    p.update({
        "lambda_j": _first(d, "lambda_j", default=0.0),
        "mu_j": _first(d, "mu_j", default=0.0),
        "sigma_j": _first(d, "sigma_j", default=0.0),
        "E_J_minus_1": _first(d, "E_J_minus_1", default=0.0),
        "drift_adjustment": _first(d, "drift_adjustment", default=0.0),
    })
    return p

def build_real_case(heston_json="oneDheston.json", bates_json="oneDbates.json"):
    h_raw = _load_json(heston_json) if os.path.exists(heston_json) else {}
    b_raw = _load_json(bates_json) if os.path.exists(bates_json) else {}
    h = _normalize_heston(h_raw) if h_raw else None
    b = _normalize_bates(b_raw) if b_raw else None
    S0 = (h or b)["S0"]
    V0 = (h or b)["v0"]
    r, q = (h or b)["r"], (h or b)["q"]
    # real-data config
    T = 59.0 / 252.0
    n_steps = 59
    Ks = np.array([3225.0, 2600.0], dtype=float)
    kinds = ["call", "put"]
    return (h, b, dict(S0=S0, V0=V0, r=r, q=q, T=T, n_steps=n_steps, Ks=Ks, kinds=kinds))


# ----------------------------- core hedger -----------------------------------

@dataclass
class SimpleBench:
    name: str
    model: object  # HestonModel or BatesModel
    r: float
    q: float

    # ---------- simulation wrapper (returns discounted X, dX and discounted payoffs) ----------
    def _sim_discounted(self, measure: str, S0: float, V0: float, T: float, n_steps: int,
                        n_paths: int, Ks: np.ndarray, kinds: List[str]) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        dt = T / n_steps
        if measure == "mmm":
            S, V, _ = self.model.simulate_mmm(S0, V0, T, dt, n_paths, _show_progress=False)
        elif measure == "vomm":
            S, V, _ = self.model.simulate_vomm(S0, V0, T, dt, n_paths, _show_progress=False)
        elif measure == "p":
            S, V, _ = self.model.simulate_under_P(S0, V0, T, dt, n_paths, _show_progress=False)
        else:
            raise ValueError("measure must be 'mmm', 'vomm', or 'p'")
        # Normalize simulated prices by S0
        S = S / S0
        t, disc = _disc_grid(self.r, self.q, T, n_steps)
        X = (S * disc[None, :]).astype(np.float64)          # discounted price paths
        dX = X[:, 1:] - X[:, :-1]                           # discounted increments
        H_tilde = _batch_discounted_payoffs(S[:, -1], Ks, kinds, disc[-1], S0)
        return X, dX, H_tilde

    # ---------- LSM helpers (basis + continuation values) ----------
    def _poly_features(self, Xk: np.ndarray, Vk: Optional[np.ndarray], tk: float) -> np.ndarray:
        """
        Build simple polynomial basis in discounted (normalized) state at time k.
        Xk: (n_paths,) discounted normalized price; Vk: (n_paths,) variance (can be None); tk: scalar in [0, T].
        """
        if Vk is None:
            return np.column_stack([
                np.ones_like(Xk),
                Xk, Xk**2,
                tk*np.ones_like(Xk),
                tk*Xk
            ]).astype(np.float64)
        return np.column_stack([
            np.ones_like(Xk),
            Xk, Vk,
            Xk**2, Vk**2, Xk*Vk,
            tk*np.ones_like(Xk),
            tk*Xk, tk*Vk
        ]).astype(np.float64)

    def _cond_mean_var(self, dX_col: np.ndarray, Xk_col: np.ndarray, Vk_col: Optional[np.ndarray], tk: float) -> Tuple[np.ndarray, float]:
        """
        Conditional mean/variance of ΔX_k given F_k via polynomial regression.
        Returns:
          mu_hat: pathwise conditional mean (n_paths,)
          var_hat: scalar conditional variance estimate
        """
        Phi = self._poly_features(Xk_col, (Vk_col if Vk_col is not None else None), tk)
        A = Phi.T @ Phi
        A.flat[::A.shape[0]+1] += 1e-10
        beta = np.linalg.solve(A, Phi.T @ dX_col)
        mu_hat = Phi @ beta
        resid = dX_col - mu_hat
        var_hat = float(np.mean(resid * resid))
        return mu_hat, var_hat

    def _discount_normalize_from_S(self, S: np.ndarray, S0: float, T: float, n_steps: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Given raw S paths (n_paths, n_steps+1), return (X, dX, disc) with
        X_t = e^{-(r-q)t} S_t / S0 and dX = ΔX."""
        t, disc = _disc_grid(self.r, self.q, T, n_steps)
        S_norm = S / S0
        X = (S_norm * disc[None, :]).astype(np.float64)
        dX = X[:, 1:] - X[:, :-1]
        return X, dX, disc
     
    def _apply_betas(self, X: np.ndarray, V: Optional[np.ndarray], t_grid: np.ndarray, betas: List[np.ndarray]) -> np.ndarray:
      n_paths, T_plus_1 = X.shape
      Vhat = np.zeros((n_paths, T_plus_1), dtype=np.float64)
      for k in range(T_plus_1 - 1):
         Xk = X[:, k]
         Vk = None if V is None else V[:, k]
         Phi = self._poly_features(Xk, Vk, t_grid[k])
         Vhat[:, k] = Phi @ betas[k]
      # terminal left at 0; we only need k<T values for dV etc.
      return Vhat
    
    def _lsm_continuation_values(self, X: np.ndarray, V: Optional[np.ndarray],
                             H_tilde_port: np.ndarray, t_grid: np.ndarray) -> Tuple[np.ndarray, List[np.ndarray]]:
         n_paths, T_plus_1 = X.shape
         Vhat = np.zeros((n_paths, T_plus_1), dtype=np.float64)
         betas: List[np.ndarray] = [None]*(T_plus_1)  # store beta per time

         Vhat[:, -1] = H_tilde_port
         ridge = 1e-10

         for k in range(T_plus_1 - 1):
            Xk = X[:, k]
            Vk = None if V is None else V[:, k]
            Phi = self._poly_features(Xk, Vk, t_grid[k])
            A = Phi.T @ Phi
            A.flat[::A.shape[0]+1] += ridge
            beta = np.linalg.solve(A, Phi.T @ H_tilde_port)
            Vhat[:, k] = Phi @ beta
            betas[k] = beta

         betas[-1] = np.zeros_like(betas[0])  # terminal dummy
         return Vhat, betas

    # ---------- LRM theta under MMM (local projection on ΔX without drift-removal) ----------
    def train_theta_lrm(self, S0, V0, T, n_steps, n_paths, Ks, kinds):
         X, dX, H_tilde_mat = self._sim_discounted("mmm", S0, V0, T, n_steps, n_paths, Ks, kinds)
         H_port = H_tilde_mat.sum(axis=0)
         t_grid, _ = _disc_grid(self.r, self.q, T, n_steps)
         Vhat, betas = self._lsm_continuation_values(X, None, H_port, t_grid)

         dV = Vhat[:, 1:] - Vhat[:, :-1]
         theta = np.zeros(dX.shape[1], dtype=np.float64)
         eps = 1e-12
         for k in range(dX.shape[1]):
            x, y = dX[:, k], dV[:, k]
            cov = ((x - x.mean()) * (y - y.mean())).mean()
            var = ((x - x.mean())**2).mean() + eps
            theta[k] = cov / var

         H0_lrm = float(np.mean(H_port))  # E^{MMM}[H~]
         return theta, betas, H0_lrm

    # ---------- MVH theta under VOMM (GKW integrand via LSM; v0 handled at evaluation) ----------
    def train_theta_mvh(self, S0, V0, T, n_steps, n_paths, Ks, kinds):
         have_with_density = hasattr(self.model, 'simulate_vomm_with_density')
         if have_with_density:
            S, V, Z, comp_ratio = self.model.simulate_vomm_with_density(S0, V0, T, T/n_steps, n_paths, _show_progress=False)
         else:
            S, V, _ = self.model.simulate_vomm(S0, V0, T, T/n_steps, n_paths, _show_progress=False)
            Z = None; comp_ratio = None

         X, dX, disc = self._discount_normalize_from_S(S, S0, T, n_steps)
         S_T_norm = (S / S0)[:, -1]
         H_tilde_mat = _batch_discounted_payoffs(S_T_norm, Ks, kinds, disc[-1], S0=S0)
         H_port = H_tilde_mat.sum(axis=0)

         t_grid, _ = _disc_grid(self.r, self.q, T, n_steps)
         Vhat, betas = self._lsm_continuation_values(X, None, H_port, t_grid)
         dV = Vhat[:, 1:] - Vhat[:, :-1]

         Tm1 = dX.shape[1]
         xi = np.zeros(Tm1, dtype=np.float64)
         eps = 1e-12
         for k in range(Tm1):
            x, y = dX[:, k], dV[:, k]
            cov = ((x - x.mean()) * (y - y.mean())).mean()
            var = ((x - x.mean())**2).mean() + eps
            xi[k] = cov / var

         H0_mvh = float(np.mean(H_port))  # E^{VOMM}[H~]
         return xi, betas, H0_mvh, comp_ratio

    # ---------- evaluate under P (portfolio = CALL+PUT) ----------
    def evaluate_P(self,
               xi_mvh: np.ndarray, betas_vomm: List[np.ndarray], H0_mvh: float,
               theta_lrm: np.ndarray, betas_mmm: List[np.ndarray], H0_lrm: float,
               S0: float, V0: float, T: float, n_steps: int, n_paths: int,
               Ks: np.ndarray, kinds: List[str]) -> Dict:
        """
        Paper-faithful P-evaluation (discounted, normalized).
        - MVH error:  H~ - (H0_mvh + sum θ^{MVH}_k(path) ΔX_k)  with feedback (Eq. 4.24)
        - LRM error:  H~ - (H0_lrm + sum θ^{LRM}_k ΔM_k)        with ΔM_k = ΔX_k - E[ΔX_k|F_k]
        Returns orthogonality metrics for both MVH and LRM.
        """
        dt = T / n_steps

        # Simulate under P and build discounted normalized state
        S, V, _ = self.model.simulate_under_P(S0, V0, T, dt, n_paths, _show_progress=False)
        X, dX, disc = self._discount_normalize_from_S(S, S0, T, n_steps)

        # Discounted payoff (portfolio = sum over options), already normalized by S0 in payoff helper
        S_T_norm = (S / S0)[:, -1]
        H_tilde_mat = _batch_discounted_payoffs(S_T_norm, Ks, kinds, disc[-1], S0=S0)
        payoff_port = H_tilde_mat.sum(axis=0)  # (n_paths,)

        T_al = min(dX.shape[1], xi_mvh.shape[0], theta_lrm.shape[0])
        dX = dX[:, :T_al]
        Xk = X[:, :T_al]              # state X at begins of steps
        Vk = V[:, :T_al] if V is not None else None
        t_grid, _ = _disc_grid(self.r, self.q, T, n_steps)

        # ---------- LRM: build ΔM_k via conditional mean regression per time ----------
        Phi_list = []
        for k in range(T_al):
            phi = np.column_stack([
                np.ones_like(Xk[:, k]), Xk[:, k],
                (Vk[:, k] if Vk is not None else Xk[:, k]*0.0),
                Xk[:, k]**2,
                (Xk[:, k]*(Vk[:, k] if Vk is not None else 0.0)),
                (Vk[:, k] if Vk is not None else 0.0)**2
            ]).astype(np.float64)
            Phi_list.append(phi)
        dM = np.empty_like(dX)
        mu_cond = np.empty_like(dX)
        for k in range(T_al):
            A = Phi_list[k].T @ Phi_list[k]
            A.flat[::A.shape[0]+1] += 1e-10
            beta_mu = np.linalg.solve(A, Phi_list[k].T @ dX[:, k])
            mu_hat = Phi_list[k] @ beta_mu                  # ≈ E_P[ΔX_k | F_k]
            mu_cond[:, k] = mu_hat
            dM[:, k] = dX[:, k] - mu_hat

        # LRM: pathwise gains and error
        gains_lrm = (dM * theta_lrm[:T_al][None, :]).sum(axis=1)
        err_lrm = payoff_port - (H0_lrm + gains_lrm)       # equals L_T (discrete)

        # ---------- LRM orthogonality: E[L_T * M_T] with M_T = Σ θ_k ΔM_k (should be ~0) ----------
        # For LRM, orthogonality: E[L_T * M_T] where M_T = sum θ_k dM_k
        M_T_lrm = gains_lrm
        L_T_lrm = err_lrm
        ortho_lrm = float(np.mean(L_T_lrm * M_T_lrm))

        # ---------- MVH: feedback using conditional regressions (stable scalar α_k) ----------
        # Reconstruct V^{H,~P}_k along P-paths using VOMM betas
        Vhat_vomm = self._apply_betas(X[:, :T_al+1], None, t_grid, betas_vomm)

        # Compute scalar α_k = E[μ_k] / Var_k with μ_k, Var_k from conditional regressions at each time
        alpha_scalar = np.zeros(T_al, dtype=np.float64)
        for k in range(T_al):
            mu_hat_k, var_hat_k = self._cond_mean_var(dX[:, k], Xk[:, k], (Vk[:, k] if Vk is not None else None), t_grid[k])
            # baseline conditional mean (diffusion part)
            num = float(np.mean(mu_hat_k))

            # --- Bates jump drift correction under P: add λ * E[J-1] * E[X_k] * Δt ---
            if isinstance(self.model, BatesModel):
                # Prefer explicit E[J-1] if provided; else compute from (mu_j, sigma_j)
                EJm1 = getattr(self.model, 'E_J_minus_1', None)
                if EJm1 is None:
                    mu_j = getattr(self.model, 'mu_j', 0.0)
                    sigma_j = getattr(self.model, 'sigma_j', 0.0)
                    EJm1 = np.exp(mu_j + 0.5 * sigma_j * sigma_j) - 1.0
                lam = getattr(self.model, 'lambda_j', 0.0)
                jd = lam * EJm1 * float(np.mean(Xk[:, k])) * dt
                num += jd

            den = max(var_hat_k, 1e-12)

            a = num / den
            # Clamp α_k conservatively; tighter for Bates to avoid recursion blow-up due to sparse jumps
            if isinstance(self.model, BatesModel):
                alpha_scalar[k] = np.clip(a, -10.0, 10.0)
            else:
                alpha_scalar[k] = np.clip(a, -100.0, 100.0)

        # Recursion: θ^{MVH}_k(path) = ξ_k + α_k * ( Vhat_k - H0_mvh - sum_{j<k} θ_j ΔX_j )
        theta_path = np.zeros_like(dX)
        cum = np.zeros(dX.shape[0], dtype=np.float64)   # cumulative gains up to k-1
        # Tighter clips for Bates (jumps); looser for Heston
        if isinstance(self.model, BatesModel):
            theta_clip = 10.0
            cum_clip = 2.0
        else:
            theta_clip = 50.0
            cum_clip = 10.0
        for k in range(T_al):
            Vk_meanless = Vhat_vomm[:, k] - H0_mvh
            raw_theta = xi_mvh[k] + alpha_scalar[k] * (Vk_meanless - cum)
            raw_theta = np.clip(raw_theta, -theta_clip, theta_clip)
            theta_path[:, k] = raw_theta
            cum = np.clip(cum + raw_theta * dX[:, k], -cum_clip, cum_clip)

        gains_mvh = (theta_path * dX).sum(axis=1)
        # MVH error equals cost martingale \tilde L_T under P (HPS Eq.4.24 discrete analogue)
        err_mvh = payoff_port - (H0_mvh + gains_mvh)

        # MVH orthogonality w.r.t. martingale part M_T = Σ θ_k ΔM_k under P
        M_T_mvh = (dM * theta_path).sum(axis=1)  # reuse dM from LRM construction above
        L_T_mvh = err_mvh
        ortho_mvh = float(np.mean(L_T_mvh * M_T_mvh))

        # ---------- metrics ----------
        def _metrics(e: np.ndarray, unhedged: np.ndarray) -> Dict:
            mean = float(np.mean(e))
            mse = float(np.mean(e**2))
            var = max(mse - mean**2, 0.0)
            rmse = float(np.sqrt(mse))
            std = float(np.sqrt(var))
            heff = 1.0 - (var / max(float(np.var(unhedged)), 1e-12))
            return dict(MSE=mse, RMSE=rmse, Mean=mean, Std=std, HedgeEffPct=100.0*heff)

        mv = _metrics(err_mvh, payoff_port)
        lr = _metrics(err_lrm, payoff_port)

        return dict(
            measure="P",
            v0_mvh_disc=float(H0_mvh),
            H0_lrm_disc=float(H0_lrm),
            mv_metrics=mv,
            lr_metrics=lr,
            theta_mvh_port=None,  # pathwise; too big to dump by default
            theta_lrm_port=theta_lrm[:T_al].tolist(),
            # orthogonality_mv=float(ortho_mvh),
            orthogonality_lrm=float(ortho_lrm),
        )

# ------------------------------- runner --------------------------------------

def run_realdata():
    # load calibrated params and real case
    h_p, b_p, case = build_real_case()
    S0, V0, r, q, T, n_steps, Ks, kinds = case["S0"], case["V0"], case["r"], case["q"], case["T"], case["n_steps"], case["Ks"], case["kinds"]


    # models
    heston = HestonModel({'kappa': h_p['kappa'], 'theta': h_p['theta'], 'sigma': h_p['sigma'], 'rho': h_p['rho'],
                          'v0': h_p['v0'], 'r': h_p['r'], 'q': h_p['q'], 'xi_s': h_p.get('xi_s', 0.0)})
    if b_p is not None and all(k in b_p for k in ['lambda_j','mu_j','sigma_j']):
        bates = BatesModel({'kappa': b_p['kappa'], 'theta': b_p['theta'], 'sigma': b_p['sigma'], 'rho': b_p['rho'],
                            'v0': b_p['v0'], 'r': b_p['r'], 'q': b_p['q'], 'xi_s': b_p.get('xi_s', 0.0),
                            'lambda_j': b_p['lambda_j'], 'mu_j': b_p['mu_j'], 'sigma_j': b_p['sigma_j'],
                            'E_J_minus_1': b_p.get('E_J_minus_1', 0.0), 'drift_adjustment': b_p.get('drift_adjustment', 0.0)})
    else:
        bates = BatesModel({'kappa': h_p['kappa'], 'theta': h_p['theta'], 'sigma': h_p['sigma'], 'rho': h_p['rho'],
                            'v0': h_p['v0'], 'r': h_p['r'], 'q': h_p['q'], 'xi_s': h_p.get('xi_s', 0.0),
                            'lambda_j': 0.0, 'mu_j': 0.0, 'sigma_j': 0.0, 'E_J_minus_1': 0.0, 'drift_adjustment': 0.0})

    # simple benches
    hb = SimpleBench("Heston", heston, r, q)
    bb = SimpleBench("Bates", bates, r, q)

    # path budgets (powers of two)
    n_train = _next_pow2(2**18)        # 262,144 (validation-speed)
    n_eval  = _next_pow2(2**17)        # 131,072 (evaluation under P)

    # train thetas (portfolio-level)
    xi_lrm_h, betas_mmm_h, H0_lrm_h = hb.train_theta_lrm(S0, V0, T, n_steps, n_train, Ks, kinds)
    xi_mvh_h, betas_vomm_h, H0_mvh_h, comp_h = hb.train_theta_mvh(S0, V0, T, n_steps, n_train, Ks, kinds)

    xi_lrm_b, betas_mmm_b, H0_lrm_b = bb.train_theta_lrm(S0, V0, T, n_steps, n_train, Ks, kinds)
    xi_mvh_b, betas_vomm_b, H0_mvh_b, comp_b = bb.train_theta_mvh(S0, V0, T, n_steps, n_train, Ks, kinds)

    # evaluate under P
    res_h = hb.evaluate_P(xi_mvh_h, betas_vomm_h, H0_mvh_h,
                      xi_lrm_h, betas_mmm_h, H0_lrm_h,
                      S0, V0, T, n_steps, n_eval, Ks, kinds)
    res_b = bb.evaluate_P(xi_mvh_b, betas_vomm_b, H0_mvh_b,
                      xi_lrm_b, betas_mmm_b, H0_lrm_b,
                      S0, V0, T, n_steps, n_eval, Ks, kinds)

    # save artifacts
    ts = time.strftime("%Y%m%d-%H%M%S")
    outdir = f"artifacts/benchmark_eval/{ts}/"
    os.makedirs(outdir, exist_ok=True)
    with open(os.path.join(outdir, "heston_eval_P.json"), "w") as f:
        json.dump(res_h, f, indent=2)
    with open(os.path.join(outdir, "bates_eval_P.json"), "w") as f:
        json.dump(res_b, f, indent=2)

    print(f"\nSaved P-eval artifacts to {outdir}")
    print(f"Heston (portfolio) MV RMSE={res_h['mv_metrics']['RMSE']:.6f}  |  LRM RMSE={res_h['lr_metrics']['RMSE']:.6f}")
    print(f"Bates  (portfolio) MV RMSE={res_b['mv_metrics']['RMSE']:.6f}  |  LRM RMSE={res_b['lr_metrics']['RMSE']:.6f}")

    return dict(heston=res_h, bates=res_b, outdir=outdir)


# ------------------------------ entry point ----------------------------------

def runner():
    return run_realdata()

if __name__ == "__main__":
    runner()


Saved P-eval artifacts to artifacts/benchmark_eval/20250919-214921/
Heston (portfolio) MV RMSE=0.034136  |  LRM RMSE=0.037951
Bates  (portfolio) MV RMSE=0.050596  |  LRM RMSE=0.048986
