# LSTM Example 1: A Toy Model

In [1]:
import numpy as np
import pandas as pd
from datetime import datetime
import yfinance as yf
from sklearn.preprocessing import MinMaxScaler

def sigmoid(x): 
    return 1. / (1 + np.exp(-x))

#Prepare and download stock price 
start_date = datetime(2022,1,1)
end_date = datetime(2024,1,31)

In [2]:
stock = yf.download('AAPL',start_date ,end_date)
stock_price = pd.DataFrame(stock['Adj Close'])
stock_volume = pd.DataFrame(stock['Volume'])

scaler = MinMaxScaler()
price_scaled = pd.Series(scaler.fit_transform(stock_price).squeeze(),index=stock.index)
price_scaled.describe()
adjClose = price_scaled.values.tolist()
volume_scaled = pd.Series(scaler.fit_transform(stock_volume).squeeze(), index=stock.index)
volume_scaled.describe()
tradeVolume = volume_scaled.values.tolist()

#Configurate 
input_size = 2  # M x 1
hidden_size = 3 # K x 1
output_size = 1 # N x 1
hidden_state_prev = np.zeros((hidden_size,1))  
final_memory_prev = np.zeros((hidden_size,1))  
targets = adjClose[1:] # we use data from 0 : T-1 to predict adjusted close for 1 : T
S = len(targets)

#Initialize weights and biases (parameters)
# Input Gate
input_weights_U_i = np.random.randn(hidden_size, input_size) 
hidden_weights_W_i = np.random.randn(hidden_size, hidden_size) 
hidden_bias_i = np.zeros((hidden_size, 1)) 

# Forget Gate
input_weights_U_f = np.random.randn(hidden_size, input_size)
hidden_weights_W_f  = np.random.randn(hidden_size, hidden_size) 
hidden_bias_f = np.zeros((hidden_size, 1)) 

# Output Gate
input_weights_U_o = np.random.randn(output_size, input_size) 
hidden_weights_W_o = np.random.randn(output_size, hidden_size) 
hidden_bias_o = np.zeros((output_size, 1)) 

# New Memory
input_weights_U_new = np.random.randn(hidden_size, input_size) 
hidden_weights_W_new = np.random.randn(hidden_size, hidden_size) 

#Forward pass: choose dictionaries as data type with keys to be the timestamp. 
xs, hidden_states, outputs = {}, {}, {}
input_gate, forget_gate, new_memory, final_memory = {}, {}, {}, {}
loss = 0
hidden_states[-1] = np.copy(hidden_state_prev)
final_memory[-1] = np.copy(final_memory_prev)
for t in range(0, S): 
    # stacking inputs into M x 1 vector (M features)
    xs[t] = np.zeros((input_size,1))  # 2 x 1
    xs[t][0] = adjClose[t] # the 1st element in the input is adjClose 
    xs[t][1] = tradeVolume[t] # the 2nd element in the input is tradeVolume 
    # Compute input gate K x 1
    input_gate[t] = sigmoid(input_weights_U_i @ xs[t] 
                               + hidden_weights_W_i @ hidden_states[t-1] 
                               + hidden_bias_i)
    # Compute forget gate K x 1
    forget_gate[t] = sigmoid(input_weights_U_f @ xs[t] 
                               + hidden_weights_W_f @ hidden_states[t-1] 
                               + hidden_bias_f)
    # Compute output gate N x 1
    outputs[t] = sigmoid(input_weights_U_o @ xs[t] 
                               + hidden_weights_W_o @ hidden_states[t-1] 
                               + hidden_bias_o)  
    # Generate new memory K x 1
    new_memory[t] = np.tanh(input_weights_U_new @ xs[t] 
                               + hidden_weights_W_new @ hidden_states[t-1] )
    # Generate final memory K x 1 
    # np.ravel is to change a 2-dimensional array or a multi-dimensional array into a contiguous flattened array. 
    final_memory[t] = np.tanh(np.ravel(forget_gate[t])*np.ravel(final_memory[t-1]) 
                               +  np.ravel(input_gate[t])*np.ravel(new_memory[t]))
    # Update hidden state K x 1
    hidden_states_array =  np.ravel(outputs[t])*final_memory[t]
    hidden_states[t] =  hidden_states_array.reshape(-1,1)
    # Compute mean-squared error loss
    # - choose RMSE as loss, probablities are no longer in need.
    # - outputs[t] is numpy array, e.g. array([[0.5001818]]), indexing [0][0] is to take the number values.
    loss += (targets[t] -  outputs[t][0][0])**2
    
loss_rmse = np.sqrt(loss/S)
print(loss_rmse)

[*********************100%***********************]  1 of 1 completed

0.29722938291223905



