In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# ANN learning libraries
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow import keras
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
import scipy.stats as si
from scipy.stats import norm
from functools import partial
from scipy.optimize import minimize


In [None]:
import pandas as pd

data = pd.read_csv("Data_SABR_vol_mult_rst.txt", header=None)
# data headings: [v, alpha, rho, imp vol 1-36]
data


In [None]:
# define inputs and targets
targets = data.iloc[:, 0:3]
data = data.iloc[:, 3:]


In [None]:
# split data into train and test sets
train_inputs, test_inputs, train_targets, test_targets = train_test_split(
    data, targets, test_size=0.3, shuffle=False)

In [None]:
# store test inputs and targets for resultsDataFrame(test_targets).to_csv("test_targetsSABRparam.csv")
pd.DataFrame(test_inputs).to_csv("test_inputsSABRparam.csv")
pd.DataFrame(test_targets).to_csv("test_targetsSABRparam.csv")


In [None]:
# scale data
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
data = scaler.fit_transform(data)

In [None]:
# convert to tensor
train_inputs = tf.convert_to_tensor(train_inputs, dtype='float64')
train_targets = tf.convert_to_tensor(train_targets.values, dtype='float64')
test_inputs = tf.convert_to_tensor(test_inputs, dtype='float64')
test_targets = tf.convert_to_tensor(test_targets.values, dtype='float64')


In [None]:
output_size = 3 
input_size = train_inputs.shape[1]

hidden_layer_size = 249


model = tf.keras.Sequential([
    tf.keras.layers.Dense(
        input_size, activation=tf.nn.relu),  # 1st hidden layer
    tf.keras.layers.Dense(
        hidden_layer_size, activation=tf.nn.relu),  # 2nd hidden layer
    tf.keras.layers.Dense(
        hidden_layer_size, activation=tf.nn.relu),  # 3rd Hidden layer
    tf.keras.layers.Dense(
        hidden_layer_size, activation=tf.nn.relu),  # 4th Hidden layer
    tf.keras.layers.Dense(
        hidden_layer_size, activation=tf.nn.relu),  # 5th Hidden layer
    tf.keras.layers.Dense(output_size, activation="linear")  # output layer

])


In [None]:
# set an early stopping mechanism
LRScheduler = ReduceLROnPlateau(monitor='loss', factor=0.1, patience=15,
                                verbose=1, min_delta=0.0001, cooldown=5, min_lr=0.00001)


In [None]:
# create customized loss function
def my_loss_fn(y_true, y_pred):
    squared_difference = tf.square(y_true - y_pred)
    squared_difference = squared_difference * [0.25, 0.25,40]
    return tf.reduce_mean(squared_difference, axis=-1)  # Note the `axis=-1`

In [None]:
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Dense, Input, BatchNormalization, Concatenate

In [None]:
# define separate model
units_1=[26,26,26,26,256]
units_2=[16,16,16,16,256]
units_3=[195,195,195,195,195,195,256]
input= Input(shape=(36,))

x_1=Dense(units_1[0], activation="relu")(input)
#x_1=BatchNormalization()(x_1)
for i in range(1, len(units_1)-1):
    x_1=Dense(units_1[i], activation="relu")(x_1)
    #x_1=BatchNormalization()(x_1)

x_2=Dense(units_2[0], activation="relu")(input)
#x_2=BatchNormalization()(x_2)
for i in range(1, len(units_2)-1):
    x_2=Dense(units_2[i], activation="relu")(x_2)
   # x_2=BatchNormalization()(x_2)
x_3=Dense(units_3[0], activation="relu")(input)
#x_2=BatchNormalization()(x_2)
for i in range(1, len(units_3)-1):
    x_3=Dense(units_3[i], activation="relu")(x_3)
   # x_2=BatchNormalization()(x_2)

x=Concatenate()([x_1,x_2,x_3])
x=Dense(256, activation="relu")(x)

output=Dense(3)(x)


model=Model(inputs=[input], outputs=[output])
tf.keras.utils.plot_model(model, show_shapes=True)

In [None]:
# Set early stopping mechanism
LRScheduler = ReduceLROnPlateau(monitor='loss', factor=0.1, patience=15,
                                verbose=1, min_delta=0.00001, cooldown=5, min_lr=0.0001)

In [None]:
# begin training
adam = Adam(lr=2e-05)
model.compile(optimizer=adam, loss=my_loss_fn, metrics=["mean_absolute_error"])

batch_size = 216

max_epochs = 50000


history = model.fit(train_inputs,  
                    train_targets,  
                    batch_size=batch_size,  
                    epochs=max_epochs,
                    validation_split=0.1,
                    callbacks=[
                        keras.callbacks.EarlyStopping(monitor='val_loss', patience=40,min_delta=0.0000001),
                        LRScheduler
                        ],
                    verbose=True)

In [None]:
# store predicted parameters
nu_pred = model.predict(test_inputs)[0:1000, 0]
pd.DataFrame(nu_pred).to_csv("SABR_nu_pred.csv")
alpha_pred = model.predict(test_inputs)[0:1000, 1]
pd.DataFrame(alpha_pred).to_csv("SABR_alpha_pred.csv")
rho_pred = model.predict(test_inputs)[0:1000, 2]
pd.DataFrame(rho_pred).to_csv("SABR_rho_pred.csv")


In [None]:
# plot mse train and validation across epochs
loss_train = history.history['loss']
loss_val = history.history['val_loss']
epochs = range(1, len(history.epoch)+1)
plt.plot(epochs, loss_train, 'g', label='Training Mean Square Error')
plt.plot(epochs, loss_val, 'b', label='Validation Mean Square Error')
plt.title('Training and Validation Mean Square Error')
plt.xlabel('Epochs')
plt.ylabel('Mean Square Error')
plt.legend()
plt.show()


In [None]:
# plot mse train and validation across epochs
loss_train = history.history["mean_absolute_error"]
loss_val = history.history["val_mean_absolute_error"]
epochs = range(1, len(history.epoch)+1)
plt.plot(epochs, loss_train, 'g', label='Training Mean Absolute Error')
plt.plot(epochs, loss_val, 'b', label='Validation Mean Absolute Error')
plt.title('Training and Validation Mean Absolute Error')
plt.xlabel('Epochs')
plt.ylabel('Mean Absolute Error')
plt.legend()
plt.show()


In [None]:
# evaluate model
model.evaluate(test_inputs ,test_targets)

In [None]:
# save model
model_dir = 'paramSABRModel.h5'
model.save(model_dir)

In [None]:
# load model
from tensorflow.keras.models import load_model, save_model
calibration_model = load_model('paramSABRModel.h5', custom_objects={
                               'my_loss_fn': my_loss_fn})


In [None]:
# time pre-trained model on 1000 samples
import time
start = time.time()

s = (1000, output_size)
y_pred = np.zeros(s)
for i in range(1000):
    X_try = test_inputs[i, :]
    c = np.array([X_try])
    y_pred[i, :] = calibration_model.predict(c)

end = time.time()

# total time taken
print(f"Runtime of the program is {end - start}")

pd.DataFrame(y_pred).to_csv("SABR_param_pred.csv")


In [None]:
# save true targets
test_targets = test_targets.numpy()
test_targets_sub = test_targets[0:999, :]
test_targets_sub
pd.DataFrame(test_targets_sub).to_csv("SABR_param_true.csv")
