In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
import os
import seaborn
from tqdm import tqdm

trainPath1 = 'data/Dataset_2_train_instrument_1.csv'
trainPath2 = 'data/Dataset_2_train_instrument_2.csv'

targetPath = 'data/Dataset_2_train_payoff.csv'
testPath1 = 'data/Dataset_2_test_instrument_1.csv'
testPath2 = 'data/Dataset_2_test_instrument_2.csv'

growthPath = 'data/Dataset_1_train_asset_daily growth_rate.csv'

train1 = pd.read_csv(trainPath1, header=None)
train2 = pd.read_csv(trainPath2, header=None)
test1 = pd.read_csv(testPath1, header=None)
test2 = pd.read_csv(testPath1, header=None)
target = pd.read_csv(targetPath, header=None)


In [None]:
# define growth values for instrument 1
def col_numeric_names(df):
    df = df.rename(columns={x:y for x,y in zip(df.columns,range(0,len(df.columns)))})
    return df

def absolute_growth(df):
    len_df = df.shape[1]
    temp = df.iloc[:,1:len_df]
    temp = col_numeric_names(temp)
    temp2 = df.iloc[:,:len_df-1]
    df2 = temp-temp2
    df2.insert(0, "0", 0)
    df2 = col_numeric_names(df2)
    return df2
growthTrain = absolute_growth(train1)

In [None]:
# exercise 2 - Heston Model
import torch
import torch.nn as nn
import torch.optim as optim

torch.manual_seed(42)
# Set up device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Define stock price path, payoffs, and transaction costs
train_instrument1 = torch.Tensor(train1.values.tolist())
train_instrument2 = torch.Tensor(train2.values.tolist())
test_instrument1 = torch.Tensor(test1.values.tolist())
test_instrument2 = torch.Tensor(test2.values.tolist())

payoffs = torch.Tensor([item for sublist in target.values for item in sublist])
transaction_costs = torch.Tensor([0.01] * train1.shape[0])
transaction_costs_test = torch.Tensor([0.01] * test1.shape[0])
growths = torch.Tensor(growthTrain.values.tolist())

# define parameters
num_epochs = 50
batch_size = 256
learning_rate = 0.001


# Set up dataset and iterator
dataset_train = torch.utils.data.TensorDataset(train_instrument1[0:7999], train_instrument2[0:7999], payoffs[0:7999], transaction_costs[0:7999], growths[0:7999])
dataset_val = torch.utils.data.TensorDataset(train_instrument1[7999:], train_instrument2[7999:], payoffs[7999:], transaction_costs[7999:], growths[7999:])
dataset_test = torch.utils.data.TensorDataset(test_instrument1, test_instrument2, transaction_costs_test)

train_loader = torch.utils.data.DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(dataset_val, batch_size=batch_size, shuffle=False)
test_loader = torch.utils.data.DataLoader(dataset_test, batch_size=1, shuffle=False)

In [None]:
import torch.nn.functional as F
import math

class DeepHedging_RNN(nn.Module):
    def __init__(self, input_size, HL_size, output_size):
        super(DeepHedging_RNN, self).__init__()
        self.rnn = torch.nn.RNN(input_size=input_size,
                                hidden_size=HL_size,
                                num_layers=1)
        self.linear = torch.nn.Linear(HL_size, output_size)
        self.softplus = torch.nn.Softplus()
        
    def forward(self, S):
        out, _ = self.rnn(S)
        out = self.linear(out)
        out = self.softplus(out)

        return out.squeeze()

def heston_loss(deltas, S, v, pay_off, growth, costs, q1, q2, beta=1):
    zeros = torch.zeros((deltas.shape[0], 1)).to(device)

    S_delta_T = torch.transpose(growth, 0, 1)
    V_delta_T = torch.transpose(torch.cat([zeros, torch.abs(torch.diff(v.squeeze(-1), dim=1))], dim=1), 0, 1)
    
    delta_S = torch.diagonal(torch.matmul(deltas, S_delta_T))
    delta_V = torch.diagonal(torch.matmul(deltas, V_delta_T))
    delta_diff = torch.cat([zeros, torch.abs(torch.diff(deltas, dim=1))], dim=1)
    costs_S = costs * torch.diagonal(torch.matmul(delta_diff, S))
    costs_V = costs * torch.diagonal(torch.matmul(delta_diff, v))
    
    # Calculating loss
    loss = pay_off.squeeze() - delta_S + costs_S - delta_V + costs_V
    es_99 = (F.relu(loss-q1).mean())/(1-0.99) + q1
    es_50 = (F.relu(loss-q2).mean())/(1-0.5) + q2
    risk_measure = (es_50 + es_99*beta)/(1+beta)
    return risk_measure

def standardized(tensor):
    #tensor = tensor[:, :-1, :]
    return (tensor - torch.mean(tensor, 1, True)) / torch.std(tensor)

In [None]:
input_size = 2 # number of expected features
output_size = 1
HL_size = 16
model = DeepHedging_RNN(input_size, HL_size, output_size).to(device)
# Set up optimizer
q1 = torch.tensor((0.), dtype=torch.float32, requires_grad=True, device=device)
q2 = torch.tensor((0.), dtype=torch.float32, requires_grad=True, device=device)
optimizer = optim.Adam(list(model.parameters()) + [q1, q2], lr=learning_rate)

In [None]:
from tqdm import tqdm

train_loss = []
val_loss = []

for epoch in tqdm(range(num_epochs)):
    model.train()
    running_loss = 0.0
    counter = 0

    for S, v, y, c, g in train_loader:
        counter += 1
        S = (S.to(device).unsqueeze(-1))
        v = (v.to(device).unsqueeze(-1))
        y = y.to(device)
        c = c.to(device)
        g = g.to(device)
        
        # Calculate gradients and update model weights
        optimizer.zero_grad()
        S_v = torch.cat((S, v), 2)
        deltas = model(S_v)
        losses = heston_loss(deltas, S, v, y, g, c, q1, q2)
        running_loss += losses.item()
        
        losses.backward()
        optimizer.step()
        
    epoch_train_loss = running_loss / counter
    train_loss.append(epoch_train_loss)
    print("Train loss: {}".format(epoch_train_loss))
    
    # validation 
    running_loss_test = 0.0
    counter = 0
    model.eval()
    for S_val, v_val, y_val, c_val, g_val in val_loader:
 
        
        counter += 1
        S_val = (S_val.to(device).unsqueeze(-1))
        v_val = (v_val.to(device).unsqueeze(-1))
        y_val = y_val.to(device)
        g_val = g_val.to(device)
        c_val = c_val.to(device)
        
        S_v_val = torch.cat((S_val, v_val), 2)
        deltas_val = model(S_v_val)
        losses = heston_loss(deltas_val, S_val, v_val, y_val, g_val, c_val, q1, q2)
        running_loss_test += losses.item()

    epoch_test_loss = running_loss_test / counter
    val_loss.append(epoch_test_loss)
    print("Validation loss: {}".format(epoch_test_loss))

In [None]:
# check test set
final = []
for S_test, sigma_test, _ in tqdm(test_loader):
    S_test = S_test.to(device).unsqueeze(-1)
    sigma_test = sigma_test.to(device).unsqueeze(-1)
    S_sigma = torch.cat((S_test, sigma_test), 2)
    
    with torch.no_grad():
        final.append(model(S_sigma).cpu().detach().numpy())

In [None]:
def StandardData(data: np.ndarray):
    min_value = np.min(data)
    max_value = np.max(data)
    standard_data = (data - min_value) / (max_value - min_value)
    return standard_data

In [None]:
def plot_figures(greek: pd.DataFrame, 
                 rnn_bs: pd.DataFrame, 
                 df: pd.DataFrame): 
    fig, axs = plt.subplots(2, 3)
    fig.set_size_inches(18.5, 10.5)
    fig.suptitle('Delta spread at different time intervals: Test Set', size=45)

    axs[0,0].plot(df.loc[:, 0], greek.loc[:, 0], 'bo', label='Heston')
    axs[0,0].plot(df.loc[:, 0], StandardData(rnn_bs.loc[:, 0]), 'gx', label='Heston-RNN')
    axs[0,0].legend()
    axs[0,0].set_title('Time 0')
    axs[0,0].set_ylim([0, 1])
    axs[0,1].plot(df.loc[:, 1], greek.loc[:, 1], 'bo', label='Heston')
    axs[0,1].plot(df.loc[:, 1], StandardData(rnn_bs.loc[:, 1]), 'gx', label='Heston-RNN')
    axs[0,1].legend()
    axs[0,1].set_title('Time 1')
    axs[0,2].plot(df.loc[:, 5], greek.loc[:, 5], 'bo', label='Heston')
    axs[0,2].plot(df.loc[:, 5], StandardData(rnn_bs.loc[:, 5]), 'gx', label='Heston-RNN')
    axs[0,2].legend()
    axs[0,2].set_title('Time 5')
    axs[1,0].plot(df.loc[:, 15], greek.loc[:, 15], 'bo', label='Heston')
    axs[1,0].plot(df.loc[:, 15], StandardData(rnn_bs.loc[:, 15]), 'gx', label='Heston-RNN')
    axs[1,0].legend()
    axs[1,0].set_title('Time 15')
    axs[1,1].plot(df.loc[:, 25], greek.loc[:, 25], 'bo', label='Heston')
    axs[1,1].plot(df.loc[:, 25], StandardData(rnn_bs.loc[:, 25]), 'gx', label='Heston-RNN')
    axs[1,1].legend()
    axs[1,1].set_title('Time 25')
    axs[1,2].plot(df.loc[:, 30], greek.loc[:, 30], 'bo', label='Heston')
    axs[1,2].plot(df.loc[:, 30], StandardData(rnn_bs.loc[:, 30]), 'gx', label='Heston-RNN')
    axs[1,2].legend()
    axs[1,2].set_title('Time 30')

    for ax in axs.flat:
        ax.set(xlabel='Spot price', ylabel='Delta')

In [None]:
import numpy as np
from scipy.stats import norm

def heston_call_price(S, K, r, T, V0, kappa, theta, sigma, rho):
    
    """
    Calculates the price of a European call option using the Heston model for stochastic volatility.
    
    Parameters:
        S: Spot price of the underlying asset
        K: Strike price of the option
        r: Risk-free interest rate
        T: Time to maturity of the option (in years)
        V0: Initial volatility
        kappa: Mean reversion speed of volatility
        theta: Long-term mean of volatility
        sigma: Volatility of volatility
        rho: Correlation between the underlying asset and volatility processes
        
    Returns:
        The price of the European call option
    """
    
    # Define some useful constants
    i = 1j
    beta = kappa - i*rho*sigma
    gamma = np.sqrt(beta**2 + sigma**2)
    rho2 = rho**2
    a = kappa*theta
    b = kappa + gamma
    
    # Calculate the characteristic function of the log-stock price
    u = 0.5
    d = np.sqrt((rho2 - 1)**2 + 4*rho2*(sigma**2/(2*gamma))*(kappa - i*rho*sigma + gamma))
    g = (beta - rho2 + d) / (2*rho2)
    D = (beta - rho2 + d)**2 - 4*rho2*(beta - i*rho*sigma + gamma)
    G = (beta - rho2 + d - D) / (4*rho2)
    C = r*i*u + a/(sigma**2)*((beta - rho2 + d)*T - 2*np.log((1 - G*np.exp(-d*T))/(1 - G)))
    D = (1 - np.exp(-d*T)) / (1 - G*np.exp(-d*T))
    B = np.exp(C + D*V0)
    
    # Calculate the price of the option
    d1 = (np.log(S/K) + (r + 0.5*V0) * T) / (np.sqrt(V0*T))
    d2 = d1 - np.sqrt(V0*T)
    price = S*norm.cdf(d1) - K*np.exp(-r*T)*norm.cdf(d2)
    
    return price, norm.cdf(d1), d2

In [None]:
def calculate_heston(instrument1: pd.DataFrame, 
                     instrument2: pd.DataFrame,
                     K: float=100,
                     r: float=0.0,):
    
    instrument1 = np.array(instrument1)
    instrument2 = np.array(instrument2)
    # using given data to calculate variables kappa, theta, sigma, rho
    V0 = instrument2[0].mean()
    Vfinal = instrument2[-1].mean()
    theta = V0 + (Vfinal - V0) / instrument2[-1].std()
    kappa = 2*(theta - V0) / (Vfinal*instrument2[-1].std())
    sigma = 0.2 # assumption
    rho = -0.5 # assumption
    K=K
    
    S = instrument1.flatten()
    T = (31 - np.arange(instrument1.shape[1]))/365
    T = np.tile(T, (instrument1.shape[0],1)).flatten()
    _, delta, _ = heston_call_price(S, K, r, T, V0, kappa, theta, sigma, rho)
    return delta.reshape(instrument1.shape)

In [None]:
heston_deltas_test = pd.DataFrame(calculate_heston(test1, test2))
heston_deltas_train = pd.DataFrame(calculate_heston(train1, train2))

In [None]:
plot_figures(heston_deltas_test, pd.DataFrame(final), test1)

In [None]:
greek = pd.DataFrame(calculate_heston(train1, train2))
df = train1
fig, axs = plt.subplots(2, 3)
fig.set_size_inches(18.5, 10.5)
fig.suptitle('Delta spread at different time intervals: Train set', size=45)

axs[0,0].plot(df.loc[:, 0], greek.loc[:, 0], 'bo', label='Heston')
axs[0,0].legend()
axs[0,0].set_title('Time 0')
axs[0,0].set_ylim([0, 1])
axs[0,1].plot(df.loc[:, 1], greek.loc[:, 1], 'bo', label='Heston')
axs[0,1].legend()
axs[0,1].set_title('Time 1')
axs[0,2].plot(df.loc[:, 5], greek.loc[:, 5], 'bo', label='Heston')
axs[0,2].legend()
axs[0,2].set_title('Time 5')
axs[1,0].plot(df.loc[:, 15], greek.loc[:, 15], 'bo', label='Heston')
axs[1,0].legend()
axs[1,0].set_title('Time 15')
axs[1,1].plot(df.loc[:, 25], greek.loc[:, 25], 'bo', label='Heston')
axs[1,1].legend()
axs[1,1].set_title('Time 25')
axs[1,2].plot(df.loc[:, 30], greek.loc[:, 30], 'bo', label='Heston')
axs[1,2].legend()
axs[1,2].set_title('Time 30')

for ax in axs.flat:
    ax.set(xlabel='Spot price', ylabel='Delta')