In [1]:
# importing all the necessary libraries

import numpy as np
import matplotlib.pyplot as plt
import os
import torch
import torch.nn as nn
from sklearn.metrics import classification_report #for detailed statistics on classifier
from sklearn.metrics import confusion_matrix
from torchvision.datasets import MNIST  #importing MNIST dataset
from tqdm import tqdm
from torchvision import transforms #for transforming the training and testing data 
from torch.utils.data import DataLoader #Dataloader loads the data batchwise with shuffling in a hassle free manner
from torch.optim import Adam #Adam for GD
import time # to see how long training took

In [2]:
# Flags for running various parts of the assignment

download_flag = False
load_model    = False
master_dir = os.getcwd() #this is the directory we're working in
mnist_dir     = 'mnist_A2' #directory to store the MNIST dataset
model_dir     = 'RNN_model' #directory containing the AE model
download_dir  = 'C:\\Users\\ABHISHEK\\Downloads\\EE6132_Ass4'


if not os.path.exists(os.path.join(master_dir, mnist_dir)):
        os.mkdir(os.path.join(master_dir, mnist_dir)) #make the directory if it doesn't exist

if not os.path.exists(os.path.join(master_dir, model_dir)):
        os.mkdir(os.path.join(master_dir, model_dir)) #make the directory if it doesn't exist
model_path = os.path.join(master_dir, model_dir)+'/'


In [3]:
#Training Hyperparams:
learning_rate = 1e-2 
batch_size    = 64
N_epochs      = 7 #make it 5 for image, 30 for seq, 7 for binary
N_iter_train  = 250
N_iter_test   = 40
N_iter_check  = 5

input_size    = 10 #MNIST inputs sent as 28 units of 28x1 vectors, 10 for the second question
N_steps       = 28 #no of time steps


#device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") #checks for gpu else runs in cpu



In [4]:
class Vanilla_RNN(nn.Module):
    
    def __init__(self,input_size = input_size,hidden = 128,bd_flag = False): #class constructor with params for hidden layer and input size
        super(Vanilla_RNN, self).__init__() #calls the parent constructor
        
        #configuring the RNN
        self.rnn = nn.RNN(input_size = input_size, 
                          hidden_size = hidden, 
                          num_layers = 1,
                          bidirectional = bd_flag, 
                          batch_first = True)
        
        #we want to use the output of the Hidden Layer for the next time step
        self.HL = nn.Linear(hidden + hidden*bd_flag ,10) #as size of the output is 10
        
        self.logsoftmax = nn.LogSoftmax(dim = 1)
    
    def forward(self,x): #defines the forward pass and also the structure of the network thus helping backprop
        
        # x :(batch_size, #time_steps, input_size)
        # out:(batch_size, #time_steps, output_size)
        
        out,hidden_ = self.rnn(x)
        
        #we want the output at the last time step alone
        out = self.HL(out[:,-1,:]) #obtain the output of the last hidden state
        
        pred   = self.logsoftmax(out)
        
        return pred
    
class Vanilla_LSTM(nn.Module):
    
    def __init__(self,input_size = input_size,hidden = 128,bd_flag = False): #class constructor with params for hidden layer and input size
        super(Vanilla_LSTM, self).__init__() #calls the parent constructor
        
        #configuring the RNN
        self.lstm = nn.LSTM(input_size = input_size, 
                          hidden_size = hidden, 
                          num_layers = 1,
                          bidirectional = bd_flag, 
                          batch_first = True)
        
        #we want to use the output of the Hidden Layer for the next time step
        self.HL = nn.Linear(hidden + hidden*bd_flag,10) #as size of the output is 10
        
        self.logsoftmax = nn.LogSoftmax(dim = 1)
        
        self.bd_flag = bd_flag
        self.hidden  = hidden
        
    
    def forward(self,x): #defines the forward pass and also the structure of the network thus helping backprop
        
        # x :(batch_size, #time_steps, input_size)
        # out:(batch_size, #time_steps, output_size)
        # h :(D*#hidden_layers, batch_size, hidden_size)
        # c :(D*#hidden_layers, batch_size, hidden_size)
        # D = 2 if bidirectional else 1 
        
        #initializing the cell and hidden state to all zeros for the first input
        
        h_0 = torch.zeros(1 + 1*self.bd_flag, x.size(0), self.hidden).requires_grad_()

        c_0 = torch.zeros(1 + 1*self.bd_flag, x.size(0), self.hidden).requires_grad_()

        # We detach as we're doing truncated BPTT and don't wanna start from the beginning for a new batch
        
        # Forward prop
        out, (h_n, c_n) = self.lstm(x, (h_0.detach(), c_0.detach()))

        #we want the output at the last time step alone
        out = self.HL(out[:,-1,:]) #obtain the output of the last hidden state

        pred   = self.logsoftmax(out)
        
        return pred
        
        
class Vanilla_GRU(nn.Module):
    
    def __init__(self,input_size = input_size,hidden = 128,bd_flag = False): #class constructor with params for hidden layer and input size
        super(Vanilla_GRU, self).__init__() #calls the parent constructor
        
        #configuring the RNN
        self.gru = nn.GRU(input_size = input_size, 
                          hidden_size = hidden, 
                          num_layers = 1,
                          bidirectional = bd_flag, 
                          batch_first = True)
        
        #we want to use the output of the Hidden Layer for the next time step
        self.HL = nn.Linear(hidden + hidden*bd_flag,10) #as size of the output is 10
        
        self.logsoftmax = nn.LogSoftmax(dim = 1)
        
        self.bd_flag = bd_flag
        self.hidden  = hidden
        
    
    def forward(self,x): #defines the forward pass and also the structure of the network thus helping backprop
        
        # x :(batch_size, #time_steps, input_size)
        # out:(batch_size, #time_steps, output_size)
        # h :(D*#hidden_layers, batch_size, hidden_size)
        # D = 2 if bidirectional else 1
        
        
        #initializing the hidden state to all zeros for the first input
        
        h_0 = torch.zeros(1 + self.bd_flag*1, x.size(0), self.hidden).requires_grad_()

        # We detach as we're doing truncated BPTT and don't wanna start from the beginning for a new batch
        
        # Forward prop
        out, h_n = self.gru(x, (h_0.detach()))

        #we want the output at the last time step alone
        out = self.HL(out[:,-1,:]) #obtain the output of the last hidden state

        pred   = self.logsoftmax(out)
        
        return pred
    

class Binary_LSTM(nn.Module):
    
    def __init__(self,input_size = 2,hidden = 128,bd_flag = False): #class constructor with params for hidden layer and input size
        super(Binary_LSTM, self).__init__() #calls the parent constructor
        
        #configuring the RNN
        self.lstm = nn.LSTM(input_size = input_size, 
                          hidden_size = hidden, 
                          num_layers = 1,
                          bidirectional = bd_flag, 
                          batch_first = True)
        
        #we want to use the output of the Hidden Layer for the next time step
        self.HL = nn.Linear(hidden + hidden*bd_flag,1) #as size of the output is 1
        
        self.sigmoid = nn.Sigmoid()
        
        self.bd_flag = bd_flag
        self.hidden  = hidden
        
    
    def forward(self,x): #defines the forward pass and also the structure of the network thus helping backprop
        
        # x :(batch_size, #time_steps, input_size)
        # out:(batch_size, #time_steps, output_size)
        # h :(D*#hidden_layers, batch_size, hidden_size)
        # c :(D*#hidden_layers, batch_size, hidden_size)
        # D = 2 if bidirectional else 1 
        
        #initializing the cell and hidden state to all zeros for the first input
        
        h_0 = torch.zeros(1 + 1*self.bd_flag, x.size(0), self.hidden).requires_grad_()

        c_0 = torch.zeros(1 + 1*self.bd_flag, x.size(0), self.hidden).requires_grad_()

        # We detach as we're doing truncated BPTT and don't wanna start from the beginning for a new batch
        
        # Forward prop
        out, (h_n, c_n) = self.lstm(x, (h_0.detach(), c_0.detach()))

        #we want the output at every time step
        

        pred   = self.sigmoid(self.HL(out))
    
        
        return pred
        
    
    
    
        
        
        
        
        
        

In [5]:
def cross_entropy(pred, target): #custom cross entropy loss function as normal Pytorch doesn't support
    return (1/2)*(torch.mean(-torch.sum(target * torch.log(pred))) + torch.mean(-torch.sum((1-target) * torch.log(1-pred))))


#Defining a function to train the network: Returns the training loss for the current epoch 
def Train(model,model_flag,device,TrainDataLoader,optimizer,lossfn,train_length,reg = False,l2_reg = 0.001):
    
    model.train() #setting the model in training mode
    
    #initializing the total training loss and total correct training predictions to 0
    train_loss    = 0
    train_correct = 0 #correct predictions made
    
    #loop over the training set
    
    for (data,label) in tqdm(TrainDataLoader): #(data,label): Training data for that batch
        
        
        (data,label) = (data.to(device),label.to(device))  #sending the data to the device we've chosen
        
        #reshape data to format of the RNNs (batch_size, #time_steps, input_size)
        
        data = data.view(-1,28,28) #this tells the RNN there are 28 time steps it has to look at
        
        pred = model(data) #prediction from model
        
        loss = lossfn(pred,label) #our loss
        
        if(reg == True): #apply regularization to only input to hidden weights
            
            if(model_flag == 0):
                l2_norm = sum(p.pow(2.0).sum() for p in model.rnn.weight_ih_l0)
            
            elif(model_flag == 1):
                l2_norm = sum(p.pow(2.0).sum() for p in model.lstm.weight_ih_l0)
            
            elif(model_flag == 2):
                l2_norm = sum(p.pow(2.0).sum() for p in model.gru.weight_ih_l0)
            
 
            loss = loss + l2_reg*l2_norm
            
        
        optimizer.zero_grad() #zeroing out the gradients before backprop
        loss.backward()       #backprop from the loss
        optimizer.step()      #updating the weights
        
        #Adding this loss to  training loss and computing correct predictions
        
        train_loss    += loss
        train_correct += (pred.argmax(1) == label).type(torch.float).sum().item() #our prediction with max probability is our label
        
        
    #Computing training accuracy 
    
    train_correct /= train_length #training accuracy 
    
    return train_loss, train_correct #returning loss and accuracy 
        
#Defining a function to test the network: Returns the test loss and prediction accuracy for the current epoch
def Test(model,device,TestDataLoader,lossfn,test_length,reg = False,l2_reg = 0.001):
    
    model.eval()  #setting the model in eval/test mode
    
    #initializing the total test loss and total correct test predictions to 0
    test_loss    = 0
    test_correct = 0 #correct predictions made
    
    #switching off the gradient for eval
    with torch.no_grad():
        
        #loop over the test set
        
        for (data,label) in TestDataLoader: # (data,label): Test data for that batch
            
            (data,label) = (data.to(device),label.to(device))  #sending the data to the device we've chosen
            
            #reshape data to format of the RNNs (batch_size, #time_steps, input_size)
        
            data = data.view(-1,28,28) #this tells the RNN there are 28 time steps it has to look at
        
            #perform forward pass and compute the loss
        
            pred = model(data) #our prediction
            loss = lossfn(pred,label) #loss 
            
            if(reg == True): #apply regularization to only input to hidden weights
            
                if(model_flag == 0):
                    l2_norm = sum(p.pow(2.0).sum() for p in model.rnn.weight_ih_l0)
            
                elif(model_flag == 1):
                    l2_norm = sum(p.pow(2.0).sum() for p in model.lstm.weight_ih_l0)
            
                elif(model_flag == 2):
                    l2_norm = sum(p.pow(2.0).sum() for p in model.gru.weight_ih_l0)
            
 
                loss = loss + l2_reg*l2_norm
            
            #Adding this loss to  test loss and computing correct predictions
        
            test_loss    += loss
            test_correct += (pred.argmax(1) == label).type(torch.float).sum().item() #our prediction with max probability is our label
        
        
    #Computing prediction accuracy 
    
    test_correct /= test_length  #prediction accuracy 
    
    return test_loss, test_correct #returning loss and accuracy 
        
        
def random_pred(model,device,test_data): #predicts the label of random images
    
    data_ind  = [6003,416,6754,1605,5055,7965,517,5551,7070,6420]
    
    for ind in data_ind: #make predictions for all these indices
        
        (test_image,true_label) = test_data[ind]
        
        with torch.no_grad():
            
            pred = model.forward(test_image.view(-1,28,28)).detach().cpu().numpy() #as it is a single image we directly run the forward pass

            pred_class = np.argmax(pred) #predicted class
            
            print(f"True label:{true_label} predicted as {pred_class}")
            

def sequence_generator(L,batch_size = batch_size,K = 1):
    
    random_seq = np.random.randint(0, 9,(batch_size, L)) #generated random number sequence

    x = np.zeros((batch_size, L,10)) #second dimension is 10 as we're looking at one-hot vectors
    y = np.zeros((batch_size,10)) #output
    
    for i in range(batch_size):
        x[i,np.arange(L), random_seq[i]] = 1   #does everything at one go (np.arange(L) and random_seq[i]) iterate over the batch in a single step
        y[i,random_seq[i,K]] = 1
        
    #converting to torch    
    random_seq = torch.tensor(random_seq, dtype=torch.int) 
    x = torch.tensor(x, dtype=torch.int)
    y = torch.tensor(y, dtype=torch.int)
    

    return random_seq,x.float(),y #as input x is reqd to be float 

def Train_sequence(model,model_flag,optimizer,lossfn,N_iter_train = N_iter_train):
    
    model.train() #setting the model in training mode
    #initializing the total training loss and total correct training predictions to 0
    train_loss    = 0
    train_correct = 0 #correct predictions made
    
    train_length = batch_size*N_iter_train
    
    for i in range(N_iter_train):
        
        L = np.random.randint(3,10) #randomizing L
        
        random_seq,x,y = sequence_generator(L)
        
        pred = model(x) #prediction using the input data
        
        loss = lossfn(pred,y.argmax(axis = 1))
        
        optimizer.zero_grad() #zeroing out the gradients before backprop
        loss.backward()       #backprop from the loss
        optimizer.step()      #updating the weights
        
        #Adding this loss to  training loss and computing correct predictions
        train_loss    += loss
        train_correct += (np.asarray(pred.argmax(axis = 1)-y.argmax(axis = 1))==0).sum() #as subtraction will result in 0 for correct pred
        
    #Computing training accuracy 
    
    train_correct /= train_length #training accuracy 
    
    return train_loss, train_correct #returning loss and accuracy 


def Test_sequence(model,lossfn,N_iter_test = N_iter_test):
    
    model.eval()  #setting the model in eval/test mode
    
    #initializing the total test loss and total correct test predictions to 0
    test_loss    = 0
    test_correct = 0 #correct predictions made
    test_length = batch_size*N_iter_test
    
    #switching off the gradient for eval
    with torch.no_grad():
        
        for i in range(N_iter_test):
        
            L = np.random.randint(3,10) #randomizing L
        
            random_seq,x,y = sequence_generator(L)
        
            pred = model(x) #prediction using the input data
        
            loss = lossfn(pred,y.argmax(axis = 1))
            
            #Adding this loss to  testing loss and computing correct predictions
            test_loss    += loss
            test_correct += (np.asarray(pred.argmax(axis = 1)-y.argmax(axis = 1))==0).sum() #as subtraction will result in 0 for correct pred
        
    #Computing prediction accuracy 
    
    test_correct /= test_length #prediction accuracy 
    
    return test_loss, test_correct #returning loss and accuracy 

def binary_sequence_generator(L,batch_size = batch_size):
    
    x = np.zeros((batch_size, L+1 ,2)) #we zero pad as output sequence might have dimension L+1
    y = np.zeros((batch_size,L+1)) #output
    
    for i in range(batch_size):
        a = np.random.randint(0,2**(L)) #number 1
        b = np.random.randint(0,2**L) #number 2
        c = a+b #sum
        
        
        bin_a = bin(a)[2:]
        bin_a = list(str('0')*(L+1 - len(bin_a)) + bin_a) #sign extension
        bin_a = np.asarray(bin_a[::-1],dtype = int) #converting to numpy array and reversing the string
        
        bin_b = bin(b)[2:]
        bin_b = list(str('0')*(L+1 - len(bin_b)) + bin_b) #sign extension
        bin_b = np.asarray(bin_b[::-1],dtype = int) #converting to numpy array and reversing the string

        
        bin_c = bin(c)[2:]
        bin_c = list(str('0')*(L+1 - len(bin_c)) + bin_c) #sign extension
        bin_c = np.asarray(bin_c[::-1],dtype = int) #converting to numpy array and reversing the string
        
        
        
        x[i,:,0] = bin_a
        x[i,:,1] = bin_b
        y[i]     = bin_c
        
    x = torch.tensor(x, dtype=torch.int)
    y = torch.tensor(y, dtype=torch.int)

    return x.float(),y #returning float as that is what is used in forward pass
        
        
def Train_sum(model,model_flag,optimizer,lossfn,N_iter_train = N_iter_train,loss_flag = 1,L = 3): #loss_flag = 1 for MSE and 0 for CE
    
    model.train() #setting the model in training mode
    #initializing the total training loss and total correct training predictions to 0
    train_loss    = 0
    train_correct = 0 #correct predictions made
    
    train_length = batch_size*N_iter_train
    
    for i in range(N_iter_train):
        
        x,y = binary_sequence_generator(L)
        
        pred = model(x) #prediction using the input data (explicitly make it float)
        
        if(loss_flag == 0): #CE Loss
            
            loss = cross_entropy(pred,y.view(pred.size()))
        
        elif(loss_flag == 1): #MSE Loss
            
            loss = lossfn(pred,y.view(pred.size()).float()) #converting to float for MSE loss
        
        optimizer.zero_grad() #zeroing out the gradients before backprop
        loss.backward()       #backprop from the loss
        optimizer.step()      #updating the weights
        
        
        #prediction made by LSTM
        threshold = torch.Tensor([0.5])
        pred_y = (pred > threshold).float() * 1
        
        #convert to base 10 equivalent
        pred_y = pred_y.numpy()[:,:,0]
        pred_y = pred_y.dot(2**np.arange(pred_y.shape[1]))

        y_10 = y.numpy()
        y_10 = y_10.dot(2**np.arange(y_10.shape[1]))
        
        #Adding this loss to  training loss and computing correct predictions
        train_loss    += loss
        train_correct += np.sum(pred_y == y_10) #as subtraction will result in 0 for correct pred, bitwise accuracy
       
    #Computing training accuracy 
    
    train_correct /= train_length #training accuracy 
    
    return train_loss, train_correct #returning loss and accuracy 
                          

    
        
def Test_sum(model,lossfn,N_iter_test = 100,loss_flag = 1,L = 9): #as we're supposed to test on 100 samples
    
    model.eval()  #setting the model in eval/test mode
    
    #initializing the total test loss and total correct test predictions to 0
    test_loss    = 0
    test_correct = 0 #correct predictions made
    test_length = batch_size*N_iter_test
    
    #switching off the gradient for eval
    with torch.no_grad():
        
        for i in range(N_iter_test):
        
            x,y = binary_sequence_generator(L)
        
        
            pred = model(x.float()) #prediction using the input data
        
            if(loss_flag == 0): #CE Loss
            
                loss = cross_entropy(pred,y.view(pred.size()))
        
            elif(loss_flag == 1): #MSE Loss
            
                loss = lossfn(pred,y.view(pred.size()).float()) #converting to float for MSE loss
                
                          
            #prediction made by LSTM
            threshold = torch.Tensor([0.5])
            pred_y = (pred > threshold).float() * 1
            
            #convert to base 10 equivalent
            pred_y = pred_y.numpy()[:,:,0]
            pred_y = pred_y.dot(2**np.arange(pred_y.shape[1]))

            y_10 = y.numpy()
            y_10 = y_10.dot(2**np.arange(y_10.shape[1]))
    
            #Adding this loss to  testing loss and computing correct predictions
            test_loss    += loss
            test_correct += np.sum(pred_y==y_10) #as subtraction will result in 0 for correct pred
        
    #Computing prediction accuracy 
    
    test_correct /= test_length #prediction accuracy 
    
    return test_loss, test_correct #returning loss and accuracy     
                          

def Check_sequence(model,N_iter_check = N_iter_check):
    
    model.eval()  #setting the model in eval/test mode
    
    #switching off the gradient for eval
    with torch.no_grad():
        
        for i in range(N_iter_check):
        
            L = np.random.randint(3,10) #randomizing L
        
            random_seq,x,y = sequence_generator(L,batch_size = 1)
        
            pred = model(x) #prediction using the input data
            
            print(f'Generated Sequence:{random_seq}')
            print(f'Predicted Output:{pred.argmax(axis = 1)}')
            

def Check_binary_sequence(model,model_name,lossfn,N_check = 100,loss_flag = 1):
    
    model.eval()
    
    test_accuracies = []
    
    for L in range(1,21): #iterating through L in the required range
    
        #initializing the total test loss and total correct test predictions to 0
        test_loss    = 0
        test_correct = 0 #correct predictions made
        test_length = 100
    
        #switching off the gradient for eval
        with torch.no_grad():
            
        
            x,y = binary_sequence_generator(L,batch_size = N_check)
        
        
            pred = model(x.float()) #prediction using the input data
        
            if(loss_flag == 0): #CE Loss
            
                loss = cross_entropy(pred,y.view(pred.size()))
        
            elif(loss_flag == 1): #MSE Loss
            
                loss = lossfn(pred,y.view(pred.size()).float()) #converting to float for MSE loss
                
                          
            #prediction made by LSTM
            threshold = torch.Tensor([0.5])
            pred_y = (pred > threshold).float() * 1
            
            #convert to base 10 equivalent
            pred_y = pred_y.numpy()[:,:,0]
            pred_y = pred_y.dot(2**np.arange(pred_y.shape[1]))

            y_10 = y.numpy()
            y_10 = y_10.dot(2**np.arange(y_10.shape[1]))
    
            #Adding this loss to  testing loss and computing correct predictions
            test_loss    += loss
            test_correct += np.sum(pred_y==y_10) #as subtraction will result in 0 for correct pred
        
            #Computing prediction accuracy 
    
            test_correct /= test_length #prediction accuracy
            
            test_accuracies.append(test_correct)
            
    
    #plotting test accuracies vs L
    
    plt.bar(np.arange(1,21),test_accuracies)
    plt.xlabel('<-- L -->')
    plt.ylabel('Accuracy')
    plt.ylim(0,1) #as accuracy is between 0 and 1
    plt.grid()
    plt.legend()
    plt.title('Prediction Accuracy vs Length')
    plt.savefig(download_dir+'\\'+model_name+str(loss_flag)+'_L_accuracy.png')
    plt.show()
    
    print(f'Prediction Accuracies : {test_accuracies}')
        
        
    
        
            
            
                

In [6]:
def Run_RNN(model_name,load_model = load_model,model_flag = 0,bd_flag = False,reg = False,hidden_layer = 128,question_flag = 1,N_epochs = N_epochs,learning_rate = learning_rate,batch_size = batch_size,loss_flag = 1,L=3):
    
    startTime = time.time()
    #applied transform first converts the data into a tensor then normalizes it.
    #0.1307 is the mean of the MNIST data set and 0.3081 is the standard deviation
    
    app_transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
    
    #organize the training and test data
    
    train_data    = MNIST(mnist_dir, train = True, download = download_flag, transform = app_transform) #getting training data
    test_data     = MNIST(mnist_dir, train = False, transform = app_transform)
    
    #initialize the dataloaders
    TrainDataLoader = DataLoader(train_data, batch_size = batch_size, shuffle = True ) 
    TestDataLoader  = DataLoader(test_data, batch_size = batch_size) 
    
    train_length  = len(TrainDataLoader.dataset) #no of training examples
    test_length   = len(TestDataLoader.dataset)  #no of testing cases
    
    #initialize the loss function
    
    lossfn    = nn.NLLLoss() #like a cross entropy loss function when we couple this with softmax
    
    #initialize the model
    if(model_flag == 0): #RNN
        
        model = Vanilla_RNN(hidden = hidden_layer,bd_flag = bd_flag).to(device)
        
    
    elif(model_flag == 1): #LSTM
        
        model = Vanilla_LSTM(hidden = hidden_layer,bd_flag = bd_flag).to(device)
        
    elif(model_flag == 2): #GRU
        
        model = Vanilla_GRU(hidden = hidden_layer,bd_flag = bd_flag).to(device)
    
    elif(model_flag == 3): #binary_LSTM
        
        model = Binary_LSTM(hidden = hidden_layer,bd_flag = bd_flag).to(device)
        
        #change loss function
        lossfn    = nn.MSELoss()
    
    
    #initialize the optimizer
    optimizer = Adam(model.parameters(),lr = learning_rate) #using Adam for GD as its the fastest and state of the art
    

    if (load_model): #if the load_model flag is true, load the pre-trained model
        
        print('Loading Model')
        model.load_state_dict(torch.load(model_path + model_name), strict=False)
        print(model)
        
        if(question_flag == 1):
            random_pred(model,device,test_data)
        
        if(question_flag == 2):
            Check_sequence(model)
            
        if(question_flag == 3):
            Check_binary_sequence(model,model_name,lossfn,loss_flag = loss_flag)
               
        
    else: #train and test the RNN network if load_model flag is false
        
        if(question_flag == 1):
        
        
            #initialising the lists

            train_losses   = []
            test_losses    = []
            train_accuracy = []
            test_accuracy  = []

            for epoch in range(1, N_epochs+1):
                print("Epoch ",epoch," has just begun!")
                print('****************** ', epoch/N_epochs," % ******************") #creates a status bar instead of using tqdm

                #train the model
                loss,accuracy = Train(model,model_flag,device,TrainDataLoader,optimizer,lossfn,train_length,reg = False,l2_reg = 0.001)
                train_losses.append(loss)
                train_accuracy.append(accuracy)
                print('Train loss for Epoch ',epoch,': ',loss)
                print('Train accuracy for Epoch ',epoch, ': ',accuracy)

                #test the model
                loss,accuracy = Test(model,device,TestDataLoader,lossfn,test_length,reg = False,l2_reg = 0.001)
                test_losses.append(loss)
                test_accuracy.append(accuracy)
                print('Test loss for Epoch ',epoch,': ',loss)
                print('Test accuracy for Epoch ',epoch, ': ',accuracy)

            endTime = time.time()  
            print("Time taken to train and test model: ",endTime-startTime)
            print("Average prediction accuracy across epochs: ",np.mean(test_accuracy))
            print("Average training accuracy across epochs: ",np.mean(train_accuracy))

            #Plotting the Loss and the accuracy curves

            plt.plot(np.asfarray(train_losses),'o-',label = 'Train Loss') #converting to float array
            plt.plot(np.asfarray(test_losses),'o-',label = 'Validation Loss') 
            plt.xlabel('<-- Epochs -->')
            plt.ylabel('Normalized Loss')
            plt.grid()
            plt.legend()
            plt.title('Progress of Training and Validation error with every Epoch')
            plt.savefig(download_dir+'\\'+model_name+'_loss.png')
            plt.show()

            plt.plot(np.asfarray(train_accuracy),'o-',label = 'Training Accuracy')
            plt.plot(np.asfarray(test_accuracy),'o-',label = 'Testing Accuracy')
            plt.xlabel('<-- Epochs -->')
            plt.ylabel('Accuracy')
            plt.ylim(0,1) #as accuracy is between 0 and 1
            plt.grid()
            plt.legend()
            plt.title('Progress of Training and Prediction Accuracy with every Epoch')
            plt.savefig(download_dir+'\\'+model_name+'_accuracy.png')
            plt.show()


            #generating a classification report

            #final predictions

            preds = [] #list to store our final predictions

            #switching off the gradient for eval
            with torch.no_grad():

                #loop over the test set

                for (data,label) in TestDataLoader: # (data,label): Test data for that batch

                    (data,label) = (data.to(device),label.to(device))  #sending the data to the device we've chosen

                    #reshape data to format of the RNNs (batch_size, #time_steps, input_size)

                    data = data.view(-1,28,28) #this tells the RNN there are 28 time steps it has to look at

                    #perform forward pass and compute the loss

                    pred = model(data) #our prediction

                    #add our predictions to the list
                    preds.extend(pred.argmax(axis=1).cpu().numpy())

            print(classification_report(test_data.targets.cpu().numpy(), np.asfarray(preds), target_names=test_data.classes))


            #plotting the confusion matrix
            confusion_mat = confusion_matrix(test_data.targets.cpu().numpy(), np.asfarray(preds))

            plt.imshow(confusion_mat, interpolation='nearest', cmap=plt.cm.Wistia)
            plt.colorbar()
            classNames = ['0','1','2','3','4','5','6','7','8','9']
            plt.title('Test Data: Confusion Matrix')
            plt.ylabel('True label')
            plt.xlabel('Predicted label')
            tick_marks = np.arange(len(classNames))
            plt.xticks(tick_marks, classNames)
            plt.yticks(tick_marks, classNames)
            plt.savefig(download_dir+'\\'+model_name+'_confusion.png')
            plt.show()

            # Save the model we just trained
            torch.save(model.state_dict(), model_path+model_name)
            print(model)
        
        
        elif(question_flag == 2):
            
            #initialising the lists

            train_losses   = []
            test_losses    = []
            train_accuracy = []
            test_accuracy  = []

            for epoch in range(1, N_epochs+1):
                print("Epoch ",epoch," has just begun!")
                print('****************** ', epoch/N_epochs," % ******************") #creates a status bar instead of using tqdm

                #train the model
                loss,accuracy = Train_sequence(model,model_flag,optimizer,lossfn,N_iter_train = N_iter_train)
                train_losses.append(loss)
                train_accuracy.append(accuracy)
                print('Train loss for Epoch ',epoch,': ',loss)
                print('Train accuracy for Epoch ',epoch, ': ',accuracy)

                #test the model
                loss,accuracy = Test_sequence(model,lossfn,N_iter_test = N_iter_test)
                test_losses.append(loss)
                test_accuracy.append(accuracy)
                print('Test loss for Epoch ',epoch,': ',loss)
                print('Test accuracy for Epoch ',epoch, ': ',accuracy)

            endTime = time.time()  
            print("Time taken to train and test model: ",endTime-startTime)
            print("Average prediction accuracy across epochs: ",np.mean(test_accuracy))
            print("Average training accuracy across epochs: ",np.mean(train_accuracy))

            #Plotting the Loss and the accuracy curves

            plt.plot(np.asfarray(train_losses),'o-',label = 'Train Loss') #converting to float array
            plt.plot(np.asfarray(test_losses),'o-',label = 'Validation Loss') 
            plt.xlabel('<-- Epochs -->')
            plt.ylabel('Normalized Loss')
            plt.grid()
            plt.legend()
            plt.title('Progress of Training and Validation error with every Epoch')
            plt.savefig(download_dir+'\\'+model_name+'_loss.png')
            plt.show()

            plt.plot(np.asfarray(train_accuracy),'o-',label = 'Training Accuracy')
            plt.plot(np.asfarray(test_accuracy),'o-',label = 'Testing Accuracy')
            plt.xlabel('<-- Epochs -->')
            plt.ylabel('Accuracy')
            plt.ylim(0,1) #as accuracy is between 0 and 1
            plt.grid()
            plt.legend()
            plt.title('Progress of Training and Prediction Accuracy with every Epoch')
            plt.savefig(download_dir+'\\'+model_name+'_accuracy.png')
            plt.show()
            
            # Save the model we just trained
            torch.save(model.state_dict(), model_path+model_name)
            
        elif(question_flag == 3):    
            
            #initialising the lists

            train_losses   = []
            test_losses    = []
            train_accuracy = []
            test_accuracy  = []

            for epoch in range(1, N_epochs+1):
                print("Epoch ",epoch," has just begun!")
                print('****************** ', epoch/N_epochs," % ******************") #creates a status bar instead of using tqdm

                #train the model
                loss,accuracy = Train_sum(model,model_flag,optimizer,lossfn,N_iter_train = N_iter_train,loss_flag = loss_flag, L = L)
                train_losses.append(loss)
                train_accuracy.append(accuracy)
                print('Train loss for Epoch ',epoch,': ',loss)
                print('Train accuracy for Epoch ',epoch, ': ',accuracy)

                #test the model
                loss,accuracy = Test_sum(model,lossfn,N_iter_test = N_iter_test,loss_flag = loss_flag, L = L)
                test_losses.append(loss)
                test_accuracy.append(accuracy)
                print('Test loss for Epoch ',epoch,': ',loss)
                print('Test accuracy for Epoch ',epoch, ': ',accuracy)

            endTime = time.time()  
            print("Time taken to train and test model: ",endTime-startTime)
            print("Average prediction accuracy across epochs: ",np.mean(test_accuracy))
            print("Average training accuracy across epochs: ",np.mean(train_accuracy))

            #Plotting the Loss and the accuracy curves

            plt.plot(np.asfarray(train_losses),'o-',label = 'Train Loss') #converting to float array
            plt.plot(np.asfarray(test_losses),'o-',label = 'Validation Loss') 
            plt.xlabel('<-- Epochs -->')
            plt.ylabel('Normalized Loss')
            plt.grid()
            plt.legend()
            plt.title('Progress of Training and Validation error with every Epoch')
            plt.savefig(download_dir+'\\'+model_name+'_loss.png')
            plt.show()

            plt.plot(np.asfarray(train_accuracy),'o-',label = 'Training Accuracy')
            plt.plot(np.asfarray(test_accuracy),'o-',label = 'Testing Accuracy')
            plt.xlabel('<-- Epochs -->')
            plt.ylabel('Accuracy')
            plt.ylim(0,1) #as accuracy is between 0 and 1
            plt.grid()
            plt.legend()
            plt.title('Progress of Training and Prediction Accuracy with every Epoch')
            plt.savefig(download_dir+'\\'+model_name+str(loss_flag)+'_accuracy.png')
            plt.show()
            
            # Save the model we just trained
            torch.save(model.state_dict(), model_path+model_name)

        
        
        
    
   

    
    
    
    

In [7]:
def Run_Assignment():
    
    answer = str(input('Do you want to check the results of question 1? (y/n)'))
    
    if(answer == 'y'):
        
        while(answer == 'y'):
        
            answer = str(input('Do you want to train an RNN model?'))
        
            if(answer == 'y'):
            
                print('Given below are the choices')
            
                model_flag = int(input('Please enter 0 for RNN, 1 for LSTM and 2 for GRU'))
                hidden_layer = int(input('Please enter the size of the Hidden Layer (128 is default)'))
                bd_flag      = str(input('Do you want a bidirectional model? (y/n)'))
                reg_flag     = str(input('Do you want regularization on input to hidden weights?'))
                
            
                if(bd_flag == 'y'):
                    bd_flag = True
                else:
                    bd_flag = False
                
                if(reg_flag == 'y'):
                    reg_flag = True
                else:
                    reg_flag = False
                
                model_name = 'RNN'*(model_flag == 0) + 'LSTM'*(model_flag == 1) + 'GRU'*(model_flag == 2) +'_'+str(hidden_layer)+'_bd'*(bd_flag)+'_reg'*(reg_flag)+'.mdl'                
                Run_RNN(model_name,load_model = False,model_flag = model_flag,hidden_layer = hidden_layer,bd_flag = bd_flag)
            
                answer = str(input('Do you want to check test predictions?'))
                if(answer == 'y'):
                    Run_RNN(model_name,load_model = True,model_flag = model_flag,hidden_layer = hidden_layer,bd_flag = bd_flag)
                
                
            answer = str(input('Do you want to repeat this experiment again?'))        
                
    answer = str(input('Do you want to check the results of question 2? (y/n)'))
    
    if(answer == 'y'):
        
        answer = str(input('Do you want to train an RNN model?'))
        
        if(answer == 'y'):
            
            print('Given below are the choices')
            
            model_flag = int(input('Please enter 0 for RNN, 1 for LSTM and 2 for GRU'))
            hidden_layers = [2,5,10] 
            
            for hidden_layer in hidden_layers:
                answer = str(input(f'Do you want to train model with hidden layer = {hidden_layer}?'))
                if(answer == 'y'):
                    model_name = 'RNN'*(model_flag == 0) + 'LSTM'*(model_flag == 1) + 'GRU'*(model_flag == 2) +'_'+str(hidden_layer)+'_q2'+'.mdl'                
                    Run_RNN(model_name,load_model = False,model_flag = model_flag,hidden_layer = hidden_layer,question_flag = 2)
                
                    answer = str(input('Do you want to check the model output?'))
                    if(answer == 'y'):
                        Run_RNN(model_name,load_model = True,model_flag = model_flag,hidden_layer = hidden_layer,question_flag = 2)
                    
                
    answer = str(input('Do you want to check the results of question 3? (y/n)'))
    
    if(answer == 'y'):
        
        answer = str(input('Do you want to train an LSTM model?'))
        if(answer == 'y'):
            hidden = int(input('Enter size of hidden layer'))
            choice = int(input('Please enter 1 for MSE Loss and 0 for cross entropy loss'))
            if(choice == 0):
                loss_flag = 0
            elif(choice == 1):
                loss_flag = 1
            L = int(input('Enter the sequence size you want to train the model on'))
            model_name =  'LSTM' +'_'+str(hidden)+str(L)+'_q3'+'.mdl'
                
            Run_RNN(model_name,load_model = False,model_flag = 3,hidden_layer = hidden,question_flag = 3,loss_flag = loss_flag,L = L)
            
            answer = str(input('Do you want to check predictions across different lengths?'))
            if(answer == 'y'):
                Run_RNN(model_name,load_model = True,model_flag = 3,hidden_layer = hidden,question_flag = 3,loss_flag = loss_flag,L = L)
                
                
            
            
        
        
        

In [None]:
#Run this cell for the assignment
Run_Assignment()