# **CLASSIFICATION TASK**

In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import KFold
import numpy as np
import random
from PIL import Image

In [None]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

In [None]:
train_dataset = torchvision.datasets.MNIST('classifier_data', train=True, download=True)
test_dataset  = torchvision.datasets.MNIST('classifier_data', train=False, download=True)

# **Network**

In [None]:
class CNN(nn.Module):  #CNN definition: 2 convolutional layer, 2 linear layer
    def __init__(self, C1, C2, Ni, Nh1, No):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(Ni, C1, kernel_size=3)
        self.conv2 = nn.Conv2d(C1, C2, kernel_size=3)
        self.conv2_drop = nn.Dropout2d() #dropout
        self.batchnorm1 = nn.BatchNorm2d(C1) #batch normalization
        self.batchnorm2 = nn.BatchNorm2d(C2)  #batch normalization
        self.m = nn.Softmax(dim=1) #softmax function
        self.fc1 = nn.Linear(1600, Nh1)
        self.fc2 = nn.Linear(Nh1, No)

    def forward(self, x):
        x = self.batchnorm1(F.relu(F.max_pool2d(self.conv1(x), 2)))
        x = self.batchnorm2(F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)))
        x = x.view(x.shape[0],-1) #flatten
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.m(self.fc2(x))
        return x

# **Dataset**

In [None]:
class ClassificationDataset(Dataset): #dataset definition

  def __init__(self, dataset, transform=None):
    self.transform = transform
    
    self.data = []
    for elem in dataset: #each element is added to a list
      self.data.append(elem)

  def __len__(self): #lenght of the dataset
    return len(self.data)

  def __getitem__(self, idx): #Function for retreiving elements. It receives a list of integers as input
    elements = []

    for elem in idx: #For each integer in the input list, retrives the element in that position
      sample = self.data[elem]
      if self.transform:
          sample = self.transform(sample)
      
      elements.append(sample)
    return elements #return list of selected elements

class ToTensor(object):
    def __call__(self, sample): #image transformation to tensor
        x, y = sample
        return (transforms.ToTensor()(x).float(), y)

class Normalization(object):
    def __call__(self, sample): #image normalization with mean 0.5 and std 0.5
        x, y = sample
        return (transforms.Normalize(0.5, 0.5)(x).float(), y)

In [None]:
composed_transform = transforms.Compose([ToTensor(), Normalization()])

train_data = ClassificationDataset(train_dataset, transform=composed_transform) #train dataset
test_data = ClassificationDataset(test_dataset, transform=composed_transform) #test dataset


# **Training**

In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f"Training device: {device}")

In [None]:
def initNet(): #function that initializes a new NN
  torch.manual_seed(0)
  Ni = 1 #number of input neurons
  C1 = 10 #number of the channel first convolutional layer
  C2 = 64 #number of the channel second convolutional layer
  Nh1 = 128 #number of neurons of the first linear layer
  No = 10 #number of neurons of output layer
  net = CNN(C1, C2, Ni, Nh1, No)
  net.to(device)

  return net

In [None]:
def get_optimizer(n, net, lr): #Function for choosing the optimizer: n = index for the list, net = network model, lr = learning rate
  typeM = [optim.Adagrad(net.parameters(), lr=lr, weight_decay = 1e-3), optim.Adadelta(net.parameters(), lr = lr, weight_decay = 1e-3), optim.Adam(net.parameters(), lr=lr, weight_decay = 1e-3), optim.RMSprop(net.parameters(), lr=lr, weight_decay = 1e-3)]

  return typeM[n]

In [None]:
###########
#K-Fold Cross Validation
###########

nIter = 2 #number of different set of hyperparameters to try
model_set = {} #best hyperparameters
loss_fn = nn.CrossEntropyLoss() #cross-entropy loss function

for i in range(nIter): #for the number of different set
  print('#################')
  print(f'# Iter {i}')
  print('#################')

  kf = KFold(n_splits=5) #5-fold cross validation
  num_epochs = random.randrange(100, 300, 20) #number of iteration for training the model
  lr = random.uniform(0, 0.01) #learning rate
  batch_num = [] #batch size
  type_optimizer = random.randrange(0, 4) #type of optimizer (i. e. index in the dictionary for choosing the optimizer)
  
  train_loss_Fold = [] #list for saving the training loss for each fold
  val_loss_Fold = [] #list for saving the validation loss for each fold

  for train_index, val_index in kf.split(train_data): #for each fold
    print('#################')
    print(f'# Fold')
    print('#################')
    net = initNet()
    optimizer = get_optimizer(type_optimizer, net, lr)

    trainSet, valSet = train_data.__getitem__(train_index), train_data.__getitem__(val_index) #division in training set and validation set

    if batch_num == []: #check if it's the first iteration
      batch_num = random.randrange(1, 1000)

    trainSetX = DataLoader(trainSet, batch_size= batch_num, shuffle=True, num_workers=0) #dataloader of the training set
    valSetX = DataLoader(valSet, batch_size=len(valSet), shuffle=True, num_workers=0) #dataloader of the validation set

    train_loss= [] #list for saving the training loss at each epoch
    val_loss= [] #list for saving the validation loss at each epoch

    for epoch_num in range(num_epochs): #for each epoch
      
      net.train() #training
      for sample_batched in trainSetX: #for each batch
        x_batch = sample_batched[0].to(device) #input elements
        label_batch = sample_batched[1].to(device) #labels

        out = net(x_batch) #output of the model

        loss = loss_fn(out, label_batch) #loss of the model
        
        net.zero_grad()
        loss.backward() #backprobagation
        optimizer.step()

        if epoch_num == num_epochs - 1: #saving the loss at the last epoch
          loss_batch = loss.detach().cpu().numpy()
          train_loss.append(loss_batch)

      net.eval() #evaluation
      with torch.no_grad():
        for sample_batched in valSetX: #for each batch
          x_batch = sample_batched[0].to(device) #input elements
          label_batch = sample_batched[1].to(device) #labels

          out = net(x_batch) #output of the model

          loss = loss_fn(out, label_batch) #loss of the model

          if epoch_num == num_epochs - 1: #saving the loss at the last epoch
            loss_batch = loss.detach().cpu().numpy()
            val_loss.append(loss_batch)
      
    train_loss = np.mean(train_loss) #mean loss of different batches
    train_loss_Fold.append(train_loss)

    val_loss = np.mean(val_loss) #mean loss of different batches
    val_loss_Fold.append(val_loss)

  train_loss = np.mean(train_loss_Fold) #mean of the loss of each fold
  print(f"AVERAGE TRAIN LOSS: {train_loss}")

  val_loss = np.mean(val_loss_Fold) #mean of the loss of each fold
  print(f"AVERAGE VAL LOSS: {np.mean(val_loss)}")
  val_loss_Fold.append(val_loss)

  if len(model_set) == 0 or val_loss < model_set["loss"]: #save the new best hyperparameter if the new validation losso is lower
    model_set["num_epochs"] = num_epochs
    model_set["lr"] = lr
    model_set["num_batch"] = batch_num
    model_set["type_opt"] = type_optimizer
    model_set["loss"] = val_loss

In the next cell you can choose to use whether the hyperparameters that I found to be good or the set of hyperparameters that you found during cross validation

Set 'use_saved' equal to True if you want to use the set that I found

In [None]:
hyper_set = {} #set of the best hyperparameters
use_saved = True #true if we want to use a set of hyperparameters that is already checked to be very good
                #false if we want to use the new set of hyperparameters given by cross validation
if use_saved:
  hyper_set = {'lr': 0.0096, 'num_batch': 123, 'num_epochs': 150, 'type_opt': 0}
else:
  hyper_set = {'lr': model_set["lr"], 'num_batch': model_set["num_batch"], 'num_epochs': model_set["num_epochs"], 'type_opt': model_set["type_opt"]}

In [None]:
num_epochs = hyper_set["num_epochs"] #number of epochs
lr = hyper_set["lr"] #larning rate
batch_num = hyper_set["num_batch"] #size of the batch
type_optimizer = hyper_set["type_opt"] #type of otpimizer

train_loss_log = [] #list of training error at each epoch
test_loss_log = [] #list of validation error at each epoch

train_dataloader = DataLoader(train_data.__getitem__(range(len(train_data))), batch_size=batch_num, shuffle=True, num_workers=0)

net = initNet() #initialization of the nework
loss_fn = nn.CrossEntropyLoss() #loss function
optimizer = get_optimizer(type_optimizer, net, lr) #optimizer

for epoch_num in range(num_epochs): #for each epoch
  print('#################')
  print(f'# EPOCH {epoch_num}')
  print('#################')

  train_loss= []
  net.train()
  for sample_batched in train_dataloader: #for each batch
    x_batch = sample_batched[0].to(device)
    label_batch = sample_batched[1].to(device)

    out = net(x_batch)

    loss = loss_fn(out, label_batch)

    net.zero_grad()
    loss.backward()
    optimizer.step()

    loss_batch = loss.detach().cpu().numpy()
    train_loss.append(loss_batch)
    
  train_loss = np.mean(train_loss) #mean training loss of the batches
  print(f"AVERAGE TRAIN LOSS: {train_loss}")
  train_loss_log.append(train_loss)


#Save the trained network

#net_state_dict = net.state_dict()
#torch.save(net_state_dict, 'Classification.torch')

In [None]:
#######
#Plot of the evolution of the training error
#######

plt.figure(figsize=(12,8))
plt.semilogy(train_loss_log, label='Train loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.grid()
plt.legend()
plt.show()

# **Testing**

Set 'use_pre' equal to True in the next cell to load a pretrained network.

In [None]:
#Load a previus trained network
use_pre = False

if use_pre:
  net = initNet() 
  # Load the state dict previously saved
  net_state_dict = torch.load('Classification.torch')
  # Update the network parameters
  net.load_state_dict(net_state_dict)

In [None]:
#######
#Testing of the Model on Test Dataset
#######

test_dataloader = DataLoader(test_data.__getitem__(range(len(test_data))), batch_size=len(test_data), shuffle=False, num_workers=0)

val_loss= [] 
net.eval() #evaluation
with torch.no_grad(): #disable gradient tracking
  for sample_batched in test_dataloader:
    x_batch = sample_batched[0].to(device)
    label_batch = sample_batched[1].to(device)

    out = net(x_batch)

    loss = loss_fn(out, label_batch)

    loss_batch = loss.detach().cpu().numpy()
    val_loss.append(loss_batch)
  val_loss = np.mean(val_loss) #mean of the loss of the batches
  print(f"AVERAGE VAL LOSS: {np.mean(val_loss)}")

In [None]:
#Computation of the accuracy on test dataset

count = 0

for elem, pred_labels in zip(test_data.data, out): #for each element in the test data and output of the model
  label_indx = pred_labels.argmax(0) #class 
  true_label = elem[1] #true label

  if label_indx == true_label: #if true and predicted are equal
    count += 1

print(f"ACCURACY: {count * 100 / len(test_data)}%") #percentage of true predictions

In [None]:
#######
#Testing of the Model on Training Dataset
#######

trainL = DataLoader(train_data.__getitem__(range(len(train_data))), batch_size=len(train_data), shuffle=False, num_workers=0)

val_loss= []
net.eval()
with torch.no_grad():
  for sample_batched in trainL:
    x_batch = sample_batched[0].to(device)
    label_batch = sample_batched[1].to(device)

    out_train = net(x_batch)

    loss = loss_fn(out_train, label_batch)

    loss_batch = loss.detach().cpu().numpy()
    val_loss.append(loss_batch)
  val_loss = np.mean(val_loss)
  print(f"AVERAGE VAL LOSS: {np.mean(val_loss)}")

#Computation of the accuracy on test dataset

count = 0

for elem, pred_labels in zip(train_data.data, out_train):
  label_indx = pred_labels.argmax(0)
  true_label = elem[1]

  if label_indx == true_label:
    count += 1

print(f"ACCURACY: {count * 100 / len(train_data)}%")

Set 'n' in the next cell to choose the number of picture to display



In [None]:
#Print of model prediction
n = 10 #number of picture to display
for elem,pred_labels, i, k in zip(test_data.data, out, range(len(out)), range(n)):
  label_indx = pred_labels.argmax(0)
  im = elem[0]

  print(f"LABEL {i +1}: {label_indx}")
  fig = plt.figure(figsize=(8,8))
  plt.imshow(im, cmap='Greys')
  plt.show()
  




In [None]:
#Missclassified  images

for elem,pred_labels, i in zip(test_data.data, out, range(len(out))):
  label_indx = pred_labels.argmax(0)
  im = elem[0]
  true_label = elem[1]

  if label_indx !=  true_label:
    print(f"True Label: {true_label}")
    print(f"Predicted Label: {label_indx}")
    fig = plt.figure(figsize=(8,8))
    plt.imshow(im, cmap='Greys')
    plt.show()

# **Weights**

In [None]:
#First convolutional layer
h1_w = net.conv1.weight.data.cpu().numpy()

#Second convolutional layer
h2_w = net.conv2.weight.data.cpu().numpy()

#First linear layer
h3_w = net.fc1.weight.data.cpu().numpy()
h3_b = net.fc1.bias.data.cpu().numpy()

#Output layer
out_w = net.fc2.weight.data.cpu().numpy()
out_b = net.fc2.bias.data.cpu().numpy()

In [None]:
#######
#Plotting of the Weights Histograms
########

fig, axs = plt.subplots(4, 1, figsize=(12,8))
axs[0].hist(h1_w.flatten(), 50)
axs[0].set_title('First convolutional layer weights')
axs[1].hist(h2_w.flatten(), 50)
axs[1].set_title('Second convolutional layer weights')
axs[2].hist(h3_w.flatten(), 50)
axs[2].set_title('First linear layer weights')
axs[3].hist(out_w.flatten(), 50)
axs[3].set_title('Second linear layer weights')
[ax.grid() for ax in axs]
plt.tight_layout()
plt.show()

# **Activations**

In [None]:
def get_activation(layer, input, output):
    global activation
    activation = torch.sigmoid(output)

#register hook 

hook_handle = net.fc1.register_forward_hook(get_activation)

#analyze activations
net = net.to(device)
net.eval()
with torch.no_grad():
    x1 = test_data.__getitem__([0])[0][0].unsqueeze(0).to(device) #first image of the test dataset
    y1 = net(x1)
    z1 = activation
    x2 = test_data.__getitem__([-1])[0][0].unsqueeze(0).to(device) #last image of the test dataset
    y2 = net(x2)
    z2 = activation

#remove hook
hook_handle.remove()

#plot activations
fig, axs = plt.subplots(2, 1, figsize=(12,6))
axs[0].stem(z1[0].cpu().numpy(), use_line_collection=True)
axs[0].set_title('First linear layer activations for the first image of the test set')
axs[1].stem(z2[0].cpu().numpy(), use_line_collection=True)
axs[1].set_title('First linear layer activations for the last image of the test set')
plt.tight_layout()
plt.show()

# **Filters**

In [None]:
model_weights = [] #list the conv layer weights in this list
conv_layers = [] # list of convulational layer

model_children = list(net.children()) #list of the model children

# append all the conv layers and their respective weights to the list
for i in range(len(model_children)): #for each children
    if type(model_children[i]) == nn.Conv2d: #check if the layer is a convulational layer
        model_weights.append(model_children[i].weight)
        conv_layers.append(model_children[i])

In [None]:
######
#Print the weighs of the filters
######

print("Weights of Convolutonal Layer #1")
plt.figure(figsize=(20, 7))
for i, filter in enumerate(model_weights[0]):
  filter = filter.cpu()
  plt.subplot(2, 5, i+1)
  plt.imshow(filter[0, :, :].detach(), cmap='gray')
  plt.axis('off')
plt.show()

print("Weights of Convolutonal Layer #2")
plt.figure(figsize=(20, 10))
for i, filter in enumerate(model_weights[1]):
  filter = filter.cpu()
  plt.subplot(8, 8, i+1)
  plt.imshow(filter[0, :, :].detach(), cmap='gray')
  plt.axis('off')
plt.show()

# **Feature Maps**

In [None]:
img = test_data.__getitem__([0])[0][0].unsqueeze(0).to(device) #first image of the test dataset

results = [conv_layers[0](img)] #passing the image to the first convulational layer
for i in range(1, len(conv_layers)): #passing the results of the previus convulatinal layer to the next one
  results.append(conv_layers[i](results[-1]))

outputs = results #results

#Visualize the feture maps of the filters

for num_layer in range(len(outputs)): #for each layer
  print(f"Convolutional layer #{num_layer + 1}")
  plt.figure(figsize=(30, 30))
  layer_viz = outputs[num_layer][0, :, :, :] #values of a particular layer
  layer_viz = layer_viz.data
  for i, filter in enumerate(layer_viz): #for each filter
    filter = filter.cpu()
    plt.subplot(8, 8, i + 1)
    plt.imshow(filter, cmap='gray')
    plt.axis("off")
  plt.show()
  plt.close()
  print()
  print()