In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

Hyperparameters

In [None]:
hidden_states = 10
period_lenth = 5
learning_rate = 0.0001
epochs = 100

Data Loading

In [None]:
df = pd.read_csv('GOOGL.csv')

df['Date'] = pd.to_datetime(df['Date'])

# Separate training and testing data based on year
training_df = df[df['Date'].dt.year <= 2020]
testing_df = df[df['Date'].dt.year > 2020]

In [None]:
training_df.info()

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(np.arange(len(df['Date'])), df['Open'], label='Open', color='blue')
plt.plot(np.arange(len(df['Date'])), df['Close'], label='Close', color='green')
plt.plot(np.arange(len(df['Date'])), df['High'], label='High', color='red')
plt.plot(np.arange(len(df['Date'])), df['Low'], label='Low', color='orange')

plt.title('Stock Market Data')
plt.xlabel('Days')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.show()

Data Preprocessing

In [None]:
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler(feature_range = (0,1))

features = ['Open', 'High', 'Low', 'Volume']
traindataset = training_df[features].values
normalized_traindataset = scaler.fit_transform(traindataset)
testdataset = testing_df[features].values
#print(testdataset.shape)

Preparing training and testing dataset

In [None]:
Xtrain_scaled = []
Xtest = []
Ytrain_scaled = []
Ytest = []
Xtrain = []
Ytrain = []
for i in range(len(normalized_traindataset)-period_lenth):
    trainSample = []
    trainOutput = []
    for j in range(i, i+period_lenth):
        trainSample.append(normalized_traindataset[j])
        trainOutput.append(normalized_traindataset[j+1])
    Xtrain_scaled.append(trainSample)
    Ytrain_scaled.append(trainOutput)

for i in range(len(testdataset)-period_lenth):
    testSample = []
    testOutput = []
    for j in range(i, i+period_lenth):
        testSample.append(testdataset[j])
        testOutput.append(testdataset[j+1])
    Xtest.append(testSample)
    Ytest.append(testOutput)

for i in range(len(traindataset)-period_lenth):
    trainSample = []
    trainOutput = []
    for j in range(i, i+period_lenth):
        trainSample.append(traindataset[j])
        trainOutput.append(traindataset[j+1])
    Xtrain.append(trainSample)
    Ytrain.append(trainOutput)


Model

In [99]:
def initializeWeights(h, iSize, oSize):
    Whh = np.random.randn(h, h)
    Whx = np.random.randn(h, iSize)
    Wyh = np.random.randn(oSize, h)
    b_h = np.random.randn(h, 1)
    b_y = np.random.rand(oSize, 1)
    return Whh, Whx, Wyh, b_h, b_y

def sigmoid(x):
    return 1/(1 + np.exp(-x))

def evaluateCell(x_t, h_t_1, Whh, Whx, Wyh, b_h, b_y):
    h_t = np.tanh(np.dot(Whh, h_t_1) + np.dot(Whx, x_t) + b_h)
    y_t = sigmoid(np.dot(Wyh, h_t) + b_y)
    return h_t, y_t

def forward(x, Whh, Whx, Wyh, b_h, b_y):
    H, y = [], []   #to store hidden states and output at each time step
    h_t = np.zeros((hidden_states, 1))
    for t in range(len(x)):
        x_t = x[t].reshape(-1, 1)
        h_t, y_t = evaluateCell(x_t, h_t, Whh, Whx, Wyh, b_h, b_y)
        H.append(h_t)
        y.append(y_t.reshape(4, 1))

    return H, y

def mse(predicted, actual):
    loss = 0
    n = len(predicted)
    for i in range(len(predicted)):
        loss += np.mean((np.transpose(actual[i]).reshape(-1, 1)-predicted[i])**2)
    return loss/n

def mse_grad(predicted, actual):
    grads = []
    for i in range(len(predicted)):
        grads.append(np.transpose(actual[i]).reshape(-1, 1)-predicted[i])
    return grads

def backprop(H, y_pred, y_act, x, learning_rate, Whh, Whx, Wyh, b_h, b_y):
    dWhh, dWhx, dWyh, db_h, db_y = np.zeros_like(Whh), np.zeros_like(Whx), np.zeros_like(Wyh), np.zeros_like(b_h), np.zeros_like(b_y)
    grads = mse_grad(y_pred, y_act)
    temp = np.multiply(y_pred[-1], (1-y_pred[-1]))
    dWyh = 2*np.dot(np.multiply(grads[-1], temp), np.transpose(H[-1]))
    db_y = 2*np.multiply(grads[-1], temp)

    initProd = 2*np.multiply(np.dot(np.transpose(Wyh), np.multiply(grads[-1], temp)), 1-H[-1]**2)
    initSum = np.transpose(x[-1].reshape(4, 1))
    dWhx += np.dot(initProd, initSum)
    for t in range(len(H)-1):
        initProd = np.multiply(np.dot(np.transpose(Whh), initProd), 1-H[-2-t]**2)
        initSum = np.transpose(x[-2-t].reshape(4, 1))
        dWhx += np.dot(initProd, initSum)

    initProd = 2*np.multiply(np.dot(np.transpose(Wyh), np.multiply(grads[-1], temp)), 1-H[-1]**2)
    initSum = np.transpose(H[-1])
    dWhh += np.dot(initProd, initSum)
    for t in range(len(H)-1):
        initProd = np.multiply(np.dot(np.transpose(Whh), initProd), 1-H[-2-t]**2)
        initSum = np.transpose(H[-2-t])
        dWhh += np.dot(initProd, initSum) 

    initProd = 2*np.multiply(np.dot(np.transpose(Wyh), np.multiply(grads[-1], temp)), 1-H[-1]**2)
    db_h += initProd
    for t in range(len(H)-1):
        initProd = np.multiply(np.dot(np.transpose(Whh), initProd), 1-H[-2-t]**2)
        db_h += initProd 
        
    
    Whh -= learning_rate*dWhh
    Whx -= learning_rate*dWhx
    Wyh -= learning_rate*dWyh
    b_h -= learning_rate*db_h
    b_y -= learning_rate*db_y
    return Whh, Whx, Wyh, b_h, b_y

def train(Xtrain, ytrain, epochs, learning_rate, iSize, oSize, h):
    Whh, Whx, Wyh, b_h, b_y = initializeWeights(h, iSize, oSize)
    for epoch in tqdm(range(epochs)):
        loss = 0
        for i in range(len(Xtrain)):
            sample = Xtrain[i]
            y_actual = ytrain[i]
            H, y_pred = forward(sample, Whh, Whx, Wyh, b_h, b_y)
            loss += mse(y_pred, y_actual)

            Whh, Whx, Wyh, b_h, b_y = backprop(H, y_pred, y_actual, sample, learning_rate, Whh, Whx, Wyh, b_h, b_y)
        loss = loss/len(Xtrain)  
        print(loss)

    return Whh, Whx, Wyh, b_h, b_y

def test(X, y, Whh, Whx, Wyh, b_h, b_y, scaler):
    n = len(X)
    loss = 0
    for i in range(len(X)):
        sample = X[i]
        output = y[i]
        X_scaled = scaler.transform(np.vstack(sample))
        _, y_pred = forward(X_scaled, Whh, Whx, Wyh, b_h, b_y)

        y_scaled = scaler.transform(np.vstack(output))
        loss += mse(y_pred, y_scaled)
        #predictions.append(scaler.inverse_transform(y_pred[-1]))
    loss /= n
    return loss


In [None]:
iSize = len(features)
oSize = len(features)
h = hidden_states
Whh, Whx, Wyh, b_h, b_y = train(Xtrain_scaled, Ytrain_scaled, epochs, learning_rate, iSize, oSize, h)

In [100]:
trainLoss = test(Xtrain, Ytrain, Whh, Whx, Wyh, b_h, b_y, scaler)
testLoss = test(Xtest, Ytest, Whh, Whx, Wyh, b_h, b_y, scaler)
print('Training Loss: ', trainLoss)
print('Test Loss: ', testLoss)

Training Loss:  0.556204614210751
Test Loss:  0.5544050312834014
