In [1]:
import torch
import torch.nn as nn
import pickle
import signatory
import numpy as np
from scoring_program.evaluation import full_evaluation
import seaborn as sns
import matplotlib.pyplot as plt

In [2]:
def save_obj(obj: object, filepath: str):
    """ Generic function to save an object with different methods. """
    if filepath.endswith('pkl'):
        saver = pickle.dump
    elif filepath.endswith('pt'):
        saver = torch.save
    else:
        raise NotImplementedError()
    with open(filepath, 'wb') as f:
        saver(obj, f)
    return 0


def load_obj(filepath):
    """ Generic function to load an object. """
    if filepath.endswith('pkl'):
        loader = pickle.load
    elif filepath.endswith('pt'):
        loader = torch.load
    elif filepath.endswith('json'):
        import json
        loader = json.load
    else:
        raise NotImplementedError()
    with open(filepath, 'rb') as f:
        return loader(f)
    
    
def to_numpy(x):
    """
    Casts torch.Tensor to a numpy ndarray.

    The function detaches the tensor from its gradients, then puts it onto the cpu and at last casts it to numpy.
    """
    return x.detach().cpu().numpy()

def sample_indices(dataset_size, batch_size):
    indices = torch.from_numpy(np.random.choice(
        dataset_size, size=batch_size, replace=False)).cuda()
    # functions torch.-multinomial and torch.-choice are extremely slow -> back to numpy
    return indices.long()

Define a Generator, which implements the Euler-Maruyama scheme for the log-Heston SDE. The parameters that define the SDE are trainable `torch.Parameter`s.

In [9]:
class EulerGenerator(nn.Module):
    """Given a set of parameters for the log-Heston SDE (as trainable pytorch parameters), produces samples of the SDE
    using the Euler-Maruyama scheme from given initial conditions and Brownian increments."""
    def __init__(
        self,
        mu=0.05,
        rho1=0.0, rho2=0.0,
        kappa1=2.0, theta1=0.04, sigma1=0.2,
        kappa2=1.0, theta2=0.1, sigma2=0.5
    ):
        super().__init__()
        self.mu      = nn.Parameter(torch.tensor(mu))
        self.rho1    = nn.Parameter(torch.tensor(rho1))
        self.rho2    = nn.Parameter(torch.tensor(rho2))
        self.kappa1  = nn.Parameter(torch.tensor(kappa1))
        self.theta1  = nn.Parameter(torch.tensor(theta1))
        self.sigma1  = nn.Parameter(torch.tensor(sigma1))
        self.kappa2  = nn.Parameter(torch.tensor(kappa2))
        self.theta2  = nn.Parameter(torch.tensor(theta2))
        self.sigma2  = nn.Parameter(torch.tensor(sigma2))
        
        self.lr_D = 0.0001
        self.optimizer = torch.optim.Adam(
            self.parameters(), lr=self.lr_D, betas=(0, 0.9))
        
        self.s = 100.0
        self.y1 = 0.4
        self.y2 = 0.4

    def one_step_forward(self, bm_increment, dt, s, y1, y2):
        dw1, dw2, dw3, dw4 = bm_increment[:, 0], bm_increment[:, 1], bm_increment[:, 2], bm_increment[:, 3]

        # Ensure non-negative under square root
        y1_safe = torch.maximum(torch.zeros_like(y1), y1)
        y2_safe = torch.maximum(torch.zeros_like(y2), y2)

        # Y1, Y2 Euler steps
        y1_new = (y1
                  + self.kappa1 * (self.theta1 - y1) * dt
                  + self.sigma1 * torch.sqrt(y1_safe) * dw2
                  )
        y2_new = (y2
                  + self.kappa2 * (self.theta2 - y2) * dt
                  + self.sigma2 * torch.sqrt(y2_safe) * dw4
                  )

        # Drift term
        dt_term = (self.mu
                   - 0.5 * y1 * (self.rho1**2 + 1 - self.rho1)
                   - 0.5 * y2 * (self.rho2**2 + 1 - self.rho2))

        # S Euler step
        s_new = (s
                 + dt_term * dt
                 + torch.sqrt(y1_safe) * (self.rho1 * dw1 + torch.sqrt(1 - self.rho1) * dw2)
                 + torch.sqrt(y2_safe) * (self.rho2 * dw3 + torch.sqrt(1 - self.rho2) * dw4)
                 )

        return s_new, y1_new, y2_new
    
    def forward(self,
                bm_increments, s_initial_value=None, y1_initial_value=None, y2_initial_value=None):
        device = bm_increments.device
        N, T, D = bm_increments.shape

        if s_initial_value is None:
            s_initial_value = torch.tensor(self.s)
        if y1_initial_value is None:
            y1_initial_value = torch.tensor(self.y1)
        if y2_initial_value is None:
            y2_initial_value = torch.tensor(self.y2)
        
        s_path = torch.zeros(N, T+1).to(device)
        y1_path = torch.zeros(N, T+1).to(device)
        y2_path = torch.zeros(N, T+1).to(device)
        s_path[:, 0] = torch.log(s_initial_value)
        y1_path[:, 0] = y1_initial_value
        y2_path[:, 0] = y2_initial_value
        dt = 1/T
        for t in range(T):
            bm_increment = bm_increments[:, t]
            old_s, old_y1, old_y2 = s_path[:, t].clone(), y1_path[:, t].clone(), y2_path[:, t].clone()
            log_s, y1, y2 = self.one_step_forward(bm_increment, dt, old_s, old_y1, old_y2)
            s_path[:, t+1] = log_s.clone()
            y1_path[:, t+1] = y1.clone()
            y2_path[:, t+1] = y2.clone()
            del old_s, old_y1, old_y2, log_s, y1, y2
        s_path = torch.exp(s_path)
        return s_path, y1_path, y2_path
    
    def print_params(self):
        print(f"mu: {self.mu.item()}")
        print(f"rho1: {self.rho1.item()}")
        print(f"rho2: {self.rho2.item()}")
        print(f"kappa1: {self.kappa1.item()}")
        print(f"theta1: {self.theta1.item()}")
        print(f"sigma1: {self.sigma1.item()}")
        print(f"kappa2: {self.kappa2.item()}")
        print(f"theta2: {self.theta2.item()}")
        print(f"sigma2: {self.sigma2.item()}")

    def return_params(self):
        res_dict = {"mu": self.mu.item(),
                    "rho1": self.rho1.item(),
                    "rho2": self.rho2.item(),
                    "kappa1": self.kappa1.item(),
                    "theta1": self.theta1.item(),
                    "sigma1": self.sigma1.item(),
                    "kappa2": self.kappa2.item(),
                    "theta2": self.theta2.item(),
                    "sigma2": self.sigma2.item()}
        return res_dict

Load the training data, which is a single long path. For the calibration, we split it into paths of smaller length.

In [10]:
device = "cuda"
x_real = torch.tensor(load_obj("long_s_path.pkl")).to(device)
print(f"training data before splitting: {x_real.shape}")


T = 256
num_batches = x_real.shape[1] // T 
x_real = x_real[:, :(T*num_batches), :]
x_real = x_real.reshape((-1, T, 3))
print(f"training data after splitting: {x_real.shape}")

training data before splitting: torch.Size([1, 20481, 3])
training data after splitting: torch.Size([80, 256, 3])


Initialise a Generator with some initial configuration of parameters.

In [11]:
config = {
        'mu': 0.04,
        'rho1': 0.5,
        'rho2': -0.5,
        'kappa1': 3.0,
        'theta1': 0.04,
        'sigma1': 0.3,
        'kappa2': 2.0,
        'theta2': 0.1,
        'sigma2': 0.5,
    }


# Initialise Generator
calibrator = EulerGenerator(**config)
calibrator = calibrator.to(device)

Train the SDE parameters. As a loss, we use the L2 difference between the mean level 2 signature of the set of real paths, and sets of randomly sampled paths from the Generator. Note that this serves purely as an illustration, and we do not claim that this is a particularly good approach (in fact it is not).

In [12]:
bm_dim = 4
dt = 1/T

losses = []
num_iterations = 20
for iter in range(num_iterations):
    calibrator.optimizer.zero_grad()

    x_real_batch = x_real.clone()
    s_initial_value = x_real_batch[:, -1, 0].to(device)
    y1_initial_value = x_real_batch[:, -1, 1].to(device)
    y2_initial_value = x_real_batch[:, -1, 2].to(device)

    # Generate set of paths with current training parameters
    bm_increments = torch.sqrt(torch.tensor(dt)) * torch.randn([num_batches, T, bm_dim])
    x_fake_batch = calibrator.forward(
        bm_increments.to(device),
        s_initial_value,
        y1_initial_value,
        y2_initial_value
        )
    x_fake_batch = torch.stack(x_fake_batch, -1)
    
    # calculate mean signature of the set of training paths, and the set of samples paths
    esig_fake = signatory.signature(x_fake_batch, 2).mean(0)
    esig_real = signatory.signature(x_real_batch, 2).mean(0)
    
    # calculate the L2 norm of the difference
    loss = torch.norm(esig_fake-esig_real)
    losses.append(loss)


    loss.backward()
    calibrator.optimizer.step()
    print(f"Iteration {iter+1}: loss = {loss.item()}")

    torch.cuda.empty_cache()

Iteration 1: loss = 903120.0625
Iteration 2: loss = 106517360.0
Iteration 3: loss = 103714784.0
Iteration 4: loss = 613904.5625
Iteration 5: loss = 72676712.0
Iteration 6: loss = 18460860.0
Iteration 7: loss = 767446.5625
Iteration 8: loss = 505198.78125
Iteration 9: loss = 10623078.0
Iteration 10: loss = 1044736.0625
Iteration 11: loss = 6903982.0
Iteration 12: loss = 6010684.0
Iteration 13: loss = 6675022.5
Iteration 14: loss = 192309.421875
Iteration 15: loss = 933901.5625
Iteration 16: loss = 979219.625
Iteration 17: loss = 2973370.75
Iteration 18: loss = 675561.9375
Iteration 19: loss = 892037.1875
Iteration 20: loss = 1941311.375


Compare calibrated and real parameters.

In [13]:
print("Final calibrated parameters:")
calibrator.print_params()

Final calibrated parameters:
mu: 0.03943103924393654
rho1: 0.5001426935195923
rho2: -0.4999622404575348
kappa1: 3.0000367164611816
theta1: 0.0396575927734375
sigma1: 0.3000560402870178
kappa2: 1.999850869178772
theta2: 0.10008596628904343
sigma2: 0.5003318190574646


In [15]:
calibrated_params = calibrator.return_params()

In [16]:
print("Save the learnt parameters into the solution folder")
save_obj(calibrated_params, "sample_submission_bundle/calibrated_params.pkl")

0