# Calibration of a Stochastic Model using the Sinkhorn divergence

## OT based Calibrator

In [None]:
!pip install geomloss



In [None]:
from geomloss import SamplesLoss  # Ensure you have installed geomloss: pip install geomloss
import torch
import torch.optim as optim

In [None]:
class ModelCalibrator:
    """
    Calibrates a stochastic model using the Sinkhorn divergence.
    """
    def __init__(self, model, observed_data, S0=None, T=1.0, loss_type="sinkhorn", p=2, blur=0.05, optimizer_cls=optim.Adam, lr=0.01):
        self.model = model
        self.observed_data = torch.tensor(observed_data, dtype=torch.float32, device=self.model.device)
        self.S0 = S0
        self.T = T
        self.loss_fn = SamplesLoss(loss_type, p=p, blur=blur)
        self.lr = lr
        self.optimizer_cls = optimizer_cls
        self._setup_optimizer()

    def _setup_optimizer(self):
        # Collect parameters that require gradients
        params = []
        for param in self.model.params.values():
            if param.requires_grad:
                params.append(param)
        self.optimizer = self.optimizer_cls(params, lr=self.lr)

    def calibrate(self, num_epochs=1000, batch_size=None, steps=100, verbose=True):
        if batch_size is None:
            batch_size = len(self.observed_data)
        observed_tensor = self.observed_data

        for epoch in range(num_epochs):
            self.optimizer.zero_grad()
            simulated_data = self.model.simulate(S0=self.S0, T=self.T, N=batch_size, steps=steps)
            if simulated_data.dim() > 1:
                simulated_flat = simulated_data.view(batch_size, -1)
                observed_flat = observed_tensor.view(batch_size, -1)
            else:
                simulated_flat = simulated_data.view(-1, 1)
                observed_flat = observed_tensor.view(-1, 1)

            loss = self.loss_fn(simulated_flat, observed_flat)
            loss.backward()
            self.optimizer.step()

            # Apply parameter constraints
            self.model._apply_constraints()

            if verbose and epoch % (num_epochs // 10) == 0:
                params_str = ', '.join([f'{name}: {param.item() if param.dim()==0 else param.detach().cpu().numpy()}' for name, param in self.model.params.items()])
                print(f'Epoch {epoch}, Loss: {loss.item():.6f}, {params_str}')

    def get_calibrated_params(self):
        result = {}
        for name, param in self.model.params.items():
            if param.dim() == 0:
                result[name] = param.item()
            else:
                result[name] = param.detach().cpu().numpy()
        return result

## Stochastic Model

In [None]:
class StochasticModel:
    """
    Base class for stochastic models.
    """

    def __init__(self, params):
        self.params = params  # Dictionary of model parameters

        # Determine the device (CPU or GPU) and move all parameters to it
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        for param in self.params.values():
            param.data = param.data.to(self.device)

    def simulate(self, S0, T, N, **kwargs):
        raise NotImplementedError("simulate method must be implemented by subclasses.")

    def _apply_constraints(self):
        pass

## Heston

In [None]:
class Heston(StochasticModel):
    """
    Heston stochastic volatility model.
    """

    def __init__(self, kappa_init=1.0, theta_init=0.04, sigma_v_init=0.5, rho_init=-0.5, v0_init=0.04, mu_init=0.0):
        params = {
            'kappa': torch.tensor(kappa_init, requires_grad=True),
            'theta': torch.tensor(theta_init, requires_grad=True),
            'sigma_v': torch.tensor(sigma_v_init, requires_grad=True),
            'rho': torch.tensor(rho_init, requires_grad=True),
            'v0': torch.tensor(v0_init, requires_grad=True),
            'mu': torch.tensor(mu_init, requires_grad=True)
        }
        super().__init__(params)

    def simulate(self, S0, T, N, steps=100):
      dt = T / steps  # dt is a float here
      dt = torch.tensor(dt, device=self.device)  # Convert to tensor early
      mu = self.params['mu']
      kappa = self.params['kappa']
      theta = self.params['theta']
      sigma_v = self.params['sigma_v']
      rho = torch.clamp(self.params['rho'], min=-0.999, max=0.999)
      v0 = self.params['v0'].item()  # Ensure scalar for torch.full
      S0 = torch.tensor(S0, device=self.device)
      N = int(N)
      steps = int(steps)

      vs = [torch.full((N,), v0, device=self.device)]
      Ss = [torch.full((N,), S0.item(), device=self.device)]  # Ensure scalar

      for t in range(1, steps):
          v_prev = vs[-1]
          S_prev = Ss[-1]

          Z1 = torch.randn(N, device=self.device)
          Z2 = torch.randn(N, device=self.device)
          W_S = Z1 * torch.sqrt(dt)  # dt is now a tensor
          W_v = (rho * Z1 + torch.sqrt(1 - rho**2) * Z2) * torch.sqrt(dt)

          v_t_minus = torch.relu(v_prev)
          dv = kappa * (theta - v_t_minus) * dt + sigma_v * torch.sqrt(v_t_minus) * W_v
          new_v = v_t_minus + dv
          new_S = S_prev + S_prev * (mu * dt + torch.sqrt(v_t_minus) * W_S)

          vs.append(new_v)
          Ss.append(new_S)

      v = torch.stack(vs, dim=1)
      S = torch.stack(Ss, dim=1)
      return S[:, -1]

    def option_price(self, S0, K, T, r, option_type='call', N=10000, steps=100):
        S_T = self.simulate(S0, T, N, steps)
        if option_type.lower() == 'call':
            payoff = torch.relu(S_T - K)
        elif option_type.lower() == 'put':
            payoff = torch.relu(K - S_T)
        else:
            raise ValueError("option_type must be 'call' or 'put'")
        price = torch.exp(-r * T) * torch.mean(payoff)
        return price.item()

    def _apply_constraints(self):
        self.params['theta'].data.clamp_(min=1e-6)
        self.params['sigma_v'].data.clamp_(min=1e-6)
        self.params['v0'].data.clamp_(min=1e-6)
        self.params['rho'].data.clamp_(min=-0.999, max=0.999)

## Calibrate Heston

In [None]:
import numpy as np
import torch

In [None]:
# Generate synthetic observed data using true Heston parameters
N_observed = 1000
S0 = 100.0
T = 1.0

true_params = {
    'kappa_init': 2.0,
    'theta_init': 0.05,
    'sigma_v_init': 0.3,
    'rho_init': -0.7,
    'v0_init': 0.04,
    'mu_init': 0.03
}

np.random.seed(42)
torch.manual_seed(42)
heston_true = Heston(**true_params)
S_observed = heston_true.simulate(S0=S0, T=T, N=N_observed)

# Initialize the Heston model with initial guesses
heston_model = Heston(
    kappa_init=1.0,
    theta_init=0.02,
    sigma_v_init=0.2,
    rho_init=-0.5,
    v0_init=0.02,
    mu_init=0.0
)

# Set up the calibrator
calibrator = ModelCalibrator(
    model=heston_model,
    observed_data=S_observed.detach().cpu().numpy(),  # Convert tensor to numpy array
    S0=S0,
    T=T,
    lr=0.01
)

In [None]:
# Calibrate the model
calibrator.calibrate(num_epochs=1000, steps=100, verbose=True)

# Get the calibrated parameters
calibrated_params = calibrator.get_calibrated_params()
print("Calibrated Parameters:")
for name, value in calibrated_params.items():
    print(f"{name}: {value:.6f}")

In [None]:
# Calibrate the model
calibrator.calibrate(num_epochs=1000, steps=1000, verbose=True)

# Get the calibrated parameters
calibrated_params = calibrator.get_calibrated_params()
print("Calibrated Parameters:")
for name, value in calibrated_params.items():
    print(f"{name}: {value:.6f}")

Epoch 0, Loss: 0.564808, kappa: 0.9372955560684204, theta: 0.08623747527599335, sigma_v: 0.14133913815021515, rho: -0.9244350790977478, v0: 0.019999999552965164, mu: 0.028018858283758163
Epoch 100, Loss: 2.090685, kappa: 0.9221691489219666, theta: 0.09568589180707932, sigma_v: 0.1458897888660431, rho: -0.9815501570701599, v0: 0.019999999552965164, mu: 0.03190338984131813
Epoch 200, Loss: 0.508622, kappa: 0.8853012323379517, theta: 0.0846395418047905, sigma_v: 0.12412653863430023, rho: -0.9876137375831604, v0: 0.019999999552965164, mu: 0.03302152082324028
Epoch 300, Loss: 0.371718, kappa: 0.8727018237113953, theta: 0.0870034396648407, sigma_v: 0.14460746943950653, rho: -0.937980592250824, v0: 0.019999999552965164, mu: 0.028822358697652817
Epoch 400, Loss: -0.653510, kappa: 0.8595237731933594, theta: 0.09777134656906128, sigma_v: 0.1225954070687294, rho: -0.939472496509552, v0: 0.019999999552965164, mu: 0.031075533479452133
Epoch 500, Loss: -0.900974, kappa: 0.8373584151268005, theta: 0.