# Long Short Term Memory (LSTM)

Implementation of LSTM in time series data

In [1]:
import torch
from torch import nn

import warnings

warnings.filterwarnings("ignore")

In [2]:
# get sample data

import polars as pl
import yfinance as yf
import re

prices = yf.download("SPLG", start='2023-01-01', end='2024-01-01')

df = (
    pl
    .from_pandas(
        prices
        .reset_index()
    ).with_columns(
        pl.lit("SPLG").alias("Ticker")
    )
)

df.columns = [re.sub(r"[^\w\s]","",header.split(",")[0]) for header in df.columns]

df.head()

[*********************100%***********************]  1 of 1 completed


Date,Close,High,Low,Open,Volume,Ticker
datetime[ns],f64,f64,f64,f64,i64,str
2023-01-03 00:00:00,43.092529,43.718036,42.75572,43.487079,3688279,"""SPLG"""
2023-01-04 00:00:00,43.42934,43.660294,42.996297,43.342732,4335811,"""SPLG"""
2023-01-05 00:00:00,42.928936,43.198383,42.861574,43.188762,4449438,"""SPLG"""
2023-01-06 00:00:00,43.900875,44.035599,42.928937,43.284993,2160602,"""SPLG"""
2023-01-09 00:00:00,43.881626,44.540814,43.862383,44.151077,4251681,"""SPLG"""


## LSTM Model Class

RNN with architecture consisting of 1 LSTM cell

In [3]:
class SimpleLSTM(nn.Module):
    def __init__(self):
        super(SimpleLSTM, self).__init__()

        self.w_f = nn.Parameter(
            torch.rand(1),
            requires_grad=True
        )
        self.u_f = nn.Parameter(
            torch.rand(1),
            requires_grad=True
        )
        self.b_f = nn.Parameter(
            torch.tensor(0.),
            requires_grad=True
        )

        self.w_i = nn.Parameter(
            torch.rand(1),
            requires_grad=True
        )
        self.u_i = nn.Parameter(
            torch.rand(1),
            requires_grad=True
        )
        self.b_i = nn.Parameter(
            torch.tensor(0.),
            requires_grad=True
        )

        self.w_o = nn.Parameter(
            torch.rand(1),
            requires_grad=True
        )
        self.u_o = nn.Parameter(
            torch.rand(1),
            requires_grad=True
        )
        self.b_o = nn.Parameter(
            torch.tensor(0.),
            requires_grad=True
        )

        self.w_c = nn.Parameter(
            torch.rand(1),
            requires_grad=True
        )
        self.u_c = nn.Parameter(
            torch.rand(1),
            requires_grad=True
        )
        self.b_c = nn.Parameter(
            torch.tensor(0.),
            requires_grad=True
        )

    def cell(self, x, previous_short_term, previous_long_term):
        """
            x: input
            h_1: short term memory previously
            c_1: long term memory previously
        """
        lt_remember_pct = torch.sigmoid(self.w_f * x + self.u_f * previous_short_term + self.b_f) #forget gate

        new_lt_remember_pct = torch.sigmoid(self.w_i * x + self.u_i * previous_short_term + self.b_i)
        new_lt_memory = torch.relu(self.w_c * x + self.u_c * previous_short_term + self.b_c) #input gate

        updated_lt_memory = lt_remember_pct * previous_long_term + new_lt_remember_pct * new_lt_memory

        output_pct = torch.sigmoid(self.w_o * x + self.u_o * previous_short_term + self.b_o)
        updated_st_memory = output_pct * torch.relu(updated_lt_memory)

        return updated_st_memory, updated_lt_memory

    def forward(self, input: torch.Tensor, h_1 = None, c_1 = None):
        """
            x: input value
            h_1: previous hidden state (if applicable) 
            c_1: previous memory (if applicable)
        """
        if h_1 is None:
            short_term_memory = torch.zeros(1, dtype=torch.float32)
        else:
            short_term_memory = h_1

        if c_1 is None:
            long_term_memory = torch.zeros(1, dtype=torch.float32)
        else:
            long_term_memory = h_1

        for x in input.t():
            short_term_memory, long_term_memory = self.cell(x, short_term_memory, long_term_memory)

        return short_term_memory

# Training

In [4]:
# Suppose we create a model with 4 lagged variables as entry

data = df.select(pl.col("Close").alias("y"))
data = data.with_columns(
    pl.col("y").shift(1).alias("x_1"),
    pl.col("y").shift(2).alias("x_2"),
    pl.col("y").shift(3).alias("x_3"),
    pl.col("y").shift(4).alias("x_4"),
)
data = data[4:]

In [5]:
x_train = torch.FloatTensor(data.select("x_1", "x_2", "x_3", "x_4").to_numpy())
y_train = torch.FloatTensor(data["y"].to_numpy())

print(x_train.shape)
print(y_train.shape)

torch.Size([246, 4])
torch.Size([246])


In [6]:
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim

# Setup hyperparamters
batch_size = 40
epochs = 2000
learning_rate = 0.0001

train_dataset = TensorDataset(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size)

model = SimpleLSTM()

# Setup loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(1, epochs + 1):
    total_loss = 0
    model.train()
    for batch_id, (data, target) in enumerate(train_loader): #for each batch, also get the index of batch
        optimizer.zero_grad()
        output = model(data) #forward pass
        loss = criterion(output, target)
        # print(output)
        # print(target)
        loss.backward() #compute gradients
        optimizer.step() #update weights
        
        total_loss += loss.item()

        if batch_id % 100 == 0 and epoch % 100 == 0: #update on training iterations
            print("Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                epoch, batch_id * len(data), len(train_loader.dataset),
                100. * batch_id / len(train_loader), loss.item()))


