In [16]:
import numpy as np
import pandas as pd
import seaborn as sns
import math
import datetime
import torch
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

%matplotlib inline

In [17]:
url = 'https://raw.githubusercontent.com/Thomas101Shen/Dynamic_hedging/refs/heads/main/Option_SPX.csv'
data = pd.read_csv(url)
data = data

print(data.shape)
data.dropna(inplace=True)
print(data.shape)
print(data.head())

X = data[['Moneyness', 'TTM', 'D_BS']]
y = data[['Target']]

print(f'\n\nFeatures:\n {X.describe()} \n\n\n Test data:\n {y.describe()}')

(1008, 14)
(1008, 14)
         Date  ID  Days until next hedge        S  Dividend        C_BS  \
0  2024-10-09   1                      1  5792.04    1.3255  347.319191   
1  2024-08-12   3                      1  5344.39    1.4375   65.911689   
2  2024-08-05   5                      1  5186.33    1.4816   29.540325   
3  2024-11-15   1                      3  5870.62    1.3163  351.552064   
4  2024-09-17   6                      1  5634.58    1.3645   46.314393   

       D_BS   C_mkt  D_Blm        R  TTM  Moneyness  D_Optimal    Target  
0  0.630024  357.80  0.642  4.78852  163      92.04   0.385249 -0.244775  
1  0.227086   74.05  0.247  4.96403  221    -555.61   0.228776  0.001690  
2  0.111161   38.45  0.132  4.95853  228    -913.67  -0.159851 -0.271012  
3  0.685499  370.40  0.710  4.77039  126     170.62   0.377866 -0.307633  
4  0.185447   54.15  0.208  4.76338  185    -565.42   0.160314 -0.025133  


Features:
          Moneyness          TTM         D_BS
count  1008.000000 

In [18]:
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=True)
# print(type(y_train_val))

In [19]:
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.2, random_state=42, shuffle=True)

In [20]:
scaler = MinMaxScaler()


scaler.fit(X)
X_train = scaler.transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

feat_cols = ['Moneyness', 'TTM', 'D_BS']

X_train = pd.DataFrame(X_train, columns=feat_cols)
X_test = pd.DataFrame(X_test, columns=feat_cols)
X_val = pd.DataFrame(X_val, columns=feat_cols)

loss_metrics = data[['C_BS', 'C_mkt', 'D_BS', 'S']]

# 'Date', 'ID', 'Days until next hedge', 'S', 'Dividend', 'C_BS', 'D_BS',
#        'C_mkt', 'D_Blm', 'R', 'TTM', 'Moneyness', 'D_Optimal', 'Target'],
#       dtype='object')
print(loss_metrics.describe())
c_bs = loss_metrics[['C_BS']].iloc[1:]
c_diff = loss_metrics[['C_mkt']].diff().dropna()
d_bs = loss_metrics[['D_BS']].iloc[1:]
s_diff = loss_metrics[['S']].diff().dropna()

print(s_mkt.describe())
print(c_mkt.describe())
print(c_bs.describe())
print(d_bs.describe())
# quotient = c_mkt/s_mkt
# print(quotient.describe())

In [21]:
import torch.nn as nn
import torch.optim as optim


class FNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(FNN, self).__init__()
        self.hidden = nn.Linear(input_size, hidden_size)
        self.activation = nn.ReLU()
        self.output = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.hidden(x)
        x = self.activation(x)
        x = self.output(x)
        return x


input_size = 3
hidden_size = 6
output_size = 1

In [22]:
X_train = torch.tensor(X_train.values, dtype=torch.float)
X_test = torch.tensor(X_test.values, dtype=torch.float)
X_val = torch.tensor(X_val.values, dtype=torch.float)

y_train = torch.tensor(y_train.values, dtype=torch.float)
y_test = torch.tensor(y_test.values, dtype=torch.float)
y_val = torch.tensor(y_val.values, dtype=torch.float)

In [23]:
loss_metrics = data[['C_BS', 'C_mkt', 'D_BS', 'S']]

# 'Date', 'ID', 'Days until next hedge', 'S', 'Dividend', 'C_BS', 'D_BS',
#        'C_mkt', 'D_Blm', 'R', 'TTM', 'Moneyness', 'D_Optimal', 'Target'],
#       dtype='object')
print(loss_metrics.describe())
c_bs = loss_metrics[['C_BS']].iloc[1:]
c_diff = loss_metrics[['C_mkt']].diff().dropna()
d_bs = loss_metrics[['D_BS']].iloc[1:]
s_diff = loss_metrics[['S']].diff().dropna()

print(s_mkt.describe())
print(c_mkt.describe())
print(c_bs.describe())
print(d_bs.describe())
# quotient = c_mkt/s_mkt
# print(quotient.describe())

              C_BS        C_mkt         D_BS            S
count  1008.000000  1008.000000  1008.000000  1008.000000
mean    125.613720   131.908829     0.376424  5714.541786
std     101.212170   106.137719     0.203848   198.707036
min       4.475058     5.050000     0.029593  5186.330000
25%      44.031836    38.350000     0.208732  5596.780000
50%      98.065708   109.125000     0.364226  5725.530000
75%     186.975239   197.137500     0.524479  5851.895000
max     465.237571   483.900000     0.948629  6047.150000
                 S
count  1007.000000
mean      0.193972
std     283.018324
min    -812.410000
25%    -188.710000
50%       0.460000
75%     191.605000
max     812.410000
             C_mkt
count  1007.000000
mean     -0.123932
std     148.303012
min    -433.350000
25%     -95.775000
50%      -2.550000
75%      92.375000
max     443.900000
              C_BS
count  1007.000000
mean    125.393556
std     101.020679
min       4.475058
25%      44.017141
50%      97.801881
75%

In [24]:
def print_tensor_with_precision(tensor, decimals=8):
    """Print a PyTorch tensor or float with specified decimal precision."""
    if isinstance(tensor, float) or tensor.dim() == 0:  # Handle scalar tensors and floats
        print(f"{tensor:.{decimals}f}")
    elif tensor.dim() == 1:  # Handle 1D tensors
        formatted_row = [f"{val:.{decimals}f}, " for val in tensor.tolist()]
        print(" ".join(formatted_row))
    else:  # Handle multi-dimensional tensors
        for row in tensor:  
            formatted_row = [f"{val:.{decimals}f}, " for val in row.tolist()]
            print(" ".join(formatted_row))

In [30]:
# Now we will create a FNN using the custom loss function
class CustomLoss(nn.Module):
    def __init__(self):
        super(CustomLoss, self).__init__()
    
    def forward(self, predictions, s_diff, o_diff, d_bs):
        """
        Args:
            predictions (torch.Tensor): Output of the neural network.
            s_price (pd.DataSeries): Stock price
            o_price (pd.DataSeries)): Option price
            d_bs (pd.DataSeries)): Black-Scholes Delta
        
        Returns:
            torch.Tensor: MSE of optimal delta
        """
        s_diff = torch.tensor(s_diff.values, dtype=torch.float32)
        o_diff = torch.tensor(o_diff.values, dtype=torch.float32)
        d_bs = torch.tensor(d_bs.values, dtype=torch.float32)
        
        # quotient = o_price / s_price
        # print(f"quotient: \n {print_tensor_with_precision(quotient)}")
        # quotient = torch.tensor(quotient, dtype=torch.float32)
        # print(f"quotient: \n {print_tensor_with_precision(quotient)}, \n d_bs: \n {print_tensor_with_precision(d_bs)}")
        # print(f"predictions: \n {print_tensor_with_precision(predictions)}, \n")
        
        difference = o_diff - (predictions + d_bs) * s_change
        
        loss = torch.mean(difference ** 2)
        return loss

# custom_loss_func = nn.CustomLoss()

In [31]:
# Now we initialise the model (using same Adam optimizer as before  but diff loss function)

custom_no_batch = FNN(input_size, hidden_size, output_size)
custom_batch = FNN(input_size, hidden_size, output_size)

In [36]:
def train_custom_model(model, epochs, loss_func, optimizer, batch_size=-1):
    patience = 5
    wait = 0
    best_val_loss = float('inf')
    for epoch in range(epochs):
        model.train()
        predictions_train = model(X_train)
        loss_train = loss_func(predictions_train, s_diff, c_diff, d_bs)
    
        optimizer.zero_grad()
        loss_train.backward()
        optimizer.step()
    
        model.eval()
        with torch.no_grad():
            predictions_val = model(X_val)
            loss_val = loss_func(predictions_val, s_diff, c_diff, d_bs)
    
        # Check for improvement
        if loss_val < best_val_loss:
            best_val_loss = loss_val
            wait = 0
        else:
            wait += 1
            if wait >= patience:
                print("Early stoppage")
                print(f"Epoch [{epoch+1}/{epochs}], Training Loss: {loss_train.item():.4f}, Validation Loss: {loss_val.item():.4f}")
                break
                
        if (epoch + 1) % 5 == 0:
            print(f"Epoch [{epoch+1}/{epochs}], Training Loss: {loss_train.item():.4f}, Validation Loss: {loss_val.item():.4f}")

In [32]:
# Now we train the models
custom_loss = CustomLoss()
model = FNN(input_size, hidden_size, output_size)
custom_nb_optim = optim.Adam(model.parameters(), lr=0.01)
custom_b_optim = optim.Adam(model.parameters(), lr=0.01)


In [37]:
train_custom_model(custom_no_batch, 10, custom_loss, custom_nb_optim)
# train_batch_model(custom_batch, 1000, custom_loss, custom_b_optim, 30)

644
644


RuntimeError: The size of tensor a (644) must match the size of tensor b (1007) at non-singleton dimension 0

In [None]:
# def train_custom_model(model, epochs, loss_func, optimizer, batch_size=-1):
#     patience = 5
#     wait = 0
#     best_val_loss = float('inf')
#     for epoch in range(epochs):
#         if batch_size == -1:
#             model.train()
#             predictions_train = model(X_train)
#             loss_train = loss_func(predictions_train, y_train)
        
#             optimizer.zero_grad()
#             loss_train.backward()
#             optimizer.step()
#         else:
#             for start in range(0, len(X_train), batch_size):
#                 end = start + batch_size
#                 X_batch = X_train[start:end]
#                 y_batch = y_train[start:end]
            
#                 # Forward pass
#                 predictions_train = model(X_batch)
#                 loss_train = loss_func(predictions_train, y_batch)
            
#                 # Backward pass
#                 optimizer.zero_grad()
#                 loss_train.backward()
#                 optimizer.step()
    
#         model.eval()
#         with torch.no_grad():
#             predictions_val = model(X_val)
#             loss_val = loss_func(predictions_val, y_val)
    
#         # Check for improvement
#         if loss_val < best_val_loss:
#             best_val_loss = loss_val
#             wait = 0
#         else:
#             wait += 1
#             if wait >= patience:
#                 print("Early stoppage")
#                 print(f"Epoch [{epoch+1}/{epochs}], Training Loss: {loss_train.item():.4f}, Validation Loss: {loss_val.item():.4f}")
#                 break
                
#         if (epoch + 1) % 5 == 0:
#             print(f"Epoch [{epoch+1}/{epochs}], Training Loss: {loss_train.item():.4f}, Validation Loss: {loss_val.item():.4f}")