<a href="https://colab.research.google.com/github/GrayardET/MIT-Reserach/blob/main/Homework2/RNN_MIT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## RNN Model for Image Classification



In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import tensorflow as tf
import torch

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

import time
import tqdm
from torch.utils.data import *

from sklearn.model_selection import KFold

from sklearn.metrics import confusion_matrix
import seaborn as sns
from sklearn.metrics import f1_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score



device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


### Constants

In [None]:
BATCH_SIZE = 64
K_FOLDS = 5
N_X = 28
N_H = 100
LR = 0.001
N_EPOCH = 20

In [None]:
# list all transformations
transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize(0.286041, 0.353024)])

### Data Loading: Fashion_Mnist

In [None]:
# Loading Fashion Mnist dataset from torchVision and downloads it to local machine
train_set = torchvision.datasets.FashionMNIST(root = './data', train = True, download = True, transform = transform)
test_set = torchvision.datasets.FashionMNIST(root = './data', train = False, transform = transform)

# To make sure that we don't run into dimension conflicts, drop the last incomplete batch
# train_loader = torch.utils.data.DataLoader(train_set, batch_size = 64, shuffle = True, num_workers = 2, drop_last = True)
# test_loader = torch.utils.data.DataLoader(test_set, batch_size = 64, shuffle = True, num_workers = 2, drop_last = True)

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz to ./data/FashionMNIST/raw/train-images-idx3-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=26421880.0), HTML(value='')))


Extracting ./data/FashionMNIST/raw/train-images-idx3-ubyte.gz to ./data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw/train-labels-idx1-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=29515.0), HTML(value='')))


Extracting ./data/FashionMNIST/raw/train-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz to ./data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=4422102.0), HTML(value='')))


Extracting ./data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz to ./data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz


HBox(children=(FloatProgress(value=0.0, max=5148.0), HTML(value='')))


Extracting ./data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw



  return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


### Construct Models

In [None]:
# Model for many to one basic RNN with one layer only.
class ImageRNN(nn.Module):
    def __init__(self, n_x, n_h, batch_size):
        super(ImageRNN, self).__init__()
        
        self.n_h = n_h
        self.n_x = n_x

        # batch_size is in x.shape(0), so we set bach_first = True
        self.rnn = nn.RNN(self.n_x, self.n_h, batch_first = True) 
        
        self.outputLayer = nn.Linear(self.n_h, 10)
        
    def forward(self, X):
        
        self.batch_size = X.size(0)
        self.hidden = torch.zeros(1, 64, self.n_h).to(device)
        hidden_set, self.hidden = self.rnn(X, self.hidden)      
        out = self.outputLayer(self.hidden)
        
        return out.view(-1, 10) # batch_size X n_output
        

In [None]:
# Model for LSTM.
class LSTM(nn.Module):
    def __init__(self, n_x, n_h, batch_size):
        super(LSTM, self).__init__()
        
        self.n_h = n_h
        self.n_x = n_x
        self.lstm = nn.LSTM(self.n_x, batch_size, batch_first = True) 
        
        self.outputLayer = nn.Linear(batch_size, 10)
        
    def forward(self, X):
        
        output,(first,second) = self.lstm(X, None) 
        out = self.outputLayer(output[:,-1,:])
        
        return out

### Util functions

In [None]:
# function that computes the accuracy of a batch
def acc_batch(y_hat, target, batch_size):
    ''' Obtain accuracy for training round '''
    # assign index of the max prediction value as it's class, with axis = 1
    y_category = torch.argmax(y_hat, dim = 1)
    corrects = (y_category == target.data).sum()
    acc= 100.0 * corrects/batch_size
    return acc.item()


def total_acc(loader, model, batch_size):
    Y = []
    Y_pred = []
    with torch.no_grad():
        acc = 0.0
        for j, (x, y) in enumerate(loader):
            x = x.view(-1, 28, 28).to(device)
            y = y.to(device)
            y_hat = model(x)
            acc += acc_batch(y_hat, y, BATCH_SIZE)

            pred = torch.argmax(y_hat, axis = 1)
            Y_pred.append(pred)
            Y.append(y)

        Y = torch.cat(Y).tolist()
        Y_pred = torch.cat(Y_pred).tolist()
        return acc/j, Y, Y_pred


def visualize_batch(t):
# Visualize dataset:
# make_grid function takes in tensor of shape (B X C X H X W)
# with B being the batch size and C being the num of channels
# C = 1 since our image is gray scale
# Functions takes a tensor of shape (B X C X H X W) and shows the whole batch of image
    t = torchvision.utils.make_grid(t)

    # permute tensor for plotting
    images_reshaped = t.permute(1,2,0)

    plt.figure(figsize = (7,7))
    plt.imshow(images_reshaped)

In [None]:
def train_model(model, loss_func, optimizer, n_epoch, train_loader, test_loader):
    train_acc_list = []
    eval_acc_list = []
    Y = []
    Y_pred = []
    for epoch in range(n_epoch):  # loop over the dataset multiple times
        train_running_loss = 0.0
        train_acc = 0.0
        model.train()
        
        # Train batches in each epoch
        for i, (images, labels) in enumerate(train_loader):

            # Reset Gradient in every iteration
            optimizer.zero_grad()

            images = images.view(-1, 28,28).to(device)
            labels = labels.to(device)

            # forward and backward propagation
            outputs = model(images)
            # computer loss and gradient
            loss = loss_func(outputs, labels)
            loss.backward()
            # update parameters
            optimizer.step()

            train_running_loss += loss.detach().item()
            train_acc += acc_batch(outputs, labels, BATCH_SIZE)

            # Construct Y and Y_pred for confusion-matrix plotting



        model.eval()

        evaluation_acc, Y, Y_pred = total_acc(test_loader, model, BATCH_SIZE)
        print(len(Y))
        train_acc_list.append(train_acc / i)
        eval_acc_list.append(evaluation_acc)

        print('Epoch:  %d | Loss: %.4f | Train Accuracy: %.2f | Evaluation Accuracy: %.2f' 
            %(epoch, train_running_loss / i, train_acc/i, evaluation_acc))

    return train_acc_list, eval_acc_list, Y, Y_pred

### Implement K-fold Validation

KFold Cross Validation With RNN

In [None]:
# Concatenate two datasets
datasetList = [train_set, test_set]
dataset_concated = torch.utils.data.ConcatDataset(datasetList)

# Use KFold in sklearn to distribute data
kfold = KFold(n_splits = K_FOLDS, shuffle = True)
train_acc_list = []
eval_acc_list = []

for fold, (train_ids, test_ids) in enumerate(kfold.split(dataset_concated)):
    print(f'THE {fold}th FOLD')
    print('------------------------------------------------------------------')


    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)


    train_loader = torch.utils.data.DataLoader(dataset_concated, batch_size = BATCH_SIZE, 
                                               sampler = train_subsampler, drop_last=True)
    test_loader = torch.utils.data.DataLoader(dataset_concated, batch_size=BATCH_SIZE, 
                                              sampler = test_subsampler, drop_last = True)
    
    # Define model, optimizer and loss function
    model = ImageRNN(N_X, N_H, BATCH_SIZE).to(device)
    loss_func = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LR)

    # Training the model and output Accuracy
    trainAcc, evalAcc, Y, Y_pred = train_model(model, loss_func, optimizer, N_EPOCH, train_loader, test_loader)

    # keep track of prediction accuracies
    train_acc_list.append(trainAcc)
    eval_acc_list.append(evalAcc)
    print('\n\n')

THE 0th FOLD
------------------------------------------------------------------


KeyboardInterrupt: ignored

KFolds Cross Validation With LSTM

In [None]:
# Concatenate two datasets
datasetList = [train_set, test_set]
dataset_concated = torch.utils.data.ConcatDataset(datasetList)

# Use KFold in sklearn to distribute data
kfold = KFold(n_splits = K_FOLDS, shuffle = True)
train_acc_list2 = []
eval_acc_list2 = []

for fold, (train_ids, test_ids) in enumerate(kfold.split(dataset_concated)):
    print(f'THE {fold}th FOLD')
    print('------------------------------------------------------------------')


    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)


    train_loader = torch.utils.data.DataLoader(dataset_concated, batch_size = BATCH_SIZE, 
                                               sampler = train_subsampler, drop_last=True)
    test_loader = torch.utils.data.DataLoader(dataset_concated, batch_size=BATCH_SIZE, 
                                              sampler = test_subsampler, drop_last = True)
    
    # Define model, optimizer and loss function
    model = LSTM(N_X, N_H, BATCH_SIZE).to(device)
    loss_func = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LR)

    # Training the model and output Accuracy
    trainAcc, evalAcc, Y2, Y_pred2 = train_model(model, loss_func, optimizer, N_EPOCH, train_loader, test_loader)

    # keep track of prediction accuracies
    train_acc_list2.append(trainAcc)
    eval_acc_list2.append(evalAcc)
    print('\n\n')

THE 0th FOLD
------------------------------------------------------------------
13952
Epoch:  0 | Loss: 0.7283 | Train Accuracy: 73.68 | Evaluation Accuracy: 82.26
13952
Epoch:  1 | Loss: 0.4369 | Train Accuracy: 84.36 | Evaluation Accuracy: 85.16
13952
Epoch:  2 | Loss: 0.3852 | Train Accuracy: 86.26 | Evaluation Accuracy: 86.82
13952
Epoch:  3 | Loss: 0.3548 | Train Accuracy: 87.20 | Evaluation Accuracy: 87.69
13952
Epoch:  4 | Loss: 0.3339 | Train Accuracy: 87.98 | Evaluation Accuracy: 87.85
13952
Epoch:  5 | Loss: 0.3183 | Train Accuracy: 88.56 | Evaluation Accuracy: 88.72
13952
Epoch:  6 | Loss: 0.3046 | Train Accuracy: 88.86 | Evaluation Accuracy: 88.59
13952
Epoch:  7 | Loss: 0.2965 | Train Accuracy: 89.18 | Evaluation Accuracy: 89.00
13952
Epoch:  8 | Loss: 0.2869 | Train Accuracy: 89.61 | Evaluation Accuracy: 89.52
13952
Epoch:  9 | Loss: 0.2777 | Train Accuracy: 89.88 | Evaluation Accuracy: 88.62
13952
Epoch:  10 | Loss: 0.2719 | Train Accuracy: 89.98 | Evaluation Accuracy: 8

In [None]:
import pickle

# Save result
with open('train_acc_list.pkl', 'wb') as f:
    pickle.dump(train_acc_list, f)

with open('eval_acc_list.pkl', 'wb') as f:
    pickle.dump(train_acc_list, f)

In [None]:
#save the LSTM model using pikle 
filename = 'LSTM_model.sav'
with open('LSTM_model.sav', 'wb') as f:
    pickle.dump(model,f)


In [None]:
torch.cuda.get_device_name(0)

In [None]:
eval_acc_list = np.array(eval_acc_list)
train_acc_list = np.array(train_acc_list)


eval_mean = np.mean(eval_acc_list, axis = 0)
eval_std = np.std(eval_acc_list, axis = 0)
train_mean = np.mean(train_acc_list, axis = 0)
train_std = np.std(train_acc_list, axis = 0)

k_list = range(1,21)
print(k_list)

plt.plot(k_list, train_mean, label='Rnn_Train', linewidth = 2)
plt.errorbar(k_list, eval_mean, eval_std, label='Validation', linewidth = 2)

plt.xticks(range(0,22, 2))
plt.legend()
plt.xlabel('Number of Epoch')
plt.ylabel('Accuracy')

plt.savefig('Epoch_Accuracy_Chart.jpg', format = 'JPEG')

In [None]:
eval_acc_list = np.array(eval_acc_list)
train_acc_list = np.array(train_acc_list)


eval_mean = np.mean(eval_acc_list, axis = 0)
eval_std = np.std(eval_acc_list, axis = 0)
train_mean = np.mean(train_acc_list, axis = 0)
train_std = np.std(train_acc_list, axis = 0)

k_list = range(1,21)
print(k_list)

plt.plot(k_list, train_mean, label='Train', linewidth = 2)
plt.errorbar(k_list, eval_mean, eval_std, label='Validation', linewidth = 2)
plt.xticks(range(0,22, 2))
plt.legend()
plt.xlabel('Number of Epoch')
plt.ylabel('Accuracy')

plt.savefig('Epoch_Accuracy_Chart.jpg', format = 'JPEG')

In [None]:
eval_acc_list = np.array(eval_acc_list)
train_acc_list = np.array(train_acc_list)
eval_acc_list2 = np.array(eval_acc_list2)
train_acc_list2 = np.array(train_acc_list2)
val_acc_list = np.array(eval_acc_list)
train_acc_list = np.array(train_acc_list)


eval_mean = np.mean(eval_acc_list, axis = 0)
eval_std = np.std(eval_acc_list, axis = 0)
train_mean = np.mean(train_acc_list, axis = 0)
train_std = np.std(train_acc_list, axis = 0)
val_acc_list2 = np.array(eval_acc_list2)
train_acc_list2 = np.array(train_acc_list2)


eval_mean2 = np.mean(eval_acc_list2, axis = 0)
eval_std2 = np.std(eval_acc_list2, axis = 0)
train_mean2 = np.mean(train_acc_list2, axis = 0)
train_std2 = np.std(train_acc_list2, axis = 0)

k_list = range(1,21)
print(k_list)
plt.figure(dpi=1000)
plt.plot(k_list, train_mean, label='Train RNN', linewidth = 2)
plt.errorbar(k_list, eval_mean, eval_std, label='Validation RNN', linewidth = 2)
plt.xticks(range(0,22, 2))
plt.legend()

plt.plot(k_list, train_mean2, label='Train LSTM', linewidth = 2)
plt.errorbar(k_list, eval_mean2, eval_std2, label='Validation LSTM', linewidth = 2)
plt.xticks(range(0,22, 2))
plt.legend()

plt.xlabel('Number of Epoch')
plt.ylabel('Accuracy')
plt.savefig('Epoch_Accuracy_Chart_RNN_VS_LSTM.jpg', format = 'JPEG', dpi = 500)





### Evaluation Metrics

In [None]:
def plot_confusion_heatmap(predicts, labels, dataset, name):
    conf_mat = confusion_matrix(labels, predicts)

    fig, ax = plt.subplots(figsize=(10, 9), dpi=600)
    ax = sns.heatmap(
        conf_mat, 
        xticklabels=dataset.classes, 
        yticklabels=dataset.classes, 
        annot=True, 
        cmap="YlGnBu", 
        fmt='d', 
        ax=ax, 
        linewidths=.8
    )

    plt.draw()

    fig.savefig('heat_map_'+str(name)+'.png')

    return conf_mat

In [None]:
def cal_matrics(predicts, labels):

    f1 = f1_score(labels, predicts, average='macro')
    precision = precision_score(labels, predicts, average='macro')
    recall = recall_score(labels, predicts, average='macro')

    return f1, precision, recall

In [None]:
model = ImageRNN(N_X, N_H, BATCH_SIZE).to(device)
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LR)

# Training the model and output Accuracy
trainAcc, evalAcc, Y3, Y_pred3 = train_model(model, loss_func, optimizer, 10, train_loader, test_loader)

In [None]:
# Y_pred3_concat = torch.cat(Y_pred3).tolist()
# Y_pred3_concat = torch.cat(Y_pred3).tolist()
print(len(Y_pred3))
print(len(Y3))

plot_confusion_heatmap(Y_pred3, Y3, test_set)

In [None]:
f1, precision, recall = cal_matrics(Y_pred3, Y3)
print("f1 score = " + str(f1))
print("recall = " + str(recall))
print("precision = " + str(precision))

In [None]:
# Calculate LSTM metrices
model = LSTM(N_X, N_H, BATCH_SIZE).to(device)
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LR)

# Training the model and output Accuracy
trainAcc, evalAcc, Y4, Y_pred4 = train_model(model, loss_func, optimizer, 15, train_loader, test_loader)

In [None]:
# LSTM confusion matrix
plot_confusion_heatmap(Y_pred4, Y4, test_set, 'LSTM')

In [None]:
f1, precision, recall = cal_matrics(Y_pred4, Y4)
print("f1 score = " + str(f1))
print("recall = " + str(recall))
print("precision = " + str(precision))