In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as fn
from torch.optim import Adam
from torch.utils.data import TensorDataset, DataLoader

import pytorch_lightning as pl

import numpy as np



In [5]:
class BasicLSTM(pl.LightningModule):

    def __init__(self, num_feat, num_hiddens):
        '''
        num_feat - number of features input into the model
        '''
        super().__init__()
        
        mean_w1 = torch.zeros(num_feat,num_hiddens)
        mean_w2 = torch.zeros(num_hiddens,num_hiddens)
        std = torch.ones(1)

        #the forget gate weights and bias 
        #(for f_t = sig(wf1 x_t + wf2 h_{t-1} + b_f))
        self.wf1 = nn.Parameter(torch.normal(mean=mean_w1,std=std),
                                requires_grad=True,
                                )
        self.wf2 = nn.Parameter(torch.normal(mean=mean_w2,std=std),
                                requires_grad=True,
                                )
        self.bf = nn.Parameter(torch.tensor(0.0),
                                requires_grad=True,
                                )
        #the input gate weights and bias
        #(for i_t = sig(wi1 x_t + wi2 h_{t-1} + b_i))
        self.wi1 = nn.Parameter(torch.normal(mean=mean_w1,std=std, device=self.device),
                                requires_grad=True,
                                )
        self.wi2 = nn.Parameter(torch.normal(mean=mean_w2,std=std),
                                requires_grad=True,
                                )
        self.bi = nn.Parameter(torch.tensor(0.0),
                                requires_grad=True,
                                )
        #the output gate weights and bias
        #(for o_t = sig(wo1 x_t + wo2 h_{t-1} + b_o))
        self.wo1 = nn.Parameter(torch.normal(mean=mean_w1,std=std),
                        requires_grad=True,
                        )
        self.wo2 = nn.Parameter(torch.normal(mean=mean_w2,std=std),
                                requires_grad=True,
                                )
        self.bo = nn.Parameter(torch.tensor(0.0),
                                requires_grad=True,
                                )
        #the candidate context weights and bias
        #(for c^'_t = sig(wcc1 x_t + wcc2 h_{t-1} + bc_c))
        self.wcc1 = nn.Parameter(torch.normal(mean=mean_w1,std=std),
                                requires_grad=True,
                                )
        self.wcc2 = nn.Parameter(torch.normal(mean=mean_w2,std=std),
                                requires_grad=True,
                                )
        self.bcc = nn.Parameter(torch.tensor(0.0),
                                requires_grad=True,
                                )

        # print('wf1 \n', self.wf1, '\n wf1 \n', self.wf2, '\n bf \n', self.bf)
        # print('wi1 \n', self.wi1, '\n wi2 \n', self.wi2, '\n bi \n', self.bi)
        # print('wcc1 \n', self.wf1, '\n wcc2 \n', self.wf2, '\n bf \n', self.bcc)

    def initWeights(mean_w1, mean_w2, std):
        w1 = nn.Parameter(torch.normal(mean=mean_w1,std=std),
                                requires_grad=True,
                                )
        w2 = nn.Parameter(torch.normal(mean=mean_w2,std=std),
                                requires_grad=True,
                                )
        bias = nn.Parameter(torch.tensor(0.0),
                                requires_grad=True,
                                )


    def unit(self, val_in, long_mem, short_mem):
        '''
        INPUTS:
            val_in - input into this step of the unit x_t

            long_mem - the long term memory at this step

            short_mem - the short term memory at this step
        OUTPUTS:

        
        '''
        i_t = torch.sigmoid((val_in@self.wi1)+(short_mem@self.wi2)+(self.bi))

        f_t = torch.sigmoid((val_in@self.wf1)+(short_mem@self.wf2)+(self.bf))
        
        o_t = torch.sigmoid((val_in@self.wo1)+(short_mem@self.wo2)+(self.bo))

        cc_t = torch.tanh((val_in@self.wcc1)+(short_mem@self.wcc2)+(self.bcc))

        # print('f_t ', f_t)
        # print('i_t ', i_t)
        # print('cc_t ', cc_t)
        # print('o_t ', o_t)

        #update the long term memory (c_t)
        update_long_mem = (f_t*long_mem) + (i_t*cc_t)


        #update the short term memory (h_t)
        update_short_mem = o_t*torch.tanh(update_long_mem)

        return ([update_long_mem, update_short_mem])


    def forward(self, input):
        '''
        in order case input should be an array with multiple inputs for the model.
        The columns are the features and the rows are the days
        '''
        n_seq = np.shape(input)[0]

        # long_mem = torch.zeros(n_seq, requires_grad=False)
        # short_mem = torch.zeros(n_seq, requires_grad=False)
        long_mem = torch.zeros(1)
        short_mem = torch.zeros(1)
        
        for ii in range(0,n_seq-1):

            # long_mem[ii+1], short_mem[ii+1] = self.unit(input[ii], 
            #                                         long_mem[ii], 
            #                                         short_mem[ii],
            #                                         )
            # print(input[:,ii])
            long_mem, short_mem = self.unit(input[:,ii], 
                                                    long_mem, 
                                                    short_mem,
                                                    )
        return short_mem


    def configure_optimizers(self):
        return Adam(self.parameters())

    def training_step(self, batch, batch_indx):

        input_i, label_i = batch
        # print('input')
        # print(input_i[0])
        # print('label ', label_i)
        output_i = self.forward(input_i[0])

        loss = (output_i - label_i)**2

        self.log("training loss", loss)

        if (label_i == 0):
            self.log("out_0", output_i)

        else:
            self.log("out_1", output_i)

        return loss

In [6]:
inputs = torch.tensor([[[0., 0.5, 0.25, 1.], [1., 0.5, 0.25, 1.]],[[1., 0.75, 0.25, 1.], [1., 0.5, 1.25, 1.]]])
labels = torch.tensor([0., 1.])

dataset = TensorDataset(inputs, labels) 
dataloader = DataLoader(dataset)

In [7]:
model1 = BasicLSTM(num_feat=2, num_hiddens=1)

model1.forward(inputs[0])

tensor([-0.1316], grad_fn=<MulBackward0>)

In [8]:
inputs[0]

tensor([[0.0000, 0.5000, 0.2500, 1.0000],
        [1.0000, 0.5000, 0.2500, 1.0000]])

In [9]:
model = BasicLSTM(num_feat=2, num_hiddens=1)


trainer = pl.Trainer(max_epochs=5000,gpus=1) # with default learning rate, 0.001 (this tiny learning rate makes learning slow)
trainer.fit(model, train_dataloaders=dataloader)

  rank_zero_deprecation(
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name | Type | Params
------------------------------
------------------------------
16        Trainable params
0         Non-trainable params
16        Total params
0.000     Total estimated model params size (MB)
  rank_zero_warn(
  rank_zero_warn(


Epoch 0:   0%|          | 0/2 [00:00<?, ?it/s] 

RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cpu and cuda:0! (when checking argument for argument mat2 in method wrapper_mm)

In [None]:
print("\nNow let's compare the observed and predicted values...")
print("Company A: Observed = 0, Predicted =", model(torch.tensor([[0., 0.5, 0.25, 1.], [1., 0.5, 0.25, 1.]])).detach())
print("Company B: Observed = 1, Predicted =", model(torch.tensor([[1., 0.75, 0.25, 1.], [1., 0.5, 1.25, 1.]])).detach())