In [1]:
"""Libraries"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objs as go
from plotly.offline import iplot
import yfinance as yf
import datetime as dt
import math

# Data Processing

In [2]:
#### Dataset
#the start and end date
start_date = dt.datetime(2020,4,1)
end_date = dt.datetime(2023,4,1)

#loading from yahoo finance
data = yf.download("GOOGL",start_date, end_date)

pd.set_option('display.max_rows', 4)
pd.set_option('display.max_columns',5)
display(data)

[*********************100%%**********************]  1 of 1 completed


Unnamed: 0_level_0,Open,High,...,Adj Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2020-04-01,56.200001,56.471001,...,55.105000,51970000
2020-04-02,55.000000,56.138500,...,55.851501,56410000
...,...,...,...,...,...
2023-03-30,100.910004,101.160004,...,100.889999,33086200
2023-03-31,101.300003,103.889999,...,103.730003,36863400


In [3]:
# Setting 80 percent data for training
training_data_len = math.ceil(len(data) * .8)
training_data_len

#Splitting the dataset
train_data = data[:training_data_len].iloc[:,:1]
test_data = data[training_data_len:].iloc[:,:1]
print(train_data.shape, test_data.shape)

(605, 1) (151, 1)


In [4]:
# Selecting Open Price values
dataset_train = train_data.Open.values
# Reshaping 1D to 2D array
dataset_train = np.reshape(dataset_train, (-1,1))
dataset_train.shape


(605, 1)

In [5]:
scaler = MinMaxScaler(feature_range=(0,1))
# scaling dataset
scaled_train = scaler.fit_transform(dataset_train)


In [6]:
# Selecting Open Price values
dataset_test = test_data.Open.values
# Reshaping 1D to 2D array
dataset_test = np.reshape(dataset_test, (-1,1))
# Normalizing values between 0 and 1
scaled_test = scaler.fit_transform(dataset_test)


In [7]:
X_train = []
y_train = []
for i in range(50, len(scaled_train)):
    X_train.append(scaled_train[i-50:i, 0])
    y_train.append(scaled_train[i, 0])

In [8]:
X_test = []
y_test = []
for i in range(50, len(scaled_test)):
    X_test.append(scaled_test[i-50:i, 0])
    y_test.append(scaled_test[i, 0])

In [9]:
# The data is converted to Numpy array
X_train, y_train = np.array(X_train), np.array(y_train)

#Reshaping
X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1],1))
y_train = np.reshape(y_train, (y_train.shape[0],1))
print("X_train :",X_train.shape,"y_train :",y_train.shape)


X_train : (555, 50, 1) y_train : (555, 1)


In [10]:
# The data is converted to numpy array
X_test, y_test = np.array(X_test), np.array(y_test)

#Reshaping
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1],1))
y_test = np.reshape(y_test, (y_test.shape[0],1))
print("X_test :",X_test.shape,"y_test :",y_test.shape)


X_test : (101, 50, 1) y_test : (101, 1)


# RNN from Scratch - Many to Many adapted for time series analysis

In [11]:
class SimpleRNN:
    def __init__(self,input_dim,output_dim, hidden_dim):
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.hidden_dim = hidden_dim
        self.Waa = np.random.randn(hidden_dim, hidden_dim) * 0.01 # we initialise as non-zero to help with training later
        self.Wax = np.random.randn(hidden_dim, input_dim) * 0.01
        self.Way = np.random.randn(output_dim, hidden_dim) * 0.01
        self.ba = np.zeros((hidden_dim, 1))
        self.by = 0 # a single value shared over all outputs #np.zeros((hidden_dim, 1))
        

    def FeedForward(self, x):
        # let's calculate the hidden states
        a = [np.zeros((self.hidden_dim,1))]
        y = []
        for ii in range(len(x)):

            a_next = np.tanh(np.dot(self.Waa, a[ii])+np.dot(self.Wax,x[ii].reshape(-1,1))+self.ba)
            a.append(a_next)
            y_local = np.dot(self.Way,a_next)+self.by
            y.append(np.dot(self.Way,a_next)+self.by)

        # remove the first a and y values used for initialisation
        #a = a[1:]
        return y, a
    
    def ComputeLossFunction(self, y_pred, y_actual):
        # for a normal many to many model:
        #loss = np.sum((y_pred - y_actual) ** 2)
        # in our case, we are only using the last value so we expect scalar values here rather than a vector
        loss = (y_pred[-1] - y_actual) ** 2
        return loss
    
    def ComputeGradients(self, a, x, y_pred, y_actual):
        # Backpropagation through time
        dLdy = []
        dLdby = np.zeros((self.output_dim, 1))
        dLdWay = np.random.randn(self.output_dim, self.hidden_dim)/5.0
        dLdWax = np.random.randn(self.hidden_dim, self.input_dim)/5.0
        dLdWaa = np.zeros((self.hidden_dim, self.hidden_dim))
        dLda = np.zeros_like(a)
        dLdba = np.zeros((self.hidden_dim, 1))
        
        for t in range(self.hidden_dim-1, 0, -1):
            if t == self.hidden_dim-1:
                dldy = 2*(y_pred[t] - y_actual)
            else:
                dldy = 0
            dLdy.append(dldy)
            #dLdby.append(dldy)
            dLdby += dldy
            #print(dldy.shape)
            dLdWay += np.dot(np.array(dldy).reshape(-1,1), a[t].T)
            
            # Calculate gradient of loss with respect to a[t]
            if t == self.hidden_dim-1:
                dlda_t= np.dot(self.Way.T, np.array(dldy).reshape(-1,1))

            else:
                dlda_t = np.dot(self.Way.T, np.array(dldy).reshape(-1,1)) + np.dot(self.Waa, dLda[t+1]) * (1 - a[t]**2)
            dLda[t] = dlda_t
            #print(dlda_t.shape)
            
            rec_term = (1-a[t]*a[t])
             
            dLdWax += np.dot(dlda_t, x[t].reshape(-1,1))*rec_term
            dLdWaa += np.dot(dlda_t, a[t-1].T)*rec_term
            dLdba += dlda_t*rec_term
        
        return dLdy[::-1], dLdby[::-1], dLdWay, dLdWax, dLdWaa, dLdba
    
    def UpdateParameters(self,dLdby, dLdWay, dLdWax, dLdWaa, dLdba,learning_rate):
        self.Waa -= learning_rate * dLdWaa
        self.Wax -= learning_rate * dLdWax
        self.Way -= learning_rate * dLdWay
        self.ba -= learning_rate * dLdba
        self.by -= learning_rate * dLdby    
    
    def predict(self, x, n, a_training):
        # let's calculate the hidden states
        a_future = a_training
        y_predict = []

        # Predict the next n terms
        for ii in range(n):
            a_next = np.tanh(np.dot(self.Waa, a_future[-1]) + np.dot(self.Wax, x[ii]) + self.ba)
            a.append(a_next)
            y_predict.append(np.dot(self.Way, a_next) + self.by)

        return y_predict


In [12]:
input_dim = 1
output_dim = 1
hidden_dim = 50

learning_rate = 1e-3

# Initialize your RNN model
rnn_model = SimpleRNN(input_dim, output_dim, hidden_dim)

# Define your training data (x_train, y_train)
y_pred, a = rnn_model.FeedForward(X_train[0])
loss = rnn_model.ComputeLossFunction(y_pred, y_train[0])
dLdy, dLdby, dLdWay, dLdWax, dLdWaa, dLdba = rnn_model.ComputeGradients(a, X_train[0], y_pred, y_train[0])
rnn_model.UpdateParameters(dLdby, dLdWay, dLdWax, dLdWaa, dLdba, learning_rate)
print(f'Loss: {loss}')


Loss: [[0.02866827]]


In [None]:
input_dim = 1
output_dim = 1
hidden_dim = 50

learning_rate = 1e-3

# Initialize your RNN model
rnn_model = SimpleRNN(input_dim, output_dim, hidden_dim)

# Define your training data (x_train, y_train)

for epoch in range(200):
    for ii in range(len(X_train)):
        y_pred, a = rnn_model.FeedForward(X_train[ii])
        loss = rnn_model.ComputeLossFunction(y_pred, y_train[ii])
        dLdy, dLdby, dLdWay, dLdWax, dLdWaa, dLdba = rnn_model.ComputeGradients(a, X_train[ii], y_pred, y_train[ii])
        rnn_model.UpdateParameters(dLdby, dLdWay, dLdWax, dLdWaa, dLdba, learning_rate)
        print(f'Loss: {loss}')

In [16]:
y_train_predicted = []
for jj in range(len(X_train)):
    forecasted_values, _ = rnn_model.FeedForward(X_train[jj])
    y_train_predicted.append(forecasted_values[-1])

y_train_predicted_flat = np.array([val[0, 0] for val in y_train_predicted])
trace1 = go.Scatter(y = y_train.ravel(), mode ="lines", name = "original data")
trace2 = go.Scatter(y=y_train_predicted_flat, mode = "lines", name = "RNN output")
layout = go.Layout(title='Training data Fit', xaxis=dict(title='Time'), yaxis=dict(title='Dependent Variable'))
figure = go.Figure(data = [trace1,trace2], layout = layout)

iplot(figure)

In [17]:
y_test_predicted = []
for jj in range(len(X_test)):
    forecasted_values, _ = rnn_model.FeedForward(X_test[jj])
    y_test_predicted.append(forecasted_values[-1])

y_test_predicted_flat = np.array([val[0, 0] for val in y_test_predicted])
trace1 = go.Scatter(y = y_test.ravel(), mode ="lines", name = "original data")
trace2 = go.Scatter(y=y_test_predicted_flat, mode = "lines", name = "RNN output")
layout = go.Layout(title='Testing data Fit', xaxis=dict(title='X-Axis'), yaxis=dict(title='Dependent Variable'))
figure = go.Figure(data = [trace1,trace2], layout = layout)

iplot(figure)