In [1]:
from keras.models import Sequential, Model, load_model
from keras.layers import Dense, LeakyReLU, BatchNormalization, LSTM, Bidirectional, Input, Concatenate, Dropout
from keras.callbacks import TensorBoard
from keras.optimizers import Adam
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from geneticalgorithm import geneticalgorithm as ga
import math
import datetime

In [2]:
call_df = pd.read_csv('')
History = pd.read_csv('')
risk_free_asset = pd.read_csv('')

# Format and split data before training

In [4]:

# Add risk-free asset as feature to df on date
n_timesteps = 1

padded = np.insert(risk_free_asset.Rate.values, 0, np.array([np.nan] * n_timesteps))
rolled = np.column_stack([np.roll(padded, i) for i in range(n_timesteps)])
rolled = rolled[~np.isnan(rolled).any(axis=1)]
rolled = np.column_stack((risk_free_asset.Date.values[n_timesteps - 1:], rolled))
price_history = pd.DataFrame(data=rolled)
joined = call_df.join(price_history.set_index(0), on='Date')

joined['r'] = joined[1]
call_df = joined.drop(columns=[1,'LTP', 'Risk_free_rate'],axis=1)


#Creates the stock dynamics for the n_timesteps back.

underlying=History
n_timesteps = 30
padded = np.insert(underlying.Close.values, 0, np.array([np.nan] * n_timesteps))
rolled = np.column_stack([np.roll(padded, i) for i in range(n_timesteps)])
rolled = rolled[~np.isnan(rolled).any(axis=1)]
rolled = np.column_stack((underlying.Date.values[n_timesteps - 1:], rolled))
price_history = pd.DataFrame(data=rolled)
joined = call_df.join(price_history.set_index(0), on='Date')
call_df=joined
call_df = call_df.drop(columns=['Date','Expiry'])
call_df = call_df.dropna()

features = 4

In [None]:
call_df=call_df[['Strike Price','Close','nDiff','r','Underlying Value']]

In [25]:
call_X_train, call_X_test, call_y_train, call_y_test = train_test_split(call_df.drop(columns=['Close'],axis=1).values,
                                                                        (call_df['Close']).values,
                                                                        #shuffle=False,
                                                                         random_state=42,
                                                                  test_size=0.01)
call_X_test= np.array(call_X_test, dtype=np.float64)
call_y_test= np.array(call_y_test, dtype=np.float64)
call_X_train=np.asarray(call_X_train).astype(np.float64)


call_X_train = [call_X_train[:, -n_timesteps:].reshape(call_X_train.shape[0], n_timesteps, 1), call_X_train[:, :4]]
call_X_test = [call_X_test[:, -n_timesteps:].reshape(call_X_test.shape[0], n_timesteps, 1), call_X_test[:, :4]]


call_y_train=np.asarray(call_y_train).astype(np.float64)

# Calibration

In [None]:
varbound=np.array([[1,200],[1,200],[1,200],[0,2],[1,15],[1,200],[10,4000]])
algorithm_param = {'max_num_iteration': 100,\
            'population_size':15,\
            'mutation_probability':0.1,\
            'elit_ratio': 0.01,\
            'crossover_probability': 0.5,\
            'parents_portion': 0.3,\
            'crossover_type':'uniform',\
            'max_iteration_without_improv':5}

In [None]:
def LSTM_GA():
    def f(x):
        first_neuron, second_neuron, third_neuron, dropout, hidden_layers, fort_neuron, neurons = x
        close_history = Input((n_timesteps, 1))
        input2 = Input((features,))
        with tf.device('/device:GPU:0'):
            lstm = Sequential()
            lstm.add(Bidirectional(LSTM(units=int(first_neuron), input_shape=(n_timesteps, 1), return_sequences=True)))
            lstm.add(Bidirectional(LSTM(units=int(second_neuron), return_sequences=True)))
            lstm.add(Bidirectional(LSTM(units=int(third_neuron), return_sequences=True)))
            lstm.add(Bidirectional(LSTM(units=int(fort_neuron), return_sequences=False)))
            input1 = lstm(close_history)
            connect = Concatenate()([input1, input2])
            
            for _ in range(int(hidden_layers) - 1):
                connect = Dense(int(neurons))(connect)
                connect = BatchNormalization()(connect)
                connect = Dropout(dropout/10)(connect)
                connect = LeakyReLU()(connect)
            predict = Dense(1, activation='relu')(connect)

            model=Model(inputs=[close_history, input2], outputs=predict)

            model.compile(optimizer=Adam(learning_rate=1e-4), loss='mse')

            out = model.fit(call_X_train, call_y_train, 
                        batch_size=250,
                        epochs=200,
                        validation_split = 0.01,
                        validation_data=(call_X_test, call_y_test),
                        callbacks=[TensorBoard()],
                        verbose=1)
            
            return np.sqrt(np.mean(np.square(call_y_test-model.predict(call_X_test, batch_size=250).reshape(call_y_test.shape[0]))))
        
    model=ga(function = f,\
            dimension = 7,\
            variable_type = 'int',\
            variable_boundaries = varbound,\
            function_timeout = 4500,
            algorithm_parameters = algorithm_param,
         convergence_curve = True,
         progress_bar = True)

    model.run()
    return model.best_variable

In [None]:
LSTM_params = LSTM_GA()

# Option traning

In [None]:
first_neuron, second_neuron, third_neuron, dropout, hidden_layers, fort_neuron, neurons = LSTM_params

In [275]:
close_history =Input((n_timesteps, 1))
input2 = Input((features,))
with tf.device('/device:GPU:0'):
    lstm = Sequential()
    lstm.add(Bidirectional(LSTM(units=int(first_neuron), input_shape=(n_timesteps, 1), return_sequences=True)))
    lstm.add(Bidirectional(LSTM(units=int(second_neuron), return_sequences=True)))
    lstm.add(Bidirectional(LSTM(units=int(third_neuron), return_sequences=True)))
    lstm.add(Bidirectional(LSTM(units=int(fort_neuron), return_sequences=False)))
    input1 = lstm(close_history)
    connect = Concatenate()([input1, input2])
    
    for _ in range(int(hidden_layers) - 1):
        connect = Dense(int(neurons))(connect)
        connect = BatchNormalization()(connect)
        connect = Dropout(dropout/10)(connect)
        connect = LeakyReLU()(connect)
    predict = Dense(1, activation='relu')(connect)

    model = Model(inputs=[close_history, input2], outputs=predict)

    model.compile(optimizer=Adam(learning_rate=1e-4), loss='mse')

In [276]:
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
history = model.fit(call_X_train, call_y_train, 
                    batch_size = 4900, epochs = 290, 
                    validation_split = 0.01,
                    callbacks = [tensorboard_callback],
                    verbose = 1)
model.save('lstm.h5')

# Metrics

In [None]:
from .utilties import utilties

In [None]:
line1 = utilties.error_metrics(call_y_test, model.predict(call_X_test, batch_size=4900).reshape(call_y_test.shape[0]))

In [5]:
print('MSE: {:.2f} & RMSE: {:.2f} & BIAS: {:.2f}% & AAPE: {:.2f}% & MAPE: {:.2f}% & PE5: {:.2f}\% & PE10: {:.2f}% & PE20: {:.2f}% '.format(*line1))

# Returns


In [310]:
# Create a dataframe with only the Close Stock Price Column
data_target = History.filter(['Close'])

# Convert the dataframe to a numpy array to train the LSTM model
target = data_target.values

# Splitting the dataset into training and test
# Target Variable: Close stock price value

training_data_len = math.ceil(len(target)* 0.75) # training set has 75% of the data
training_data_len

# Normalizing data before model fitting using MinMaxScaler
# Feature Scaling
sc = MinMaxScaler(feature_range=(0,1))
training_scaled_data = sc.fit_transform(target)

In [None]:

# Create a training dataset containing the last 30-day closing price values we want to use to estimate the 31st closing price value.
train_data = training_scaled_data[0:training_data_len  , : ]

X_train = []
y_train = []
for i in range(30, len(train_data)):
    X_train.append(train_data[i-30:i, 0])
    y_train.append(train_data[i, 0])

X_train, y_train = np.array(X_train), np.array(y_train) # converting into numpy sequences to train the LSTM model
X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
print('Number of rows and columns: ', X_train.shape)

In [None]:
model = Sequential()
#Adding the first LSTM layer and some Dropout regularisation
model.add(LSTM(units = 8, return_sequences = True, input_shape = (X_train.shape[1], 1)))
model.add(Bidirectional(LSTM(units=8, return_sequences=True)))
model.add(Bidirectional(LSTM(units=8, return_sequences=True)))
model.add(Bidirectional(LSTM(units=8, return_sequences=False)))
# Adding the output layer
model.add(Dense(units = 1))
# Compiling the RNN
model.compile(optimizer = 'adam', loss = 'mean_squared_error')
# Fitting the RNN to the Training set
model.fit(X_train, y_train, epochs = 100, batch_size = 32)

In [None]:
# Getting the predicted stock price
test_data = training_scaled_data[training_data_len - 30: , : ]

#Create the x_test and y_test data sets
X_test = []
y_test =  target[training_data_len : , : ]
for i in range(30,len(test_data)):
    X_test.append(test_data[i-30:i,0])

# Convert x_test to a numpy array
X_test = np.array(X_test)

#Reshape the data into the shape accepted by the LSTM
X_test = np.reshape(X_test, (X_test.shape[0],X_test.shape[1],1))
print('Number of rows and columns: ', X_test.shape)

In [314]:
predicted_stock_price = model.predict(X_test)
predicted_stock_price = sc.inverse_transform(predicted_stock_price)
predicted_stock_price_2 = model.predict(X_train)
predicted_stock_price_2 = sc.inverse_transform(predicted_stock_price)
valid = data_target[training_data_len:]

In [316]:
valid.to_csv('LSTM-STOCK.csv')