In [13]:
import torch
import torch.nn as nn
import torch.functional as F
from torch.optim import Adam
import lightning as L
from torch.utils.data import TensorDataset, DataLoader

In [4]:
class LSTMbyHand(L.LightningModule):
  def __init__(self):
    super(LSTMbyHand, self).__init__()
    mean = torch.tensor(0.0)
    std = torch.tensor(1.0)

    #Initialize all parameter with the normal distribution

    #forget gate
    self.wlr1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
    self.wlr2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
    self.blr1 = nn.Parameter(torch.tensor(0.), requires_grad=True) 

    #input gate sigmoid (which memory to add/remove)
    self.wpr1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
    self.wpr2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
    self.bpr1 = nn.Parameter(torch.tensor(0.), requires_grad=True)

    #input gate tanh (value of the potential memory to add)
    self.wp1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
    self.wp2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
    self.bp1 = nn.Parameter(torch.tensor(0.), requires_grad=True)

    #output gate
    self.wo1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
    self.wo2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
    self.bo1 = nn.Parameter(torch.tensor(0.), requires_grad=True)
    
  def lstm_unit(self, input_value, long_memory, short_memory):
    # Forget Gate of removing long term memory
    long_remember_percent = torch.sigmoid((short_memory * self.wlr1) + (input_value * self.wlr2) + self.blr1)

    # Input Gate for adding new long term memory
    potential_remember_percent = torch.sigmoid((short_memory * self.wpr1) + (input_value * self.wpr2) + self.bpr1)
    potential_memory = torch.tanh((short_memory * self.wp1) + (input_value * self.wp2) + self.bp1)
    updated_long_memory = (long_remember_percent * long_memory) + (potential_remember_percent * potential_memory)

    # Output Gate for returning the new short term memory
    output_percent = torch.sigmoid((short_memory * self.wo1) + (input_value * self.wo2) + self.bo1)
    updated_short_memory = torch.tanh(updated_long_memory) * output_percent

    return [updated_long_memory, updated_short_memory]

  def forward(self, input):
    long_memory = 0
    short_memory = 0
    
    day1 = input[0]
    day2 = input[1]
    day3 = input[2]
    day4 = input[3]

    long_memory, short_memory = self.lstm_unit(day1, long_memory, short_memory)
    long_memory, short_memory = self.lstm_unit(day2, long_memory, short_memory)
    long_memory, short_memory = self.lstm_unit(day3, long_memory, short_memory)
    long_memory, short_memory = self.lstm_unit(day4, long_memory, short_memory)

    return short_memory

  # Required for Lightning module
  def configure_optimizers(self):
    return Adam(self.parameters())
  
  # Required for Lightning module
  def training_step(self, batch, batch_idx):
    input_i, label_i = batch
    output_i = self.forward(input_i[0])
    loss = (output_i - label_i)**2

    self.log("train_loss", loss)

    if label_i == 0:
      self.log("out_0", output_i)
    else:
      self.log("out_1", output_i)

    return loss
  
model = LSTMbyHand()

In [5]:
inputs = torch.tensor([[0., 0.5, 0.25, 1.], [1., 0.5, 0.25, 1.]])
labels = torch.tensor([0., 1.])

dataset = TensorDataset(inputs, labels)
dataloader = DataLoader(dataset)

In [6]:
trainer = L.Trainer(max_epochs=2000)
trainer.fit(model, train_dataloaders=dataloader)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs





LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name         | Type | Params
--------------------------------------
  | other params | n/a  | 12    
--------------------------------------
12        Trainable params
0         Non-trainable params
12        Total params
0.000     Total estimated model params size (MB)
d:\python3115\Lib\site-packages\lightning\pytorch\trainer\connectors\data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=7` in the `DataLoader` to improve performance.
d:\python3115\Lib\site-packages\lightning\pytorch\loops\fit_loop.py:298: The number of training batches (2) is smaller than the logging interval Trainer(log_every_n_steps=50). Set a lower value for log_every_n_steps if you want to see logs for the training epoch.


Epoch 1999: 100%|██████████| 2/2 [00:00<00:00, 26.44it/s, v_num=1]

`Trainer.fit` stopped: `max_epochs=2000` reached.


Epoch 1999: 100%|██████████| 2/2 [00:00<00:00, 22.32it/s, v_num=1]


In [7]:
company_a_pred = model(torch.tensor([0., 0.5, 0.25, 1.]))
company_b_pred = model(torch.tensor([1., 0.5, 0.25, 1.]))
print(f"COMPANY A: observed = 0, predicted = {company_a_pred.detach().item():0.4f}")
print(f"COMPANY B: observed = 1, predicted = {company_b_pred.detach().item():0.4f}")

COMPANY A: observed = 0, predicted = 0.0047
COMPANY B: observed = 1, predicted = 0.8993


In [11]:
# Train where we left off (instead of retraining 3000 epochs, start from the trained 2000 epochs and train until it reaches 3000)
path_to_best_checkpoint = trainer.checkpoint_callback.best_model_path
trainer = L.Trainer(max_epochs=3000)
trainer.fit(model, train_dataloaders=dataloader, ckpt_path=path_to_best_checkpoint)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
Restoring states from the checkpoint path at d:\githubrepo\LSTM-practice\practice\lightning_logs\version_1\checkpoints\epoch=1999-step=4000.ckpt
d:\python3115\Lib\site-packages\lightning\pytorch\callbacks\model_checkpoint.py:360: The dirpath has changed from 'd:\\githubrepo\\LSTM-practice\\practice\\lightning_logs\\version_1\\checkpoints' to 'd:\\githubrepo\\LSTM-practice\\practice\\lightning_logs\\version_2\\checkpoints', therefore `best_model_score`, `kth_best_model_path`, `kth_value`, `last_model_path` and `best_k_models` won't be reloaded. Only `best_model_path` will be reloaded.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name         | Type | Params
--------------------------------------
  | other params | n/a  | 12    
--------------------------------------
12        Trainable params
0         Non-trainable params
12        To

Epoch 2999: 100%|██████████| 2/2 [00:00<00:00, 22.95it/s, v_num=2]

`Trainer.fit` stopped: `max_epochs=3000` reached.


Epoch 2999: 100%|██████████| 2/2 [00:00<00:00, 19.78it/s, v_num=2]


In [12]:
company_a_pred = model(torch.tensor([0., 0.5, 0.25, 1.]))
company_b_pred = model(torch.tensor([1., 0.5, 0.25, 1.]))
print(f"COMPANY A: observed = 0, predicted = {company_a_pred.detach().item():0.4f}")
print(f"COMPANY B: observed = 1, predicted = {company_b_pred.detach().item():0.4f}")

COMPANY A: observed = 0, predicted = 0.0005
COMPANY B: observed = 1, predicted = 0.9550


In [10]:
# Access Tensorboard to look at log files by command: "tensorboard --logdir=lightning_logs/" with log path in ls
# open the tensorboard local host in a browser

In [None]:
# Taken from StatQuest Youtube for LSTM implementation using Pytorch and Lightning