##Model.py

In [8]:
pip install positional-encodings

Collecting positional-encodings
  Downloading positional_encodings-5.0.0-py3-none-any.whl (7.3 kB)
Collecting tf-estimator-nightly==2.8.0.dev2021122109
  Downloading tf_estimator_nightly-2.8.0.dev2021122109-py2.py3-none-any.whl (462 kB)
[K     |████████████████████████████████| 462 kB 28.7 MB/s 
Installing collected packages: tf-estimator-nightly, positional-encodings
Successfully installed positional-encodings-5.0.0 tf-estimator-nightly-2.8.0.dev2021122109


In [66]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.utils.data as DataLoader
import torchvision.transforms as transforms
from positional_encodings import PositionalEncoding1D, PositionalEncoding2D, PositionalEncoding3D

class Encoder(nn.Module):
  def __init__(self,in_channels = 1):
    super(Encoder,self).__init__()
    self.conv1 = nn.Conv2d(in_channels = in_channels,out_channels = 64,kernel_size=(3,3),padding=(1,1),stride=(1,1))
    self.conv2 = nn.Conv2d(in_channels = 64,out_channels = 128 ,kernel_size=(3,3),padding=(1,1),stride=(1,1))
    self.conv3 = nn.Conv2d(in_channels = 128,out_channels = 256 ,kernel_size=(3,3),padding=(1,1),stride=(1,1))
    self.conv3_ = nn.Conv2d(in_channels = 256,out_channels = 256 ,kernel_size=(3,3),padding=(1,1),stride=(1,1))
    self.conv4 = nn.Conv2d(in_channels = 256,out_channels = 512 ,kernel_size=(3,3),padding=(1,1),stride=(1,1))
    self.max_pool = nn.MaxPool2d(kernel_size=(2,2),padding=(0,0),stride=(2,2))
    self.max_pool2 = nn.MaxPool2d(kernel_size=(2,1),padding=(0,0),stride=(2,1))
    self.batch_norm2 = nn.BatchNorm2d(num_features=512)
    self.batch_norm1 = nn.BatchNorm2d(num_features=256)

  def forward(self,x):

    x = self.conv1(x)
    x = self.max_pool(x)
    x = self.conv2(x)
    x = self.max_pool(x)
    x = self.conv3(x)
    x = self.batch_norm1(x) 
    x = self.conv3_(x)
    x = self.max_pool(x)
    x = self.conv4(x)
    x = self.batch_norm2(x)
    p_enc_2d = PositionalEncodingPermute2D(x.shape[1])
    penc_x = p_enc_2d(x)
    assert penc_x.size()==x.size()
    combined_x = penc_x+x
    # print(combined_x.shape)
#### NOT SURE IF THEY WANT THIS SHAPE
    # x = x.reshape(x.shape[0],-1)
#### OR (MOSTLY THIS)
    x = x.reshape(x.shape[0],x.shape[2]*x.shape[3],1,x.shape[1])
    return x


In [67]:
# Testing the encoder
encoder = Encoder(in_channels=1)
x = torch.randn(64,1,28,28)
# print(x.shape)
x=encoder(x)
print(x.shape)


torch.Size([64, 9, 1, 512])


In [1]:
import torch.utils.data as utils
import torch.nn.functional as F
import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.nn.parameter import Parameter
import math
import numpy as np
import pandas as pd
import time


class LSTM(nn.Module):
    def __init__(self, input_size, cell_size, hidden_size):
        """
        cell_size is the size of cell_state.
        hidden_size is the size of hidden_state, or say the output_state of each step
        """
        super(LSTM, self).__init__()
        
        self.cell_size = cell_size
        self.hidden_size = hidden_size
        self.fl = nn.Linear(input_size + hidden_size, hidden_size)
        self.il = nn.Linear(input_size + hidden_size, hidden_size)
        self.ol = nn.Linear(input_size + hidden_size, hidden_size)
        self.Cl = nn.Linear(input_size + hidden_size, hidden_size)
        
    def step(self, input, Hidden_State, Cell_State):
        combined = torch.cat((input, Hidden_State), 1)
        f = F.sigmoid(self.fl(combined))
        i = F.sigmoid(self.il(combined))
        o = F.sigmoid(self.ol(combined))
        C = F.tanh(self.Cl(combined))
        Cell_State = f * Cell_State + i * C
        Hidden_State = o * F.tanh(Cell_State)
        
        return Hidden_State, Cell_State
    
    def forward(self, inputs):
        batch_size = inputs.size(0)
        time_step = inputs.size(1)
        Hidden_State, Cell_State = self.initHidden(batch_size)
        outputs = None
        for i in range(time_step):
            Hidden_State, Cell_State = self.step(torch.squeeze(inputs[:,i:i+1,:]), Hidden_State, Cell_State)  
            if outputs is None:
                outputs = Hidden_State.unsqueeze(1)
            else:
                outputs = torch.cat((Hidden_State.unsqueeze(1), outputs), 1)
        return outputs
    
    def initHidden(self, batch_size):
        use_gpu = torch.cuda.is_available()
        if use_gpu:
            Hidden_State = Variable(torch.zeros(batch_size, self.hidden_size).cuda())
            Cell_State = Variable(torch.zeros(batch_size, self.hidden_size).cuda())
            return Hidden_State, Cell_State
        else:
            Hidden_State = Variable(torch.zeros(batch_size, self.hidden_size))
            Cell_State = Variable(torch.zeros(batch_size, self.hidden_size))
            return Hidden_State, Cell_State
        
class BiLSTM(nn.Module):
    
    def __init__(self, input_size, cell_size, hidden_size):
        """
        cell_size is the size of cell_state.
        hidden_size is the size of hidden_state, or say the output_state of each step
        """
        super(BiLSTM, self).__init__()
        
        self.cell_size = cell_size
        self.hidden_size = hidden_size
        self.fl_f = nn.Linear(input_size + hidden_size, hidden_size)
        self.il_f = nn.Linear(input_size + hidden_size, hidden_size)
        self.ol_f = nn.Linear(input_size + hidden_size, hidden_size)
        self.Cl_f = nn.Linear(input_size + hidden_size, hidden_size)
        self.fl_b = nn.Linear(input_size + hidden_size, hidden_size)
        self.il_b = nn.Linear(input_size + hidden_size, hidden_size)
        self.ol_b = nn.Linear(input_size + hidden_size, hidden_size)
        self.Cl_b = nn.Linear(input_size + hidden_size, hidden_size)
        
        
    
    def step(self, input_f, input_b, Hidden_State_f, Cell_State_f, Hidden_State_b, Cell_State_b):
        batch_size = input_f.size(0)
        
        combined_f = torch.cat((input_f, Hidden_State_f), 1)
        
        f_f = F.sigmoid(self.fl_f(combined_f))
        i_f = F.sigmoid(self.il_f(combined_f))
        o_f = F.sigmoid(self.ol_f(combined_f))
        C_f = F.tanh(self.Cl_f(combined_f))
        Cell_State_f = f_f * Cell_State_f + i_f * C_f
        Hidden_State_f = o_f * F.tanh(Cell_State_f)
        
        combined_b = torch.cat((input_b, Hidden_State_b), 1)

        f_b = F.sigmoid(self.fl_b(combined_b))
        i_b = F.sigmoid(self.il_b(combined_b))
        o_b = F.sigmoid(self.ol_b(combined_b))
        C_b = F.tanh(self.Cl_b(combined_b))
        Cell_State_b = f_b * Cell_State_b + i_b * C_b
        Hidden_State_b = o_b * F.tanh(Cell_State_b)
        
        return Hidden_State_f, Cell_State_f, Hidden_State_b, Cell_State_b
    
    def forward(self, inputs):  
        outputs_f = None
        outputs_b = None
        
        batch_size = inputs.size(0)
        steps = inputs.size(1)
        
        Hidden_State_f, Cell_State_f, Hidden_State_b, Cell_State_b = self.initHidden(batch_size)
        
        for i in range(steps):
            Hidden_State_f, Cell_State_f, Hidden_State_b, Cell_State_b = \
                self.step(torch.squeeze(inputs[:,i:i+1,:]), torch.squeeze(inputs[:,steps-i-1:steps-i,:])\
                          , Hidden_State_f, Cell_State_f, Hidden_State_b, Cell_State_b)  
            
            if outputs_f is None:
                outputs_f = Hidden_State_f.unsqueeze(1)
            else:
                outputs_f = torch.cat((outputs_f, Hidden_State_f.unsqueeze(1)), 1)
            if outputs_b is None:
                outputs_b = Hidden_State_b.unsqueeze(1)
            else:
                outputs_b = torch.cat((Hidden_State_b.unsqueeze(1), outputs_b), 1)
        outputs = (outputs_f + outputs_b) / 2
        return outputs
        
    
    def initHidden(self, batch_size):
        use_gpu = torch.cuda.is_available()
        if use_gpu:
            Hidden_State_f = Variable(torch.zeros(batch_size, self.hidden_size).cuda())
            Cell_State_f = Variable(torch.zeros(batch_size, self.hidden_size).cuda())
            Hidden_State_b = Variable(torch.zeros(batch_size, self.hidden_size).cuda())
            Cell_State_b = Variable(torch.zeros(batch_size, self.hidden_size).cuda())
            return Hidden_State_f, Cell_State_f, Hidden_State_b, Cell_State_b
        else:
            Hidden_State_f = Variable(torch.zeros(batch_size, self.hidden_size))
            Cell_State_f = Variable(torch.zeros(batch_size, self.hidden_size))
            Hidden_State_b = Variable(torch.zeros(batch_size, self.hidden_size))
            Cell_State_b = Variable(torch.zeros(batch_size, self.hidden_size))
            return Hidden_State_f, Cell_State_f, Hidden_State_b, Cell_State_b