In [None]:
# 4. Project prototype (implementation)

## Install Dependencies and import libraries

# pip install pandas numpy yfinance pandas-ta scikit-learn tensorflow

# https://pypi.org/project/yfinance/ (""" it's an open-source tool that uses Yahoo's publicly available APIs, and is intended for research and educational purposes. """)
# import yfinance, our data source
import yfinance as yf

# https://pypi.org/project/pandas-ta/ ("""An easy to use Python 3 Pandas Extension with 130+ Technical Analysis Indicators. Can be called from a Pandas DataFrame or standalone""")
# import pandas-ta
import pandas_ta as ta

# import pandas and numpy
import pandas as pd 
import numpy as np

# import matplotlib for data visualisation
import matplotlib.pyplot as plt

# import from scikit-learn
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix, ConfusionMatrixDisplay

# import from tensorflow
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import SimpleRNN, Dense, LSTM, Input, GRU
from tensorflow.keras.utils import to_categorical

# insert the stock symbols into a list
symbols_list = ['PFE', 'ROP', 'XYL', 'CPAY', 'INCY']

# we will take the weekly data for the last 10 years
# data_weekly = yf.download(symbols_list, period='10y', interval='1wk')

## Format the data and save it as a CSV file

# source of inspiration: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.stack.html[10]
# Return a reshaped DataFrame having a multi-level inde
# stacked_data_weekly = data_weekly.stack()
# stacked_data_weekly

# Save the data to a CSV so we don't have to make any extra unnecessary requests to the API every time we reload the notebook

# save the dataframe to a csv file
# stacked_data_weekly.to_csv('stacked_data_weekly_1.csv', index=True)

# load the the dataframe from the csv file
df = pd.read_csv('stacked_data_weekly_1.csv').set_index(["Date", "Ticker"])

# df.head(5)

## Perform simple exploritory data analysis

# how many null values in each column
df.isnull().sum()

# the data shape
df.shape

# data basic stats
df.describe()

## Devide the data into five dataframes, one for each stock

# source of inspiration https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.xs.html [11]
# select specific stock data at the 'Ticker' level of this multi index dataframe
df1 = df.xs('PFE', axis=0, level='Ticker', drop_level=True)
df2 = df.xs('ROP', axis=0, level='Ticker', drop_level=True)
df3 = df.xs('XYL', axis=0, level='Ticker', drop_level=True)
df4 = df.xs('CPAY', axis=0, level='Ticker', drop_level=True)
df5 = df.xs('INCY', axis=0, level='Ticker', drop_level=True)

# disply the first dataframe
df1.head(5)

# show the new df shape
df1.shape

## Create the target of the model

# copy the dateframe before modification so we don't get a warning from jupyter notebook
df1 = df1.copy()

# create the 'Next' column to be equal to the next closing price
# this can be accomplished easily by shifting the close column backward by 1
df1["Next"] = df1['Close'].shift(-1)

# create a function that returns 1 if the the next closing price is higher than current closing price and 0 otherwise.
def assign_trend(row):
    if row['Next'] > row['Close']:
        return 1
    elif row['Next'] < row['Close']:
        return 0
    else: # if the next value is missing then return NaN
        return np.nan


# create the 'Trend' column to be equal to the output of the 'assign_trend' function    
df1['Trend'] = df1.apply(assign_trend, axis=1)

# check out the results
df1.head(5)

# Check if the data is balanced

# let's check the occurance of each value in the Trend column
df1['Trend'].value_counts()

df1['Trend'].value_counts()[1]

# percentage of 'trend up' to the whole column
df1['Trend'].value_counts()[1]/df1.shape[0]

## Create a common sense baseline

# this can be accomplished easily by shifting the close column forward by 1
common_sense = df1['Trend'].shift(1)

# measure the average of when the common sense (naive) prediction will match the actual 'Trend'
(common_sense == df1['Trend']).mean()

## Include the technical indicators

#  we can easily check the available indicators in the pandas-ta library
# help(df1.ta.indicators())

#  we can also learn about any specific indicator like this
# help(ta.macd)

# for the time being let's create a function that add all the technical indicators we want to a df
def assign_TIs(_df):
    # apply macd on the Close column in a df and add it to the dataframe    
    mcda = ta.macd(_df["Close"])
    # The MACD (Moving Average Convergence/Divergence) is a popular indicator to that is used to identify a trend
    _df.insert(6, "MACD", mcda["MACD_12_26_9"])
    # Signal is an EMA (exponential moving average) of MACD
    _df.insert(7, "Signal", mcda["MACD_12_26_9"])
    # Histogram is the difference of MACD and Signal
    _df.insert(8, "Histogram", mcda["MACD_12_26_9"])
    
    # apply RSI on the Close column in a df and add it to the dataframe    
    # RSI (Relative Strength Index) is popular momentum oscillator. Measures velocity and magnitude a trend
    rsi = ta.rsi(_df["Close"])
    _df.insert(9, "RSI", rsi)
    
    # apply SMA on the Close column in a df and add it to the dataframe    
    # SMA (Simple Moving Average) is the classic moving average that is the equally weighted average over n periods.
    sma = ta.sma(_df["Close"])
    _df.insert(10, "SMA", sma)
    
    # apply EMA on the Close column in a df and add it to the dataframe    
    # EMA (Exponential Moving Average). The weights are determined by alpha which is proportional to it's length.
    ema = ta.ema(_df["Close"])
    _df.insert(11, "EMA", ema)
    
    return _df

# apply the function to the dataframe
df1 = assign_TIs(df1)

# drop the NaN values
df1.dropna(inplace=True)

# fix the 'Trend' data type to be int
df1 = df1.astype({'Trend': int})

# check the dataframe
df1.head(5)

# the shape of the data now
df1.shape

##  Prepare the data for training

# reset the index
df1.reset_index(inplace = True)

# drop the Date column as it's not necessary for now
df1.drop(['Date'], axis=1, inplace=True)

# df1.head(5)

Create the features list, for now we will use every column except the last two.

# The features list
X1 = df1.iloc[:, :-2]

X1.head(2)

# Create the target, which is the 'Trend' column for now.

# The Target (Trend for now)
y1 = df1.iloc[:, -1]

# initialize a MinMaxScaler instance for a range between 0 and 1
scaler = MinMaxScaler(feature_range=(0, 1))

# pass the features to the scaler
scaled_X1 = scaler.fit_transform(X1)

# scaled_X1

# source of isnpiration: https://stackoverflow.com/questions/47945512/how-to-reshape-input-for-keras-lstm?rq=4 [13]
# create a function to reshape X and y into sequences of x timesteps
def create_seqs(features, target, num_rows):
    # create 2 empty lists to store the newly shaped features and target lists
    X, y = [], []
    
    # iterate over the features
    for i in range(len(features) - num_rows):
        # create indexes of the start and end of each sequence
        seq_s = i
        seq_e = i + num_rows
        
        # the ith sequence will be a slice of the features between the indexes, create it and add it to X
        xi = features[seq_s : seq_e]
        X.append(xi)
        
        # do the same for the target and add it to y
        yi = target[seq_e]
        y.append(yi)
    
    # return the X and y as numpy arraies
    return np.array(X), np.array(y)

# Create sequences
timesteps = 6
X_seq1, y_seq1 = create_seqs(scaled_X1, y1, timesteps)

# check the new shapes for the features and labels sets
X_seq1.shape, y_seq1.shape

# source: https://www.tensorflow.org/api_docs/python/tf/keras/utils/to_categorical [14]
# use to_categorical from tf to converts the target (Trend) to binary class matrix
y_seq1 = to_categorical(y_seq1)

# Devide the data into a training set and a test set in 70-30 ratio

#  sets the training test ratio to be 70-30
training_ratio = int(len(X_seq1) * 0.7)

# # split the data into training and test
X1_train, X1_test = X_seq1[:training_ratio], X_seq1[training_ratio:]
y1_train, y1_test = y_seq1[:training_ratio], y_seq1[training_ratio:]

X1_train.shape, X1_test.shape

## Create and train the baseline classification model

# source of inspiration: François Chollet (11, 2017), “Deep Learning with Python” chapter 6 [8]
# construct the model
def create_model():
    # initialize a sequential model
    model = Sequential()
    
    # add the model layers
    model.add(SimpleRNN(64, input_shape=(timesteps, X1_train.shape[2]), return_sequences=True))
    model.add(SimpleRNN(64, return_sequences=False))
    model.add(Dense(2, activation='softmax'))

    # compile the model
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    
    return model

# initialize the model
model1 = create_model()

# get the model weights before training
# model1.get_weights()

# train the model
history = model1.fit(X1_train, y1_train, validation_split=0.2, epochs=50, batch_size=32, verbose=0)

## Model evaluation and prototype conclusion

# test the model accuracy
model1.evaluate(X1_test, y1_test, verbose=2)

# get predictions from the model given the test set
y1_pred = model1.predict(X1_test)

# source of inspiration: https://stackoverflow.com/questions/48987959/classification-metrics-cant-handle-a-mix-of-continuous-multioutput-and-multi-la [15]
# convert the predictions and test set to be in the shape of a vector of labels
y1_pred_labels = np.argmax(y1_pred, axis=1)
y1_test_labels = np.argmax(y1_test, axis=1)

# get precision, recall, and fscore
precision, recall, fscore, support = precision_recall_fscore_support(y1_test_labels, y1_pred_labels, average='weighted')
print("Precision:", precision)
print("Recall:", recall)
print("F-score:", fscore)

# source of inspiration: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.ConfusionMatrixDisplay.html [16]
conf_mat = confusion_matrix(y1_test_labels, y1_pred_labels)
disp = ConfusionMatrixDisplay(conf_mat)
disp.plot()
plt.show()

# source of the code snippet[17]
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(1, len(loss) + 1)

plt.figure()

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()

# get the model summary
# model1.summary()

# get the model compile configurations
# model1.get_compile_config()

# get the model configurations after training
# model1.get_config()

# get the model weights after training
# model1.get_weights()

# Get a prediction from the model given the last 6 weeks. This is to simulate how a user would get a prediction from the model. The input will be the last entry in the test set.

# reshape the input so it have the shape (1, 6, 12) which what the model expect as input
_input = X1_test[-1].reshape(1, X1_test.shape[1], X1_test.shape[2])
pred = model1.predict(_input)
print(f"Trend: {np.argmax(pred)}, ",f"Confidence: {np.max(pred)}")

### LSTM

# construct LSTM the model
def create_LSTM_model():
    # initialize a sequential model
    model = Sequential()
    
    # add the model layers
    # input layer
    model.add(Input(shape=(timesteps, X1_train.shape[2])))
    
    # first dense layer
    model.add(LSTM(64, return_sequences=True))
    
    # second dense layer
    model.add(LSTM(64, return_sequences=False))
    
    # output layer
    model.add(Dense(2, activation='softmax'))

    # compile the model
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    
    return model

# initialize the model
model1 = create_LSTM_model()

# train the model
history = model1.fit(X1_train, y1_train, validation_split=0.2, epochs=50, batch_size=32, verbose=0)

# test the model accuracy
model1.evaluate(X1_test, y1_test, verbose=2)

# get predictions from the model given the test set
y1_pred = model1.predict(X1_test)

# source of inspiration: https://stackoverflow.com/questions/48987959/classification-metrics-cant-handle-a-mix-of-continuous-multioutput-and-multi-la [15]
# convert the predictions and test set to be in the shape of a vector of labels
y1_pred_labels = np.argmax(y1_pred, axis=1)
y1_test_labels = np.argmax(y1_test, axis=1)

# get precision, recall, and fscore
precision, recall, fscore, support = precision_recall_fscore_support(y1_test_labels, y1_pred_labels, average='weighted')
print("Precision:", precision)
print("Recall:", recall)
print("F-score:", fscore)

# source of inspiration: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.ConfusionMatrixDisplay.html [16]
conf_mat = confusion_matrix(y1_test_labels, y1_pred_labels)
disp = ConfusionMatrixDisplay(conf_mat)
disp.plot()
plt.show()

# source of the code snippet[17]
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(1, len(loss) + 1)

plt.figure()

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()

### GRU

# source of inspiration: François Chollet (11, 2017), “Deep Learning with Python” chapter 6 [8]
# construct the model
def create_GRU_model():
    # initialize a sequential model
    model = Sequential()
    
    # add the model layers
    # input layer
    model.add(Input(shape=(timesteps, X1_train.shape[2])))
    
    # first dense layer
    model.add(GRU(64, 
                  dropout=0.1, 
                  recurrent_dropout=0.5, 
                  return_sequences=True))
    
    # second dense layer
    model.add(GRU(64,
                  dropout=0.1, 
                  recurrent_dropout=0.5))
    
    # output layer
    model.add(Dense(2, activation='softmax'))

    # compile the model
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    
    return model

# initialize the model
model1 = create_GRU_model()

# train the model
history = model1.fit(X1_train, y1_train, validation_split=0.2, epochs=50, batch_size=32, verbose=0)

# test the model accuracy
model1.evaluate(X1_test, y1_test, verbose=2)

# get predictions from the model given the test set
y1_pred = model1.predict(X1_test)

# source of inspiration: https://stackoverflow.com/questions/48987959/classification-metrics-cant-handle-a-mix-of-continuous-multioutput-and-multi-la [15]
# convert the predictions and test set to be in the shape of a vector of labels
y1_pred_labels = np.argmax(y1_pred, axis=1)
y1_test_labels = np.argmax(y1_test, axis=1)

# get precision, recall, and fscore
precision, recall, fscore, support = precision_recall_fscore_support(y1_test_labels, y1_pred_labels, average='weighted')
print("Precision:", precision)
print("Recall:", recall)
print("F-score:", fscore)

# source of inspiration: https://scikit-learn.org/stable/modules/generated/sklearn.metrics.ConfusionMatrixDisplay.html [16]
conf_mat = confusion_matrix(y1_test_labels, y1_pred_labels)
disp = ConfusionMatrixDisplay(conf_mat)
disp.plot()
plt.show()

# source of the code snippet[17]
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(1, len(loss) + 1)

plt.figure()

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()

# Now le'ts create a regression version of all the 3 models we have so far.
# First let's adjust the target column of the model, for refression the column we are trying to predict is the Next colum.

# The Target (Next for now)
y1_reg = df1.iloc[:, -2]

# Create sequences
timesteps = 6
X_seq1_reg, y_seq1_reg = create_seqs(scaled_X1, y1_reg, timesteps)

# split the data into training and test 70-30 ratio
X1_train_reg, X1_test_reg = X_seq1_reg[:training_ratio], X_seq1_reg[training_ratio:]
y1_train_reg, y1_test_reg = y_seq1_reg[:training_ratio], y_seq1_reg[training_ratio:]

### SimpleRNN regression model

# construct the model
def create_SimpleRNN_Reg_model():
    # initialize a sequential model
    model = Sequential()
    
    # add the model layers
    model.add(Input(shape=(timesteps, X1_train.shape[2])))
    model.add(SimpleRNN(64, return_sequences=True))
    model.add(SimpleRNN(64, return_sequences=False))
    model.add(Dense(1))

    # compile the model
    model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
    
    return model

from sklearn.metrics import r2_score

# initialize the model
model1 = create_SimpleRNN_Reg_model()

# train the model
history = model1.fit(X1_train_reg, y1_train_reg, validation_split=0.2, epochs=200, batch_size=32, verbose=0)

# test the model accuracy
model1.evaluate(X1_test_reg, y1_test_reg, verbose=2)

# get predictions from the model given the test set
y1_pred = model1.predict(X1_test_reg)

# get the R2 of the model
r2 = r2_score(y1_test_reg, y1_pred)
print(f"R2 score is: {r2}")


# source of the code snippet[17]
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)
plt.figure()
plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()
plt.show()

### LSTM regression model

# construct the model
def create_LSTM_Reg_model():
    # initialize a sequential model
    model = Sequential()
    
    # add the model layers
    model.add(Input(shape=(timesteps, X1_train.shape[2])))
    model.add(LSTM(64, return_sequences=True))    
    model.add(LSTM(64, return_sequences=False))
    model.add(Dense(1))

    # compile the model
    model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
    
    return model

# initialize the model
model1 = create_LSTM_Reg_model()

# train the model
history = model1.fit(X1_train_reg, y1_train_reg, validation_split=0.2, epochs=300, batch_size=32, verbose=0)

# test the model accuracy
model1.evaluate(X1_test_reg, y1_test_reg, verbose=2)

# get predictions from the model given the test set
y1_pred = model1.predict(X1_test_reg)

# get the R2 of the model
r2 = r2_score(y1_test_reg, y1_pred)
print(f"R2 score is: {r2}")

# source of the code snippet[17]
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(1, len(loss) + 1)

plt.figure()

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()

### GRU regression model

# construct the model
def create_GRU_Reg_model():
    # initialize a sequential model
    model = Sequential()
    
    # add the model layers
    model.add(Input(shape=(timesteps, X1_train.shape[2])))
    model.add(GRU(64, dropout=0.1, recurrent_dropout=0.5, return_sequences=True))
    model.add(GRU(64, dropout=0.1, recurrent_dropout=0.5))
    model.add(Dense(1))

    # compile the model
    model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
    
    return model

# initialize the model
model1 = create_GRU_Reg_model()

# train the model
history = model1.fit(X1_train_reg, y1_train_reg, validation_split=0.2, epochs=200, batch_size=32, verbose=0)

# test the model accuracy
model1.evaluate(X1_test_reg, y1_test_reg, verbose=2)

# get predictions from the model given the test set
y1_pred = model1.predict(X1_test_reg)

# get the R2 of the model
r2 = r2_score(y1_test_reg, y1_pred)
print(f"R2 score is: {r2}")

# source of the code snippet[17]
loss = history.history['loss']
val_loss = history.history['val_loss']

epochs = range(1, len(loss) + 1)

plt.figure()

plt.plot(epochs, loss, 'bo', label='Training loss')
plt.plot(epochs, val_loss, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()

# the average value for the Next closing price to judge weather the mae is acceptable or not 
df1['Next'].mean()

## Hyperparameters optimization

# !pip install scikeras

# helper function to measure how long a process would take
from datetime import datetime

def get_time():
    return datetime.now()

from tensorflow.keras.optimizers import Adam
from scikeras.wrappers import KerasClassifier, KerasRegressor
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
import warnings


# get the time before starting the process
start = get_time()

# source of inspiration: https://stackoverflow.com/questions/72392579/scikeras-randomizedsearchcv-for-best-hyper-parameters [19]
# construct the model
def create_model(n_hidden = 1, n_neurons = 30, learning_rate=3e-3):
    # initialize a sequential model
    model = Sequential()
    
    # create an input layer
    model.add(Input(shape=(timesteps, X1_train.shape[2])))
    
    # add a static first deep layer
    model.add(SimpleRNN(n_neurons, return_sequences=True))

    # add the other model deep layers dynamically
    for layer in range(1, n_hidden):
        model.add(SimpleRNN(n_neurons))
    
    # output layer
    model.add(Dense(2, activation='softmax'))
    
    # create an Adam optimizer with a variable learning rate
    opt = Adam(learning_rate=learning_rate)
    
    # compile the model
    model.compile(loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"])
    
    return model


# define possible parameters (this is just a test for now and we will add more parameters for the final report)
# we need to distingiush between the model building function input and the RandomizedSearchCV input, to do that we prefix the model input with model__
# source: https://adriangb.com/scikeras/stable/notebooks/Basic_Usage.html (7.1 Special prefixes) [20]
params = {
    "model__n_hidden": [0, 1, 2, 3, 4, 5],
    "model__n_neurons": [int(x) for x in np.arange(1, 128)],
    "model__learning_rate": [1e-2, 1e-3, 1e-4], 
    'epochs': [10, 20, 30],
    'batch_size': [16, 32, 64]
}

# create a keras classification model wrappers from the scikeras library which allow us to utilize the hyperparameter tunning functions from the scikit-learn library
kerasWarp = KerasClassifier(model=create_model, verbose=0)

# RandomizedSearchCV
random_simpleRnn_clas = RandomizedSearchCV(estimator=kerasWarp, param_distributions=params, n_iter=10, cv=None, random_state=101)

# source of inspiration: https://stackoverflow.com/questions/40105796/turn-warning-off-in-a-cell-jupyter-notebook [21]
# prevent FitFailedWarning
with warnings.catch_warnings():
    warnings.simplefilter('ignore')
    
    # fit the RandomizedSearchCV models
    random_simpleRnn_clas.fit(X1_train, y1_train, validation_split=0.2)

# print results
print(f"RandomizedSearchCV best parameters: {random_simpleRnn_clas.best_params_}")
print(f"RandomizedSearchCV best score: {random_simpleRnn_clas.best_score_}")


# get the time after finishing the process
end = get_time()

# print the duration
print(f"process finished in: {end - start}")

# retrieve the best model and refit on the training data to get the history
best_model = random_simpleRnn_clas.best_estimator_.model_

# evaluate the best model on the test data
best_model.evaluate(X1_test, y1_test, verbose=1)

# get predictions from the model given the test set
y1_pred = best_model.predict(X1_test)

# convert the predictions and test set to be in the shape of a vector of labels
y1_pred_labels = np.argmax(y1_pred, axis=1)
y1_test_labels = np.argmax(y1_test, axis=1)

# get precision, recall, and fscore
precision, recall, fscore, support = precision_recall_fscore_support(y1_test_labels, y1_pred_labels, average='weighted')
print("Precision:", precision)
print("Recall:", recall)
print("F-score:", fscore)

# Confusion Matrix
conf_mat = confusion_matrix(y1_test_labels, y1_pred_labels)
disp = ConfusionMatrixDisplay(conf_mat)
disp.plot()
plt.show()

# get a summary of the best model
best_model.summary()