# Second-Order Maximum Mean Discrepancy of the Heston Model

## Change working directory to navigate out of folder. 

In [1]:
import os
os.getcwd()
path_parent = os.path.dirname(os.getcwd())
os.chdir(path_parent)

## Import Packages

In [2]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
from collections import Counter
from torch import nn
import torch.cuda
from sklearn.metrics import mean_squared_error

from MMD.mmd import RBFKernel, SigKernel

from GenerateMMDDataset.mmd_dataset_base import save_dataset, load_dataset, save_path_params, load_path_params
from GenerateMMDDataset.heston_dataset import GenerateHestonDistanceDataset

from RegressionModel.mmd_model import SecondOrderMMDApprox

## Set PyTorch device

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Device: {device}')
torch.backends.cudnn.benchmark = True

Device: cuda


## Generate the Dataset

In [4]:
genHestonDataset = GenerateHestonDistanceDataset(20, device, 0.5, 0, 0.5, 0, time_series_index=2)
heston_mmd_dataset, heston_path_dataset, heston_sample_path_param_dict = genHestonDataset.generate_mmd_dataset(1000, 14, 300, (1, 1), (0.2, 0.8), (0.2, 1), (0.2, 0.8), (0.2, 0.8), _round=2, order=2, lambda_=1e-5, estimator='ub')

Computing Initial Distances


  0%|          | 0/20 [00:00<?, ?it/s]

Finished Computing Initial Distances
----------------------------------------------------------------------------------------------------


Computing Next Batch of Distances


  0%|          | 0/980 [00:00<?, ?it/s]

Finished Computing All Distances
----------------------------------------------------------------------------------------------------




## Save Dataset

In [5]:
torch.save(heston_mmd_dataset, 'Heston/Data/Heston_mmd_dataset_1000_second_order_samples_ub.pt')

save_path_params(heston_sample_path_param_dict,
                 'Heston/Data/Heston_sample_path_param_dict_1000_second_order_samples_ub.pkl')

## Load Dataset

In [5]:
heston_mmd_dataset = torch.load('Heston/Data/Heston_mmd_dataset_1000_second_order_samples_ub.pt')

heston_sample_path_param_dict = load_path_params(
    'Heston/Data/Heston_sample_path_param_dict_1000_second_order_samples_ub.pkl')

In [7]:
M_h = heston_sample_path_param_dict['M']
N_h = heston_sample_path_param_dict['N']


heston_mmd_features = []
heston_mmd_labels = []
for i in range(M_h):
    for j in range(N_h):        
        heston_mmd_features.append([heston_sample_path_param_dict['V0'][i],
                                    heston_sample_path_param_dict['V0'][j],
                                    heston_sample_path_param_dict['Vol of Vol'][i],
                                    heston_sample_path_param_dict['Vol of Vol'][j], 
                                    heston_sample_path_param_dict['Mean Vol'][i],
                                    heston_sample_path_param_dict['Mean Vol'][j],
                                    heston_sample_path_param_dict['Speed'][i],
                                    heston_sample_path_param_dict['Speed'][j],
                                    heston_sample_path_param_dict['Correlation'][i],
                                    heston_sample_path_param_dict['Correlation'][j]])
        heston_mmd_labels.append(heston_mmd_dataset[i][j])
        
print(len(heston_mmd_labels))

20000


## Neural Network Approximation

In [9]:
M_h = heston_sample_path_param_dict['M']
N_h = heston_sample_path_param_dict['N']


heston_mmd_features = []
heston_mmd_labels = []
for i in range(M_h):
    for j in range(N_h):        
        heston_mmd_features.append([heston_sample_path_param_dict['V0'][i],
                                    heston_sample_path_param_dict['V0'][j],
                                    heston_sample_path_param_dict['Vol of Vol'][i],
                                    heston_sample_path_param_dict['Vol of Vol'][j], 
                                    heston_sample_path_param_dict['Mean Vol'][i],
                                    heston_sample_path_param_dict['Mean Vol'][j],
                                    heston_sample_path_param_dict['Speed'][i],
                                    heston_sample_path_param_dict['Speed'][j],
                                    heston_sample_path_param_dict['Correlation'][i],
                                    heston_sample_path_param_dict['Correlation'][j]])
        heston_mmd_labels.append(heston_mmd_dataset[i][j])
        
for i in range(M_h):
    for j in range(N_h):        
        heston_mmd_features.append([heston_sample_path_param_dict['V0'][j],
                                    heston_sample_path_param_dict['V0'][i],
                                    heston_sample_path_param_dict['Vol of Vol'][j],
                                    heston_sample_path_param_dict['Vol of Vol'][i], 
                                    heston_sample_path_param_dict['Mean Vol'][j],
                                    heston_sample_path_param_dict['Mean Vol'][i],
                                    heston_sample_path_param_dict['Speed'][j],
                                    heston_sample_path_param_dict['Speed'][i],
                                    heston_sample_path_param_dict['Correlation'][j],
                                    heston_sample_path_param_dict['Correlation'][i]])
        heston_mmd_labels.append(heston_mmd_dataset[i][j])
        

vol_of_vol_range, speed_range, mean_volatility_range, v0_range = (0.2, 0.8), (0.2, 1), (0.2, 0.8), (0.2, 0.8)
for i in range(5000):
        vol_of_vol = np.random.uniform(vol_of_vol_range[0], vol_of_vol_range[1])
        mean_volatility = np.random.uniform(mean_volatility_range[0], mean_volatility_range[1])
        speed = np.random.uniform(speed_range[0], speed_range[1])
        v0 = np.random.uniform(v0_range[0], v0_range[1])
        corr = np.random.uniform(-1, 1)
        heston_mmd_features.append([v0, v0, vol_of_vol, vol_of_vol, mean_volatility, mean_volatility, speed, speed, 
                                    corr, corr])
        heston_mmd_labels.append(torch.tensor(0.0).to(device=device))

### Define the Model

### Training the Model

In [None]:
heston_mmd_training_param_dict = {
    'lr' : 3e-3,
    'Epochs' : 200,
    'l2_weight' : 0.0,
    'l1_weight' : 0.0,
    'Train/Val Split' : 0.8
}


heston_mmd_dataset_loader_params = {
    'batch_size' : 512,
    'shuffle' : True,
    'num_workers' : 0
}

heston_mmd_input_dimension = 10

heston_mmd_model_param_dict = {
    'input_dimension' : heston_mmd_input_dimension,
    'intermediate_dimensions' : [50, 50, 50, 50],
    'activation_functions' : [nn.ReLU(), nn.ReLU(), nn.ReLU(), nn.ReLU()], 
    'add_layer_norm' : [False, False, False, False], 
    'output_dimension' : 1,
    'output_activation_fn' : None
}

for i in range(5):

    heston_second_order_mmd_model = SecondOrderMMDApprox(heston_mmd_model_param_dict, heston_mmd_training_param_dict,
                                                         heston_mmd_dataset_loader_params, nn.MSELoss(), device,
                                                         f'Heston/MMD Model/heston_mmd_scaler_{i+1}.pkl')
    
    heston_second_order_mmd_model.fit(heston_mmd_features, heston_mmd_labels, 
                                  **{'filename' : f'Heston/MMD Model/heston_mmd_checkpoint_{i+1}.pth.tar',
                                     'best_model_filename' : f'Heston/MMD Model/heston_mmd_model_best_{i+1}.pth.tar'})

In [8]:
train_losses = []
valid_losses = []
for i in range(1, 5):
    checkpoint = torch.load(f'Heston/MMD Model/heston_mmd_model_best_{i+1}.pth.tar', map_location='cpu')
    valid_losses.append(checkpoint['valid_loss'])
    train_losses.append(checkpoint['train_loss'])
    
print(f'Training Loss')
print(np.mean(train_losses))
print(np.std(train_losses))
print(f'Validation Loss')
print(np.mean(valid_losses))
print(np.std(valid_losses))

Training Loss
0.0021789220793325846
5.8756222122694964e-05
Validation Loss
0.0026066265101572406
5.779707934940193e-05


### Test Model

In [None]:
genHestonTestDataset = GenerateHestonDistanceDataset(20, device, 0.5, 0, 0.5, 0, time_series_index=2)
heston_mmd_dataset_test, heston_path_dataset_test, heston_sample_path_param_dict_test = genHestonTestDataset.generate_mmd_dataset(40, 14, 300, (1, 1), (0.2, 0.8), (0.2, 1), (0.2, 0.8), (0.2, 0.8), _round=2, order=2, lambda_=1e-5, estimator='ub')


M_h = heston_sample_path_param_dict_test['M']
N_h = heston_sample_path_param_dict_test['N']


testing_features = []
testing_distances = []
for i in range(M_h):
    for j in range(N_h):        
        testing_features.append([heston_sample_path_param_dict_test['V0'][i],
                                 heston_sample_path_param_dict_test['V0'][j],
                                 heston_sample_path_param_dict_test['Vol of Vol'][i],
                                 heston_sample_path_param_dict_test['Vol of Vol'][j], 
                                 heston_sample_path_param_dict_test['Mean Vol'][i],
                                 heston_sample_path_param_dict_test['Mean Vol'][j],
                                 heston_sample_path_param_dict_test['Speed'][i],
                                 heston_sample_path_param_dict_test['Speed'][j],
                                 heston_sample_path_param_dict_test['Correlation'][i],
                                 heston_sample_path_param_dict_test['Correlation'][j]])
        testing_distances.append(heston_mmd_dataset_test[i][j].cpu())

In [None]:
np.save('Heston/Data/Heston_features_dataset_800_second_order_samples_ub_testing', testing_features)
np.save('Heston/Data/Heston_distances_dataset_800_second_order_samples_ub_testing', testing_distances)

In [18]:
testing_distances = np.load('Heston/Data/Heston_distances_dataset_800_second_order_samples_ub_testing.npy') 
testing_features = np.load('Heston/Data/Heston_features_dataset_800_second_order_samples_ub_testing.npy')

In [20]:
heston_mmd_input_dimension = 10


heston_mmd_model_param_dict = {
    'input_dimension' : heston_mmd_input_dimension,
    'intermediate_dimensions' : [50, 50, 50, 50],
    'activation_functions' : [nn.ReLU(), nn.ReLU(), nn.ReLU(), nn.ReLU()], 
    'add_layer_norm' : [False, False, False, False],
    'output_dimension' : 1,
    'output_activation_fn' : None
}

test_losses = []


for i in range(1, 5):
    
    heston_second_order_mmd_model = SecondOrderMMDApprox(heston_mmd_model_param_dict,
                                                         None,
                                                         None, None,
                                                         device,
                                                         None)
    
    heston_second_order_mmd_model.load_best_model(f'Heston/MMD Model/heston_mmd_model_best_{i+1}.pth.tar', 
                                                  f'Heston/MMD Model/heston_mmd_scaler_{i+1}.pkl')
    
    heston_second_order_mmd_model.model.eval()
    with torch.no_grad():
        
        distances = heston_second_order_mmd_model.transform(torch.tensor(testing_features))
        test_losses.append(mean_squared_error(distances.cpu(), testing_distances))
        
print(np.mean(test_losses))
print(np.std(test_losses))

0.006336030543575317
0.0002062245470360287


## Test to Determine whether $\mathcal{NN}_{\mathcal{D}^{2}}^{\text{Heston}} \left( \cdot, \cdot \right)$ is a Metric

In [None]:
heston_mmd_input_dimension = 10

heston_mmd_model_param_dict = {
    'input_dimension' : heston_mmd_input_dimension,
    'intermediate_dimensions' : [50, 50, 50, 50],
    'activation_functions' : [nn.ReLU(), nn.ReLU(), nn.ReLU(), nn.ReLU()], 
    'add_layer_norm' : [False, False, False, False], 
    'output_dimension' : 1,
    'output_activation_fn' : None
}

heston_second_order_mmd_model = SecondOrderMMDApprox(heston_mmd_model_param_dict,
                                                     None,
                                                     None,
                                                     None, 
                                                     device,
                                                     None)

heston_second_order_mmd_model.load_best_model('Heston/MMD Model/heston_mmd_model_best_1.pth.tar', 
                                              'Heston/MMD Model/heston_mmd_scaler_1.pkl')

- #### Identity of Indiscernables: $\sqrt{\mathcal{NN}_{2}^{\text{Heston}} \left(\Theta, \Theta \right)} = 0$. 

In [30]:
mmd_values = []
cond_sat = []
num_sim = 50000
threshold = 0.05

vol_of_vol_range = heston_sample_path_param_dict['Vol of Vol Range']
mean_volatility_range = heston_sample_path_param_dict['Mean Volatility Range']
speed_range = heston_sample_path_param_dict['Speed Range']
v0_range = heston_sample_path_param_dict['V0 Range']

heston_second_order_mmd_model.model.eval()
with torch.no_grad():
    for i in range(num_sim):
        vol_of_vol = np.random.uniform(vol_of_vol_range[0], vol_of_vol_range[1])
        mean_volatility = np.random.uniform(mean_volatility_range[0], mean_volatility_range[1])
        speed = np.random.uniform(speed_range[0], speed_range[1])
        v0 = np.random.uniform(v0_range[0], v0_range[1])
        corr = np.random.uniform(-1, 1)

        distance = torch.maximum(
            heston_second_order_mmd_model.transform(torch.tensor(
                [v0, v0, vol_of_vol, vol_of_vol, mean_volatility, mean_volatility, speed, speed, corr, corr]
            ).unsqueeze(0).float()).squeeze(0).squeeze(0), torch.tensor(0.0)).cpu()
        
        mmd_values.append(distance)
        
        if distance < threshold:
            cond_sat.append(True)
        else:
            cond_sat.append(False)
            
counter_dict = Counter(cond_sat)
print(f'Number of Samples: {num_sim}')
print(f'Threshold: {threshold}')
print(f'Number of Distances Below Threshold: {counter_dict[True]}')

Number of Samples: 50000
Threshold: 0.05
Number of Distances Below Threshold: 49731


- #### Symmetry: $\sqrt{\mathcal{NN}_{2}^{\text{Heston}} \left(\Theta_{1}, \Theta_{2} \right)} = \sqrt{\mathcal{NN}_{2}^{\text{Heston}} \left(\Theta_{2}, \Theta_{1} \right)}$. 

In [2]:
threshold = 0.1
num_sim = 50000

is_sat = []

vol_of_vol_range = heston_sample_path_param_dict['Vol of Vol Range']
mean_volatility_range = heston_sample_path_param_dict['Mean Volatility Range']
speed_range = heston_sample_path_param_dict['Speed Range']
v0_range = heston_sample_path_param_dict['V0 Range']

heston_second_order_mmd_model.model.eval()
with torch.no_grad():
    for i in range(num_sim):
        
        vol_of_vol_1 = np.random.uniform(vol_of_vol_range[0], vol_of_vol_range[1])
        mean_volatility_1 = np.random.uniform(mean_volatility_range[0], mean_volatility_range[1])
        speed_1 = np.random.uniform(speed_range[0], speed_range[1])
        v0_1 = np.random.uniform(v0_range[0], v0_range[1])
        corr_1 = np.random.uniform(-1, 1)
        
        vol_of_vol_2 = np.random.uniform(vol_of_vol_range[0], vol_of_vol_range[1])
        mean_volatility_2 = np.random.uniform(mean_volatility_range[0], mean_volatility_range[1])
        speed_2 = np.random.uniform(speed_range[0], speed_range[1])
        v0_2 = np.random.uniform(v0_range[0], v0_range[1])
        corr_2 = np.random.uniform(-1, 1)

        d1 = torch.maximum(
            heston_second_order_mmd_model.transform(torch.tensor(
                [v0_1, v0_2, vol_of_vol_1, vol_of_vol_2, mean_volatility_1, mean_volatility_2, speed_1, speed_2, corr_1, corr_2]
            ).unsqueeze(0).float()).squeeze(0).squeeze(0), torch.tensor(0.0)).cpu()
        
        d2 = torch.maximum(
            heston_second_order_mmd_model.transform(torch.tensor(
                [v0_2, v0_1, vol_of_vol_2, vol_of_vol_1, mean_volatility_2, mean_volatility_1, speed_2, speed_1, corr_2, corr_1]
            ).unsqueeze(0).float()).squeeze(0).squeeze(0), torch.tensor(0.0)).cpu()
                        
        if abs(np.sqrt(d1)-np.sqrt(d2)) < threshold:
            is_sat.append(True)
        else:
            is_sat.append(False)
            
            
symm_counter_dict = Counter(is_sat)
print(f'Number of Samples: {num_sim}')
print(f'Threshold: {threshold}')
print(f'Number of Distances Below Threshold: {symm_counter_dict[True]}')

Number of Samples: 50000
Threshold: 0.1
Number of Distances Below Threshold: 49724


- #### Triangle Inequality: $\sqrt{\mathcal{NN}_{\mathcal{D}^{2}}^{\text{MGBM}} \left(\Theta_{1}, \Theta_{3} \right)} \leq \sqrt{\mathcal{NN}_{\mathcal{D}^{2}}^{\text{MGBM}} \left(\Theta_{1}, \Theta_{2} \right)} + \sqrt{\mathcal{NN}_{\mathcal{D}^{2}}^{\text{MGBM}} \left(\Theta_{2}, \Theta_{3} \right)}$. 

In [1]:
triag_inq_sat = []
num_sim = 50000

vol_of_vol_range = heston_sample_path_param_dict['Vol of Vol Range']
mean_volatility_range = heston_sample_path_param_dict['Mean Volatility Range']
speed_range = heston_sample_path_param_dict['Speed Range']
v0_range = heston_sample_path_param_dict['V0 Range']

heston_second_order_mmd_model.model.eval()
with torch.no_grad():
    for i in range(num_sim):
        
        vol_of_vol_1 = np.random.uniform(vol_of_vol_range[0], vol_of_vol_range[1])
        mean_volatility_1 = np.random.uniform(mean_volatility_range[0], mean_volatility_range[1])
        speed_1 = np.random.uniform(speed_range[0], speed_range[1])
        v0_1 = np.random.uniform(v0_range[0], v0_range[1])
        corr_1 = np.random.uniform(-1, 1)
        
        vol_of_vol_2 = np.random.uniform(vol_of_vol_range[0], vol_of_vol_range[1])
        mean_volatility_2 = np.random.uniform(mean_volatility_range[0], mean_volatility_range[1])
        speed_2 = np.random.uniform(speed_range[0], speed_range[1])
        v0_2 = np.random.uniform(v0_range[0], v0_range[1])
        corr_2 = np.random.uniform(-1, 1)
        
        vol_of_vol_3 = np.random.uniform(vol_of_vol_range[0], vol_of_vol_range[1])
        mean_volatility_3 = np.random.uniform(mean_volatility_range[0], mean_volatility_range[1])
        speed_3 = np.random.uniform(speed_range[0], speed_range[1])
        v0_3 = np.random.uniform(v0_range[0], v0_range[1])
        corr_3 = np.random.uniform(-1, 1)

        d1 = torch.maximum(
            heston_second_order_mmd_model.transform(torch.tensor(
                [v0_1, v0_3, vol_of_vol_1, vol_of_vol_3, mean_volatility_1, mean_volatility_3, speed_1, speed_3, corr_1, corr_3]
            ).unsqueeze(0).float()).squeeze(0).squeeze(0), torch.tensor(0.0)).cpu()
        
        d2 = torch.maximum(
            heston_second_order_mmd_model.transform(torch.tensor(
                [v0_1, v0_2, vol_of_vol_1, vol_of_vol_2, mean_volatility_1, mean_volatility_2, speed_1, speed_2, corr_1, corr_2]
            ).unsqueeze(0).float()).squeeze(0).squeeze(0), torch.tensor(0.0)).cpu()
        
        d3 = torch.maximum(
            heston_second_order_mmd_model.transform(torch.tensor(
                [v0_2, v0_3, vol_of_vol_2, vol_of_vol_3, mean_volatility_2, mean_volatility_3, speed_2, speed_3, corr_2, corr_3]
            ).unsqueeze(0).float()).squeeze(0).squeeze(0), torch.tensor(0.0)).cpu()
               
        if np.sqrt(d1) <= (np.sqrt(d2)+np.sqrt(d3)):
            triag_inq_sat.append(True)
        else:
            triag_inq_sat.append(False)
            
triag_inq_counter_dict = Counter(triag_inq_sat)
print(f'Number of Samples: {num_sim}')
print(f'Number of Samples Satisfying the Triangle Inequality: {triag_inq_counter_dict[True]}')
print(f'Number of Samples: 50000')
print(f'Number of Samples Satisfying the Triangle Inequality: 50000')

Number of Samples: 50000
Number of Samples Satisfying the Triangle Inequality: 50000
