<a href="https://colab.research.google.com/github/geoffroy-h-e/derivatives_p2/blob/main/q6v3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
#### Comments for Geoffroy ####
### pour Q6 ###
# On simule un Bates process
# Price options via fonctions de Q3
# Construction de model-based VIX
# Comparer avec VIX
# Retourne RMSE, corr, stats, etc.
#
##### CHANGEMENTS ####
# 1) mu_hat_J = exp(mu_J + 0.5 * sigma_J^2) - 1 (comme dans Bates)
# 2) Ajout d'un bloc HESTON:
#    pricer pour calls
#    calibration aux options (OptionMetrics)
#    mapping Heston Bates


# q6v3.py

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy.integrate import quad
from scipy.optimize import minimize


# RMSE

def rmse(y_true, y_pred):
    diff = y_pred - y_true
    return np.sqrt(np.mean(diff**2))


# Bates dynamic (under Q measure)

def simulate_bates_paths_Q(
    S0,
    v0,
    rf_d,
    y_d,
    params,
    n_days=21,
    n_paths=20_000,
    random_state=None,
):

    ##### On simule Bates sous  Q pour le modèle (Heston + sauts lognormaux)



    kappa   = params["kappa"]
    theta   = params["theta"]
    sigma_v = params["sigma_v"]
    rho     = params["rho"]
    mu_J    = params["mu_J"]
    sig_J   = params["sigma_J"]

    lambda_annual = params["lambdaJ_annual"]
    lambda_daily  = lambda_annual / 252.0

    mu_hat_J = np.exp(mu_J + 0.5 * sig_J**2) - 1.0

    rng = np.random.default_rng(random_state)

    S = np.full(n_paths, float(S0))
    v = np.full(n_paths, float(v0))

    dt = 1.0
    for _ in range(n_days):
        z1 = rng.standard_normal(n_paths)
        z2_indep = rng.standard_normal(n_paths)
        z2 = rho * z1 + np.sqrt(1.0 - rho**2) * z2_indep

        N_jump = rng.poisson(lambda_daily * dt, size=n_paths)

        J_sum = np.zeros(n_paths)
        has_jump = N_jump > 0
        if np.any(has_jump):
            m = N_jump[has_jump]
            J_sum[has_jump] = rng.normal(
                loc=mu_J * m,
                scale=sig_J * np.sqrt(m),
            )

        v_clipped = np.clip(v, 1e-10, None)

        # Drift + diffusion + sauts
        drift = (rf_d - y_d - lambda_daily * mu_hat_J - 0.5 * v_clipped) * dt
        diff  = np.sqrt(v_clipped * dt) * z1
        S *= np.exp(drift + diff + J_sum)

        # var dynamic  Heston style
        v = v + kappa * (theta - v) * dt + sigma_v * np.sqrt(v_clipped * dt) * z2
        v = np.clip(v, 1e-10, None)

    return S


## Serie sous Bates

def run_bates_vix_series(
    spx,
    rf_daily,
    y_daily,
    h,
    r,
    vix_close,
    bates_params,
    vix_fhs=None,
    vix_gauss=None,
    n_paths=20_000,
    horizon_days=21,
    make_plot=True,
    make_error_plot=True,
):


    h_series = pd.Series(h, index=r.index, name="h")
    duan_vol_ann = np.sqrt(252.0 * h_series) * 100.0  # en %

    vix_start = vix_close.index.min()
    vix_end   = vix_close.index.max()
    common_dates = r.index[(r.index >= vix_start) & (r.index <= vix_end)]

    wed_dates = common_dates[common_dates.weekday == 2]

    from q3 import (
        build_m_grid,
        forward_and_discount,
        price_options_from_paths,
        vix_from_option_curve,
    )

    m_grid = build_m_grid()
    vix_bates = pd.Series(index=wed_dates, dtype=float)

    for d in wed_dates:
        S0   = spx.loc[d, "spindx"]
        v0   = h_series.loc[d]
        rf_d = rf_daily.loc[d]
        y_d  = y_daily.loc[d]

        # simulations sous Q pour Bates
        S_T = simulate_bates_paths_Q(
            S0=S0,
            v0=v0,
            rf_d=rf_d,
            y_d=y_d,
            params=bates_params,
            n_days=horizon_days,
            n_paths=n_paths,
            random_state=None,
        )

        # Curve Options, VIX CBOE
        F, df, R_f, T = forward_and_discount(S0, rf_d, y_d, T_days=30)
        strikes, C, P = price_options_from_paths(S_T, F, v0, m_grid, df)
        VIX_b, _      = vix_from_option_curve(strikes, C, P, F, R_f, T)

        vix_bates.loc[d] = VIX_b

    plot_df = pd.concat(
        [
            vix_bates.rename("VIX_Bates"),
            duan_vol_ann.rename("Vol_phys"),
            vix_close.rename("VIX_actual"),
        ],
        axis=1,
        join="inner",
    ).dropna()

    if vix_fhs is not None:
        plot_df = plot_df.join(vix_fhs.rename("VIX_FHS"), how="inner")

    if vix_gauss is not None:
        plot_df = plot_df.join(vix_gauss.rename("VIX_Gauss"), how="inner")

    ## PLOT
    if make_plot:
        fig, ax = plt.subplots(figsize=(12, 5))
        ax.plot(plot_df.index, plot_df["VIX_Bates"],
                label="VIX_Bates", color="steelblue")
        ax.plot(plot_df.index, plot_df["Vol_phys"],
                label="Vol_phys (Duan NGARCH)", color="red")
        ax.plot(plot_df.index, plot_df["VIX_actual"],
                label="VIX_actual", color="black")
        ax.set_ylabel("Volatility index level")
        ax.set_xlabel("Date")
        ax.set_title("Bates (2000) model VIX vs physical volatility and market VIX")
        ax.legend(loc="upper left")
        plt.tight_layout()
        plt.show()

    print("\nCorrelation matrix (Bates, phys, actual, FHS, Gaussian):")
    print(plot_df.corr())

    err_bates = plot_df["VIX_Bates"] - plot_df["VIX_actual"]
    err_bates.name = "Bates_VIX_error"

    if make_error_plot:
        fig, ax = plt.subplots(figsize=(12, 4))
        err_bates.plot(ax=ax, color="steelblue", linewidth=1,
                       label="Bates_VIX_error")
        ax.axhline(0.0, color="black", linestyle="--", linewidth=1,
                   label="Zero error")
        ax.axhline(err_bates.mean(), color="red", linestyle="-", linewidth=1.5,
                   label="Mean error")
        ax.set_title("Bates VIX pricing error (Model − Market)")
        ax.set_ylabel("Error (index points)")
        ax.set_xlabel("Date")
        ax.legend()
        plt.tight_layout()
        plt.show()

        print("\nBates pricing error summary:")
        print(err_bates.describe())

    print("\nDescriptive statistics (Bates vs phys vs actual vs others):")
    print(plot_df.describe())

    # RMSE vs VIX réel
    rmse_bates = rmse(plot_df["VIX_actual"], plot_df["VIX_Bates"])
    print("\nRMSE vs VIX_actual:")
    print(f"  Bates VIX   : {rmse_bates:.4f}")

    rmse_fhs = None
    if "VIX_FHS" in plot_df.columns:
        rmse_fhs = rmse(plot_df["VIX_actual"], plot_df["VIX_FHS"])
        print(f"  FHS VIX     : {rmse_fhs:.4f}")

    rmse_gauss = None
    if "VIX_Gauss" in plot_df.columns:
        rmse_gauss = rmse(plot_df["VIX_actual"], plot_df["VIX_Gauss"])
        print(f"  Gaussian VIX: {rmse_gauss:.4f}")

    return {
        "plot_df": plot_df,
        "vix_bates": vix_bates,
        "duan_vol_ann": duan_vol_ann,
        "errors_bates": err_bates,
        "rmse_bates": rmse_bates,
        "rmse_fhs": rmse_fhs,
        "rmse_gauss": rmse_gauss,
    }


#  Bates params (défaut, non calibrés)
# please note that these parameters were found in various papers (Bates (2000), Duffie(2000), Agazzotti (2025), Cape (2014))
# not optimization routine was ran to find those.

bates_params = {
    "kappa": 0.3 / 252.0,
    "theta": 0.04 / 252.0,
    "sigma_v": 0.05 / np.sqrt(252),
    "rho": -0.7,
    "lambdaJ_annual": 0.3,
    "mu_J": -0.1,
    "sigma_J": 0.15,
}


#####  HESTON LAYER

def heston_call_price(
    S0,
    K,
    T,
    r,
    q,
    v0,
    kappa,
    theta,
    sigma_v,
    rho,
    u_max=100.0,
):

    x0 = np.log(S0)
    i = 1j

    def _phi(u, j):
        # j = 1 ou 2
        if j == 1:
            b = kappa - rho * sigma_v
        else:
            b = kappa

        a = kappa * theta

        d = np.sqrt(
            (rho * sigma_v * i * u - b) ** 2
            + (sigma_v ** 2) * (u ** 2 + i * u)
        )
        g = (b - rho * sigma_v * i * u - d) / (b - rho * sigma_v * i * u + d)
        exp_dT = np.exp(-d * T)

        C = (
            (r - q) * i * u * T
            + (a / (sigma_v ** 2))
            * ((b - rho * sigma_v * i * u - d) * T
               - 2.0 * np.log((1 - g * exp_dT) / (1 - g)))
        )

        D = (
            (b - rho * sigma_v * i * u - d)
            / (sigma_v ** 2)
            * ((1.0 - exp_dT) / (1.0 - g * exp_dT))
        )

        return np.exp(C + D * v0 + i * u * x0)

    def _P(j):
        def integrand(u):
            u = float(u)
            cf = _phi(u, j)
            return np.real(np.exp(-1j * u * np.log(K)) * cf / (1j * u))

        val, _ = quad(integrand, 0.0, u_max)
        return 0.5 + val / np.pi

    P1 = _P(1)
    P2 = _P(2)
    call_price = S0 * np.exp(-q * T) * P1 - K * np.exp(-r * T) * P2
    return np.real(call_price)


def calibrate_heston_to_calls(
    option_df,
    S0,
    r,
    q,
    initial_guess=None,
    bounds=None,
    u_max=100.0,
):

    Ks = option_df["K"].values
    Ts = option_df["T"].values
    C_mkt = option_df["price"].values

    if "weight" in option_df.columns:
        w = option_df["weight"].values
    else:
        w = np.ones_like(C_mkt)

    if initial_guess is None:
        # [kappa, theta, sigma_v, rho, v0]
        initial_guess = np.array([1.5, 0.04, 0.5, -0.5, 0.04], dtype=float)

    if bounds is None:
        bounds = [
            (1e-4, 10.0),
            (1e-4, 2.0),
            (1e-4, 2.0),
            (-0.999, 0.999),
            (1e-6, 2.0),
        ]

    def objective(x):
        kappa, theta, sigma_v, rho, v0 = x

        if kappa <= 0 or theta <= 0 or sigma_v <= 0 or v0 <= 0:
            return 1e9
        if not (-0.999 < rho < 0.999):
            return 1e9

        # pénalité Feller (2 κ θ > σ^2)
        penalty = 0.0
        if 2.0 * kappa * theta <= sigma_v ** 2:
            penalty = 1e3 * (sigma_v ** 2 - 2.0 * kappa * theta) ** 2

        C_model = []
        for K, T in zip(Ks, Ts):
            C_model.append(
                heston_call_price(
                    S0=S0,
                    K=K,
                    T=T,
                    r=r,
                    q=q,
                    v0=v0,
                    kappa=kappa,
                    theta=theta,
                    sigma_v=sigma_v,
                    rho=rho,
                    u_max=u_max,
                )
            )

        C_model = np.array(C_model)
        diff = (C_model - C_mkt) * w
        mse = np.mean(diff ** 2)
        return mse + penalty

    res = minimize(
        objective,
        x0=initial_guess,
        bounds=bounds,
        method="L-BFGS-B",
    )

    if not res.success:
        print("WARNING: Heston calibration did not fully converge.")
        print(res.message)

    kappa_hat, theta_hat, sigma_hat, rho_hat, v0_hat = res.x
    params_hat = {
        "kappa": kappa_hat,
        "theta": theta_hat,
        "sigma_v": sigma_hat,
        "rho": rho_hat,
        "v0": v0_hat,
    }

    return params_hat, res


def build_bates_params_from_heston(
    heston_params,
    lambdaJ_annual=0.3,
    mu_J=-0.1,
    sigma_J=0.15,
):
    ###Construit un set de params Bates à partir de params Heston calibrés
    #We keep exogenous jump params  (lambdaJ_annual, mu_J, sigma_J)
    return {
        "kappa": heston_params["kappa"],
        "theta": heston_params["theta"],
        "sigma_v": heston_params["sigma_v"],
        "rho": heston_params["rho"],
        "lambdaJ_annual": lambdaJ_annual,
        "mu_J": mu_J,
        "sigma_J": sigma_J,
    }


#  DATA (options_30dte_91dte.csv)

def prepare_optionmetrics_option_df(
    csv_path,
    calib_date,
    cp_flag="C",
    min_mid_price=0.25,
    use_volume_filter=False,
    min_volume=1,
):


    df = pd.read_csv(
        csv_path,
        parse_dates=["date", "exdate"],
    )

    calib_date = pd.to_datetime(calib_date)
    df = df[df["date"] == calib_date]

    df = df[df["cp_flag"] == cp_flag]     # Calls only

    df["mid"] = 0.5 * (df["best_bid"] + df["best_offer"])
    df = df[np.isfinite(df["mid"])]

    df = df[df["mid"] >= min_mid_price]

    if use_volume_filter and "volume" in df.columns:
        df = df[df["volume"] >= min_volume]

    df["K"] = df["strike_price"] / 1000.0

    df["T"] = (df["exdate"] - df["date"]).dt.days / 365.0
    df = df[df["T"] > 0]

    option_df = df[["K", "T", "mid"]].copy()
    option_df.rename(columns={"mid": "price"}, inplace=True)

    option_df.sort_values(["T", "K"], inplace=True)
    option_df.reset_index(drop=True, inplace=True)

    return option_df


def calibrate_bates_from_optionmetrics_csv(
    calib_date,
    S0,
    rf_annual,
    q_annual,
    csv_path="/content/PD_TP2/options_30dte_91dte.csv",
    lambdaJ_annual=0.3,
    mu_J=-0.1,
    sigma_J=0.15,
    cp_flag="C",
):



    option_df = prepare_optionmetrics_option_df(
        csv_path=csv_path,
        calib_date=calib_date,
        cp_flag=cp_flag,
    )

    heston_params_hat, calib_res = calibrate_heston_to_calls(
        option_df=option_df,
        S0=S0,
        r=rf_annual,
        q=q_annual,
    )

    bates_params_calib = build_bates_params_from_heston(
        heston_params=heston_params_hat,
        lambdaJ_annual=lambdaJ_annual,
        mu_J=mu_J,
        sigma_J=sigma_J,
    )

    return bates_params_calib, heston_params_hat, calib_res, option_df

