## CNN Model for FashionMNIST dataset

https://www.kaggle.com/zalando-research/fashionmnist

In [3]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)

import numpy as np
import pandas as pd
from scipy import stats
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

from datetime import datetime
from sklearn.metrics import confusion_matrix

In [4]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
torch.__version__

ModuleNotFoundError: No module named 'torch'

In [None]:
import sys
try:
    from pytorch_model_summary import summary
except:
    if 'google.colab' in sys.modules:
        !pip install pytorch-model-summary
    else:
        !conda install -c conda-forge pytorch-model-summary
    from pytorch_model_summary import summary

### Use datasets class to download FashionMNIST data

https://pytorch.org/docs/stable/torchvision/datasets.html#fashion-mnist

#### Training data

In [None]:
train_dataset = torchvision.datasets.FashionMNIST(
    root='.',
    train=True,
    transform=transforms.ToTensor(),
    download=True)

In [None]:
train_dataset.data.max()

In [None]:
train_dataset.data.shape  # 28 pixels by 28 pixels

In [None]:
train_dataset.targets

#### Test data

In [None]:
test_dataset = torchvision.datasets.FashionMNIST(
    root='.',
    train=False,
    transform=transforms.ToTensor(),
    download=True)

In [None]:
test_dataset.data.shape

In [None]:
# number of classes
K = len(set(train_dataset.targets.numpy()))
print("number of classes:", K)

In [None]:
np.unique(train_dataset.targets.numpy())

#### Function to calculate size of output of final convolutional layer

http://deeplearning.net/software/theano/tutorial/conv_arithmetic.html

https://theaisummer.com/receptive-field/

In [None]:
def conv_image_size(layers,size_in,padding=0,kernel=3,stride=2,dilation=1):
    for _ in range(layers):
        size_in = np.floor(((size_in + 2*padding - dilation*(kernel-1)-1)/stride)+1)
        #print(size_in)
    return int(size_in)
conv_image_size(3,32,padding=1)

In [None]:
inp_sz = conv_image_size(3,28) # 3 layers, 28x28, default stride = 2, kernal = 3
inp_sz

### Define the model


In [None]:
class CNN(nn.Module):
  def __init__(self, K):
    super(CNN, self).__init__()
    self.conv_layers = nn.Sequential(
      nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=2),
      nn.ReLU(),
      nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=2),
      nn.ReLU(),
      nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=2),
      nn.ReLU()
    )
    
    self.flatten = nn.Flatten()
    
    self.dense_layers = nn.Sequential(
      nn.Dropout(0.2),
      nn.Linear(128 * inp_sz * inp_sz, 512),
      nn.ReLU(),
      nn.Dropout(0.2),
      nn.Linear(512, K)
    )
   
  
  def forward(self, X):
    z = self.conv_layers(X)
    z = self.flatten(z)
    out = self.dense_layers(z) # Softmax done in CrossEntopy Loss
    return out

#### Instantiate the model

In [None]:
model = CNN(K)

In [None]:
# Alternate way to specify model
# model = nn.Sequential(
#     nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=2),
#     nn.ReLU(),
#     nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=2),
#     nn.ReLU(),
#     nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=2),
#     nn.ReLU(),
#     nn.Flatten(),
#     nn.Dropout(0.2),
#     nn.Linear(128 * 2 * 2, 512),
#     nn.ReLU(),
#     nn.Dropout(0.2),
#     nn.Linear(512, K)   
# )

#### Set Device and send model to the device

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
model.to(device)

In [None]:
print(summary(model,torch.zeros((1,1,28,28)).to(device),show_input=False,show_hierarchical=False))

In [None]:
# Number of parameters conv2d_2
# n,m = shape of kernel
# l = number of inputs
# k = number of outputs
n,m,l,k = 3,3,1,32# 3,3,32,64
(n*m*l+1)*k

#### Loss and optimizer

In [None]:
criterion = nn.CrossEntropyLoss() # Applies the softmax function
optimizer = torch.optim.Adam(model.parameters())

#### Data loader

The data loader automatically generates batches in the training loop and takes care of shuffling the data

In [None]:
batch_size = 128
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, 
                                           batch_size=batch_size, 
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset, 
                                          batch_size=batch_size, 
                                          shuffle=False)

#### Training flag

Since the model has dropout layers, the training flag should be set to true during train and false during evaluation.

model.train() sets the flag to True, model.train(False) sets it to False. model.eval() also will set the training flag to false.

A Batch Normalization layer is the only other user of this flag

#### Define a function to encapsulate the training loop

In [None]:

def mini_batch_gd(model, criterion, optimizer, train_loader, test_loader, epochs,device):
  train_losses = np.zeros(epochs)
  test_losses = np.zeros(epochs)

  for it in range(epochs):
    model.train() # Set training mode
    t0 = datetime.now()
    train_loss = []
    for inputs, targets in train_loader:
      # move data to device
      inputs, targets = inputs.to(device), targets.to(device)
     
      # zero the parameter gradients
      optimizer.zero_grad()

      # Forward pass
      outputs = model(inputs)
      loss = criterion(outputs, targets)
        
      # Backward and optimize
      loss.backward()
      optimizer.step()

      train_loss.append(loss.item())

    # Get train loss and test loss
    
    train_loss = np.mean(train_loss) 
    
    test_loss = []
    model.train(False) # Set test model aka model.eval()
    for inputs, targets in test_loader:
      inputs, targets = inputs.to(device), targets.to(device)
      outputs = model(inputs)
      loss = criterion(outputs, targets)
      test_loss.append(loss.item())
    test_loss = np.mean(test_loss)

    # Save losses
    train_losses[it] = train_loss
    test_losses[it] = test_loss
    
    dt = datetime.now() - t0
    print(f'Epoch {it+1}/{epochs}, Train Loss: {train_loss:.4f}, \
      Test Loss: {test_loss:.4f}, Duration: {dt}')
  
  return train_losses, test_losses

In [None]:
train_losses, test_losses = mini_batch_gd(
    model, criterion, optimizer, train_loader, test_loader, epochs=15,device=device)

#### Plot the train loss and test loss per iteration

In [None]:
plt.plot(train_losses, label='train loss')
plt.plot(test_losses, label='test loss')
plt.legend()
plt.show()

#### Prediction Accuracy for Training Data

In [None]:
n_correct = 0.
n_total = 0.

model.eval() # Sets training flag to false

for inputs, targets in train_loader:
  # move data to GPU
  inputs, targets = inputs.to(device), targets.to(device)

  # Forward pass
  outputs = model(inputs)

  # Get prediction
  # torch.max returns both max and argmax
  _, predictions = torch.max(outputs, 1) # Which class has hightest probability
  
  # update counts
  n_correct += (predictions == targets).sum().item()
  n_total += targets.shape[0]

train_acc = n_correct / n_total
print(f"Train accuracy: {train_acc:.4f}")

#### Predict Test Data

In [None]:
x_test = test_dataset.data.numpy()
y_test = test_dataset.targets.numpy()

pred_test = np.array([])
n_correct = 0.
n_total = 0.

model.eval() # Sets training flag to false

for inputs, targets in test_loader:
  # move data to GPU
  inputs, targets = inputs.to(device), targets.to(device)

  # Forward pass
  outputs = model(inputs)

  # Get prediction
  _, predictions = torch.max(outputs, 1)
  
  # update counts and pred_test
  n_correct += (predictions == targets).sum().item()
  n_total += targets.shape[0]
  pred_test = np.concatenate((pred_test, predictions.cpu().numpy()))

test_acc = n_correct / n_total
print(f"Test accuracy: {test_acc:.4f}")

#### Output confusion matrix

In [None]:
cm = confusion_matrix(y_test, pred_test)
print(cm)

print(f'Accuracy = {np.trace(cm)/np.sum(cm)}')

#### Show some misclassified examples

In [None]:
# Label mapping
labels = '''T-shirt/top
Trouser
Pullover
Dress
Coat
Sandal
Shirt
Sneaker
Bag
Ankle boot'''.split("\n")

In [None]:
pred_test = pred_test.astype(np.uint8)
misclassified_idx = np.where(pred_test != y_test)[0]
i = np.random.choice(misclassified_idx)
plt.imshow(x_test[i].reshape(28,28), cmap='gray')
plt.title("True label: %s Predicted: %s" % (labels[y_test[i]], labels[pred_test[i]]));