This is, in theory, a super simple example of how Long Short-Term Memory Neural Networks work. We'll start by implementing a single "memory cell" that we'll duplicate (reusing all the weights and biases) for each element in the input.

First, import the modules...

In [22]:
import torch # torch will allow us to create tensors.
import torch.nn as nn # torch.nn allows us to create a neural network.
import torch.nn.functional as F # nn.functional give us access to the activation and loss functions.
# from torch.optim import SGD # optim contains many optimizers. Here, we're using SGD, stochastic gradient descent.
from torch.optim import Adam # optim contains many optimizers. Here, we're using Adam

import lightning as L # lightning has tons of cool tools that make neural networks easier
from torch.utils.data import TensorDataset, DataLoader # these are needed for the training data

from torchmetrics import Accuracy

import matplotlib.pyplot as plt ## matplotlib allows us to draw graphs.
import seaborn as sns ## seaborn makes it easier to draw nice-looking graphs.

## Set the seed so that, hopefully, everyone will get the same results as me.
from pytorch_lightning.utilities.seed import seed_everything


In [24]:
class BasicLightningTrain(L.LightningModule):

    def __init__(self): # __init__() is the class constructor function, and we use it to initialize the weights and biases.
        
        super().__init__() # initialize an instance of the parent class, LightningModule.

        seed_everything(seed=42)
        
        mean = torch.tensor(0.0)
        std = torch.tensor(1.0)        
        
        self.wf1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wf2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.bf1 = nn.Parameter(torch.tensor(0.), requires_grad=True)

        self.wr1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wr2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.br1 = nn.Parameter(torch.tensor(0.), requires_grad=True)

        self.wp1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wp2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.bp1 = nn.Parameter(torch.tensor(0.), requires_grad=True)
        
        self.wo1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wo2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.bo1 = nn.Parameter(torch.tensor(0.), requires_grad=True)
        
        ## PyTorch's nn.LSTM() uses a uniform distribution to initialize weights and biases
        ## so we can simulate that here...
#         self.wf1 = nn.Parameter(torch.rand(1), requires_grad=True)
#         self.wf2 = nn.Parameter(torch.rand(1), requires_grad=True)
#         self.bf1 = nn.Parameter(torch.rand(1), requires_grad=True)

#         self.wr1 = nn.Parameter(torch.rand(1), requires_grad=True)
#         self.wr2 = nn.Parameter(torch.rand(1), requires_grad=True)
#         self.br1 = nn.Parameter(torch.rand(1), requires_grad=True)

#         self.wp1 = nn.Parameter(torch.rand(1), requires_grad=True)
#         self.wp2 = nn.Parameter(torch.rand(1), requires_grad=True)
#         self.bp1 = nn.Parameter(torch.rand(1), requires_grad=True)
        
#         self.wo1 = nn.Parameter(torch.rand(1), requires_grad=True)
#         self.wo2 = nn.Parameter(torch.rand(1), requires_grad=True)
#         self.bo1 = nn.Parameter(torch.rand(1), requires_grad=True)
        
        self.state = 0 # this is used to keep track of which output we are trying to predict
        
        
    def lstm_unit(self, input_value, long_memory, short_memory):
        ## NOTES:
        ## long term memory is also called "cell state"
        ## short term memory is also called "hidden state"
        forget_percent = torch.sigmoid((short_memory * self.wf1) + (input_value * self.wf2) + self.bf1)
        remember_percent = torch.sigmoid((short_memory * self.wr1) + (input_value * self.wr2) + self.br1)
        potential_memory = torch.tanh((short_memory * self.wp1) + (input_value * self.wp2) + self.bp1)
        output_percent = torch.sigmoid((short_memory * self.wo1) + (input_value * self.wo2) + self.bo1)
        
        long_memory = (long_memory * forget_percent) + (remember_percent * potential_memory)
        short_memory = torch.tanh(long_memory) * output_percent
        return([long_memory, short_memory])
        
    
    def forward(self, input): 
        
        long_memory = 0 # long term memory is also called "cell state" also called c0
        short_memory = 0 # short term memory is also called "hidden state" also called h0
        day1 = input[0]
        day2 = input[1]
        day3 = input[2]
        day4 = input[3]
        
        ## Day 1
        long_memory, short_memory = self.lstm_unit(day1, long_memory, short_memory)
        
        ## Day 2
        long_memory, short_memory = self.lstm_unit(day2, long_memory, short_memory)
        
        ## Day 3
        long_memory, short_memory = self.lstm_unit(day3, long_memory, short_memory)
        
        ## Day 4
        long_memory, short_memory = self.lstm_unit(day4, long_memory, short_memory)
        
        ##### Now return short_memory
        return short_memory # final value for h4
        
    def configure_optimizers(self): # this configures the optimizer we want to use for backpropagation.
        return Adam(self.parameters())

    def training_step(self, batch, batch_idx): # take a step during gradient descent.
        
        input_i, label_i = batch # collect input
        output_i = self.forward(input_i[0]) # run input through the neural network
        loss = (output_i - label_i)**2 ## loss = squared residual
        
        # self.train_acc.update(output_i, label_i)
        self.log("train_loss", loss)
        if (self.state == 0):
            self.state = 1
            self.log("out_0", output_i)
        else:
            self.state = 0
            self.log("out_1", output_i)
            
        ## Internally and behind the scenes, Lightning now calls...
        ## optimizer.zero_grad() # to clear gradients
        ## loss.backward() # to do the backpropagation
        ## optimizer.step() # to update the parameters
        return loss

In [31]:
## create the training data for the neural network.
inputs = torch.tensor([[0., 0.5, 0.25, 1.], [1., 0.5, 0.25, 1.]])
labels = torch.tensor([0., 1.])

dataset = TensorDataset(inputs, labels) 
dataloader = DataLoader(dataset)

In [26]:
model = BasicLightningTrain() # First, make model from the class
print("Before...")
## print out the name and value for each parameter
print("Parameters...")
for name, param in model.named_parameters():
    print(name, param.data)

print("\nOutput Values...")
print(model(torch.tensor([0., 0.5, 0.25, 0.75])).detach())
print(model(torch.tensor([1., 0.5, 0.25, 0.75])).detach())


Global seed set to 42


Before...
Parameters...
wf1 tensor(0.3367)
wf2 tensor(0.1288)
bf1 tensor(0.)
wr1 tensor(0.2345)
wr2 tensor(0.2303)
br1 tensor(0.)
wp1 tensor(-1.1229)
wp2 tensor(-0.1863)
bp1 tensor(0.)
wo1 tensor(2.2082)
wo2 tensor(-0.6380)
bo1 tensor(0.)

Output Values...
tensor(-0.0316)
tensor(-0.0323)


In [27]:
trainer = L.Trainer(max_epochs=4000)
trainer.fit(model, train_dataloaders=dataloader)
print("\nAfter...")
## print out the name and value for each parameter
print("Parameters...")
for name, param in model.named_parameters():
    print(name, param.data)

print("\nOutput Values...")
print(model(torch.tensor([0., 0.5, 0.25, 0.75])).detach())
print(model(torch.tensor([1., 0.5, 0.25, 0.75])).detach())

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name | Type | Params
------------------------------
------------------------------
12        Trainable params
0         Non-trainable params
12        Total params
0.000     Total estimated model params size (MB)
  rank_zero_warn(
  rank_zero_warn(


Training: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=4000` reached.



After...
Parameters...
wf1 tensor(2.6675)
wf2 tensor(1.5465)
bf1 tensor(1.5411)
wr1 tensor(1.8835)
wr2 tensor(1.5822)
br1 tensor(0.5024)
wp1 tensor(1.2124)
wp2 tensor(0.8829)
bp1 tensor(-0.3179)
wo1 tensor(4.2505)
wo2 tensor(-0.3005)
bo1 tensor(0.4795)

Output Values...
tensor(-0.0511)
tensor(0.9308)


Output Values...
tensor([-0.1503])
tensor([0.9781])

In [None]:
## So we are close. But maybe adding 1000 more epochs will improve the model a little bit more.
path_to_best_checkpoint = trainer.checkpoint_callback.best_model_path ## By default, "best" = "most recent"
print("The new trainer will start where the last left off, and the check point data is here: " + 
      path_to_best_checkpoint + "\n")

trainer = L.Trainer(max_epochs=5000) # before the max epochs as 4000, so we're adding 1000 more
trainer.fit(model, train_dataloaders=dataloader, ckpt_path=path_to_best_checkpoint)
print("\nAfter...")
## print out the name and value for each parameter
print("Parameters...")
for name, param in model.named_parameters():
    print(name, param.data)

print("\nOutput Values...")
print(model(torch.tensor([0., 0.5, 0.25, 0.75])).detach())
print(model(torch.tensor([1., 0.5, 0.25, 0.75])).detach())

Output Values...
tensor([-0.1613])
tensor([0.9859])

In [None]:
%reload_ext tensorboard
%tensorboard --logdir=lightning_logs/

In [53]:
## now let's see what we can do with nn.LSTM()

class LightningLSTM(L.LightningModule):

    def __init__(self): # __init__() is the class constructor function, and we use it to initialize the weights and biases.
        
        super().__init__() # initialize an instance of the parent class, LightningModule.

        seed_everything(seed=42)
        ## input_size = number of features (or variables) in the data. In our example
        ##              we only have a single feature (value)
        ## hidden_size = number of LSTMs we want to connect the input to. This is 
        self.lstm = nn.LSTM(input_size=1, hidden_size=1) 
        
        self.hidden = (torch.zeros(1,1,1), # init hidden state (short-term memory) to 0
                       torch.zeros(1,1,1)) # init cell state (long-term memory) to 0.

        
        self.state = 0 # this keeps track of which output we are trying to predict for logging
    
    def forward(self, input):
        ## transpose the input vector    
        input_trans = input.view(len(input),1,-1)
        
        print("input:", str(input) + str(input.shape))
        print("input_trans:", str(input_trans) + str(input_trans.shape))
        
        ## run it through the LSTM unit (which automatically unrolls for us)
        # lstm_out, self.hidden = self.lstm(input_trans, self.hidden)
        lstm_out, self.hidden = self.lstm(input_trans)
        
        ## lstm_out has the short-term memories for all inputs. We make our prediction with the last one
        prediction = lstm_out[-1] 
        return prediction
        
    def configure_optimizers(self): # this configures the optimizer we want to use for backpropagation.
        return Adam(self.parameters())

    def training_step(self, batch, batch_idx): # take a step during gradient descent.
        
        input_i, label_i = batch # collect input
        output_i = self.forward(input_i[0]) # run input through the neural network
        loss = (output_i - label_i)**2 ## loss = squared residual
        
        # self.train_acc.update(output_i, label_i)
        self.log("train_loss", loss)
        if (self.state == 0):
            self.state = 1
            self.log("out_0", output_i)
        else:
            self.state = 0
            self.log("out_1", output_i)
            
        ## Internally and behind the scenes, Lightning now calls...
        ## optimizer.zero_grad() # to clear gradients
        ## loss.backward() # to do the backpropagation
        ## optimizer.step() # to update the parameters
        return loss

In [54]:
model_lstm = LightningLSTM() # First, make model from the class
model_lstm(torch.tensor([0., 0.5, 0.25, 0.75]))

Global seed set to 42


input: tensor([0.0000, 0.5000, 0.2500, 0.7500])torch.Size([4])
input_trans: tensor([[[0.0000]],

        [[0.5000]],

        [[0.2500]],

        [[0.7500]]])torch.Size([4, 1, 1])


tensor([[0.6228]], grad_fn=<SelectBackward0>)

In [45]:
model_lstm = LightningLSTM() # First, make model from the class
model_lstm(torch.tensor([0., 0.5, 0.25, 0.75]))
print("Before...")
## print out the name and value for each parameter
print("Parameters...")
for name, param in model_lstm.named_parameters():
    print(name, param.data)

print("\nOutput Values...")
print(model_lstm(torch.tensor([0., 0.5, 0.25, 0.75])).detach())
print(model_lstm(torch.tensor([1., 0.5, 0.25, 0.75])).detach())

Global seed set to 42


Before...
Parameters...
lstm.weight_ih_l0 tensor([[ 0.7645],
        [ 0.8300],
        [-0.2343],
        [ 0.9186]])
lstm.weight_hh_l0 tensor([[-0.2191],
        [ 0.2018],
        [-0.4869],
        [ 0.5873]])
lstm.bias_ih_l0 tensor([ 0.8815, -0.7336,  0.8692,  0.1872])
lstm.bias_hh_l0 tensor([ 0.7388,  0.1354,  0.4822, -0.1412])

Output Values...
tensor([[0.6228]])
tensor([[0.6219]])


In [46]:
## create the training data for the neural network.
inputs = torch.tensor([[0., 0.5, 0.25, 1.], [1., 0.5, 0.25, 1.]])
labels = torch.tensor([0., 1.])

dataset = TensorDataset(inputs, labels) 
dataloader = DataLoader(dataset)


trainer = L.Trainer(max_epochs=4000, log_every_n_steps=1)
trainer.fit(model_lstm, train_dataloaders=dataloader)
print("\nAfter...")
## print out the name and value for each parameter
print("Parameters...")
for name, param in model_lstm.named_parameters():
    print(name, param.data)

print("\nOutput Values...")
print(model_lstm(torch.tensor([0., 0.5, 0.25, 0.75])).detach())
print(model_lstm(torch.tensor([1., 0.5, 0.25, 0.75])).detach())

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name | Type | Params
------------------------------
0 | lstm | LSTM | 16    
------------------------------
16        Trainable params
0         Non-trainable params
16        Total params
0.000     Total estimated model params size (MB)


Training: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=4000` reached.



After...
Parameters...
lstm.weight_ih_l0 tensor([[ 0.7622],
        [ 0.6810],
        [-0.2552],
        [ 0.7721]])
lstm.weight_hh_l0 tensor([[-0.2550],
        [ 0.0546],
        [-0.5445],
        [ 0.4085]])
lstm.bias_ih_l0 tensor([ 0.7066, -0.9065,  0.7998,  0.0915])
lstm.bias_hh_l0 tensor([ 0.5639, -0.0374,  0.4129, -0.2369])

Output Values...
tensor([[0.4641]])
tensor([[0.4632]])


In [None]:
%reload_ext tensorboard
%tensorboard --logdir=lightning_logs/

In [52]:
test = nn.LSTM(input_size=1, hidden_size=2)
print("Parameters...")
for name, param in test.named_parameters():
    print(name, param.data)

Parameters...
weight_ih_l0 tensor([[-0.5290],
        [ 0.2060],
        [-0.1323],
        [-0.6332],
        [ 0.3229],
        [-0.1645],
        [-0.2942],
        [-0.2197]])
weight_hh_l0 tensor([[ 0.4808,  0.4155],
        [ 0.1926,  0.2959],
        [ 0.3632,  0.2925],
        [ 0.6021,  0.4258],
        [-0.3639, -0.1077],
        [ 0.3298,  0.5626],
        [-0.5876, -0.3507],
        [-0.3507,  0.6679]])
bias_ih_l0 tensor([ 0.6940, -0.3161, -0.2185,  0.0125,  0.0414, -0.2015, -0.5159, -0.3763])
bias_hh_l0 tensor([-0.0590,  0.0765,  0.0271, -0.6493,  0.6261, -0.4119, -0.1731,  0.6559])
