In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm

import tensorflow as tf
from keras import backend as K

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
def bscall_vanilla(S, K, T, r, sig):
  d1 = (np.log(S/K)+(r+0.5*sig**2)*T)/(sig*np.sqrt(T))
  d2 = (np.log(S/K)+(r-0.5*sig**2)*T)/(sig*np.sqrt(T))
  return S*norm.cdf(d1)-K*np.exp(-r*T)*norm.cdf(d2)

def bsput_vanilla(S, K, T, r, sig):
  d1 = (np.log(S/K)+(r+0.5*sig**2)*T)/(sig*np.sqrt(T))
  d2 = (np.log(S/K)+(r-0.5*sig**2)*T)/(sig*np.sqrt(T))
  return K*np.exp(-r*T)*norm.cdf(-d2)-S*norm.cdf(-d1)

def bscall_lookback(S, m, T, t, r, sig):
  tau = T - t
  a1 = (np.log(S/m) + (r+0.5*sig**2)*tau)/(sig*np.sqrt(tau))
  a2 = (np.log(S/m) + (r-0.5*sig**2)*tau)/(sig*np.sqrt(tau))
  a3 = (np.log(S/m) - (r-0.5*sig**2)*tau)/(sig*np.sqrt(tau))
  return S*norm.cdf(a1) - m*np.exp(-r*tau)*norm.cdf(a2) - ((S*sig**2)/2*r)*norm.cdf(-a1) + ((S*sig**2)/2*r)*np.exp(-r*tau)*(m/S)**((2*r)/(sig**2))*norm.cdf(-a3)

## Simulated market data generation

In [None]:
S0 = 1
K = 1
T = 30/365
r = 0.02
sig = 0.2

M = 50000
M_test = 10000
N = 250

dt = T/N
rdt = r*dt
sigsdt = sig*np.sqrt(dt)

option_type = 'put' #call or put
model_name = f'{N}dt_mlp_floating_lookback_{option_type}_opt_model'


# TRAIN DATASET
np.random.seed(1234)

S = np.empty([M,N+1])
rv = np.random.normal(r*dt,sigsdt,[M,N])

for i in range(M):
  S[i,0] = S0
  for j in range(N):
    S[i,j+1] = S[i,j] * (1+rv[i,j])

In [None]:
# Monte carlo pricing
mc_price = 0
if(option_type=='call'):
  print("call monte carlo price")
  mc_price = [s[-1] - np.min(s) for s in S]
  mc_price = np.mean(mc_price) * np.exp(-r*T)
else:
  print("put monte carlo price")
  mc_price = [np.max(s) - s[-1] for s in S]
  mc_price = np.mean(mc_price) * np.exp(-r*T)

print(mc_price)

In [None]:
premium = mc_price * np.ones([M,1])
cost = np.zeros([M,1])
initial_optimum_price = np.ones([M,1]) * S0
SS = [S[:,i].reshape(M,1) for i in range(N+1)]
x = [premium] + [cost] + [initial_optimum_price] + [SS]
y = np.zeros([M,1])


# TEST DATASET
np.random.seed(4321)

S_test = np.empty([M_test,N+1])
rv_test = np.random.normal(r*dt,sigsdt,[M_test,N])

for i in range(M_test):
  S_test[i,0] = S0
  for j in range(N):
    S_test[i,j+1] = S_test[i,j] * (1+rv_test[i,j])


premium_test = mc_price * np.ones([M_test,1])
cost_test = np.zeros([M_test,1])
initial_optimum_price_test = np.ones([M_test,1]) * S0
SS_test = [S_test[:,i].reshape(M_test,1) for i in range(N+1)]
x_test = [premium_test] + [cost_test] + [initial_optimum_price_test] + [SS_test]
y_test = np.zeros([M_test,1])

## Model architecture

#### Delta model block

In [None]:
def delta_model():
  price_input = tf.keras.Input(shape=(2,))#first input is price, second is past minimum or maximum price
  delta = tf.keras.layers.Dense(32, activation='leaky_relu', input_shape=(2,))(price_input)
  delta = tf.keras.layers.BatchNormalization()(delta)
  delta = tf.keras.layers.Dense(32, activation='leaky_relu')(delta)
  delta = tf.keras.layers.BatchNormalization()(delta)
  delta = tf.keras.layers.Dense(32, activation='leaky_relu')(delta)
  delta = tf.keras.layers.BatchNormalization()(delta)
  delta = tf.keras.layers.Dense(1, activation='linear')(delta)

  return tf.keras.Model(inputs=price_input, outputs=delta)

#### Model construction

In [None]:
train_new_model = True
model = None

if(not train_new_model):
  # Load the model from Google Drive
  model = tf.keras.models.load_model(f'/content/drive/My Drive/final_project_financial_eng/{model_name}.h5')

if(train_new_model):
  my_input = []

  premium = tf.keras.layers.Input(shape=(1,), name="premium")
  hedge_cost = tf.keras.layers.Input(shape=(1,), name="hedge_cost")
  price = tf.keras.layers.Input(shape=(1,), name="price")
  optimum_price = tf.keras.layers.Input(shape=(1,), name="optimum_price")

  my_input = my_input + [premium] + [hedge_cost] + [optimum_price] + [price]

  for j in range(N):
    new_price = tf.keras.layers.Input(shape=(1,), name='S'+str(j+1))
    my_input = my_input + [new_price]

    price_inc = tf.keras.layers.Subtract(name='price_inc_'+str(j))([price, new_price])

    price_optimum_price_concat = tf.keras.layers.Concatenate(name='S-optimum_S_concat'+str(j))([price,optimum_price])
    delta = delta_model()(price_optimum_price_concat)

    cost = tf.keras.layers.Multiply(name="stock_"+str(j))([delta, price_inc])
    hedge_cost = tf.keras.layers.Add(name='cost_'+str(j))([hedge_cost, cost])
    if(option_type=='call'):
      optimum_price = tf.keras.layers.Minimum(name='optimum_price'+str(j))([optimum_price,new_price])
    else:
      optimum_price = tf.keras.layers.Maximum(name='optimum_price'+str(j))([optimum_price,new_price])
    price = new_price

  payoff = None
  if(option_type=='call'):
    payoff = tf.keras.layers.Subtract(name='payoff')([price,optimum_price])
  else:
    payoff = tf.keras.layers.Subtract(name='payoff')([optimum_price,price])
  
  cum_cost = tf.keras.layers.Add(name='hedge_cost_plus_payoff')([hedge_cost,payoff])
  cum_cost = tf.keras.layers.Subtract(name='cum_cost-premium')([cum_cost,premium])

  model = tf.keras.Model(inputs=my_input, outputs=cum_cost)

In [None]:
# tf.keras.utils.plot_model(model)

## Model Training & Testing

In [None]:
if(train_new_model):
  model.compile(loss='mse', optimizer='adam')

#### Training on previously created artificial data (simulated market scenarios)

In [None]:
if(train_new_model):
  hist = model.fit(x,y, batch_size=64,epochs=25,verbose=True,validation_split=0.2)

In [None]:
if(train_new_model):
  plt.plot(hist.history['loss'],label='Training Loss')
  plt.plot(hist.history['val_loss'],label='Validation Loss')
  plt.legend()

  plt.show()

#### Testing

In [None]:
y_pred_test = model.predict(x_test)

In [None]:
#test data
plt.hist(y_pred_test, bins=30)
plt.show()

In [None]:
#test data
plt.plot(S_test[0:1500,-1], y_pred_test[0:1500], marker = ".", linestyle='none')
plt.show()

In [None]:
#test data
plt.plot(S_test[0:-1,-1], y_pred_test[0:-1], marker = ".", linestyle='none')
plt.show()

In [None]:
# Test data
print(np.mean(y_pred_test))
print(np.std(y_pred_test))

In [None]:
model.count_params()

In [None]:
# Save the model to Google Drive
model.save(f'/content/drive/My Drive/final_project_financial_eng/{model_name}.h5')