# ResNet34 Model for Fashion MNIST
This model is the realization of the first object of this project, which was to create a deep neural network model which, when trained on the Fashion MNIST training dataset, could achieve an accuracy of 90% when classifying on the corresponding testing data.

This implementation is one called a residual network (ResNet). Most other networks 'linearily' pass down the information of an image through various levels of convulational filters and pooling layers. The uniquesnss of a ResNet is that it sums the output of convulational layers within 'blocks' of them as the features travel through. That way there is no issue of disappearing gradient, increasing the accuracy and performance of the model. 

Along with the arcitecture of the model, this notebook contains training and testing of the network, as applied to the Fashion MNIST data.

## Imports
Much of the resources required for the creation and evaluation of the model comes from Pytorch. Even the use of the FashionMNIST dataset is imported from Pytorch's version of it. 

For the visualisation of the results, the pandas and matplotlib's pyplot were used.

In [None]:
from __future__ import print_function
import argparse
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from torch.autograd import Variable
from typing import Tuple
from torch.utils.data import Dataset,DataLoader
# from sklearn.metrics import confusion_matrix, top_k_accuracy_score
import torchvision                                                       
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
from torchvision.transforms import ToTensor
from numpy import random as rd
import torch
import torch.nn as nn
import torch.optim as optim
import gc
from torch.utils.data.sampler import SubsetRandomSampler

## Dataloader Block
Here, the Fashion MNIST dataset is loaded from torch's dataset library and split into its training and testing sets. The only transformation applied a ToTensor, for technical reasons. Any real image transformation would obscure information from an already low resolution, grayscale picture, with little performance benefit.

In [None]:
# transform method
transform = transforms.Compose([
            transforms.ToTensor(),
    ])

# train data
trainData = datasets.FashionMNIST(root="./",
                                  train=True,
                                  transform=transform,
                                  download=True
                                  )
trainLoad = DataLoader(trainData, 
                       batch_size=30, 
                       shuffle=True, 
                       drop_last=False
                       )
# test data
testData = datasets.FashionMNIST(root="./",
                                  train=False,
                                  transform=transform,
                                  download=True
                                  )
testLoad = DataLoader(testData, 
                     batch_size=30, 
                     shuffle=True, 
                     drop_last=False
                     )

## Residual Block
The residual block holds the basic essence of the layers that compose the ResNet. The order by which each layer is initialized is the same order the layers are put in when forward is called and the model is built.

In [None]:
class ResidualBlock(nn.Module):
    
    expansion = 4   # factor by which to expand the number of features per block
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = None):
        super(ResidualBlock, self).__init__()
        
        # First block's convolutional layer with a batch normalization and RELU activation
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, padding=0),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
        )

        # Second block's convolutional layer with batch normalization and RELU activation
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, padding=0),
            nn.BatchNorm2d(out_channels),
        )
        
        # Third block's convolutional layer with batch normalization
        self.conv3 = nn.Sequential(
            nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(out_channels*self.expansion)
        )

        #Finishing layers, with a downsample and activation
        self.downsample = downsample
        self.relu = nn.ReLU()
        self.out_channels=out_channels

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)

        if self.downsample != None:
            residual = self.downsample(residual)
            
        x += residual
        x = self.relu(x)
        return x

## Renset Block
This is the general model and the system by which the residual blocks are inserted. First is a input convolutional layer, followed by the layers of residual blocks, then a average pool to finish off with a fully connected linear layer to output the classification.

In [None]:
# class for ResNet model that extend from nn.Module
class Resnet(nn.Module):
    
    # initialize the resnet model with inputted block type, list of blockNum 
    def __init__(self, block, blockList):
        super(Resnet, self).__init__()
        
        self.inplanes = 16  # Standard factor of feature channels to expand each block from
        
        # First convulotion layer with batch normalization, ReLU activation, and max pooling as this is the first layer from input
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels= 1, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(3, 2, 1)
        )
        
        # Calling of the make layer functions to build middle of the model
        self.block0 = self._make_layer(block,   in_channels=16, blocksNum=blockList[0], stride=1)
        self.block1 = self._make_layer(block,  in_channels=32, blocksNum=blockList[1], stride=2)
        self.block2 = self._make_layer(block,  in_channels=64, blocksNum=blockList[2], stride=2)
        self.block3 = self._make_layer(block,  in_channels=128, blocksNum=blockList[3], stride=2)
        
        # apply 2D adaptive average pooling from 1 input to 1 plane
        # self.avgpool = nn.AdaptiveAvgPool2d(1)
        self.avgpool = nn.AvgPool2d(7, stride=1)
        
        # flatten the data into 1 dimension
        self.flatten = nn.Flatten()
        
        # apply dropout to output with 60% percent chance
        self.drop = nn.Dropout(0.6)
        
        # connect 2048 input nodes into 10 output nodes
        self.fc = nn.Linear(64, 10)

    # helper function that adds layer by layer along with the res block
    def _make_layer(self, block: ResidualBlock, in_channels, blocksNum, stride):
        downn_sample = None
        
        if stride != 1 or self.inplanes != in_channels * block.expansion:
            downn_sample = nn.Sequential(
                nn.Conv2d(in_channels=in_channels * block.expansion, out_channels=self.inplanes, kernel_size=1, stride=stride),
                nn.BatchNorm2d(in_channels * block.expansion)
            )

        layers = []
        layers.append(block(in_channels=self.inplanes, out_channels=in_channels, stride=stride, downsample=downn_sample))
        self.inplanes = in_channels * block.expansion  # expands feature channels for next block's input
        
        # Adds multiple residucal blocks depending on the inputted number of layers in a block
        for _ in range(1, blocksNum):
            layers.append(block(in_channels=self.inplanes, out_channels=in_channels))

        return nn.Sequential(*layers)
    
    # forward function 
    def forward(self, x: ToTensor):
        x = self.conv1(x)
        
        x = self.block0(x)
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        
        x = self.avgpool(x)
        x = self.drop(x)
        x = self.flatten(x)
        x = self.fc(x)
        return x

Hyperparameter block

In [None]:
# number of epoch
epochNum=5
# learning rate
learningRate = 0.01
# weight decay
weightDecayRate = 0.001
# momentum
momentumAmount = 0.9
# setting up the device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# setting up the model
model = Resnet(ResidualBlock, [3, 4, 6, 3]).to(device)
# loss
criterion = nn.CrossEntropyLoss()
# optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=learningRate, weight_decay=weightDecayRate)

Training

In [None]:
total_step = len(trainLoad)
# print(model)
for epoch in range(epochNum):
    for i, (images, labels) in enumerate(trainLoad):
        # move tensor to device
        images = images.to(device)
        labels = labels.to(device)
        
        # forward the output and calculate loss
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # backward the output and perform optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # deallocation
        del images, labels, outputs
        torch.cuda.empty_cache()
        gc.collect()
        
#    print ('Epoch [{}/{}], Loss: {:.4f}' 
#                   .format(epoch+1, epochNum, loss.item()))
    

In [None]:
# def train(dataloader, model, loss_fn, optimizer):
#     size = len(dataloader.dataset)
#     for batch, (X, y) in enumerate(dataloader):
#         X, y = X.cpu(), y.cpu()
#         pred = model(X)
#         loss = loss_fn(pred, y)
        
#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()
        
#         if batch % 100 == 0:
#             loss ,current = loss.item(), batch * len(X)
#             print(f"loss:{loss:>7f} [{current:>5d}/{size:>5d}]")

# def test(dataloader, model, loss_fn, Train = False):
#     size = len(dataloader.dataset)
#     num_batches = len(dataloader)
#     test_loss, correct = 0, 0

#     with torch.no_grad():
#         for X, y in dataloader:
#             X, y = X.cuda(), y.cuda()
#             pred = model(X)
#             test_loss += loss_fn(pred, y).item()
#             correct += (pred.argmax(1) == y).type(torch.float).sum().item()

#     test_loss /= num_batches
#     correct /= size
#     if Train:
#         print(f"Train Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
#     else:
#         print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

# def train_loop(model, epochs):
#     loss_fn = nn.CrossEntropyLoss()
#     optimizer = torch.optim.Adam(model.parameters(),lr = 0.001)
#     for t in range(epochs):
#         print(f"Epoch {t+1}\n-------------------------------")
#         train(trainLoad, model, loss_fn, optimizer)
#         test(trainLoad, model, loss_fn, Train = True)
#         test(trainLoad, model, loss_fn)
#     print("Done!")
# resnet_model = Resnet(ResidualBlock, [3, 4, 6, 3])
# train_loop(resnet_model.cpu(), 20)

Testing

In [None]:
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in testLoad:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        del images, labels, outputs
    
    print('Accuracy of the network on the {} validation images: {} %'.format(10000, 100 * correct / total))