In [None]:
import numpy as np

from scipy.optimize import least_squares
from scipy.fft import fft, ifft
from scipy.integrate import quad_vec
from scipy.optimize import root_scalar, newton

## Characteristic Functions and Damping Parameter Calculations

In [None]:
def BS_cf(u, S0, params, T, r):
    sigma = params[0]
    mu = np.log(S0) + (r - 0.5 * sigma**2) * T
    nu = sigma * np.sqrt(T)
    return np.exp(1j * u * mu - 0.5 * (nu * u)**2)

def VG_cf(u, S0, params, T, r):
    sigma, theta, nu = params
    omega = (1.0 / nu) * np.log(1.0 - theta * nu - 0.5 * sigma**2 * nu)
    mu = np.log(S0) + (r + omega) * T
    denom = 1.0 - 1j * theta * nu * u + 0.5 * sigma**2 * nu * u**2
    return np.exp(1j * u * mu) * denom**(-T / nu)

def heston_cf(u, S0, params, tau, r):
    v0, kappa, theta, sigma, rho = params
    a, b = kappa*theta, kappa
    rspi = rho * sigma * 1j * u

    d_raw = np.sqrt((rspi - b)**2 + sigma**2*(1j*u + u*u))
    d = np.where(np.real(d_raw) < 0, -d_raw, d_raw)

    g = (b - rspi - d) / (b - rspi + d)

    exp_neg  = np.exp(-d * tau)
    log_den1 = np.log1p(-g)            # stable for moderate g
    log_den2 = np.log1p(-g * exp_neg)  # stable if |g*exp_neg|<1

    C = 1j*u*r*tau + (a/sigma**2)*((b - rspi - d)*tau - 2*(log_den2 - log_den1))
    D = ((b - rspi - d)/sigma**2) * ((1 - exp_neg)/(1 - g*exp_neg))

    return np.exp(C + D*v0 + 1j*u*np.log(S0))

def compute_psi(v, S0, params, T, r, alpha, cf):
    u = v - 1j * (alpha + 1)
    phi = cf(u, S0, params, T, r)
    return np.exp(-r * T) * phi / (alpha**2 + alpha - v**2 + 1j * (2*alpha + 1) * v)

def compute_gamma(v, S0, params, T, r, alpha, cf):
    def zeta(v_shift):
        phi_shift = cf(v_shift - 1j, S0, params, T, r)
        return np.exp(-r*T) * (
            1/(1 + 1j*v_shift)
            - np.exp(r*T)/(1j*v_shift)
            - phi_shift/(v_shift**2 - 1j*v_shift)
        )

    return 0.5 * (zeta(v - 1j*alpha) - zeta(v + 1j*alpha))

def alpha_optimal_BS(S0, params, r, K_ref, T):
    sigma = params[0]
    d1 = (np.log(S0/K_ref) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    return np.clip(-d1 / (sigma * np.sqrt(T)), 0, 30.0)

def alpha_optimal_vg_core(params, tau, m_tilde):
    sigma, theta, nu = params
    term1 = -theta/sigma**2 - 1.0 + tau/(nu * m_tilde)
    radical = (
        theta**2 / sigma**4
        + 2.0 / (nu * sigma**2)
        + tau**2 / (nu**2 * m_tilde**2)
    )
    term2 = np.sign(m_tilde) * np.sqrt(radical)
    return term1 - term2

def alpha_optimal_VG(S0, params, r, K_ref, T):
    sigma, theta, nu = params
    omega = (1.0 / nu) * np.log(1.0 - theta * nu - 0.5 * sigma**2 * nu)
    tau   = T
    f     = r * tau
    k     = np.log(K_ref / S0)
    m_tilde = f - k + omega * tau
    m_tilde = np.where(np.isclose(m_tilde, 0), 1e-6, m_tilde)

    return np.clip(alpha_optimal_vg_core(params, tau, m_tilde), -30.0, 30.0)

def dlogphi_du(u, cf, S0, params, tau, r, h=1e-6):
    logp = np.log(cf(u + 1j*h, S0, params, tau, r))
    return np.imag(logp) / h

def find_zeta_newton(cf, S0, params, tau, r, zeta1, zeta2,
                     zeta0=None, tol=1e-8, maxiter=10):
    if zeta0 is None:
        zeta = 0.5*(zeta1 + zeta2)
    else:
        zeta = zeta0

    for _ in range(maxiter):
        u = 1j * zeta
        dldu = dlogphi_du(u, cf, S0, params, tau, r)
        psi_p = np.real(1j * dldu)

        if abs(psi_p) < tol:
            break

        hzeta = 1e-6
        u_ph = 1j * (zeta + hzeta)
        dldu_ph = dlogphi_du(u_ph, cf, S0, params, tau, r)
        psi_pp = np.real(1j * dldu_ph)
        zeta -= psi_p / ((psi_pp - psi_p) / hzeta)

        zeta = min(max(zeta, zeta1), zeta2)

    return zeta

def alpha_optimal_heston(S0, params, tau, r):
    v0, kappa, theta, sigma, rho = params

    A = sigma**2*(rho**2 - 1)
    B = -2*kappa*sigma*rho + sigma**2
    C = kappa**2
    roots = np.roots([A, B, C])
    real_roots = roots[np.isreal(roots)].real
    zeta1, zeta2 = np.sort(real_roots)
    zeta2 = min(zeta2, -1.0)
    zeta_star = find_zeta_newton(cf, S0, params, tau, r, zeta1, zeta2)

    # back to α
    return float(-(zeta_star + 1))

def alpha_optimal(S0, params, r, K_ref, t, cf):
    if cf == BS_cf:
        return alpha_optimal_BS(S0, params, r, K_ref, t)
    elif cf == VG_cf:
        return alpha_optimal_VG(S0, params, r, K_ref, t)
    elif cf == heston_cf:
        return alpha_optimal_heston(S0, params, r, t)

## Integration by Standard Quadrature

In [None]:
def integrand(u, S0, K, k, params, tau, r, cf):
    args = (S0, params, tau, r)
    numerator = np.exp(r*tau)*cf(u-1j,*args) - K*cf(u,*args)
    denominator = 1j*u*K**(1j*u)
    return numerator/denominator

def call_price(S0, K, params, tau, r, cf, v_max=200):
    k   = np.log(K / S0)
    args = (S0, K, k, params, tau, r, cf)
    integral, err = quad_vec(integrand, 1e-4, v_max, args=args)
    return (S0 - K*np.exp(-r*tau))/2 + np.real(integral)/np.pi

def price_num(params, S0, K, T, r, v_max, cf):
    prices = np.full_like(K, np.nan)
    for t in np.unique(T):
        mask = np.isclose(T, t, atol=1e-4)
        K_t  = K[mask]
        r_t  = float(r[mask][0])
        C_t = call_price(S0, K_t, params, t, r_t, cf, v_max=v_max)
        prices[mask] = C_t

    return prices

def resid_num(params, S0, P, K, T, r, v_max, cf):
    errs = []
    for t in np.unique(T):
        mask = np.isclose(T, t, atol=1e-4)
        K_t  = K[mask]
        P_t  = P[mask]
        r_t  = float(r[mask][0])
        C_t = call_price(S0, K_t, params, t, r_t, cf, v_max=v_max)
        errs.append((P_t - C_t))

    return np.concatenate(errs)

## FFT Functions

In [None]:
def logStrikePartition(eta = 0.25, N = 4096):
    
    lamb = 2*np.pi/(eta*N)
    b = ((N - 1) * lamb) / 2
    k = -b + lamb*np.arange(0, N)
    return (b, lamb, k)

def price_itm_fft(S0, Kmax, T, r, params, alpha, N, cf, eta = 0.25):
    
    v = np.arange(0, N*eta, eta)

    kPart = logStrikePartition(eta, N)
    b, k = kPart[0], kPart[2]

    pm_one = np.empty((N,))
    pm_one[::2] = -1
    pm_one[1::2] = 1
    w = 3 + pm_one
    w[0] -= 1
    w[-1] -= 1
    w = (eta/3) * w
    
    x = np.exp(1j * v * b) * compute_psi(v, S0, params, T, r, alpha, cf) * w
    callPrices = (np.exp(-alpha*k) / np.pi) * fft(x).real
    
    strikeGrid = np.exp(k)
    kIndices = np.logical_and(np.exp(k)>1e-8, np.exp(k)<Kmax)
    
    return strikeGrid[kIndices], callPrices[kIndices]

def price_otm_fft(S0, Kmax, T, r, params, alpha, N, cf, eta = 0.25):

    v = np.arange(0, N*eta, eta)

    kPart = logStrikePartition(eta, N)
    b, k = kPart[0], kPart[2]
    
    pm_one = np.empty((N,))
    pm_one[::2] = -1
    pm_one[1::2] = 1
    w = 3 + pm_one
    w[0] -= 1
    w[-1] -= 1
    w = (eta/3) * w
    
    x = np.exp(1j * v * b) * compute_gamma(v, S0, params, T, r, alpha, cf) * w
    callPrices = fft(x).real / (np.sinh(alpha * k) *np.pi)
    
    strikeGrid = np.exp(k)
    kIndices = np.logical_and(np.exp(k)>1e-8, np.exp(k)<Kmax)
    
    return strikeGrid[kIndices], callPrices[kIndices]


def price_fft(params, S0, K, Kmax, T, r, N, cf, dynamic_alpha=False, OTM=False):
    prices = np.full_like(K, np.nan)
    for t in np.unique(T):
        mask        = np.isclose(T, t, atol=1e-4)
        K_t         = K[mask]
        r_t         = float(r[mask][0])
        if dynamic_alpha == True:
            alpha = alpha_optimal(S0, params, r_t, S0 * 1.1, t, cf)
        else:
            alpha = 1.5
    
        strikeGrid, callPrices = price_itm_fft(S0, Kmax, t, r_t, params, alpha, N, cf)
        if OTM == True:
            strikeGridOTM, callPricesOTM = price_otm_fft(S0, Kmax, t, r_t, params, alpha, N, cf)
            C_t = np.where(
                K_t <= S0,
                np.interp(K_t, strikeGrid, callPrices),
                np.interp(K_t, strikeGridOTM, callPricesOTM)
            )
        else:
            C_t =  np.interp(K_t, strikeGrid, callPrices)
            
        prices[mask] = C_t
        
    return prices

def resid_fft(params, S0, P, K, Kmax, T, r, N, cf, dynamic_alpha=False, OTM=False):
    errs = []
    for t in np.unique(T):
        mask = np.isclose(T, t, atol=1e-4)
        K_t = K[mask]
        P_t = P[mask]
        r_t = float(r[mask][0])
        if dynamic_alpha == True:
            alpha = alpha_optimal(S0, params, r_t, S0 * 1.1, t, cf)
        else:
            alpha = 1.5
        
        strikeGrid, callPrices = price_itm_fft(S0, Kmax, t, r_t, params, alpha, N, cf)
        if OTM == True:
            strikeGridOTM, callPricesOTM = price_otm_fft(S0, Kmax, t, r_t, params, alpha, N, cf)
            C_t = np.where(
                K_t <= S0,
                np.interp(K_t, strikeGrid, callPrices),
                np.interp(K_t, strikeGridOTM, callPricesOTM)
            )
        else:
            C_t =  np.interp(K_t, strikeGrid, callPrices)
            
        errs.append((P_t - C_t))

    return np.concatenate(errs)

## FRFT Functions

In [None]:
def make_psi_norm(S0, params, T, r, alpha, cf):
    psi0 = abs(compute_psi(0, S0, params, T, r, alpha, cf))
    def psi_norm(v):
        return abs(compute_psi(v, S0, params, T, r, alpha, cf)) / psi0
    return psi_norm

def find_u_bar(psi_norm, m, cf, max_u=3000):
    v_lo, v_hi = 0.0, 1.0
    while v_hi < max_u and psi_norm(v_hi) > 10.0**(-m):
        v_hi *= 2.0
    if v_hi >= max_u:
        return max_u

    sol = root_scalar(
        lambda v: psi_norm(v) - 10.0**(-m),
        bracket=[v_lo, v_hi],
        method='brentq',
        xtol=1e-6
    )
    return int(np.ceil(sol.root))

def logStrikePartition_FRFT(S0, Kmin, Kmax, T, r, params, alpha, N, m, cf):
    logKmin, logKmax = np.log(Kmin), np.log(Kmax)
    lamb = (logKmax - logKmin) / (N - 1)
    psi_norm = make_psi_norm(S0, params, T, r, alpha, cf)
    eta = find_u_bar(psi_norm, m, cf) / N
    a    = eta * lamb
    b    = logKmin 
    k    = b + lamb * np.arange(N)
    return (a, lamb, b, k, eta)

def frft_bailey(h, alpha):
    N = len(h)
    j = np.arange(N)

    pre   = h * np.exp(-1j * np.pi * alpha * j*j)
    H     = np.fft.fft(np.concatenate([pre, np.zeros(N)]))
    chirp = np.exp(1j * np.pi * alpha * np.arange(2*N)**2)
    C     = np.fft.ifft(H * np.fft.fft(chirp))
    post  = C[:N] * np.exp(-1j * np.pi * alpha * j*j)
    return post

def price_itm_frft(S0, Kmin, Kmax, T, r, params, alpha, N, m, cf):

    kPart = logStrikePartition_FRFT(S0, Kmin, Kmax, T, r, params, alpha, N, m, cf)
    a, b, k, eta  = kPart[0], kPart[2], kPart[3], kPart[4]
    v = np.arange(0, N*eta, eta)
    
    pm_one = np.empty((N,))
    pm_one[::2] = -1
    pm_one[1::2] = 1
    w = 3 + pm_one
    w[0] -= 1 
    w[-1] -= 1
    w = (eta/3) * w

    x = np.exp(-1j * v * b) * compute_psi(v, S0, params, T, r, alpha, cf) * w
    callPrices = (np.exp(-alpha*k) / np.pi) * frft_bailey(x, a / (2*np.pi)).real

    strikeGrid = np.exp(k)
    
    return strikeGrid, callPrices

def price_otm_frft(S0, Kmin, Kmax, T, r, params, alpha, N, cf, eps):

    kPart = logStrikePartition_FRFT(T, params, N, eps, Kmin, Kmax)
    a, b, k, eta  = kPart[0], kPart[2], kPart[3], kPart[4]
    v = np.arange(0, N*eta, eta)
    
    pm_one = np.empty((N,))
    pm_one[::2] = -1
    pm_one[1::2] = 1
    w = 3 + pm_one
    w[0] -= 1
    w[-1] -= 1
    w = (eta/3) * w
    
    x = np.exp(-1j * v * b) * compute_gamma(v, S0, params, T, r, alpha, cf) * w
    callPrices = frft_bailey(x, a / (2*np.pi)).real / (np.sinh(alpha * k) *np.pi)
    
    strikeGrid = np.exp(k)
    kIndices = np.logical_and(np.exp(k)>1e-8, np.exp(k)<Kmax)
    
    return strikeGrid[kIndices], callPrices[kIndices]

def price_frft(params, S0, K, Kmin, Kmax, T, r, N, m, cf, dynamic_alpha=False, OTM=False):
    prices = np.full_like(K, np.nan)
    for t in np.unique(T):
        mask        = np.isclose(T, t, atol=1e-4)
        K_t         = K[mask]
        r_t         = float(r[mask][0])
        if dynamic_alpha == True:
            alpha = alpha_optimal(S0, params, r_t, S0 * 1.1, t, cf)
        else:
            alpha = 1.5
    
        strikeGrid, callPrices = price_itm_frft(S0, Kmin, Kmax, t, r_t, params, alpha, N, m, cf)
        if OTM == True:
            strikeGrid, callPrices = price_otm_frft(S0, Kmin, Kmax, t, r_t, params, alpha, N, m, cf)
            C_t = np.where(
                K_t <= S0,
                np.interp(K_t, strikeGrid, callPrices),
                np.interp(K_t, strikeGridOTM, callPricesOTM)
            )
        else:
            C_t =  np.interp(K_t, strikeGrid, callPrices)

        prices[mask] = C_t
    return prices

def resid_frft(params, S0, P, K, Kmin, Kmax, T, r, N, m, cf, dynamic_alpha=False, OTM=False):
    errs = []
    for t in np.unique(T):
        mask = np.isclose(T, t, atol=1e-4)
        K_t = K[mask]
        P_t = P[mask]
        r_t = float(r[mask][0])
        if dynamic_alpha == True:
            alpha = alpha_optimal(S0, params, r_t, S0 * 1.1, t, cf)
        else:
            alpha = 1.5

        strikeGrid, callPrices = price_itm_frft(S0, Kmin, Kmax, t, r_t, params, alpha, N, m, cf)
        if OTM == True:
            strikeGrid, callPrices = price_otm_frft(S0, Kmin, Kmax, t, r_t, params, alpha, N, m, cf)
            C_t = np.where(
                K_t <= S0,
                np.interp(K_t, strikeGrid, callPrices),
                np.interp(K_t, strikeGridOTM, callPricesOTM)
            )
        else:
            C_t =  np.interp(K_t, strikeGrid, callPrices)

        errs.append((P_t - C_t))

    return np.concatenate(errs)

## Minimizers

In [None]:
N = 8192
v_max = 200
m = 8

# Synthetic data

S0 = 100
Kmin, Kmax = 50, 150
K = np.linspace(Kmin,Kmax,100)
tau = np.linspace(0.05,1,19)
tau_grid, K_grid = np.meshgrid(tau, K, indexing='ij')
tau_long = tau_grid.ravel()
K_long   = K_grid.ravel()
r = (tau_long + 0.01) / 50

# Assume some calculated parameters for a given surface: These are calculated by the minimiser when you have proper pricing data to use.

params_bs = [0.1468495]
params_vg = [0.14723719, -0.15580366,  0.69096385]
params_heston = [0.01508475,  9.29958104,  0.04642471,  2.19165982, -0.74143845]

In [None]:
#BS
# Call with BS_cf

lower = [1e-2]
upper = [0.5]
x0 = [0.1]

In [None]:
#VG
# Call with VG_cf

lower = [1e-6, -2.0, 1e-4]
upper = [0.5, 2.0, 1.]
x0 = [0.3, 0.1, 0.5]

In [None]:
#Heston
# Call with heston_cf

lower = [1e-3, 1e-3, 1e-3, 1e-2, -1.]
upper = [0.2, 15., 0.1, 2.5, 0]
x0 = [0.1, 3, 0.05, 0.3, -0.8]

In [None]:
result_fft = least_squares(
    resid_fft,
    x0,
    method='trf',
    args=(S0, P, K, Kmax, tau, r, N, VG_cf),
    x_scale='jac',
    loss='soft_l1',
    ftol=1e-12,
    xtol=1e-12,
    gtol=1e-12,
    bounds=(lower, upper),
)
params_fft = result_fft.x
print(result_fft.x)

In [None]:
result_frft = least_squares(
    resid_frft,
    x0,
    method='trf',
    args=(S0, P, K, Kmin, Kmax, tau, r, N, m, heston_cf),
    x_scale='jac',
    loss='soft_l1',
    ftol=1e-12,
    xtol=1e-12,
    gtol=1e-12,
    bounds=(lower, upper),
)
params_frft = result_frft.x
print(result_frft.x)

In [None]:
result_num = least_squares(
    resid_num,
    x0,
    method='trf',
    args=(S0, P, K, tau, r, v_max, heston_cf),
    x_scale='jac',
    loss='soft_l1',
    ftol=1e-12,
    xtol=1e-12,
    gtol=1e-12,
    bounds=(lower, upper),
)
params_num = result_num.x
print(params_num)

In [None]:
price_num(params_vg, S0, K_long, tau_long, r, v_max, VG_cf)

In [None]:
price_fft(params_vg, S0, K_long, Kmax, tau_long, r, N, VG_cf, dynamic_alpha=False, OTM=False)

In [None]:
price_frft(params_vg, S0, K_long, Kmin, Kmax, tau_long, r, N, m, VG_cf, dynamic_alpha = False)