In [15]:
import numpy as np
import pandas as pd
from scipy.stats import qmc # Latin Hypercube !
from scipy.interpolate import interp1d 
from tqdm import tqdm
import time

import sys, os, pathlib

NOTEBOOK_DIR = pathlib.Path(os.getcwd())  
ROOT = NOTEBOOK_DIR.parents[0]  
sys.path.insert(0, str(ROOT))
from src.Heston_FFT import heston_price_fft


n_samples = 200_000

S0 = 105.0
K  = 105.0
T  = 1.0
r  = 0.05
q  = 0.0

alpha = 1.5
n = 4096
eta = 0.1
lam = 2.0*np.pi / (n * eta)  


def sample_heston_lhs(n_samples: int, seed: int = 42) -> pd.DataFrame:
    """
    Genera n_samples parametri Heston verosimili con Latin Hypercube Sampling.
    Vincoli:
      - v0, theta in [0.01, 0.09]  ~ vol 10%-30%
      - kappa in [0.5, 5.0]        ~ mean reversion ragionevole
      - sigma_v in [0.1, 1.0]      ~ vol-of-vol realistica
      - rho in [-0.95, -0.1]       ~ correlazione negativa tipica
      - Feller: 2*kappa*theta >= sigma_v^2
    """
    # BOUNDS (l_bounds, u_bounds) nell’ordine: v0, kappa, theta, sigma_v, rho
    l_bounds = np.array([0.01, 0.5, 0.01, 0.10, -0.95], dtype=float)
    u_bounds = np.array([0.09, 5.0, 0.09, 1.00, -0.10], dtype=float)

    collected = []
    attempt = 0
    rng = np.random.default_rng(seed)

    # Ripeti in batch finché non raccogli abbastanza campioni validi
    while len(collected) < n_samples and attempt < 20:
        # Genera più punti del necessario per compensare il filtraggio
        batch_size = int(np.ceil((n_samples - len(collected)) * 2.0))
        sampler = qmc.LatinHypercube(d=5, seed=seed + attempt)
        U = sampler.random(batch_size)
        X = qmc.scale(U, l_bounds, u_bounds)  # [batch_size, 5]

        v0, kappa, theta, sigma_v, rho = (
            X[:, 0], X[:, 1], X[:, 2], X[:, 3], X[:, 4]
        )

        # Vincolo di Feller
        feller_ok = (2.0 * kappa * theta) >= (sigma_v ** 2)

        # (eventuale) altra logica di pulizia: niente rho ai limiti estremi
        # già garantito dai bounds scelti

        good = feller_ok
        X_ok = X[good]

        for row in X_ok:
            collected.append(row)
            if len(collected) >= n_samples:
                break

        attempt += 1

    if len(collected) < n_samples:
        raise RuntimeError("Non sono riuscito a generare abbastanza campioni che rispettino i vincoli. "
                           "Allarga leggermente i bounds o aumenta i tentativi.")

    params = np.vstack(collected)[:n_samples]
    df = pd.DataFrame(params, columns=["v0", "kappa", "theta", "sigma_v", "rho"])

    # Aggiungi colonne fisse di mercato
    df.insert(0, "S0", S0)
    df.insert(1, "K", K)
    df.insert(2, "T", T)
    df.insert(3, "r", r)
    df.insert(4, "q", q)

    return df

df_first = sample_heston_lhs(n_samples, seed=123)

In [16]:
def price_heston_fft_df(
    df: pd.DataFrame,
    option_type: str = "call",
    alpha: float = 1.5,
    N: int = 4096,
    eta: float = 0.05,
) -> pd.DataFrame:
    """
    Aggiunge la colonna 'price' con il prezzo Heston per ciascuna riga di df,
    utilizzando la funzione heston_price_fft già importata.

    Richiede che df contenga le colonne:
    ['S0','K','T','r','q','v0','kappa','theta','sigma_v','rho'].
    """
    required = ["S0","K","T","r","q","v0","kappa","theta","sigma_v","rho"]
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"Mancano colonne richieste nel DataFrame: {missing}")

    prices = []
    for _, row in df.iterrows():
        try:
            p = heston_price_fft(
                S=float(row["S0"]),
                K=float(row["K"]),
                T=float(row["T"]),
                r=float(row["r"]),
                v0=float(row["v0"]),
                kappa=float(row["kappa"]),
                theta=float(row["theta"]),
                sigma_v=float(row["sigma_v"]),
                rho=float(row["rho"]),
                option_type=option_type,
                q=float(row["q"]),
                alpha=alpha,
                N=N,
                eta=eta,
            )
        except Exception:
            # Se qualcosa va storto per una riga (es. instabilità numerica), metti NaN
            p = np.nan
        prices.append(p)

    out = df.copy()
    out["price"] = prices
    return out


df = price_heston_fft_df(df_first)
df.head()

Unnamed: 0,S0,K,T,r,q,v0,kappa,theta,sigma_v,rho,price
0,105.0,105.0,1.0,0.05,0.0,0.076149,3.159715,0.07258,0.301267,-0.945021,13.732014
1,105.0,105.0,1.0,0.05,0.0,0.062508,4.261901,0.085686,0.307886,-0.117184,14.229387
2,105.0,105.0,1.0,0.05,0.0,0.078251,4.291613,0.060049,0.602015,-0.572078,12.888789
3,105.0,105.0,1.0,0.05,0.0,0.074242,1.153721,0.024973,0.151633,-0.909868,12.28598
4,105.0,105.0,1.0,0.05,0.0,0.047958,3.064326,0.054461,0.327791,-0.600293,12.059545


K è sempre uguale a S0! Cambiarlo?

In [None]:
# check
price_fft_example = heston_price_fft(S=105, K=105, T=1.0, r=0.05, v0=0.075529, kappa=3.799022, theta=0.010152, sigma_v=0.123638, rho=-0.352386,
                             option_type="call", q=0.0, alpha=alpha, N=n, eta=eta)

price_fft_example

9.590501113127766

In [None]:
#df.to_csv("heston_lhs_250k_fft_prices.csv", index=False)