### MNIST classification  using  custom LSTM layer
1.Iswariya Manivannan<br>
2.Sathiya Ramesh

In [1]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as dsets
from torch.autograd import Variable
import torch.nn.functional as F
import torchvision.models as models
import torch.utils.data as utils_data
from torch.utils.data.sampler import SubsetRandomSampler

%matplotlib inline
import matplotlib.pyplot as plt 
import numpy as np
import copy


torch.manual_seed(40)
torch.cuda.manual_seed(40)
np.random.seed(2)

In [2]:
cuda0 = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(cuda0)
print(torch.cuda.is_available())

cuda:1
True


In [3]:
transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
trainset = dsets.MNIST(root='./data', train=True, download=True, transform=transform)
testset =  dsets.MNIST(root='./data', train=False, download=True, transform=transform)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Processing...
Done!


In [4]:
EPOCHS = 15
BATCH_SIZE = 100

In [5]:
train_set_size = int(0.8 * len(trainset))    # Train set - Val set split is 80% - 20%
train_indices = np.random.choice(np.arange(len(trainset)), train_set_size, replace = False)  # Getting the random 80% of train data from train set
train_sampler = SubsetRandomSampler(train_indices)

# Getting the 20% val data not present in train indices
val_indices = np.setdiff1d(np.arange(len(trainset)), train_indices, assume_unique= True)
val_sampler = SubsetRandomSampler(val_indices)
#print(np.any(np.isin(train_indices, val_indices)))

trainloader = utils_data.DataLoader(trainset, batch_size = BATCH_SIZE, sampler=train_sampler, num_workers=2)
valloader = utils_data.DataLoader(trainset, batch_size = BATCH_SIZE, sampler=val_sampler, num_workers=2)
testloader = utils_data.DataLoader(testset, batch_size = BATCH_SIZE, shuffle = True, num_workers=2)

### Custom LSTM module

In [6]:
class lstm(nn.Module):

    def __init__(self, input_size, hidden_size, num_layers, batch_first=False, dropout=0):
        
        super(lstm, self).__init__()
        self.batch_first = batch_first
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dropout_layer = nn.Dropout(dropout, inplace=True)
        self.input2hidden_layer = nn.ModuleList()
        
        for i in range(num_layers):
            input_size = input_size if i == 0 else hidden_size
            self.input2hidden_layer.append(nn.Linear(input_size, hidden_size * 4)) 
        self.hidden2hidden_layer = nn.ModuleList([nn.Linear(hidden_size, hidden_size * 4) for i in range(num_layers)])
    
    # forward for a single input
    def forward_step(self, input, hidden):
        nowh, nowc = hidden
        nxth_list, nxtc_list = [], []
        
        for L in range(self.num_layers):
            if L > 0: input = self.dropout_layer(nxth_list[L - 1])  
            h, c = nowh[L], nowc[L]  
            gate_vector = self.input2hidden_layer[L](input)+self.hidden2hidden_layer[L](h)
            ingate, forgetgate, cellgate, outgate = gate_vector.chunk(4, 1)
            ingate = F.sigmoid(ingate)
            forgetgate = F.sigmoid(forgetgate)
            cellgate = F.tanh(cellgate)  
            outgate = F.sigmoid(outgate)
            cy = (forgetgate * c) + (ingate * cellgate)
            hy = outgate * F.tanh(cy)   
            nxth_list.append(hy)
            nxtc_list.append(cy)
            
        nxth = torch.cat(nxth_list, 0).view(self.num_layers, input.size(0),self.hidden_size)
        nxtc = torch.cat(nxtc_list, 0).view(self.num_layers, input.size(0),self.hidden_size)
        output = nxth_list[-1]  
        return output, (nxth, nxtc)
     
    # forward for sequnce input      
    def forward(self, input, hidden):
        if self.batch_first: 
            input=input.transpose(0, 1)
        output = []
        for _in in input:
            _out, hidden = self.forward_step(_in, hidden)
            output.append(_out)
        output=torch.cat(output, 0).view(input.size(0), *output[0].size())
        if self.batch_first: 
            output = output.transpose(0, 1)
        return output, hidden
         

### LSTM Network

In [7]:
class LSTMModel(nn.Module):
    
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
        super(LSTMModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        self.lstm = lstm(input_dim, hidden_dim, layer_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
     
    def forward(self, x):
        h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(cuda0)
        c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(cuda0)        
        out, (hn, cn) = self.lstm(x, (h0,c0))        
        out = self.fc(out[:, -1, :]) 
        return out

In [8]:
def train(model, optimizer, **kwargs):
    
    if kwargs['phase'] == 'Training':
        model.train()        
    if kwargs['phase'] == 'Validation':
        model.eval() 
        
    criterion = nn.CrossEntropyLoss().to(cuda0)  
    
    running_loss = 0
    running_pred = 0  
    batch_wise_loss = [] 
    
    for i, (images, labels) in enumerate(kwargs['dataloader']):  
        
        Images = images.view(-1, seq_dim, input_dim).to(cuda0)
        Labels = labels.to(cuda0) 
        
        y_pred = model(Images)
        loss = criterion(y_pred, Labels)
        running_loss += loss.item()
        batch_wise_loss.append(loss.item()) 
        
        if kwargs['phase'] == 'Training':
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step() 
            
        # Finding the number of correct predictions in the training set
        _, pred_class = torch.max(y_pred.data, 1)
        running_pred += (pred_class.cpu() == Labels.data.cpu()).sum()
        
    acc = 100. * (running_pred.numpy()/len(kwargs['dataloader'].sampler))
    return running_loss/len(kwargs['dataloader']), acc, batch_wise_loss

In [9]:
train_loss_values = []
train_acc_values = []
val_loss_values = []
val_acc_values = []
best_acc = 0
learning_rate = 0.1
input_dim = 28
hidden_dim = 100
layer_dim = 3
output_dim = 10
seq_dim = 28 

model = LSTMModel(input_dim, hidden_dim, layer_dim, output_dim).to(cuda0)
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate) 

for epoch in range(EPOCHS):  
    
    train_loss, train_acc, _ = train(model, optimizer, dataloader = trainloader,  phase = 'Training')  
    train_loss_values.append(train_loss)
    train_acc_values.append(train_acc)

    val_loss, val_acc, _ = train(model, optimizer, dataloader = valloader,  phase = 'Validation') 
    val_loss_values.append(val_loss)
    val_acc_values.append(val_acc)
    
    if val_acc > best_acc:
        best_acc = val_acc
        classifier_model_checkpoint = copy.deepcopy(model.state_dict())
        
    if epoch % 3 == 0:
        print(f'Epoch: {epoch}  Train Loss: {train_loss:.5f}  Train Acc:{train_acc:.5f}%\
    Val Loss: {val_loss:.5f}  Val Acc:{val_acc:.5f}% ')

Epoch: 0  Train Loss: 2.22713  Train Acc:15.87917%    Val Loss: 1.93298  Val Acc:25.74167% 
Epoch: 3  Train Loss: 0.20006  Train Acc:93.90625%    Val Loss: 0.20843  Val Acc:93.30000% 
Epoch: 6  Train Loss: 0.08703  Train Acc:97.37917%    Val Loss: 0.08682  Val Acc:97.45833% 
Epoch: 9  Train Loss: 0.05681  Train Acc:98.26667%    Val Loss: 0.07347  Val Acc:97.83333% 
Epoch: 12  Train Loss: 0.03804  Train Acc:98.80833%    Val Loss: 0.06646  Val Acc:98.16667% 


In [10]:
def predict(model, testloader):
    
    model.eval()
    pred_val = 0
    prediction_array = np.array([])
    labels_array = np.array([])  
    
    for images, labels in testloader:
        
        Images = images.view(-1, seq_dim, input_dim).to(cuda0)
        Labels = labels.to(cuda0)
        pred = model(Images)     
        
        # Finding the number of correct predictions in the training set
        _, pred_label = torch.max(pred.data, 1)
        pred_val += (pred_label.cpu() == Labels.data.cpu()).sum() 
        
        # Storing predictions and true labels in numpy arrays for printing confusion matrix
        prediction_array = np.append(prediction_array, pred_label.cpu().numpy(), axis =0)
        labels_array = np.append(labels_array, Labels.data.cpu().numpy(), axis = 0)
        
    Test_acc = 100. * (pred_val.numpy()/len(testloader.dataset))
    return Test_acc, prediction_array, labels_array

In [11]:
model.load_state_dict(classifier_model_checkpoint)
torch.save(model.state_dict(), "LSTM_model.th")

In [12]:
Test_acc, Predicted_labels, True_labels = predict(model, testloader) # Test set accuracy

print(f'Test Accuracy:{Test_acc:.5f}% ')

Test Accuracy:98.34000% 
