In [1]:
import pandas as pd
import math
import matplotlib.pyplot as plt
import keras_tuner as kt
import plotly.express as px
import numpy as np
import tensorflow as tf
import plotly.graph_objects as go
from tqdm import tqdm
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_percentage_error, mean_squared_error, mean_absolute_error, r2_score
from tensorflow.keras.models import save_model, model_from_json, Sequential
from tensorflow.keras.layers import Dense, LSTM, Dropout
from tensorflow import keras
from keras_tuner.tuners import RandomSearch
from keras.models import load_model
from plotly.subplots import make_subplots
from keras_tuner.engine.hyperparameters import HyperParameters
from typing import Union
import warnings
warnings.simplefilter("ignore", UserWarning)
import plotly.io as pio
pio.renderers.default='notebook'


In [2]:
df = pd.read_csv('../../data/cleaned_data.csv')
data = df['close'].values
data = data.reshape(-1,1)

In [3]:
def min_max_scale(train: np.array, val:np.array, test: np.array) -> Union[MinMaxScaler, np.array, np.array, np.array]:
    """ Tranform the train and test data into min max scale of train data"""
    scaler = MinMaxScaler()
    scaler = scaler.fit(train)
    train_normalized = scaler.transform(train)
    test_normalized = scaler.transform(test)
    val_normalized = scaler.transform(val)
    return scaler, train_normalized, val_normalized, test_normalized

def data_divider(data: np.array, threshold: int):
    """ This functions divideds the data (close price) into 80 20 ration for test and train data """
    train_test_divider = int(len(data)*threshold)
    training_data, testing_data = data[:train_test_divider], data[train_test_divider:]
    return training_data, testing_data

def sliding_window(data: [], window_length: int, pred_len: int = 1) -> Union[np.array, np.array]:
    """ 
    This function creates a sliding window pattern from the data given and window length given.
    For example:
    Data = [[1],[2],[3],[4],[5],[6]]
    sliding window = 2
    pred_len = 1
    X = [[[1],[2]],[[2],[3]],[[3],[4]],[[4],[5]]]
    Y = [[3],[4],[5],[6]]
    
    """
    X = []
    Y = []
    for i in range(len(data) - window_length):
        input_end = i + window_length
        output_end = input_end + pred_len
        if output_end > len(data):
            break
        X.append(data[i: input_end])
        Y.append(data[input_end: output_end])
    
    return np.array(X), np.array(Y)

def model_builder(hp):
    """ Keras hyper paramter tuner model builder"""
    hp_dropout = hp.Choice('dropout', values=[0.05, 0.1, 0.2])  
    model = Sequential()
    model.add(LSTM(hp.Int('first_layer_neurons', min_value=32, max_value=512, step=32), return_sequences=True))
    for i in range(hp.Int('n_layers', 1, 4)):
        model.add(LSTM(hp.Int(f'lstm_{i}_units',min_value=32,max_value=512,step=32),return_sequences=True))
    model.add(Dropout(hp_dropout))
    model.add(LSTM(hp.Int('second_layer_neurons', min_value=32, max_value=512, step=32)))
    model.add(Dense(1))
    hp_learning_rate = hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])

    model.compile(optimizer=keras.optimizers.Adam(learning_rate=hp_learning_rate),
                  loss='mse',
                  metrics='mse')

    return model

def show_best_hyperparamters(best_hps):
    print('Best Hyper Parameters\n')
    print('Layer 1 neuron: ', best_hps.get('first_layer_neurons'))
    print('Layer 2 neuron: ' , best_hps.get('second_layer_neurons'))
    print('n_layers: ' , best_hps.get('n_layer'))
    print('learning_rate: ', best_hps.get('learning_rate'))
    print('Dropout rate: ', best_hps.get('dropout'))
    
def calculate_metrics(test: np.ndarray, predict: np.ndarray) -> float:
    """."""
    RMSE = mean_squared_error(test.flatten(), predict.flatten(), squared=False)
    MSE = mean_squared_error(test.flatten(), predict.flatten())
    MAE = mean_absolute_error(test.flatten(), predict.flatten())
    MAPE = mean_absolute_percentage_error(test.flatten(), predict.flatten())
    r2 = r2_score(test.flatten(), predict.flatten())
    print('mse: {}, mae: {}, rmse: {}, mape: {}, R2: {}'.format(MSE, MAE, RMSE, MAPE, r2))
    

In [37]:
lookback = 24
pred_len = 1
train, test = data_divider(data, 0.8)
val, test = data_divider(test, 0.5)
scaler, train_normalized, val_normalized, test_normalized = min_max_scale(train, val, test)
x_train, y_train = sliding_window(train_normalized, lookback, pred_len)
x_val, y_val = sliding_window(val_normalized, lookback, pred_len)
x_test, y_test = sliding_window(test_normalized, lookback, pred_len)

In [38]:
# tuner = kt.RandomSearch(model_builder, objective='val_mse', max_trials=30, directory='my_dir', project_name='intro_to_kt2')
# stop_early = keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)
# tuner.search(x_train, y_train, epochs=100, validation_data=(x_val,y_val), callbacks=[stop_early])

In [39]:
# best_hps = tuner.get_best_hyperparameters()[0]
# show_best_hyperparamters(best_hps)

In [40]:
def model():
    
#     np.random.seed(1234)
#     tf.random.set_seed(1234)
    
    model = keras.Sequential()
    model.add(LSTM(100, activation= 'relu', return_sequences=True))
    model.add(Dropout(0.2))
    model.add(LSTM(100, activation= 'relu'))
    model.add(Dense(pred_len))

    model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.00001),
                  loss='mse')

    return model

In [41]:
# Build the model with the best hp.
# model = model_builder(best_hps)

model = model()
# stop_training_early = keras.callbacks.EarlyStopping()
stop_training_early = keras.callbacks.EarlyStopping(monitor="val_loss", patience = 2)
history = model.fit(x_train, y_train, epochs=3 , verbose=1, shuffle=False, validation_data = (x_val, y_val), callbacks=[stop_training_early])

Epoch 1/3
Epoch 2/3
Epoch 3/3


In [42]:
model.summary()

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
lstm_2 (LSTM)                (None, 26, 100)           40800     
_________________________________________________________________
dropout_1 (Dropout)          (None, 26, 100)           0         
_________________________________________________________________
lstm_3 (LSTM)                (None, 100)               80400     
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 101       
Total params: 121,301
Trainable params: 121,301
Non-trainable params: 0
_________________________________________________________________


In [43]:
y_predict = model.predict(x_test, verbose = 0)

In [69]:
y_test_inverse = scaler.inverse_transform(y_test.squeeze().reshape(-1,1))

In [66]:
y_predict_inverse = scaler.inverse_transform(y_predict)
y_predict_inverse

array([[65684.586],
       [65640.35 ],
       [65599.33 ],
       ...,
       [37873.367],
       [37869.7  ],
       [37864.89 ]], dtype=float32)

In [119]:
# plt.plot(y_test_inverse.flatten()[-15:], label='test')
# plt.plot(y_predict_inverse.flatten()[-15:], 'r-', label='predict')
# plt.legend()
# np.concatenate(list(x_test[-2:][0,].flatten()),list(y_test.flatten()[-2:]))
y_test_test = np.array(list(x_test[-2:][0,].flatten()) + list(y_test.flatten()[-2:]))
y_test_predict = np.array(list(x_test[-2:][0,].flatten()) + list(y_predict.flatten()[-2:]))

In [125]:
# fig = go.Figure()
# fig.add_trace(go.Scatter(y = y_test.flatten()[-2:], name = 'Actual'))
# fig.add_trace(go.Scatter(y = y_predict.flatten()[-2:], name = 'Predict'))
# fig.show()

# fig = go.Figure()
# fig.add_trace(go.Scatter(y = y_test_test, name = 'Actual'))
# fig.add_trace(go.Scatter(y = y_test_predict, name = 'Predict'))
# fig.show()

fig = go.Figure()
fig.add_trace(go.Scatter(y = y_test_inverse.flatten(), name = 'Actual'))
fig.add_trace(go.Scatter(y = y_predict_inverse.flatten(), name = 'Predict'))
fig.show()

In [87]:
calculate_metrics(y_test_inverse, y_predict_inverse)

mse: 1566577.3910016504, mae: 1009.140633037238, rmse: 1251.6298937791676, mape: 0.022732785569907807, R2: 0.96225973091944
