In [None]:
import math
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
sns.set_theme(style="ticks", palette="pastel")

import tensorflow as tf
from sklearn.metrics import mean_squared_error, mean_absolute_error

import warnings
warnings.filterwarnings('ignore')

## Loading of dataset

In [None]:
path_default = "./data/stock_data.csv"
stock_data = pd.read_csv(path_default)
stock_data['Date'] = pd.to_datetime(stock_data['Date'])
stock_data

In [None]:
companies = stock_data['Stock'].unique()
print(companies)

## Split training and test set

In [None]:
stocks ={}
for i in companies:
  stocks[i] = stock_data[stock_data['Stock'] == i][["Date", "Close"]]
  print(stocks[i])

In [None]:
def SplitData(data, date):

  close_train = data['Close'][data['Date'] < date].to_numpy()
  data_train = []
  X_train = [] 
  Y_train = []
  for i in range(0, len(close_train), 5):
    try:
      data_train.append(close_train[i : i + 5])
    except:
      pass

  if len(data_train[-1]) < 5:
    data_train.pop(-1)
  
  X_train = data_train[0 : -1]
  X_train = np.array(X_train)
  X_train = X_train.reshape((-1, 5, 1))
  Y_train = data_train[1 : len(data_train)]
  Y_train = np.array(Y_train)
  Y_train = Y_train.reshape((-1, 5, 1))

  close_test = data['Close'][data['Date'] >= date].to_numpy()
  data_test = []
  X_test = []
  Y_test = []
  for i in range(0, len(close_test), 5):
    try:
      data_test.append(close_test[i : i + 5])
    except:
      pass

  if len(data_test[-1]) < 5:
    data_test.pop(-1)
  
  X_test = data_test[0 : -1]
  X_test = np.array(X_test)
  X_test = X_test.reshape((-1, 5, 1))
  Y_test = data_test[1 : len(data_test)]
  Y_test = np.array(Y_test)
  Y_test = Y_test.reshape((-1, 5, 1))

  return X_train, Y_train, X_test, Y_test

## Create model

In [None]:
def Model():
  model = tf.keras.models.Sequential([
                                      tf.keras.layers.LSTM(200, input_shape = (5, 1), activation = tf.nn.leaky_relu, return_sequences = True),
                                      tf.keras.layers.LSTM(200, activation = tf.nn.leaky_relu),
                                      tf.keras.layers.Dense(200, activation = tf.nn.leaky_relu),
                                      tf.keras.layers.Dense(100, activation = tf.nn.leaky_relu),
                                      tf.keras.layers.Dense(50, activation = tf.nn.leaky_relu),
                                      tf.keras.layers.Dense(5, activation = tf.nn.leaky_relu)
                                      ])
  return model

In [None]:
model = Model()

In [None]:
tf.keras.utils.plot_model(model, show_shapes=True)

In [None]:
model.summary()

## Custom Learning Rate

In [None]:
def scheduler(epoch):
  
  if epoch <= 150:
    lrate = (10 ** -5) * (epoch / 150) 
  elif epoch <= 400:
    initial_lrate = (10 ** -5)
    k = 0.01
    lrate = initial_lrate * math.exp(-k * (epoch - 150))
  else:
    lrate = (10 ** -6)
  
  return lrate

In [None]:
epochs = [i for i in range(1, 1001, 1)]
lrate = [scheduler(i) for i in range(1, 1001, 1)]
plt.plot(epochs, lrate)
plt.show()

In [None]:
callback = tf.keras.callbacks.LearningRateScheduler(scheduler)

## Train model

### Apple model

In [None]:
date_mark = '2017-01-01'
AAPL_X_train,AAPL_Y_train,AAPL_X_test,AAPL_Y_test=SplitData(stocks['AAPL'], date_mark)

In [None]:
AAPL_Model = Model()

In [None]:
AAPL_Model.compile(optimizer = tf.keras.optimizers.Adam(), loss = 'mse', metrics = tf.keras.metrics.RootMeanSquaredError())

In [None]:
AAPL_hist = AAPL_Model.fit(AAPL_X_train, AAPL_Y_train, epochs = 1000, validation_data = (AAPL_X_test, AAPL_Y_test), callbacks=[callback])

In [None]:
history_dict = AAPL_hist.history

loss = history_dict["loss"]
root_mean_squared_error = history_dict["root_mean_squared_error"]
val_loss = history_dict["val_loss"]
val_root_mean_squared_error = history_dict["val_root_mean_squared_error"]

epochs = range(1, len(loss) + 1)

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2)

fig.set_figheight(5)
fig.set_figwidth(15)

ax1.plot(epochs, loss, label = 'Training Loss')
ax1.plot(epochs, val_loss, label = 'Validation Loss')
ax1.set(xlabel = "Epochs", ylabel = "Loss")
ax1.legend()

ax2.plot(epochs, root_mean_squared_error, label = "Training Root Mean Squared Error")
ax2.plot(epochs, val_root_mean_squared_error, label = "Validation Root Mean Squared Error")
ax2.set(xlabel = "Epochs", ylabel = "Loss")
ax2.legend()

plt.show()

### Predicting the closing stock price of Apple

In [None]:
AAPL_prediction = AAPL_Model.predict(AAPL_X_test)

In [None]:
plt.figure(figsize=(20, 5))
plt.plot(stock_data['AAPL']['Date'][stock_data['AAPL']['Date'] < date_mark], 
        stock_data['AAPL']['Close'][stock_data['AAPL']['Date'] < date_mark], label = 'Training')

plt.plot(stock_data['AAPL']['Date'][stock_data['AAPL']['Date'] >= date_mark], 
        stock_data['AAPL']['Close'][stock_data['AAPL']['Date'] >= date_mark], label = 'Testing')

plt.plot(stock_data['AAPL']['Date'][stock_data['AAPL']['Date'] >= date_mark], 
        AAPL_prediction.reshape(-1), label = 'Predictions')
        
plt.xlabel('Time')
plt.ylabel('Closing Price')
plt.legend(loc = 'best')

In [None]:
rmse = math.sqrt(mean_squared_error(AAPL_Y_test.reshape(-1, 5), AAPL_prediction))
mape = np.mean(np.abs(AAPL_prediction - AAPL_Y_test.reshape(-1, 5))/np.abs(AAPL_Y_test.reshape(-1, 5)))
print(f'RMSE: {rmse}')
print(f'MAPE: {mape}')