In [1]:
import dill
import torch
import numpy as np
from src.Heston import HestonModel
from sklearn.preprocessing import StandardScaler
from torch.utils.data import TensorDataset, random_split

def Savepickle(obj, doc_path):
  with open(doc_path, 'wb') as file:
         dill.dump(obj, file)     

def Readpickle(doc_path):
    with open(doc_path, 'rb') as file:
        return dill.load(file)

# European

## Point-wise Learning

In [9]:
# Parameter Settings
paras = {
                'rf'    : (0, 0.06),
                'v0'    : (1e-3, 0.15),
                'vbar'  : (1e-3, 0.10),
                'kappa' : (1e-3, 5.0),
                'xi'    : (1e-3, 1.0),
                'rho'   : (-1.0, 0.0),
                'M'     : (0.50, 1.5),
                'tau'   : (0.25, 2.0)
            }

In [10]:
# Generate synthetic data
x = [] # [ [rf, v0,  vbar, kappa, xi, rho, M, tau] ]
y = []  # [ sigmaIV ]

i , Nsample , failed_sample_num = 1 , 880000 , 0

while i <= Nsample:
    S0    = 1
    rf    = np.random.uniform(paras['rf'][0],paras['rf'][1])
    v0    = np.random.uniform(paras['v0'][0],paras['v0'][1])
    vbar  = np.random.uniform(paras['vbar'][0],paras['vbar'][1])
    kappa = np.random.uniform(paras['kappa'][0],paras['kappa'][1])
    xi    = np.random.uniform(paras['xi'][0],paras['xi'][1])
    rho   = np.random.uniform(paras['rho'][0],paras['rho'][1])
    M     = np.random.uniform(paras['M'][0],paras['M'][1])
    tau   = np.random.uniform(paras['tau'][0],paras['tau'][1])
    
    try :
        model = HestonModel(rf, kappa, vbar, xi, rho)
        price = model.PriceEuropean(S0, v0, tau, K=M*S0)
        sigmaIV = model.getImpliedVol(price, S0, tau, K=M*S0)
        
        if sigmaIV == -1 :
            failed_sample_num += 1 
            continue

        else :
            
            x.append([rf, v0,  vbar, kappa, xi, rho, M, tau])
            y.append(sigmaIV)
            
            if i%10000 == 0:
                print('Sampling Process finished : %d/%d'%(i, Nsample))   
            i += 1
        
    except :
        failed_sample_num += 1 
        continue
    
print(failed_sample_num)

Sampling Process finished : 10000/880000
Sampling Process finished : 20000/880000
Sampling Process finished : 30000/880000
Sampling Process finished : 40000/880000
Sampling Process finished : 50000/880000
Sampling Process finished : 60000/880000
Sampling Process finished : 70000/880000
Sampling Process finished : 80000/880000
Sampling Process finished : 90000/880000
Sampling Process finished : 100000/880000
Sampling Process finished : 110000/880000
Sampling Process finished : 120000/880000
Sampling Process finished : 130000/880000
Sampling Process finished : 140000/880000
Sampling Process finished : 150000/880000
Sampling Process finished : 160000/880000
Sampling Process finished : 170000/880000
Sampling Process finished : 180000/880000
Sampling Process finished : 190000/880000
Sampling Process finished : 200000/880000
Sampling Process finished : 210000/880000
Sampling Process finished : 220000/880000
Sampling Process finished : 230000/880000
Sampling Process finished : 240000/880000
S

In [12]:
# Normalized X := (2*X-Sigma)*Lambda matrix multiplication
# torch.matmul((2 * dataset.tensors[0] - Sigma), Lambda)
columns  = [(bound[1] + bound[0]) for bound in paras.values()]
diagnals = [1/(bound[1] - bound[0]) for bound in paras.values()]

Sigma  = torch.tensor(columns)
Lambda = torch.diag(torch.tensor(diagnals))

# To tensor and construct dataloader
x , y = np.array(x) , np.array(y).reshape(-1, 1)
x_tensor = torch.tensor(x, dtype=torch.float32)
x_tensor = torch.matmul((2 * x_tensor - Sigma), Lambda)

y_tensor = torch.tensor(y, dtype=torch.float32)
dataset = TensorDataset(x_tensor, y_tensor)

# Dataset sizes
total_size = len(dataset)
train_size = int(total_size * 0.6)
validate_size = int(total_size * 0.2)
test_size  = total_size - train_size - validate_size

# Construct datasets
train, validate, test = random_split(dataset, [train_size, validate_size, test_size])

# Save data
data_saved = {
    'paras'            : paras,          # paramter settings and range
    'dataset'          : dataset,        # dataset.tensors gives (x_tensor, y_tensor) 
    'train'            : train,          # train[:] gives (x_tensor, y_tensor) in train
    'validate'         : validate,
    'test'             : test,
    'x_norm_mat'       : (Sigma, Lambda)
}

# Savepickle(data_saved, doc_path='./data/European/point_wise_training_data88w.pkl')

## Grid-base Learning

In [13]:
# Parameter Settings
paras = {
                'rf'    : (0, 0.06),
                'v0'    : (1e-3, 0.15),
                'vbar'  : (1e-3, 0.10),
                'kappa' : (1e-3, 5.0),
                'xi'    : (1e-3, 1.0),
                'rho'   : (-1.0, 0.0),
                'M'     : np.arange(0.5,1.6,0.1),
                'tau'   : np.arange(0.25,2.25,0.25)
            }

recover_y_dim = (len(paras['tau']) , len(paras['M']))

In [14]:
# Generate synthetic data
x  = [] # [ [rf, v0,  vbar, kappa, xi, rho] ]
y  = [] # [ sigmaIV ]

k , Nsample , failed_sample_num , flag = 1 , 10000 , 0 , False #17000
M , tau = paras['M'] , paras['tau']
M, tau = np.meshgrid(M, tau)


while k <= Nsample:
    rf    = np.random.uniform(paras['rf'][0],paras['rf'][1])
    S0    = 1.0
    v0    = np.random.uniform(paras['v0'][0],paras['v0'][1])
    vbar  = np.random.uniform(paras['vbar'][0],paras['vbar'][1])
    kappa = np.random.uniform(paras['kappa'][0],paras['kappa'][1])
    xi    = np.random.uniform(paras['xi'][0],paras['xi'][1])
    rho   = np.random.uniform(paras['rho'][0],paras['rho'][1])
    
    model = HestonModel(rf, kappa, vbar, xi, rho)
    sigmaIV = np.empty_like(M)
    
    for i in range(M.shape[0]) :
        for j in range(M.shape[1]) :
            price = model.PriceEuropean(S0, v0, tau[i,j], M[i,j])
            try :
                sigmaIV[i,j] = model.getImpliedVol(price, S0, tau[i,j], M[i,j])
            except :
                failed_sample_num += 1 
                flag = True
                break
        if flag :
            break
       
    if flag :
        flag = False
        failed_sample_num += 1
        continue
    
    else :
        x.append([rf, v0,  vbar, kappa, xi, rho])
        y.append(sigmaIV.flatten())
        
        if k%1000 == 0:
            print('Sampling Process finished : %d/%d'%(k, Nsample))
        k += 1

print(failed_sample_num)

Sampling Process finished : 1000/10000
Sampling Process finished : 2000/10000
Sampling Process finished : 3000/10000
Sampling Process finished : 4000/10000
Sampling Process finished : 5000/10000
Sampling Process finished : 6000/10000
Sampling Process finished : 7000/10000
Sampling Process finished : 8000/10000
Sampling Process finished : 9000/10000
Sampling Process finished : 10000/10000
9958


In [15]:
# Normalized X := (2*X-Sigma)*Lambda matrix multiplication
# torch.matmul((2 * dataset.tensors[0] - Sigma), Lambda)
columns  = [(bound[1] + bound[0]) for bound in paras.values() if len(bound) == 2]
diagnals = [1/(bound[1] - bound[0]) for bound in paras.values() if len(bound) == 2]

Sigma  = torch.tensor(columns)
Lambda = torch.diag(torch.tensor(diagnals))

# To tensor and construct dataloader
x , y = np.array(x) , np.array(y)
x_tensor = torch.tensor(x, dtype=torch.float32)
x_tensor = torch.matmul((2 * x_tensor - Sigma), Lambda)

y_tensor = torch.tensor(y, dtype=torch.float32)
dataset = TensorDataset(x_tensor, y_tensor)

# Dataset sizes
total_size = len(dataset)
train_size = int(total_size * 0.6)
validate_size = int(total_size * 0.2)
test_size  = total_size - train_size - validate_size

# Construct datasets
train, validate, test = random_split(dataset, [train_size, validate_size, test_size])

# Save data
data_saved = {
    'paras'            : paras,          # paramter settings and range
    'dataset'          : dataset,        # dataset.tensors gives (x_tensor, y_tensor) 
    'train'            : train,          # train[:] gives (x_tensor, y_tensor) in train
    'validate'         : validate,
    'test'             : test,
    'x_norm_mat'       : (Sigma, Lambda),
    'recover_y_dim'    : recover_y_dim
}

# Savepickle(data_saved, doc_path='./data/European/grid_based_training_data1w.pkl')

## Calibration Data

In [18]:
# Parameter Settings
paras = {
                'rf'    : (0, 0.06),
                'v0'    : (1e-3, 0.15),
                'vbar'  : (1e-3, 0.10),
                'kappa' : (1e-3, 5.0),
                'xi'    : (1e-3, 1.0),
                'rho'   : (-1.0, 0.0),
                'M'     : np.arange(0.5,1.6,0.1),
                'tau'   : np.arange(0.25,2.25,0.25)
            }

recover_y_dim = (len(paras['tau']) , len(paras['M']))

In [23]:
# Generate synthetic data
x  = [] # [ [rf, v0,  vbar, kappa, xi, rho] ]
y  = [] # [ sigmaIV ]

k , Nsample , failed_sample_num , flag = 1 , 5000 , 0 , False #17000
M , tau = paras['M'] , paras['tau']
M, tau = np.meshgrid(M, tau)


while k <= Nsample:
    rf    = 0.025
    S0    = 1.0
    v0    = np.random.uniform(paras['v0'][0],paras['v0'][1])
    vbar  = np.random.uniform(paras['vbar'][0],paras['vbar'][1])
    kappa = np.random.uniform(paras['kappa'][0],paras['kappa'][1])
    xi    = np.random.uniform(paras['xi'][0],paras['xi'][1])
    rho   = np.random.uniform(paras['rho'][0],paras['rho'][1])
    
    model = HestonModel(rf, kappa, vbar, xi, rho)
    sigmaIV = np.empty_like(M)
    
    for i in range(M.shape[0]) :
        for j in range(M.shape[1]) :
            price = model.PriceEuropean(S0, v0, tau[i,j], M[i,j])
            try :
                sigmaIV[i,j] = model.getImpliedVol(price, S0, tau[i,j], M[i,j])
            except :
                failed_sample_num += 1 
                flag = True
                break
        if flag :
            break
       
    if flag :
        flag = False
        failed_sample_num += 1
        continue
    
    else :
        x.append([rf, v0,  vbar, kappa, xi, rho])
        y.append(sigmaIV.flatten())
        
        if k%1000 == 0:
            print('Sampling Process finished : %d/%d'%(k, Nsample))
        k += 1

print(failed_sample_num)

Sampling Process finished : 1000/5000
Sampling Process finished : 2000/5000
Sampling Process finished : 3000/5000
Sampling Process finished : 4000/5000
Sampling Process finished : 5000/5000
4888


In [24]:
# Normalized X := (2*X-Sigma)*Lambda matrix multiplication
# torch.matmul((2 * dataset.tensors[0] - Sigma), Lambda)
columns  = [(bound[1] + bound[0]) for bound in paras.values() if len(bound) == 2]
diagnals = [1/(bound[1] - bound[0]) for bound in paras.values() if len(bound) == 2]

Sigma  = torch.tensor(columns)
Lambda = torch.diag(torch.tensor(diagnals))

# To tensor and construct dataloader
x , y = np.array(x) , np.array(y)
x_tensor = torch.tensor(x, dtype=torch.float32)
x_tensor = torch.matmul((2 * x_tensor - Sigma), Lambda)

y_tensor = torch.tensor(y, dtype=torch.float32)
dataset = TensorDataset(x_tensor, y_tensor)


# Save data
data_saved = {
    'paras'            : paras,          # paramter settings and range
    'dataset'          : dataset,        # dataset.tensors gives (x_tensor, y_tensor) 
    'x_norm_mat'       : (Sigma, Lambda),
    'recover_y_dim'    : recover_y_dim
}

# Savepickle(data_saved, doc_path='./data/European/calibration_data_new.pkl')