In [None]:
import torch
import torch.nn as nn
import numpy as np
from torch.nn.parameter import Parameter
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as dsets
#from torch.utils.tensorboard import SummaryWriter
import matplotlib.pyplot as plt
import numpy as np
import time

%matplotlib inline

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.manual_seed(2805)


<torch._C.Generator at 0x7fc10a3290c0>

In [None]:
train_dataset = dsets.MNIST(root='./data',
                            train=True,
                            transform=transforms.ToTensor(),
                            download=True)

test_dataset = dsets.MNIST(root='./data',
                           train=False,
                           transform=transforms.ToTensor())
## Make Iterable Train and Test Set
batch_size = 100
n_iters = 6000
num_epochs = n_iters / (len(train_dataset) / batch_size)
num_epochs = int(num_epochs)

train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size,
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
                                          shuffle=False)

In [None]:
class OurLSTM(nn.Module):
    def __init__(self,input_dim, hidden_dim,output_dim):
        super(OurLSTM,self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        

        # ====   Weights and Bias   ===========
        self.w_ii = Parameter(torch.Tensor(self.input_dim,self.hidden_dim))
        self.w_hi = Parameter(torch.Tensor(self.hidden_dim,self.hidden_dim))
        self.w_if = Parameter(torch.Tensor(self.input_dim,self.hidden_dim))
        self.w_hf = Parameter(torch.Tensor(self.hidden_dim,self.hidden_dim))
        self.w_ic = Parameter(torch.Tensor(self.input_dim,self.hidden_dim))
        self.w_hc = Parameter(torch.Tensor(self.hidden_dim,self.hidden_dim))
        self.w_io = Parameter(torch.Tensor(self.input_dim,self.hidden_dim))
        self.w_ho = Parameter(torch.Tensor(self.hidden_dim,self.hidden_dim))

        self.b_i = Parameter(torch.Tensor(self.hidden_dim))
        self.b_f = Parameter(torch.Tensor(self.hidden_dim))
        self.b_c = Parameter(torch.Tensor(self.hidden_dim))
        self.b_o = Parameter(torch.Tensor(self.hidden_dim))

        # ===  Fully Connected Layer   ======
        self.fc = nn.Linear(100,self.output_dim)

        self.custom_weight_initializer()
        #self.init_params()

    def forward(self,x,init_states=None):
        batch_size,seq,_ = x.size()
        hidden_seq = []
        if init_states is None:
            h_t, c_t = (torch.zeros(x.size(0), self.hidden_dim),
                        torch.zeros(x.size(0), self.hidden_dim))
        else:
            h_t, c_t = init_states
        for i in range(seq):
            x_t = x[:,i,:]
            i_t = self.input_gate(h_t,x_t)
            f_t = self.forget_gate(h_t,x_t,c_t)
            c_t = self.update_gate(i_t,f_t)
            output, h_t = self.output_gate(h_t,x_t,c_t)
            hidden_seq.append(h_t.unsqueeze(0))

        hidden_seq = torch.cat(hidden_seq, dim=0)
        #print("Concatenated Hidden seq size: ", hidden_seq.size())
        # reshape from shape (sequence, batch, feature) to (batch, sequence, feature)
        hidden_seq = hidden_seq.transpose(0, 1).contiguous() #contigious-> GPU parallel computing, for memory allocation
        #print("Transposed Hidden seq Size: ",hidden_seq.size())
        
        # Index hidden state of last time step
        # out.size() --> 100, 28, 100
        # out[:, -1, :] --> 100, 100 --> just want last time step hidden states!
        out = hidden_seq[:,-1,:]
        fc_output = self.fc(out)
        
        return fc_output

    def create_rand_array(self,a, b, array_size):
        
        return torch.rand(array_size) * (b - a) + a

    def custom_weight_initializer(self):
        for param in self.parameters():
            if param.data.ndimension() >= 2:
                param.data = self.create_rand_array(-0.2,0.2,param.data.size())
            else: 
                param.data = torch.zeros(param.data.size())

    def init_params(self):
        for param in self.parameters():
            if param.data.ndimension() >= 2: 
                nn.init.xavier_uniform_(param.data)
            else: 
                nn.init.zeros_(param.data)

    def input_gate(self,h_prev, x):
        i_t = torch.sigmoid(x @ self.w_ii + h_prev @ self.w_hi + self.b_i)
        candidate = torch.tanh(x @ self.w_ic + h_prev @ self.w_hc + self.b_c)
        out = torch.mul(i_t, candidate)
        return out

    def forget_gate(self,h_prev, x, c_prev):
        f_t = torch.sigmoid(x @ self.w_if + h_prev @ self.w_hf + self.b_f)
        out = torch.mul(f_t, c_prev)

        return out

    def update_gate(self,in_out, f_out):
        c_t = in_out + f_out  
        return c_t

    def output_gate(self,h_prev, x, c_t):
        o_out = torch.sigmoid(x @ self.w_io + h_prev @ self.w_ho + self.b_o)
        h_t = torch.mul(o_out, torch.tanh(c_t))
        
        return o_out, h_t


In [None]:
## =======   T R A  I N I N G  &  T E S T I N G    ==============
def train_and_test(model,optimizer,criterion,train_loader,test_loader):
    model.train()
    # Number of steps to unroll
    seq_dim = 28
    iter = 0
    loss_list = []
    acc_list  =[]

    for epoch in range(num_epochs):
        for i, (images, labels) in enumerate(train_loader):
            
            #print("images.size()",images.size()) #[100,1,28,28]
            images = images.view(-1, seq_dim, input_dim)
            #print("images.size()",images.size()) #[100,28,28]
            # Clear gradients w.r.t. parameters
            optimizer.zero_grad()

            # Forward pass to get output/logits
            # outputs.size() --> 100, 10
            outputs = model(images)
    
            # Calculate Loss: softmax --> cross entropy loss
            loss = criterion(outputs, labels)
            loss_list.append(loss.item())
            # Getting gradients w.r.t. parameters
            loss.backward()

            # Updating parameters
            optimizer.step()

            iter += 1

            if iter % 500 == 0:
                # Calculate Accuracy
                correct = 0
                total = 0
                model.eval() # weights gulo fridge kore dicchi, jate wieght update na hoy
                with torch.no_grad(): #grad off kore dichchi, grad_requires = false kore dichchi
                    # Iterate through test dataset
                    for images, labels in test_loader:
                        # Resize images
                        images = images.view(-1, seq_dim, input_dim)

                        # Forward pass only to get logits/output
                        outputs = model(images)

                        # Get predictions from the maximum value
                        _, predicted = torch.max(outputs.data, 1)

                        # Total number of labels
                        total += labels.size(0)

                        # Total correct predictions
                        correct += (predicted == labels).sum().item()

                    accuracy = 100 * correct / total
                    acc_list.append(accuracy)

                # Print Loss
                print('Iteration: {}. Loss: {}. Accuracy: {}'.format(iter, loss.item(), accuracy))

        
    return loss_list, acc_list


In [None]:
## Instantiate the LSTM model
input_dim = 28
hidden_dim = 100
layer_dim = 1
output_dim = 10
model = OurLSTM(input_dim,hidden_dim,output_dim)


print("Number of parameters in the LSTM : ", len(list(model.parameters()))) 
for name, params in model.named_parameters():
    print(name, params.size())

Number of parameters in the LSTM :  14
w_ii torch.Size([28, 100])
w_hi torch.Size([100, 100])
w_if torch.Size([28, 100])
w_hf torch.Size([100, 100])
w_ic torch.Size([28, 100])
w_hc torch.Size([100, 100])
w_io torch.Size([28, 100])
w_ho torch.Size([100, 100])
b_i torch.Size([100])
b_f torch.Size([100])
b_c torch.Size([100])
b_o torch.Size([100])
fc.weight torch.Size([10, 100])
fc.bias torch.Size([10])


In [None]:
# ========      Training & Testing ===============
# Loss Function
criterion = nn.CrossEntropyLoss()
# optimizer
learning_rate = 0.1
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

loss_list, acc_list = train_and_test(model,optimizer,criterion,train_loader,test_loader)


Iteration: 500. Loss: 0.6464361548423767. Accuracy: 84.77
Iteration: 1000. Loss: 0.1687658280134201. Accuracy: 93.61
Iteration: 1500. Loss: 0.3036350905895233. Accuracy: 94.21
Iteration: 2000. Loss: 0.08583038300275803. Accuracy: 95.58
Iteration: 2500. Loss: 0.06993672251701355. Accuracy: 96.89
Iteration: 3000. Loss: 0.1766757220029831. Accuracy: 96.24
Iteration: 3500. Loss: 0.062183357775211334. Accuracy: 97.0
Iteration: 4000. Loss: 0.1050967201590538. Accuracy: 97.01
Iteration: 4500. Loss: 0.11175236105918884. Accuracy: 97.5
Iteration: 5000. Loss: 0.06583315134048462. Accuracy: 97.53
Iteration: 5500. Loss: 0.10004343837499619. Accuracy: 97.87
