In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam

import lightning as L
from torch.utils.data import TensorDataset, DataLoader

LSTM from Scratch:

In [15]:
#Outline of an LSTM Class:
class LSTMfromScratch(L.LightningModule):
  def __init__(self):
    # Initalize weights and biases
    super().__init__()
    mean = torch.tensor(0.0)
    std = torch.tensor(1.0)

    self.wfp1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True) # The wf means the weight at the forget gate and the p means this weight is used in the sigmoid later to get the percentage
    self.wfp2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
    self.bfp1 = nn.Parameter(torch.tensor(0.0), requires_grad=True) # The bf means the bias at the forget gate and the p means this weight is used in the sigmoid later to get the percentage

    self.wip1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)# The wi means the weight at the intput gate and the p means this weight is used in the sigmoid later to get the percentage
    self.wip2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
    self.bip1 = nn.Parameter(torch.tensor(0.0), requires_grad=True)# The bi means the bias at the input gate and the p means this weight is used in the sigmoid later to get the percentage

    self.wi3 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True) # These do not have p because they are used in tanH actv fucntions to make possible predictions
    self.wi4 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
    self.bi2 = nn.Parameter(torch.tensor(0.0), requires_grad=True)

    self.wop1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
    self.wop2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
    self.bop1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)

  def lstm_unit(self, input_value, long_mem, short_mem):
    # This is where the math is done in the lstm
    long_remem_percent = torch.sigmoid((short_mem*self.wfp1) + (input_value*self.wfp2) + self.bfp1)

    potenital_long_mem_percent = torch.sigmoid((short_mem*self.wip1) + (input_value*self.wip2) + self.bip1)
    potential_mem = torch.tanh((short_mem * self.wi3) + (input_value*self.wi4) + self.bi2)

    updated_long_term_mem = (long_mem * long_remem_percent) + (potential_mem * potenital_long_mem_percent)

    ouput_percent = torch.sigmoid((short_mem*self.wop1) + (input_value * self.wop2) + self.bop1)
    updated_short_mem = torch.tanh(updated_long_term_mem) * ouput_percent

    return [updated_long_term_mem, updated_short_mem]

  def forward(self, input):
    # We do forward pass here
    long_mem = 0
    short_mem = 0
    day1 = input[0]
    day2 = input[1]
    day3 = input[2]
    day4 = input[3]

    long_mem, short_mem = self.lstm_unit(day1, long_mem, short_mem)
    long_mem, short_mem = self.lstm_unit(day2, long_mem, short_mem)
    long_mem, short_mem = self.lstm_unit(day3, long_mem, short_mem)
    long_mem, short_mem = self.lstm_unit(day4, long_mem, short_mem)

    return short_mem

  def configure_optimizers(self):
    # Used to configure the Adam optimizer
    return Adam(self.parameters())
  def training_step(self, batch, batch_idx):
    # Used to calculate loss and log training progress
    # Logging the loss (or trainging progress) will tell you when to stop training
    input_i, label_i = batch
    output_i = self.forward(input_i[0])
    loss = (output_i - label_i)**2

    self.log("train_loss", loss) # This is a lightning module that we inherited which is able to make a new directory called lightning_logs which has a file that can log and store our loss
    # Here we are logging our ouptut based on which company we just predicted (company A is out_0 and company B is out_1), and you don't have to do this since it is only apart of the example
    if label_i == 0:
      self.log("out_0", output_i)
    else:
      self.log("out_1", output_i)

    return loss

In [16]:
model = LSTMfromScratch()
print("\nComparing actual result with predicted result:")
print("Company A: Observed = 0, Predicted = ", model(torch.tensor([0.0, 0.5, 0.25, 1.0])).detach())


Comparing actual result with predicted result:
Company A: Observed = 0, Predicted =  tensor(0.2409)


In [17]:
print("\nComparing actual result with predicted result:")
print("Company B: Observed = 1, Predicted = ", model(torch.tensor([1.0, 0.5, 0.25, 1.0])).detach())


Comparing actual result with predicted result:
Company B: Observed = 1, Predicted =  tensor(0.2835)


In [19]:
inputs = torch.tensor([[0.0, 0.5, 0.25, 1.0], [1.0, 0.5, 0.25, 1.0]])
labels = torch.tensor([0.0, 1.0])
dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset)

In [20]:
trainer = L.Trainer(max_epochs=2000)
trainer.fit(model, train_dataloaders=dataloader)

ðŸ’¡ Tip: For seamless cloud uploads and versioning, try installing [litmodels](https://pypi.org/project/litmodels/) to enable LitModelCheckpoint, which syncs automatically with the Lightning model registry.
GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs

  | Name         | Type | Params | Mode
---------------------------------------------
  | other params | n/a  | 12     | n/a 
---------------------------------------------
12        Trainable params
0         Non-trainable params
12        Total params
0.000     Total estimated model params size (MB)
0         Modules in train mode
0         Modules in eval mode


Epoch 1999: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 2/2 [00:00<00:00, 76.55it/s, v_num=4]

`Trainer.fit` stopped: `max_epochs=2000` reached.


Epoch 1999: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 2/2 [00:00<00:00, 50.74it/s, v_num=4]


In [21]:
print("\nComparing actual result with predicted result:")
print("Company A: Observed = 0, Predicted = ", model(torch.tensor([0.0, 0.5, 0.25, 1.0])).detach())


Comparing actual result with predicted result:
Company A: Observed = 0, Predicted =  tensor(0.0005)


In [22]:
print("\nComparing actual result with predicted result:")
print("Company B: Observed = 1, Predicted = ", model(torch.tensor([1.0, 0.5, 0.25, 1.0])).detach())


Comparing actual result with predicted result:
Company B: Observed = 1, Predicted =  tensor(0.9432)


In [23]:
path_to_best_checkpoint = trainer.checkpoint_callback.best_model_path
trainer = L.Trainer(max_epochs=3000)
trainer.fit(model, train_dataloaders=dataloader, ckpt_path=path_to_best_checkpoint)

ðŸ’¡ Tip: For seamless cloud uploads and versioning, try installing [litmodels](https://pypi.org/project/litmodels/) to enable LitModelCheckpoint, which syncs automatically with the Lightning model registry.


GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
Restoring states from the checkpoint path at /Users/adhithyapasumarthi/Downloads/lightning_logs/version_4/checkpoints/epoch=1999-step=4000.ckpt
/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/lightning/pytorch/callbacks/model_checkpoint.py:366: The dirpath has changed from '/Users/adhithyapasumarthi/Downloads/lightning_logs/version_4/checkpoints' to '/Users/adhithyapasumarthi/Downloads/lightning_logs/version_5/checkpoints', therefore `best_model_score`, `kth_best_model_path`, `kth_value`, `last_model_path` and `best_k_models` won't be reloaded. Only `best_model_path` will be reloaded.

  | Name         | Type | Params | Mode
---------------------------------------------
  | other params | n/a  | 12     | n/a 
---------------------------------------------
12        Trainable params
0         Non-trainable params
12        Total params
0.000  

Epoch 2999: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 2/2 [00:00<00:00, 82.56it/s, v_num=5]

`Trainer.fit` stopped: `max_epochs=3000` reached.


Epoch 2999: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 2/2 [00:00<00:00, 56.92it/s, v_num=5]


In [24]:
print("\nComparing labeled values with predicted values: ")
print("Comapny A labeled value: 0, Predicted: ", model(torch.tensor([0.0, 0.5, 0.25, 1.0])).detach())


Comparing labeled values with predicted values: 
Comapny A labeled value: 0, Predicted:  tensor(0.0001)


In [25]:
print("\nComparing labeled values with predicted values: ")
print("Comapny B labeled value: 1, Predicted: ", model(torch.tensor([1.0, 0.5, 0.25, 1.0])).detach())


Comparing labeled values with predicted values: 
Comapny B labeled value: 1, Predicted:  tensor(0.9687)


LSTM using the pytorch nn.LSTM():

In [41]:
class LightningLSTM(L.LightningModule):
    def __init__(self):
        super().__init__()
        # Input size is the number of features that we feed to the lstm and hidden size means the # of ouput values
        # It is common to feed the ouput values from the lstm into a neural network so it is possible for the lstm to have more than one ouput value. 
        # Example of having multiple output values: If you were predicting the temperature, wind speed, and other features in the next hour you would need multiple different values from the lstm and pass it into a feed forward neural network to predict and classify the general weather pattern that would happen in the next hour
        self.lstm = nn.LSTM(input_size=1, hidden_size=1) 

    def forward(self, input):
        # The .view allows you to transpose the list from being a single row to being len(input) amount of rows and we set the # of columns to 1 as there is only 1 feature
        input_transpose = input.view(len(input), 1)
        # The self.lstm() takes in the transposed input and gives out the long and short term memory values (respectivly, lstm_out (short term memory values) and the temp (long term memory values))
        # The lstm_out has the short term memory values from each lstm unrolled unit and the same from temp
        lstm_out, temp = self.lstm(input_transpose) 

        #This is why we take the last short term value as that is our prediction when passed through the lstm units
        pred = lstm_out[-1]
        return pred
    def configure_optimizers(self):
        # Using the Adam optimizer and set the learning rate to 0.1 which is a lot higher than the default 0.001 learning rate
        return Adam(self.parameters(), lr=0.1)
    def training_step(self, batch, batch_idx):
        input_i, label_i = batch
        output_i = self.forward(input_i[0])
        loss = (output_i - label_i)**2

        self.log("training_loss", loss)
        if label_i == 0:
            self.log("out_0", output_i)
        else:
            self.log("out_1", output_i)
        return loss

In [59]:
model = LightningLSTM()
print("\nComparing label and the predicted values:")
print("Label value: 0 and Predicted value: ", model(torch.tensor([0.0, 0.5, .25, 1.0])).detach())


Comparing label and the predicted values:
Label value: 0 and Predicted value:  tensor([0.0647])


In [60]:
print("Comparing label and the predicted values:")
print("Label value: 1 and Predicted value: ", model(torch.tensor([1.0, 0.5, .25, 1.0])).detach())

Comparing label and the predicted values:
Label value: 1 and Predicted value:  tensor([0.0640])


In [61]:
# Notice how we changed the # of epochs to 300 instead of 3000 because we set the learning rate to 0.1 instead of using the 0.001 default learning rate
# This means our model will take larger steps we doing gradient descent which means it should take less time to find minimum loss
trainer = L.Trainer(max_epochs=300, log_every_n_steps=2)
trainer.fit(model, dataloader)

ðŸ’¡ Tip: For seamless cloud uploads and versioning, try installing [litmodels](https://pypi.org/project/litmodels/) to enable LitModelCheckpoint, which syncs automatically with the Lightning model registry.


GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs

  | Name | Type | Params | Mode 
--------------------------------------
0 | lstm | LSTM | 16     | train
--------------------------------------
16        Trainable params
0         Non-trainable params
16        Total params
0.000     Total estimated model params size (MB)
1         Modules in train mode
0         Modules in eval mode


Epoch 299: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 2/2 [00:00<00:00, 176.08it/s, v_num=11]

`Trainer.fit` stopped: `max_epochs=300` reached.


Epoch 299: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 2/2 [00:00<00:00, 125.30it/s, v_num=11]


In [62]:
print("\nComparing label and the predicted values:")
print("Label value: 0 and Predicted value: ", model(torch.tensor([0.0, 0.5, .25, 1.0])).detach())


Comparing label and the predicted values:
Label value: 0 and Predicted value:  tensor([4.9227e-05])


In [63]:
print("\nComparing label and the predicted values:")
print("Label value: 1 and Predicted value: ", model(torch.tensor([1.0, 0.5, .25, 1.0])).detach())


Comparing label and the predicted values:
Label value: 1 and Predicted value:  tensor([0.9818])
