# Test Heston Calibration

This notebook tests the Heston calibration module with:
1. Synthetic data (known true params)
2. QuantLib examples from documentation
3. Edge cases (extreme params, sparse data)
4. Performance benchmarks

In [16]:
import sys
from pathlib import Path
import numpy as np
import time

sys.path.insert(0, str(Path("../..").resolve()))

import QuantLib as ql
print(f"QuantLib version: {ql.__version__}")

QuantLib version: 1.41


In [17]:
from src.models.heston import (
    HestonParams,
    calibrate_heston,
    CalibrationResult,
    heston_price,
    heston_iv,
)
from src.models.heston.bs import bs_iv, strike_from_delta

print("Imports OK")

Imports OK


## Test 1: Basic Heston Pricing

Verify that `heston_price` and `heston_iv` work correctly.

In [18]:
# Standard test case
S0 = 100.0
K = 100.0
T = 1.0
r = 0.05
q = 0.02

# Typical Heston parameters
params = HestonParams(
    v0=0.04,      # 20% initial vol
    kappa=2.0,    # Mean reversion speed
    theta=0.04,   # Long-run variance (20% vol)
    sigma=0.3,    # Vol of vol
    rho=-0.7,     # Spot-vol correlation
)

print(f"Feller condition: {params.feller_condition}")
print(f"2*kappa*theta = {2*params.kappa*params.theta:.4f}, sigma^2 = {params.sigma**2:.4f}")

Feller condition: True
2*kappa*theta = 0.1600, sigma^2 = 0.0900


In [19]:
# Test pricing
t0 = time.time()
call_price = heston_price(S0, K, T, r, q, params, "call")
t1 = time.time()
put_price = heston_price(S0, K, T, r, q, params, "put")
t2 = time.time()

print(f"Call price: ${call_price:.4f} ({(t1-t0)*1000:.1f}ms)")
print(f"Put price:  ${put_price:.4f} ({(t2-t1)*1000:.1f}ms)")

# Put-call parity check
parity_lhs = call_price - put_price
parity_rhs = S0 * np.exp(-q*T) - K * np.exp(-r*T)
print(f"\nPut-call parity check:")
print(f"  C - P = {parity_lhs:.4f}")
print(f"  S*e^(-qT) - K*e^(-rT) = {parity_rhs:.4f}")
print(f"  Difference: {abs(parity_lhs - parity_rhs):.6f}")

Call price: $9.0595 (1.6ms)
Put price:  $6.1626 (1.0ms)

Put-call parity check:
  C - P = 2.8969
  S*e^(-qT) - K*e^(-rT) = 2.8969
  Difference: 0.000000


In [20]:
# Test IV extraction
t0 = time.time()
call_iv = heston_iv(S0, K, T, r, q, params, "call")
t1 = time.time()
put_iv = heston_iv(S0, K, T, r, q, params, "put")
t2 = time.time()

print(f"Call IV: {call_iv*100:.2f}% ({(t1-t0)*1000:.1f}ms)")
print(f"Put IV:  {put_iv*100:.2f}% ({(t2-t1)*1000:.1f}ms)")

# Should be close to sqrt(v0) = 20% for ATM
print(f"\nExpected ~{np.sqrt(params.v0)*100:.1f}% (ATM, short-term)")

Call IV: 19.56% (3.2ms)
Put IV:  19.56% (2.0ms)

Expected ~20.0% (ATM, short-term)


## Test 2: IV Surface Generation Speed

Check how long it takes to compute a full IV surface.

In [21]:
# Test strikes and maturities
strikes = np.linspace(80, 120, 9)  # 9 strikes
maturities = np.array([0.25, 0.5, 1.0])  # 3 maturities

print(f"Computing {len(strikes) * len(maturities) * 2} IVs (calls + puts)...")

t0 = time.time()
ivs = []
for T in maturities:
    for K in strikes:
        for cp in ["call", "put"]:
            iv = heston_iv(S0, K, T, r, q, params, cp)
            ivs.append(iv)
t1 = time.time()

print(f"Time: {t1-t0:.2f}s ({len(ivs)} IVs)")
print(f"Per IV: {(t1-t0)/len(ivs)*1000:.1f}ms")
print(f"\nIV range: [{min(ivs)*100:.1f}%, {max(ivs)*100:.1f}%]")

Computing 54 IVs (calls + puts)...
Time: 0.15s (54 IVs)
Per IV: 2.8ms

IV range: [15.9%, 24.3%]


In [22]:
# Full VAE grid size test
# 2 cp × 11 maturities × 13 deltas = 286 IVs per surface
n_full_grid = 2 * 11 * 13
estimated_time = (t1-t0) / len(ivs) * n_full_grid

print(f"Full grid ({n_full_grid} IVs) estimated time: {estimated_time:.1f}s")
print(f"For 2500 dates: {estimated_time * 2500 / 3600:.1f} hours")

Full grid (286 IVs) estimated time: 0.8s
For 2500 dates: 0.5 hours


## Test 3: Calibration with Synthetic Data

Generate market IVs from known Heston params, then calibrate back.

In [23]:
# True parameters
true_params = HestonParams(
    v0=0.04,
    kappa=1.5,
    theta=0.04,
    sigma=0.3,
    rho=-0.7,
)

# Generate market data
S0 = 100.0
r = 0.05
q = 0.02

# Use OTM options for calibration (typical practice)
test_maturities = []
test_strikes = []
test_ivs = []
test_cp_flags = []

for T in [0.25, 0.5, 1.0]:
    for moneyness in [0.9, 0.95, 1.0, 1.05, 1.1]:
        K = S0 * moneyness
        # Use OTM: puts for K < S0, calls for K >= S0
        cp = "put" if K < S0 else "call"
        
        iv = heston_iv(S0, K, T, r, q, true_params, cp)
        
        test_maturities.append(T)
        test_strikes.append(K)
        test_ivs.append(iv)
        test_cp_flags.append("P" if cp == "put" else "C")

test_maturities = np.array(test_maturities)
test_strikes = np.array(test_strikes)
test_ivs = np.array(test_ivs)
test_cp_flags = np.array(test_cp_flags)

print(f"Generated {len(test_ivs)} synthetic market IVs")
print(f"IV range: [{test_ivs.min()*100:.2f}%, {test_ivs.max()*100:.2f}%]")

Generated 15 synthetic market IVs
IV range: [17.42%, 22.08%]


In [24]:
# Calibrate
print("Calibrating...")
t0 = time.time()
result = calibrate_heston(
    S0=S0,
    r=r,
    q=q,
    maturities=test_maturities,
    strikes=test_strikes,
    market_ivs=test_ivs,
    cp_flags=test_cp_flags,
    max_iterations=500,
)
t1 = time.time()

print(f"\nCalibration time: {t1-t0:.2f}s")
print(result)

Calibrating...

Calibration time: 0.04s
CalibrationResult(SUCCESS, error=0.000036)
  v0=0.040047, kappa=0.9492, theta=0.042310, sigma=0.2980, rho=-0.6960


In [25]:
# Compare recovered vs true
print("\nParameter comparison:")
print(f"{'Param':<8} {'True':>10} {'Calibrated':>12} {'Error %':>10}")
print("-" * 45)

for name in ['v0', 'kappa', 'theta', 'sigma', 'rho']:
    true_val = getattr(true_params, name)
    calib_val = getattr(result.params, name)
    error_pct = abs(calib_val - true_val) / abs(true_val) * 100 if true_val != 0 else 0
    print(f"{name:<8} {true_val:>10.4f} {calib_val:>12.4f} {error_pct:>10.2f}%")


Parameter comparison:
Param          True   Calibrated    Error %
---------------------------------------------
v0           0.0400       0.0400       0.12%
kappa        1.5000       0.9492      36.72%
theta        0.0400       0.0423       5.78%
sigma        0.3000       0.2980       0.67%
rho         -0.7000      -0.6960       0.58%


## Test 4: Edge Cases

In [26]:
# Edge case 1: Very short maturity
print("Edge Case 1: Very short maturity (7 days)")
try:
    iv_short = heston_iv(S0, 100, 7/365, r, q, params, "call")
    print(f"  IV: {iv_short*100:.2f}%")
except Exception as e:
    print(f"  FAILED: {e}")

Edge Case 1: Very short maturity (7 days)
  IV: 19.97%


In [27]:
# Edge case 2: Deep ITM/OTM
print("Edge Case 2: Deep ITM/OTM options")
for K in [50, 70, 130, 150]:
    try:
        t0 = time.time()
        iv = heston_iv(S0, K, 1.0, r, q, params, "call")
        t1 = time.time()
        print(f"  K={K}: IV={iv*100:.2f}% ({(t1-t0)*1000:.0f}ms)")
    except Exception as e:
        print(f"  K={K}: FAILED - {e}")

Edge Case 2: Deep ITM/OTM options
  K=50: IV=28.27% (4ms)
  K=70: IV=24.42% (5ms)
  K=130: IV=15.97% (4ms)
  K=150: IV=14.83% (4ms)


In [28]:
# Edge case 3: Very long maturity
print("Edge Case 3: Very long maturity (5 years)")
try:
    t0 = time.time()
    iv_long = heston_iv(S0, 100, 5.0, r, q, params, "call")
    t1 = time.time()
    print(f"  IV: {iv_long*100:.2f}% ({(t1-t0)*1000:.0f}ms)")
except Exception as e:
    print(f"  FAILED: {e}")

Edge Case 3: Very long maturity (5 years)
  IV: 19.91% (3ms)


In [29]:
# Edge case 4: Extreme Heston parameters
print("Edge Case 4: Extreme parameters")

extreme_cases = [
    ("High vol-of-vol", HestonParams(v0=0.04, kappa=2.0, theta=0.04, sigma=1.0, rho=-0.5)),
    ("Near-zero kappa", HestonParams(v0=0.04, kappa=0.01, theta=0.04, sigma=0.3, rho=-0.5)),
    ("Extreme rho", HestonParams(v0=0.04, kappa=2.0, theta=0.04, sigma=0.3, rho=-0.95)),
    ("High variance", HestonParams(v0=0.25, kappa=2.0, theta=0.25, sigma=0.5, rho=-0.7)),
]

for name, p in extreme_cases:
    try:
        t0 = time.time()
        iv = heston_iv(S0, 100, 0.5, r, q, p, "call")
        t1 = time.time()
        feller = "✓" if p.feller_condition else "✗"
        print(f"  {name}: IV={iv*100:.2f}% Feller={feller} ({(t1-t0)*1000:.0f}ms)")
    except Exception as e:
        print(f"  {name}: FAILED - {e}")

Edge Case 4: Extreme parameters
  High vol-of-vol: IV=16.59% Feller=✗ (3ms)
  Near-zero kappa: IV=19.20% Feller=✗ (3ms)
  Extreme rho: IV=19.60% Feller=✓ (3ms)
  High variance: IV=48.79% Feller=✓ (2ms)


In [30]:
# Edge case 5: Sparse calibration data
print("Edge Case 5: Sparse calibration (only 5 points)")

sparse_T = np.array([0.5, 0.5, 0.5, 1.0, 1.0], dtype=float)
sparse_K = np.array([95.0, 100.0, 105.0, 95.0, 105.0], dtype=float)
sparse_cp = np.array(["P", "C", "C", "P", "C"])

sparse_ivs = np.array([
    heston_iv(S0, float(K), float(T), r, q, true_params, "call" if cp=="C" else "put")
    for T, K, cp in zip(sparse_T, sparse_K, sparse_cp)
])

t0 = time.time()
result_sparse = calibrate_heston(
    S0=S0, r=r, q=q,
    maturities=sparse_T,
    strikes=sparse_K,
    market_ivs=sparse_ivs,
    cp_flags=sparse_cp,
    max_iterations=200,
)
t1 = time.time()

print(f"  Time: {t1-t0:.2f}s")
print(f"  Result: {result_sparse}")

Edge Case 5: Sparse calibration (only 5 points)
  Time: 0.01s
  Result: CalibrationResult(SUCCESS, error=0.000000)
  v0=0.040229, kappa=0.9276, theta=0.042807, sigma=0.3095, rho=-0.6658


## Test 5: Strike-from-Delta Conversion

In [31]:
# Test delta → strike conversion
print("Delta to Strike conversion tests:")
print(f"S0={S0}, T=0.5, r={r}, q={q}, σ=20%")
print()

sigma = 0.20
T = 0.5

for delta in [0.25, 0.50, 0.75]:
    K_call = strike_from_delta(S=S0, T=T, r=r, q=q, sigma=sigma, delta=delta, cp_flag="call")
    K_put = strike_from_delta(S=S0, T=T, r=r, q=q, sigma=sigma, delta=delta, cp_flag="put")
    print(f"Delta={delta:.2f}: Call K={K_call:.2f}, Put K={K_put:.2f}")

Delta to Strike conversion tests:
S0=100.0, T=0.5, r=0.05, q=0.02, σ=20%

Delta=0.25: Call K=112.67, Put K=92.89
Delta=0.50: Call K=102.35, Put K=102.35
Delta=0.75: Call K=92.89, Put K=112.67


In [32]:
# Edge cases for delta conversion
print("\nDelta edge cases:")

edge_deltas = [0.05, 0.10, 0.90, 0.95]
for delta in edge_deltas:
    try:
        K = strike_from_delta(S=S0, T=T, r=r, q=q, sigma=sigma, delta=delta, cp_flag="call")
        if K is None or np.isnan(K):
            print(f"  Call delta={delta}: None/NaN")
        else:
            print(f"  Call delta={delta}: K={K:.2f}")
    except Exception as e:
        print(f"  Call delta={delta}: FAILED - {e}")


Delta edge cases:
  Call delta=0.05: K=129.30
  Call delta=0.1: K=122.81
  Call delta=0.9: K=84.89
  Call delta=0.95: K=80.10


## Test 6: QuantLib Direct Calibration Example

Reference implementation from QuantLib documentation to verify our wrapper works correctly.

In [33]:
# Direct QuantLib calibration (from QuantLib examples)
# This bypasses our wrapper to check if the issue is in our code

today = ql.Date.todaysDate()
ql.Settings.instance().evaluationDate = today

# Market data
spot = ql.QuoteHandle(ql.SimpleQuote(100.0))
rate = ql.YieldTermStructureHandle(
    ql.FlatForward(today, 0.05, ql.Actual365Fixed())
)
div = ql.YieldTermStructureHandle(
    ql.FlatForward(today, 0.02, ql.Actual365Fixed())
)

# Initial Heston params
v0 = 0.04
kappa = 1.0
theta = 0.04
sigma = 0.3
rho = -0.5

process = ql.HestonProcess(rate, div, spot, v0, kappa, theta, sigma, rho)
model = ql.HestonModel(process)
engine = ql.AnalyticHestonEngine(model)

print("Initial Heston model created")

Initial Heston model created


In [34]:
# Create calibration helpers
helpers = []

# Sample market data: (expiry_days, strike, IV)
market_data = [
    (90, 95, 0.22),
    (90, 100, 0.20),
    (90, 105, 0.21),
    (180, 95, 0.23),
    (180, 100, 0.21),
    (180, 105, 0.22),
    (365, 95, 0.24),
    (365, 100, 0.22),
    (365, 105, 0.23),
]

for days, K, iv in market_data:
    period = ql.Period(days, ql.Days)
    vol_quote = ql.QuoteHandle(ql.SimpleQuote(iv))
    
    helper = ql.HestonModelHelper(
        period,
        ql.TARGET(),
        100.0,  # S0
        K,
        vol_quote,
        rate,
        div,
        ql.BlackCalibrationHelper.ImpliedVolError,
    )
    helper.setPricingEngine(engine)
    helpers.append(helper)

print(f"Created {len(helpers)} calibration helpers")

Created 9 calibration helpers


In [35]:
# Run calibration
lm = ql.LevenbergMarquardt()
end_criteria = ql.EndCriteria(500, 100, 1e-8, 1e-8, 1e-8)

print("Starting direct QuantLib calibration...")
t0 = time.time()
model.calibrate(helpers, lm, end_criteria)
t1 = time.time()

print(f"\nCalibration time: {t1-t0:.2f}s")
print(f"\nCalibrated parameters:")
print(f"  v0 = {model.v0():.6f}")
print(f"  kappa = {model.kappa():.6f}")
print(f"  theta = {model.theta():.6f}")
print(f"  sigma = {model.sigma():.6f}")
print(f"  rho = {model.rho():.6f}")

Starting direct QuantLib calibration...

Calibration time: 0.33s

Calibrated parameters:
  v0 = 96.551885
  kappa = 8852.529433
  theta = 0.093828
  sigma = 8782.627546
  rho = 0.030262


In [36]:
# Check calibration errors
print("\nCalibration errors:")
total_error = 0
for i, ((days, K, mkt_iv), helper) in enumerate(zip(market_data, helpers)):
    err = helper.calibrationError()
    total_error += err**2
    print(f"  {days}d K={K}: error = {err:.6f}")

rmse = np.sqrt(total_error / len(helpers))
print(f"\nRMSE: {rmse:.6f}")


Calibration errors:
  90d K=95: error = 0.002523
  90d K=100: error = 0.000188
  90d K=105: error = 0.001463
  180d K=95: error = -0.006083
  180d K=100: error = 0.001964
  180d K=105: error = -0.005912
  365d K=95: error = -0.003838
  365d K=100: error = 0.010201
  365d K=105: error = -0.000911

RMSE: 0.004761


## Test 7: Identify Bottleneck

Profile where time is being spent in surface generation.

In [37]:
# Profile a single surface generation
from src.data.volsurface_grid import GridSpec

GRID_SPEC = GridSpec(
    days_grid=np.array([30, 60, 90, 120, 150, 180, 252, 365, 547, 730, 1095]),
    delta_grid=np.arange(0.20, 0.81, 0.05).round(2),
    cp_order=("C", "P"),
)

print(f"Grid size: {len(GRID_SPEC.cp_order)} × {len(GRID_SPEC.days_grid)} × {len(GRID_SPEC.delta_grid)}")
print(f"Total points: {len(GRID_SPEC.cp_order) * len(GRID_SPEC.days_grid) * len(GRID_SPEC.delta_grid)}")

Grid size: 2 × 11 × 13
Total points: 286


In [None]:
# Time each step
params = HestonParams(v0=0.04, kappa=2.0, theta=0.04, sigma=0.3, rho=-0.7)
S0 = 100.0
r = 0.05
q = 0.02

# Time delta → strike conversion
delta_times = []
iv_times = []
sigma_atm = np.sqrt(params.v0)

for cp in GRID_SPEC.cp_order:
    for days in GRID_SPEC.days_grid:
        T = days / 365.0
        for delta in GRID_SPEC.delta_grid:
            # Delta to strike
            t0 = time.time()
            K = strike_from_delta(S=S0, T=T, r=r, q=q, sigma=sigma_atm, delta=delta, cp_flag=cp)
            t1 = time.time()
            delta_times.append(t1 - t0)
            
            if K is not None and not np.isnan(K) and K > 0:
                # Heston IV
                t0 = time.time()
                try:
                    iv = heston_iv(S0, K, T, r, q, params, "call" if cp=="C" else "put")
                except:
                    iv = np.nan
                t1 = time.time()
                iv_times.append(t1 - t0)

print(f"Delta→Strike: {len(delta_times)} calls, total={sum(delta_times):.3f}s, mean={np.mean(delta_times)*1000:.2f}ms")
print(f"Heston IV:    {len(iv_times)} calls, total={sum(iv_times):.3f}s, mean={np.mean(iv_times)*1000:.2f}ms")
print(f"\nTotal time: {sum(delta_times) + sum(iv_times):.2f}s per surface")

Delta→Strike: 286 calls, total=0.019s, mean=0.07ms
Heston IV:    0 calls, total=0.000s, mean=nanms

Total time: 0.02s per surface


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


In [39]:
# Check for slow individual IV computations
print("\nSlowest IV computations:")
slow_threshold = 0.1  # 100ms
slow_count = sum(1 for t in iv_times if t > slow_threshold)
print(f"  {slow_count} / {len(iv_times)} took > {slow_threshold*1000:.0f}ms")

if iv_times:
    print(f"  Max: {max(iv_times)*1000:.0f}ms")
    print(f"  95th percentile: {np.percentile(iv_times, 95)*1000:.0f}ms")


Slowest IV computations:
  0 / 0 took > 100ms


## Summary

In [40]:
print("="*60)
print("HESTON CALIBRATION TEST SUMMARY")
print("="*60)
print("\nIf you see this cell, all tests completed without hanging.")
print("\nKey findings:")
print(f"  • Single IV computation: ~{np.mean(iv_times)*1000:.0f}ms avg")
print(f"  • Full surface ({len(iv_times)} IVs): ~{sum(iv_times):.1f}s")
print(f"  • Calibration (15 points): ~{t1-t0:.1f}s")
print("\nIf generating surfaces is slow, consider:")
print("  1. Reducing grid resolution")
print("  2. Parallelizing IV computations")
print("  3. Using vectorized Heston pricing")

HESTON CALIBRATION TEST SUMMARY

If you see this cell, all tests completed without hanging.

Key findings:
  • Single IV computation: ~nanms avg
  • Full surface (0 IVs): ~0.0s
  • Calibration (15 points): ~0.0s

If generating surfaces is slow, consider:
  1. Reducing grid resolution
  2. Parallelizing IV computations
  3. Using vectorized Heston pricing
