In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
import lightning as L
from torch.utils.data import TensorDataset, DataLoader



In [2]:
from typing import Any


class LSTMbyHand(L.LightningModule):
    def __init__(self):
        super().__init__()
        mean = torch.tensor(0.0)
        std = torch.tensor(1.0)

        self.wlr1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wlr2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.blr1 = nn.Parameter(torch.tensor(0.), requires_grad=True)

        self.wpr1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wpr2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.bpr1 = nn.Parameter(torch.tensor(0.), requires_grad=True)

        self.wp1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wp2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.bp1 = nn.Parameter(torch.tensor(0.), requires_grad=True)

        self.wo1 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.wo2 = nn.Parameter(torch.normal(mean=mean, std=std), requires_grad=True)
        self.bo1 = nn.Parameter(torch.tensor(0.), requires_grad=True)
        
    def lstm_unit(self,input_value, long_memory, short_memory):
        long_remember_percent = torch.sigmoid((short_memory*self.wlr1)+(input_value*self.wlr2)+self.blr1)

        potential_remember_percent = torch.sigmoid((self.wpr1*short_memory)+(self.wpr2 * input_value)+self.bpr1)
        potential_memory = torch.tanh((self.wp1*short_memory)+(self.wp2 * input_value)+self.bp1)

        updated_long_memory = (long_memory*long_remember_percent) + (potential_memory*potential_remember_percent)

        output_percent = torch.sigmoid((self.wo1*short_memory)+(self.wo2 * input_value)+self.bo1)

        updated_short_memory = torch.tanh(updated_long_memory) * output_percent

        return [updated_long_memory, updated_short_memory]
        
    def forward(self, input):
        long_memory = 0 
        short_memory = 0 
        day1 = input[0]
        day2 = input[1]
        day3 = input[2]
        day4 = input[3]

        long_memory, short_memory = self.lstm_unit(day1, long_memory=long_memory, short_memory=short_memory)
        long_memory, short_memory = self.lstm_unit(day2, long_memory=long_memory, short_memory=short_memory)
        long_memory, short_memory = self.lstm_unit(day3, long_memory=long_memory, short_memory=short_memory)
        long_memory, short_memory = self.lstm_unit(day4, long_memory=long_memory, short_memory=short_memory)
        
        return short_memory

    def configure_optimizers(self):
        return Adam(self.parameters())
    
    def training_step(self,batch, batch_idx):
        print("batch",batch)
        print("batch_idx",batch_idx)
        input_i, label_i = batch
        output_i = self.forward(input_i[0])

        loss = (output_i - label_i)**2

        self.log("train_loss", loss)

        if label_i == 0:
            self.log('Out_0', output_i)
        else:
            self.log('Out_1', output_i)

        return loss



In [5]:
model = LSTMbyHand()

input = torch.tensor([
    [0., 0.5, 0.25, 1.0],
    [1., 0.5, 0.25, 1.]
])
labels = torch.tensor([0., 1.])

dataset = TensorDataset(input, labels)

dataloader = DataLoader(dataset=dataset)

trainer = L.Trainer(max_epochs=2000)
trainer.fit(model, train_dataloaders=dataloader)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs



  | Name | Type | Params
------------------------------
------------------------------
12        Trainable params
0         Non-trainable params
12        Total params
0.000     Total estimated model params size (MB)


Epoch 0:   0%|          | 0/2 [00:00<?, ?it/s] [tensor([[0.0000, 0.5000, 0.2500, 1.0000]]), tensor([0.])]
0
Epoch 0:  50%|█████     | 1/2 [00:00<00:00, 83.35it/s, v_num=0][tensor([[1.0000, 0.5000, 0.2500, 1.0000]]), tensor([1.])]
1
Epoch 1:   0%|          | 0/2 [00:00<?, ?it/s, v_num=0]         [tensor([[0.0000, 0.5000, 0.2500, 1.0000]]), tensor([0.])]
0
Epoch 1:  50%|█████     | 1/2 [00:00<00:00, 111.13it/s, v_num=0][tensor([[1.0000, 0.5000, 0.2500, 1.0000]]), tensor([1.])]
1
Epoch 2:   0%|          | 0/2 [00:00<?, ?it/s, v_num=0]         [tensor([[0.0000, 0.5000, 0.2500, 1.0000]]), tensor([0.])]
0
Epoch 2:  50%|█████     | 1/2 [00:00<00:00, 117.42it/s, v_num=0][tensor([[1.0000, 0.5000, 0.2500, 1.0000]]), tensor([1.])]
1
Epoch 3:   0%|          | 0/2 [00:00<?, ?it/s, v_num=0]         [tensor([[0.0000, 0.5000, 0.2500, 1.0000]]), tensor([0.])]
0
Epoch 3:  50%|█████     | 1/2 [00:00<00:00, 111.14it/s, v_num=0][tensor([[1.0000, 0.5000, 0.2500, 1.0000]]), tensor([1.])]
1
Epoch 4:   0%|    

`Trainer.fit` stopped: `max_epochs=2000` reached.


Epoch 1999: 100%|██████████| 2/2 [00:00<00:00, 40.82it/s, v_num=0]


In [4]:
print("\n Now Lets compare the observed and predicted values")
print("Company A: Observed = 0, Predicted = 0 ", model(torch.tensor([0.,0.5,0.25, 1]).detach()))

print("\n Now Lets compare the observed and predicted values")
print("Company A: Observed = 1, Predicted = 0 ", model(torch.tensor([1.,0.5,0.25, 1]).detach()))


 Now Lets compare the observed and predicted values
Company A: Observed = 0, Predicted = 0  tensor(0.0012, grad_fn=<MulBackward0>)

 Now Lets compare the observed and predicted values
Company A: Observed = 1, Predicted = 0  tensor(0.9402, grad_fn=<MulBackward0>)
