# Options Analysis

$$
\textrm{d}L_t = \Big(\kappa\left(\frac{\theta}{L_t}-L_t\right)-\frac{\gamma^2}{4}\Big)\textrm{dt} + \gamma\textrm{d}B_t
$$

> Calibrating an option pricing model consists in iteratively adjusting the model parameters so that the differences between the prices of liquidly-traded options and the corresponding model prices are minimized.

[Zhang, Amici](https://arxiv.org/html/2407.15536v1)

In [1]:
import numpy as np
import torch.nn as nn
import torch
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from scipy.integrate import quad
from scipy.interpolate import interp1d
from sklearn.model_selection import train_test_split

In [2]:
rng = np.random.default_rng()

## Heston Model

In [3]:
def fft_integral_approx(f, K, u_max=100, N=2**9, k0=-5, params=None):
    du = u_max / N
    u = np.arange(0,N) * du
    u[0] = 1e-10

    # create frequency grid 
    dk = (2 * np.pi) / (N * du) # angular frequency step
    k_grid = k0 + (np.arange(N) * dk)

    # FFT approximates the Fourier integral at frequencies k_grid (real part only)
    int_approx = np.real(np.fft.fft(f(u,params)*np.exp(-1j*k0*u)*du))
    
    interp = interp1d(k_grid, int_approx, kind='linear', fill_value='extrapolate')
    return float(interp(K))

In [4]:
def d(u, theta):
    (kappa, _lambda, sigma, rho, v0, r, Tau, S0, K ) = theta
    return np.sqrt(np.power((kappa - 1j * rho * sigma *u),2)+np.power(sigma,2)*(1j*u + np.power(u,2)))
    
def g(u, theta):
    (kappa, _lambda, sigma, rho, v0, r, Tau, S0, K ) = theta
    d_u = d(u, theta)
    return (kappa - (1j*rho * sigma *u) - d_u) / (kappa - (1j*rho * sigma *u) + d_u)

def D(u, theta):
    (kappa, _lambda, sigma, rho, v0, r, Tau, S0, K ) = theta
    d_u = d(u, theta)
    g_u = g(u, theta)
    return (kappa - (1j*rho * sigma *u) - d_u / np.power(sigma,2)) * ((1-np.exp(-d_u*Tau))/(1-g_u*np.exp(-d_u*Tau)))

def C(u, theta):
    (kappa, _lambda, sigma, rho, v0, r, Tau, S0, K ) = theta
    d_u = d(u, theta)
    g_u = g(u, theta)
    term2 = ((kappa - 1j*rho * sigma *u) - d_u)*Tau - 2*np.log((1-g_u*np.exp(-d_u*Tau)) / (1-g_u))
    return (1j*r*u*Tau) + ((kappa * _lambda)/np.power(sigma,2)) * term2

In [5]:
def partial_D(u : np.array, theta : np.array, theta_H : str):
    (kappa, _lambda, sigma, rho, v0, r, tau, S0, K ) = theta
    
    # Evaluate at given u 
    d_u = d(u, theta)
    g_u = g(u, theta)
    a = np.exp(-tau * d_u)

    # Common subexpressions
    kappa_minus = kappa - 1j * rho * sigma * u        # κ - iρσu
    kappa_minus_d = kappa_minus - d_u                 # κ - iρσu - d
    one_minus_ag = 1 - a * g_u
    one_plus_ag = 1 + a * g_u
    iu = 1j * u
    iu_plus_u2 = iu + u ** 2

    if theta_H == "kappa":
        # ∂_kappa D_tau
        term2 = tau * a * kappa_minus * (1 - g_u) - (1 - a) * one_plus_ag
        num = kappa_minus_d * term2
        denom = sigma ** 2 * d_u * one_minus_ag ** 2
        return num / denom

    elif theta_H == "rho":
        # ∂_rho D_tau
        term1 = iu * kappa_minus_d
        term2 = one_plus_ag * (1 - a) - tau * a * (1 - g_u) * kappa_minus
        num = term1 * term2
        denom = sigma * d_u * one_minus_ag ** 2
        return num / denom

    elif theta_H == "sigma":
        # ∂_sigma D_tau
        bracket1 = (1 - a) * (kappa * a * g_u + kappa - d_u + d_u * a * g_u)
        bracket2 = tau * sigma * a * (1 - g_u) * \
                   (1j * rho * u * kappa_minus - sigma * iu_plus_u2)
        bracket = bracket1 - bracket2
        num_factor = -iu_plus_u2
        denom = sigma * d_u * one_minus_ag ** 2 * (kappa_minus + d_u)
        return num_factor * bracket / denom

    elif theta_H == "lambda":
        # ∂_lambda D_tau
        return 0
        
    else:
        raise ValueError("theta_H must be 'kappa', 'rho', or 'sigma', or 'lambda'")

In [6]:
def partial_C(u: np.array, theta : np.array, theta_H : str):
    (kappa, _lambda, sigma, rho, v0, r, tau, S0, K ) = theta

    # Evaluate at given u 
    d_u = d(u)
    g_u = g(u)
    a = np.exp(-tau * d_u)

    # Common subexpressions
    kappa_minus = kappa - 1j * rho * sigma * u          # κ - iρσu
    kappa_minus_d = kappa_minus - d_u                   # κ - iρσu - d
    kappa_plus_d = kappa_minus + d_u                    # κ - iρσu + d
    one_minus_ag = 1 - a * g_u
    one_plus_ag = 1 + a * g_u
    iu = 1j * u
    iu_plus_u2 = iu + u ** 2

    N = one_minus_ag / (1-g_u)
    M = kappa_minus_d * tau - 2*np.log(N)
    
    if theta_H == "kappa":
        term2_num = kappa * kappa_minus_d * (
            2 * (1 - a) - tau * d_u * one_minus_ag - tau * a * (1 - g_u) * kappa_minus
        )
        term2_denom = d_u ** 2 * one_minus_ag
        return (lam / sigma ** 2) * (M + term2_num / term2_denom)

    elif theta_H == "rho":
        term_num = 1j * kappa * lam * u * kappa_minus_d * (
            tau * (d_u + a * kappa_minus) - 2 * (1 - a) - tau * a * g_u * kappa_plus_d
        )
        term_denom = sigma * d_u ** 2 * one_minus_ag
        return term_num / term_denom

    elif theta_H == "sigma":
        # Assemble the five parts inside the brackets
        part1 = -rho * sigma ** 2 * tau * u
        part2_num = 4 * kappa * sigma ** 3 * (1 - a) * iu_plus_u2
        part2_denom = d_u * one_minus_ag * (1 - g_u) * kappa_plus_d ** 2
        part2 = part2_num / part2_denom
        part3 = -4 * np.log(N)
        part4_num = sigma ** 2 * tau * one_plus_ag * (
            1j * rho * u * kappa_minus - sigma * iu_plus_u2
        )
        part4_denom = d_u * one_minus_ag
        part4 = part4_num / part4_denom
        part5 = -2 * sigma * tau * kappa_minus_d

        bracket = part1 + part2 + part3 + part4 + part5
        return (kappa * lam / sigma ** 4) * bracket

    elif theta_H == "lambda":
        return (kappa/(sigma**2))*M
        
    elif theta_H == "v0":
        return 0

    else:
        raise ValueError("theta_H must be 'kappa', 'rho', or 'sigma'")

In [7]:
def phi(u : np.array, theta : np.ndarray):
    assert theta.shape == (9,)
    (kappa, _lambda, sigma, rho, v0, r, Tau, S0, K ) = theta
    
    return np.exp(C(u, theta) + D(u, theta)*v0 + 1j*u*np.log(S0))

<img src="./images/heston_prop1.png" alt="heston slv param nn topology" width="700">

In [8]:
def partial_phi(u : np.array, theta : np.array, theta_H : str):
    (kappa, _lambda, sigma, rho, v0, r, tau, S0, K ) = theta

    if(theta_H in ["kappa", "lambda", "sigma", "rho"]):
        return phi(u, theta) * (partial_C(u, theta, theta_H) + v0 * partial_D(u, theta, theta_H))
    elif theta_H == "v0":
        return phi(u, theta) * partial_D(u, theta)

In [9]:
def G(theta : np.ndarray, u_max=100, N=2**9, k0=-5, alpha=1.5): 
    assert theta.shape == (9,)
    (kappa, _lambda, sigma, rho, v0, r, Tau, S0, K ) = theta
    k = np.log(K)
    
    def f(u, theta):
        return phi(u-1j, theta)/(1j*u)
    
    def h(u, theta):
        return phi(u, theta)/(1j*u)

    Pi1 = 0.5 + (1/np.pi)*fft_integral_approx(f, np.log(K), params=theta)
    Pi2 = 0.5 + (1/np.pi)*fft_integral_approx(f, np.log(K), params=theta)
    
    return Pi1 * S0 - K*np.exp(-r*Tau)*Pi2

In [38]:
def grad_G(theta : np.ndarray[9]) -> np.ndarray:
    (kappa, _lambda, sigma, rho, v0, r, Tau, S0, K ) = theta
    theta_Hs = ["kappa", "lambda", "sigma", "rho", "v0"]
    
    grad = np.zeros(5)
    for theta_H in theta_Hs:
        k = np.log(K)
        def h(u):
            return partial_phi(u-1j,theta, theta_H) / (1j*u) 
            
        def g(u):
            return partial_phi(u,theta, theta_H) / (1j*u) 
        
        term1 = (S0/np.pi) * fft_integral_approx(h, np.log(K), params=theta)
        term2 = ((K*np.exp(-r*tau))/np.pi) * fft_integral_approx(g, np.log(K), params=theta)
        grad[i] = term1 - term2
        
    return grad

def grad_G_mat(thetas : np.ndarray) -> np.ndarray:
    assert thetas.shape[1] == 9
    
    ret = np.zeros((thetas.shape[0],5))
    for i in range(thetas.shape[0]):
        ret[i,:] = grad_G(thetas[i,:])
        
    return ret

## Data

In [11]:
# Generate Options Data 
def generate_thetas(N : int):
    def runif(a,b):
        return (b - a) * rng.random() + a
    ret = np.ndarray((N,9))
    for i in range(N):
        theta = np.array([
            runif(0.005,5), # kappa
            runif(0,1), # lambda
            runif(0.1,1), # sigma
            runif(-0.95,0), # rho
            runif(0,1), # v0
            runif(0,0.10), # r
            runif(0.05,1), # Tau
            runif(10,6000), # S0
            runif(-5,5) # ln(K/S0)
        ])
        theta[-1] = np.exp(theta[-1]) * theta[-2] # recover strike K
        ret[i,:] = theta
    return ret

X = generate_thetas(1000)

p = np.ndarray(X.shape)
for i in range(X.shape[0]):
    p[i,:] = G(X[i,:])

X_train, X_test, p_train, p_test = train_test_split(X, p, test_size = 0.3, shuffle=True)

## Neural Network

<img src="./images/heston_nn.png" alt="heston slv param nn topology" width="500">

>To tackle this issue, in the spirit of Huge and Savine (2020) we propose a deep differential
network (DDN) for the calibration of the Heston model. Our DDN adds a differentiation
layer to the typical structure of a deep neural network. This layer is given by the first-order
partial derivatives of the network output with respect to some of the input parameters,
namely the parameters of the stochastic variance process. 

In [29]:
# Activation function is ReLU
class HestonNet(nn.Module):
    def __init__(self, alpha, input_size, num_hidden_layers, nodes_per_layer):
        assert(num_hidden_layers >=1)
        super().__init__() # initialize parent class
        self.lr = alpha
        self.layers = nn.ModuleList()
        self.layers.append(nn.Linear(input_size, nodes_per_layer))
        
        for _ in range(1, num_hidden_layers-1):
            self.layers.append(nn.Linear(nodes_per_layer, nodes_per_layer))

        self.output = nn.Linear(nodes_per_layer, 1)

    def forward(self, x):
        for layer in self.layers:
            x = torch.relu(layer(x))
        return self.output(x)

In [26]:
def calc_loss(p_hat : torch.FloatTensor,
         p : torch.FloatTensor, 
         dp_hat : torch.FloatTensor, 
         dp : torch.FloatTensor, 
         params, 
         lambda_reg = 0.01):
    assert dp_hat.shape == dp.shape and p_hat.shape == p.shape
    
    reg_loss = 0
    for param in params:
        reg_loss += torch.sum(param ** 2)
        
    return torch.mean((p-p_hat)**2) + torch.mean((dp-dp_hat)**2) + (lambda_reg * reg_loss)

## Train & Eval

In [14]:
# Hyperparameters
alpha = 0.001
input_size = 9
num_hidden_layers = 3
nodes_per_layer = 64
batch_size = 32
epochs = 100

In [31]:
# Prepare DataLoader
train_dataset = TensorDataset(torch.from_numpy(X_train).float(),
                              torch.from_numpy(p_train).float())
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

In [39]:
# Create model, optimizer, and loss
model = HestonNet(alpha, input_size, num_hidden_layers, nodes_per_layer)
optimizer = optim.Adam(model.parameters(), lr=alpha)

# Training loop
model.train()
for epoch in range(epochs):
    total_loss = 0.0
    for theta_batch, price_batch in train_loader:
        optimizer.zero_grad()
        theta_batch.requires_grad_(True)
        
        # Forward pass
        pred = model(theta_batch)
        grad_p_hat = torch.autograd.grad(pred, theta_batch,
                               grad_outputs=torch.ones_like(pred),
                               create_graph=True)[0]
        
        grad_p_batch = grad_G_mat(theta_batch.detach().numpy())
        loss = calc_loss(pred, theta_batch, grad_p_hat[:,0:5], torch.tensor(grad_p_batch), model.parameters())
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item() * theta_batch.size(0)
    
    avg_loss = total_loss / len(train_loader.dataset)
    print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.6f}")

TypeError: grad_G.<locals>.h() takes 1 positional argument but 2 were given