In this notebook, we will be doing classification of images using both DNN and Convolutional Neural Networks 

We will be using the [Fashion-MNIST dataset](https://github.com/zalandoresearch/fashion-mnist) Fashion-MNIST is a set of 28x28 grayscale images of clothes. dataset which contains 70,000 grayscale images in 10 categories. The images show individual articles of clothing at low resolution (28 by 28 pixels)

![Fashion MNIST dataset](https://github.com/abdulelahsm/ignite/blob/update-tutorials/examples/notebooks/assets/fashion-mnist.png?raw=1)

In [None]:
from google.colab import drive
drive.mount('/content/drive/', force_remount=True)
% cd './drive/My Drive/PhD_Carmi_Shimon/deep_learning_course'

Mounted at /content/drive/
/content/drive/My Drive/PhD_Carmi_Shimon/deep_learning_course


import

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import os
import datetime

import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.data import SubsetRandomSampler
from torchvision import datasets, transforms
# Load the TensorBoard notebook extension
import tensorflow as tf
from tensorflow import summary
%load_ext tensorboard

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, accuracy_score, precision_recall_fscore_support
from sklearn.manifold import TSNE

Util Functions

In [None]:

def plot_samples(data_loader):
  ''' plots random 20 images in a data_loader''' 
  dataiter = iter(data_loader)
  images, labels = dataiter.next()
  fig = plt.figure(figsize=(15, 5))
  for idx in np.arange(20):
    ax = fig.add_subplot(4, 20/4, idx+1, xticks=[], yticks=[])
    ax.imshow(np.squeeze(images[idx]), cmap='gray')
    ax.set_title(labels[idx].item())
    fig.tight_layout()

def show_results(y_pred, y_true, class_names):
  cm = confusion_matrix(y_pred, y_true)
  disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
  fig, ax = plt.subplots(figsize=(10,10))
  disp.plot(ax=ax)
  plt.show()
  test_accuracy = accuracy_score(y_true, y_pred)
  print('Test Accuracy = ', test_accuracy)
  accuracies = cm.diagonal()/cm.sum(axis=1)
  tuple_accuracies = [(name, round(acc, 2)) for name, acc in zip(class_names, accuracies)]
  print('per class test accuracy: \n', tuple_accuracies)

def plot_tsne(embeddings, y):
  Xt = TSNE(n_components=2, n_jobs=8, n_iter=300).fit_transform(embeddings)
  plt.scatter(Xt[:, 0], Xt[:, 1], c=y.astype(np.int32),
                                  s=0.5)

# Class Trainer

In [None]:
class Trainer:
  def __init__(self, model, transform, batch_size=64, val_size=0.2, lr=0.001, is_show_plots=False, n_classes=10):
    os.makedirs('./saved_models', exist_ok=True)
    self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    self.n_classes = n_classes
    self.changed_labels = {0: [0, 2, 3, 4, 6], 1: [1, 5, 7, 9], 2: [8]}
    self.model = model.to(self.device)
    self.criterion = nn.NLLLoss()
    self.optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)
    self.batch_size = batch_size
    self.val_size = val_size
    self.is_show_plots = is_show_plots
    # PIL image in the range [0, 255] to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0].
    self.test_transform = transforms.Compose([transforms.ToTensor()]) 
    self.train_transform = transform
    self.__dataLoader()

  def __dataLoader(self):
    # Download and load the train and test data
    trainset = datasets.FashionMNIST('./data', download=True, train=True, transform=self.train_transform)
    testset = datasets.FashionMNIST('./data', download=True, train=False, transform=self.test_transform)
    # --------- 1.2 Dataset Manipulation - take care of 3 classes mode ------------ #
    if self.n_classes == 3:
      for i, vals in enumerate(list(self.changed_labels.values())):
        for val in vals:
          trainset.train_labels[trainset.train_labels == val] = list(self.changed_labels.keys())[i]
          testset.test_labels[testset.test_labels == val] = list(self.changed_labels.keys())[i]
    # ----------------------------------------------------------------------------- # 
    # validation set preparation
    indices = list(range(len(trainset)))
    np.random.shuffle(indices)
    # get val_size % from train set as val set
    split = int(np.floor((1-self.val_size) * len(trainset)))
    train_sample = SubsetRandomSampler(indices[:split])
    valid_sample = SubsetRandomSampler(indices[split:])

    # Data Loader
    self.train_loader = DataLoader(trainset, sampler=train_sample, batch_size=self.batch_size)
    self.val_loader = DataLoader(trainset, sampler=valid_sample, batch_size=self.batch_size)
    self.test_loader = DataLoader(testset, batch_size=self.batch_size, shuffle=True)
    if self.is_show_plots:
      plot_samples(self.train_loader)

# ----------------Train-------------------- #
  def train(self, epochs):
    min_val_loss = 2
    log_interval = 10
    self.model.train() # Set the model in train mode
    train_losses, val_losses = [], []
    train_acc, val_acc = 0, 0
    # train loop
    for e in range(epochs):
      running_loss, val_loss = 0, 0
      for images, labels in self.train_loader:
        images, labels = images.to(self.device), labels.to(self.device)
        self.optimizer.zero_grad() # Clear the gradients of all optimized variables
        out, _ = self.model(images) # Forward pass through the network
        _, pred = torch.max(out, 1)
        loss = self.criterion(out, labels) # calculate the loss
        loss.backward() # Backward pass through the network to calculate the gradients for model parameters
        self.optimizer.step() # Take a step with the optimizer to update the model parameters
        running_loss += loss.item()*images.size(0) # aggregate batch loss
        train_acc += torch.sum(labels == pred).item()

      # val loop
      self.model.eval() # Set the model in train mode
      for batch_idx, (images, labels) in enumerate(self.val_loader):
        images, labels = images.to(self.device), labels.to(self.device)
        out, _ = self.model(images)
        _, pred = torch.max(out, 1)
        loss = self.criterion(out, labels)
        val_loss += loss.item()*images.size(0)
        val_acc += torch.sum(labels == pred).item()

      # performance measure
      train_acc /= len(self.train_loader.sampler)
      val_acc /= len(self.val_loader.sampler)
      running_loss = running_loss/len(self.train_loader.sampler)
      val_loss = val_loss/len(self.val_loader.sampler)
      train_losses.append(running_loss)
      val_losses.append(val_loss)

      print('Epoch: {} \tTraining Loss: {:.6f} \t Train Acc: {:.6f} \t Val Loss: {:.6f} \t Val Acc: {:.6f}'\
            .format(e+1, running_loss, train_acc, val_loss, val_acc))
       
       # ------------ write to tensorboard log-file ----------- #
      with train_summary_writer.as_default():
            tf.summary.scalar('Loss', running_loss, step=e)
            tf.summary.scalar('Accuracy', train_acc, step=e)
      with val_summary_writer.as_default():
            tf.summary.scalar('Loss', val_loss, step=e)
            tf.summary.scalar('Accuracy', val_acc, step=e)
        
      
      # in case a model gets better, save weights
      if val_loss <= min_val_loss:
        print('validation loss decreased({:.6f} -->{:.6f}). Saving Model ...'\
              .format(min_val_loss, val_loss))
        model_name = f'DNN_epoch_{e+1}.pt'
        torch.save(self.model.state_dict(), model_name)
        min_val_loss = val_loss


  def test(self):
    test_loss = 0
    preds = []
    true_labels = []
    self.model.eval()
    for images, labels in self.test_loader:
      images, labels = images.to(self.device), labels.to(self.device)
      out, _ = self.model(images) # Forward pass
      loss = self.criterion(out, labels) # calculate loss
      test_loss += loss.item()*images.size(0) # update test loss
      _, pred = torch.max(out, 1)
      # collect preds and true labels
      preds += [p.item() for p in pred]
      true_labels += [l.item() for l in labels]
    return preds, true_labels

  def get_embeddings(self):
    self.model.eval()
    train_embds, test_embds = np.array([], dtype=np.int64).reshape(0,128), np.array([], dtype=np.int64).reshape(0,128)
    y_train, y_test = [], []
    for images, labels in self.train_loader:
      images, labels = images.to(self.device), labels.to(self.device)
      out, embds = self.model(images) # Forward pass
      train_embds = np.vstack([train_embds, embds.cpu().data.numpy()])
      y_train += [l.item() for l in labels]
    for images, labels in self.test_loader:
      images, labels = images.to(self.device), labels.to(self.device)
      out, embds = self.model(images) # Forward pass
      test_embds = np.vstack([test_embds, embds.cpu().data.numpy()])
      y_test += [l.item() for l in labels]
    train_embds, y_train = np.asarray(train_embds), np.asarray(y_train)
    test_embds, y_test = np.asarray(test_embds), np.asarray(y_test)

    return train_embds, y_train, test_embds, y_test



# NN models

# 1.1 DNN class as the base model

In [None]:
class DNN(nn.Module):

  def __init__(self, n_classes=10):
    super(DNN, self).__init__()
    self.flatten = nn.Flatten(1)
    self.fc1 = nn.Linear(28*28, 128)
    self.relu = nn.ReLU()
    self.fc2 = nn.Linear(128, 10)
  
  def forward(self, x):
    x = self.flatten(x)
    x = self.fc1(x)
    x = self.relu(x)
    embds = x # latent vector of size 128 for better representation
    x =  self.fc2(x)
    return F.log_softmax(x,dim=1), embds

# 1.4 CNN class - different model configurations

In [None]:
class CNN1(nn.Module):
    def __init__(self, n_classes=10):
      super(CNN1, self).__init__()
      self.n_classes = n_classes
      self.convlayer1 = nn.Sequential(
          nn.Conv2d(1, 32, 3, padding=1),
          nn.ReLU()
      )
     
      self.fc1 = nn.Linear(32*28*28,600)
      self.fc2 = nn.Linear(600, 128)
      self.fc3 = nn.Linear(128, self.n_classes)
      
    def forward(self, x):
      x = self.convlayer1(x)
      x = x.view(-1,32*28*28)
      x = self.fc1(x)
      x = self.fc2(x)
      embds = x
      x = self.fc3(x)
      
      return F.log_softmax(x,dim=1), embds

class CNN2(nn.Module):
    def __init__(self, n_classes=10):
      super(CNN2, self).__init__()
      self.n_classes = n_classes
      self.convlayer1 = nn.Sequential(
          nn.Conv2d(1, 32, 3, padding=1),
          nn.BatchNorm2d(32),
          nn.ReLU(),
          nn.MaxPool2d(2)
      )
     
      self.fc1 = nn.Linear(32*14*14,600)
      self.drop = nn.Dropout2d(0.3)
      self.fc2 = nn.Linear(600, 128)
      self.fc3 = nn.Linear(128, self.n_classes)
      
    def forward(self, x):
      x = self.convlayer1(x)
      x = x.view(-1,32*14*14)
      x = self.fc1(x)
      x = self.drop(x)
      x = self.fc2(x)
      embds = x
      x = self.fc3(x)
      
      return F.log_softmax(x,dim=1), embds
      
class CNN3(nn.Module):
    
    def __init__(self, n_classes=10):
        super(CNN3, self).__init__()
        self.n_classes = n_classes
        self.convlayer1 = nn.Sequential(
            nn.Conv2d(1, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        self.convlayer2 = nn.Sequential(
            nn.Conv2d(32,64,3),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        
        self.fc1 = nn.Linear(64*6*6,600)
        self.drop = nn.Dropout2d(0.3)
        self.fc2 = nn.Linear(600, 128)
        self.fc3 = nn.Linear(128, self.n_classes)
        
    def forward(self, x):
        x = self.convlayer1(x)
        x = self.convlayer2(x)
        x = x.view(-1,64*6*6)
        x = self.fc1(x)
        x = self.drop(x)
        x = self.fc2(x)
        x = self.drop(x)
        embds = x
        x = self.fc3(x)
        
        return F.log_softmax(x,dim=1), embds

class CNN4(nn.Module):
    
    def __init__(self, n_classes=10):
        super(CNN4, self).__init__()
        self.n_classes = n_classes
        self.convlayer1 = nn.Sequential(
            nn.Conv2d(1, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        self.convlayer2 = nn.Sequential(
            nn.Conv2d(32,64,3),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.convlayer3 = nn.Sequential(
            nn.Conv2d(64,128,3),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        
        self.fc1 = nn.Linear(128*2*2,600)
        self.drop = nn.Dropout2d(0.3)
        self.fc2 = nn.Linear(600, 128)
        self.fc3 = nn.Linear(128, self.n_classes)
        
    def forward(self, x):
        x = self.convlayer1(x)
        x = self.convlayer2(x)
        x = self.convlayer3(x)
        x = x.view(-1, 128*2*2)
        x = self.fc1(x)
        x = self.drop(x)
        x = self.fc2(x)
        x = self.drop(x)
        embds = x
        x = self.fc3(x)
        
        return F.log_softmax(x,dim=1), embds

**TensorBoard files**

In [None]:
current_time = str(datetime.datetime.now().timestamp())
train_log_dir = 'logs/tensorboard/train/' + current_time
val_log_dir = 'logs/tensorboard/val/' + current_time
train_summary_writer = summary.create_file_writer(train_log_dir)
val_summary_writer = summary.create_file_writer(val_log_dir)


# Main function

In [None]:
if __name__ == '__main__':
  # CONSTANTS
  is_show_plots = False
  is_plot_tsne = False
  BATCH_SIZE=64
  VAL_SIZE=0.2
  EPOCH=10
  np.random.seed(123)

  # --------------------------------------------------------- #               
  # --------------- 1.1 - Vanilla Training ------------------ #  
  # creating model, defining optimizer and loss
  NUM_CLASSES = 10
  class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
  model = DNN(n_classes=NUM_CLASSES)
  transform = transforms.Compose([transforms.ToTensor()]) 
  # instantiate Trainer class
  trainer = Trainer(model, transform, batch_size=BATCH_SIZE, val_size=VAL_SIZE, is_show_plots=True, n_classes=NUM_CLASSES)
  %tensorboard --logdir logs/tensorboard
  # train model
  trainer.train(epochs=EPOCH)
  # run test on unseen data
  y_pred, y_true = trainer.test()
  show_results(y_pred, y_true, class_names)
  train_embds, y_train, test_embds, y_test = trainer.get_embeddings()
  if is_plot_tsne:
    plot_tsne(train_embds, np.expand_dims(np.asarray(y_train), axis=1))

  # --------------------------------------------------------- #               
  # ----------------- 1.2 Dataset Manipulation -------------- #               
  NUM_CLASSES=3
  class_names = ['Top', 'Bottom', 'Accessories']
  model = DNN(n_classes=NUM_CLASSES)
  # instantiate Trainer class
  trainer = Trainer(model, transform, batch_size=BATCH_SIZE, val_size=VAL_SIZE, is_show_plots=True, n_classes=NUM_CLASSES)
  %tensorboard --logdir logs/tensorboard
  # train model
  trainer.train(epochs=EPOCH)
  # run test on unseen data
  y_pred, y_true = trainer.test()
  show_results(y_pred, y_true, class_names)
  train_embds, y_train, test_embds, y_test = trainer.get_embeddings()
  if is_plot_tsne:
    plot_tsne(train_embds, np.expand_dims(np.asarray(y_train), axis=1))
    plot_tsne(test_embds, np.expand_dims(np.asarray(y_test), axis=1))
  
  # # --------------------------------------------------------- #               
  # # ----------------- 1.3 Data Augmentation ----------------- #    
  # is_show_plots = False
  # AUGMENTATIONS=['noise', 'cropping', 'rotation', 'horizontal_flip', 'affine', 'scaling', 'deformation', 'brightness', 'contrast']
  # NUM_CLASSES=3
  # class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
  #              'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
  # # instantiate Trainer class
  # trainer = Trainer(model, transform, batch_size=BATCH_SIZE, val_size=VAL_SIZE, is_show_plots=True, n_classes=NUM_CLASSES)
  # # train model
  # trainer.train(epochs=EPOCH)
  # # run test on unseen data
  # y_pred, y_true = trainer.test()
  # show_results(y_pred, y_true, class_names)

  # --------------------------------------------------------- #               
  # ----------------- 1.4 Choosing a Model ------------------ #  
  model = CNN3(n_classes=NUM_CLASSES)
  transform = transforms.Compose([transforms.ToTensor()]) 
  # instantiate Trainer class
  trainer = Trainer(model, transform, batch_size=BATCH_SIZE, val_size=VAL_SIZE, is_show_plots=True, n_classes=NUM_CLASSES)
  # train model
  %tensorboard --logdir logs/tensorboard
  trainer.train(epochs=EPOCH)
  # run test on unseen data
  y_pred, y_true = trainer.test()
  show_results(y_pred, y_true, class_names)
  train_embds, y_train, test_embds, y_test = trainer.get_embeddings()
  if is_plot_tsne:
    plot_tsne(train_embds, np.expand_dims(np.asarray(y_train), axis=1))
    plot_tsne(test_embds, np.expand_dims(np.asarray(y_test), axis=1))