# Real Data Analysis: Option Pricing Model Comparison

This notebook compares four option pricing models (Heston, Merton, Bates, SVJJ) using real SPY option chain data.

In [None]:
# =============================================================================
# [Cell 1] Data Loading & Preprocessing
# =============================================================================
import yfinance as yf
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
import torch
import sys
import os

# Add parent directory to path for imports
sys.path.append(os.path.abspath('..'))
from src.physics_engine import MarketSimulator

# ---------------------------------------------------------
# 1. Download SPY Data
# ---------------------------------------------------------
ticker = "SPY"
print(f"[{ticker}] Downloading option chain data...")

spy = yf.Ticker(ticker)

# Current Price
try:
    current_price = spy.history(period="1d")['Close'].iloc[-1]
    print(f"Current Price (S0): ${current_price:.2f}")
except:
    current_price = 580.0
    print(f"Failed to load price. Using fallback: ${current_price}")

# Select Expiration Date (30-75 days out)
expirations = spy.options
target_date = None
today = datetime.now()

for exp_date in expirations:
    exp_dt = datetime.strptime(exp_date, "%Y-%m-%d")
    days_to_expire = (exp_dt - today).days
    if 30 <= days_to_expire <= 75:
        target_date = exp_date
        print(f"--> Selected Expiration: {target_date} ({days_to_expire} days)")
        break

if target_date is None:
    target_date = expirations[min(3, len(expirations)-1)]
    print(f"--> Fallback Expiration: {target_date}")

# ---------------------------------------------------------
# 2. Data Cleaning
# ---------------------------------------------------------
opt_chain = spy.option_chain(target_date)
calls = opt_chain.calls

# Filter for liquidity and reasonable strikes
calls_clean = calls[(calls['volume'] > 5) | (calls['openInterest'] > 10)].copy()
market_data = calls_clean[['strike', 'impliedVolatility', 'lastPrice']].copy()
market_data = market_data.sort_values('strike')
market_data = market_data[(market_data['impliedVolatility'] > 0.01) & (market_data['impliedVolatility'] < 1.0)]
market_data = market_data[(market_data['strike'] > current_price * 0.8) & (market_data['strike'] < current_price * 1.2)]

print(f"Preprocessed: {len(market_data)} valid data points")

# ---------------------------------------------------------
# 3. Define Calibration Variables
# ---------------------------------------------------------
calib_strikes = market_data['strike'].values
calib_ivs = market_data['impliedVolatility'].values

# Time to Maturity
expiry_date = datetime.strptime(target_date, "%Y-%m-%d")
T_val = max((expiry_date - today).days / 365.0, 0.01)

# Risk-free Rate & Time Step
r_val = 0.04
dt_val = 1/252

print(f"[Params] T: {T_val:.4f}, r: {r_val}, dt: {dt_val}")
print(f"[Params] S0: {current_price:.2f}, Strikes: {len(calib_strikes)}")

In [None]:
# =============================================================================
# [Cell 2] Helper Functions & Loss Function
# =============================================================================
from scipy.stats import norm
from scipy.optimize import brentq

# ---------------------------------------------------------
# Black-Scholes & Implied Volatility Solver
# ---------------------------------------------------------
def black_scholes_call_price(S, K, T, r, sigma):
    """Calculate Black-Scholes call option price."""
    if sigma <= 0 or T <= 0:
        return 0.0
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

def implied_vol_solver(market_price, S, K, T, r, sigma_low=0.001, sigma_high=5.0):
    """Solve for implied volatility using Brent's method."""
    if market_price <= 0 or T <= 0 or S <= 0 or K <= 0:
        return np.nan
    intrinsic = max(S - K * np.exp(-r * T), 0)
    if market_price < intrinsic or market_price >= S:
        return np.nan
    
    def objective(sigma):
        return black_scholes_call_price(S, K, T, r, sigma) - market_price
    
    try:
        f_low = objective(sigma_low)
        f_high = objective(sigma_high)
        if f_low * f_high > 0:
            return np.nan
        return brentq(objective, sigma_low, sigma_high, maxiter=100)
    except:
        return np.nan

# ---------------------------------------------------------
# Calibration Loss Function (Memory Optimized)
# ---------------------------------------------------------
def calibration_loss(params, model_name, market_strikes, market_ivs, S0, T, r, dt, num_paths):
    """Calculate RMSE between model IVs and market IVs."""
    try:
        # Parameter Unpacking
        if model_name == 'heston':
            kappa, theta, xi, rho = params
            jump_params = {'jump_lambda': 0.0, 'jump_mean': 0.0, 'jump_std': 0.0}
            val_type = 'heston'
        elif model_name == 'merton':
            sigma, jump_lambda, jump_mean, jump_std = params
            kappa, theta, xi, rho = 1.0, sigma**2, 0.001, 0.0
            jump_params = {'jump_lambda': jump_lambda, 'jump_mean': jump_mean, 'jump_std': jump_std}
            val_type = 'bates'
        elif model_name == 'bates':
            kappa, theta, xi, rho, jump_lambda, jump_mean, jump_std = params
            jump_params = {'jump_lambda': jump_lambda, 'jump_mean': jump_mean, 'jump_std': jump_std}
            val_type = 'bates'
        elif model_name == 'svjj':
            kappa, theta, xi, rho, jump_lambda, jump_mean, jump_std, vol_jump_mean = params
            jump_params = {'jump_lambda': jump_lambda, 'jump_mean': jump_mean, 'jump_std': jump_std, 'vol_jump_mean': vol_jump_mean}
            val_type = 'svjj'
            if vol_jump_mean < 0:
                return 1e9
        else:
            return 1e9

        # Safety Checks
        if kappa < 0 or theta < 0 or xi < 0 or abs(rho) > 0.99:
            return 1e9
        if jump_params['jump_lambda'] < 0 or jump_params['jump_std'] < 0:
            return 1e9
    except:
        return 1e9

    # Simulation with Memory Optimization
    try:
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
        
        with torch.no_grad():
            sim = MarketSimulator(mu=0.0, kappa=kappa, theta=theta, xi=xi, rho=rho,
                                  device=device, **jump_params)
            S_paths, _ = sim.simulate(S0, theta, T, dt, num_paths, model_type=val_type)
            
            if torch.isnan(S_paths).any():
                return 1e9

            S_final = S_paths[:, -1]
            if S_final.mean() < 1e-3 or torch.isnan(S_final.mean()):
                return 1e9
            
            # Martingale Correction
            S_corr = S_final * (S0 / S_final.mean())
            S_corr_np = S_corr.cpu().numpy()
            
            del S_paths, S_final, sim
        
        # Calculate RMSE
        error_sum = 0.0
        for i, K in enumerate(market_strikes):
            payoff = np.maximum(S_corr_np - K, 0)
            model_price = np.mean(payoff) * np.exp(-r * T)
            m_iv = implied_vol_solver(model_price, S0, K, T, r)
            target_iv = market_ivs[i]
            
            if np.isnan(m_iv) or m_iv < 0.001 or m_iv > 5.0:
                error_sum += 0.1
            else:
                error_sum += (m_iv - target_iv) ** 2
                
        return np.sqrt(error_sum / len(market_strikes))
        
    except:
        return 1e9

print("Functions Defined Successfully")

In [None]:
# =============================================================================
# [Cell 3] Model Calibration
# =============================================================================
from scipy.optimize import differential_evolution

# Optimization Settings (N=1000 for stability)
base_opts = {
    'strategy': 'best1bin',
    'maxiter': 10,
    'popsize': 10,
    'tol': 0.02,
    'mutation': (0.5, 1),
    'recombination': 0.7
}
calib_args = (calib_strikes, calib_ivs, current_price, T_val, r_val, dt_val, 1000)

print("[Calibration] Starting Differential Evolution (N=1000)...\n")

# 1. Heston Model
print("[1/4] Calibrating Heston...")
bounds_heston = [(0.1, 5.0), (0.01, 0.2), (0.1, 2.0), (-0.9, 0.0)]
res_heston = differential_evolution(calibration_loss, bounds_heston, args=('heston', *calib_args), **base_opts)
print(f"Heston RMSE: {res_heston.fun:.4f}\n")

# 2. Merton Model
print("[2/4] Calibrating Merton...")
bounds_merton = [(0.05, 0.5), (0.1, 5.0), (-0.4, 0.1), (0.01, 0.3)]
res_merton = differential_evolution(calibration_loss, bounds_merton, args=('merton', *calib_args), **base_opts)
print(f"Merton RMSE: {res_merton.fun:.4f}\n")

# 3. Bates Model
print("[3/4] Calibrating Bates...")
bounds_bates = [(0.1, 5.0), (0.01, 0.2), (0.1, 2.0), (-0.95, -0.3), (0.01, 5.0), (-0.4, 0.1), (0.01, 0.3)]
res_bates = differential_evolution(calibration_loss, bounds_bates, args=('bates', *calib_args), **base_opts)
print(f"Bates RMSE: {res_bates.fun:.4f}\n")

# 4. SVJJ Model
print("[4/4] Calibrating SVJJ...")
bounds_svjj = bounds_bates + [(0.001, 0.15)]
res_svjj = differential_evolution(calibration_loss, bounds_svjj, args=('svjj', *calib_args), **base_opts)
print(f"SVJJ RMSE: {res_svjj.fun:.4f}\n")

print("="*50)
print("Calibration Complete!")
print("="*50)

In [None]:
# =============================================================================
# [Cell 4] Visualization & Leaderboard
# =============================================================================
print("\n[Visualizing] Generating Model Curves...")

def generate_model_ivs(params, model_name, strikes, S0, T, r, dt, N):
    """Generate implied volatilities for a given model and parameters."""
    if model_name == 'heston':
        kappa, theta, xi, rho = params
        jump_params = {'jump_lambda': 0.0, 'jump_mean': 0.0, 'jump_std': 0.0}
        val_type = 'heston'
    elif model_name == 'merton':
        sigma, jump_lambda, jump_mean, jump_std = params
        kappa, theta, xi, rho = 1.0, sigma**2, 0.001, 0.0
        jump_params = {'jump_lambda': jump_lambda, 'jump_mean': jump_mean, 'jump_std': jump_std}
        val_type = 'bates'
    elif model_name == 'bates':
        kappa, theta, xi, rho, jump_lambda, jump_mean, jump_std = params
        jump_params = {'jump_lambda': jump_lambda, 'jump_mean': jump_mean, 'jump_std': jump_std}
        val_type = 'bates'
    elif model_name == 'svjj':
        kappa, theta, xi, rho, jump_lambda, jump_mean, jump_std, vol_jump_mean = params
        jump_params = {'jump_lambda': jump_lambda, 'jump_mean': jump_mean, 'jump_std': jump_std, 'vol_jump_mean': vol_jump_mean}
        val_type = 'svjj'

    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    with torch.no_grad():
        sim = MarketSimulator(mu=0.0, kappa=kappa, theta=theta, xi=xi, rho=rho, device=device, **jump_params)
        S_paths, _ = sim.simulate(S0, theta, T, dt, N, model_type=val_type)
        
        S_final = S_paths[:, -1]
        if S_final.mean() > 1e-3 and not torch.isnan(S_final.mean()):
            S_corr = S_final * (S0 / S_final.mean())
        else:
            S_corr = S_final
        S_corr_np = S_corr.cpu().numpy()
        del S_paths, S_final, sim
        if device == 'cuda':
            torch.cuda.empty_cache()

    ivs = []
    for K in strikes:
        payoff = np.maximum(S_corr_np - K, 0)
        model_price = np.mean(payoff) * np.exp(-r * T)
        try:
            iv = implied_vol_solver(model_price, S0, K, T, r)
        except:
            iv = np.nan
        ivs.append(iv)
    return np.array(ivs)

# Generate IV Curves
N_plot = 5000
strikes_plot = np.linspace(market_data['strike'].min(), market_data['strike'].max(), 50)

print("  Generating Heston...")
iv_heston = generate_model_ivs(res_heston.x, 'heston', strikes_plot, current_price, T_val, r_val, dt_val, N_plot)
print("  Generating Merton...")
iv_merton = generate_model_ivs(res_merton.x, 'merton', strikes_plot, current_price, T_val, r_val, dt_val, N_plot)
print("  Generating Bates...")
iv_bates = generate_model_ivs(res_bates.x, 'bates', strikes_plot, current_price, T_val, r_val, dt_val, N_plot)
print("  Generating SVJJ...")
iv_svjj = generate_model_ivs(res_svjj.x, 'svjj', strikes_plot, current_price, T_val, r_val, dt_val, N_plot)

# ---------------------------------------------------------
# Plotting
# ---------------------------------------------------------
plt.figure(figsize=(14, 8))
plt.scatter(market_data['strike'], market_data['impliedVolatility'], c='black', s=40, alpha=0.5, label='Market Data (SPY)', zorder=5)

plt.plot(strikes_plot, iv_heston, 'g:', linewidth=2, label=f'Heston (RMSE={res_heston.fun:.4f})')
plt.plot(strikes_plot, iv_merton, 'c-.', linewidth=2, label=f'Merton (RMSE={res_merton.fun:.4f})')
plt.plot(strikes_plot, iv_bates, 'b-', linewidth=2.5, alpha=0.8, label=f'Bates (RMSE={res_bates.fun:.4f})')
plt.plot(strikes_plot, iv_svjj, 'r--', linewidth=3, label=f'SVJJ (RMSE={res_svjj.fun:.4f})')

plt.axvline(current_price, color='gray', linestyle=':', alpha=0.7, label='Spot Price')
plt.xlabel('Strike Price', fontsize=12)
plt.ylabel('Implied Volatility', fontsize=12)
plt.title('The Evolution of Option Pricing Models: Real Data Calibration', fontsize=16)
plt.legend(loc='upper right')
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

# ---------------------------------------------------------
# Final Leaderboard
# ---------------------------------------------------------
results = {
    'Heston': res_heston.fun,
    'Merton': res_merton.fun,
    'Bates': res_bates.fun,
    'SVJJ': res_svjj.fun
}
sorted_res = sorted(results.items(), key=lambda x: x[1])

print("\n" + "="*40)
print("     FINAL LEADERBOARD")
print("="*40)
for rank, (name, score) in enumerate(sorted_res, 1):
    medal = "1st" if rank==1 else "2nd" if rank==2 else "3rd" if rank==3 else str(rank)+"th"
    print(f"{medal}. {name:<10} | RMSE: {score:.5f}")
print("="*40)