# CNN for Image Classification
### In this notebook we creat a CNN from ground up, by classifying images from images downloaded for different monkey species.

### Following are the steps are included in image classification using CNN:

* 1. Downloading the Dataset and specifying the **training config()**
* 2. Setup the Pre-Training Processing i.e., **Resizing**,**Normalization** etc.
* 3. Making/Modifying new images within the training dataset.
* 4. Defining the **CNN** architecture [Printing the summary with number of parameters]
* 5. Model training and Evaluation
* 6. Saving and Loading the best model
* 7. Inference 
* 8. Confusion Matrix

<center><img src='https://www.dropbox.com/scl/fi/e4541jejdlxzny3vrgw24/Monkey_architecture-updated.png?rlkey=4c3jm0kgzwm4txn1mbewqbox9&st=vq1lb2nt&dl=1' ></center>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from torch.utils.data import DataLoader,Dataset
from torchinfo import summary
from torch.utils.tensorboard import SummaryWriter

import torchvision
from torchvision import datasets
from torchvision.transforms import v2 as transforms
from torchvision.ops import Conv2dNormActivation

from dataclasses import dataclass

import matplotlib.pyplot as plt
import time
import numpy as np
import random
import warnings
import os
from tqdm import tqdm

import pandas as pd

%matplotlib inline
warnings.filterwarnings("ignore")

In [None]:
# Setting seed for reproduceability
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = True
        
set_seed(21)

### 1. Download and Extract the datasets

In [None]:
!curl -L "https://www.dropbox.com/s/45jdd8padeyjq6t/10_Monkey_Species.zip?dl=1" -o "10_Monkey_Species.zip"

In [None]:
!unzip -q "10_Monkey_Species.zip"

### 2. Dataset and Training Configuration Parameters

In [None]:
@dataclass(frozen=True)
# Setting up the training config, so that we can make changes in one places
class TrainingConfig:
    ''' Configuration for Training'''
    batch_size: int = 32
    num_epochs: int = 40
    learning_rate: float = 1e-4
    
    log_interval: int = 1
    test_interval: int = 1
    data_root: int = "./"
    device: str = "cuda"
    num_workers: int = 5
    
train_config = TrainingConfig()
DEVICE = torch.device("cuda") if torch.cuda.is_available() else "cpu"
print(f"Available Device: {DEVICE}")

    

In [None]:
train_root = os.path.join(train_config.data_root,"10_Monkey_Species","training","training")
val_root = os.path.join(train_config.data_root,"10_Monkey_Species","validation","validation")

In [None]:
df = pd.read_csv(os.path.join("10_Monkey_Species","monkey_labels.txt"),sep=",",header=None)
df.columns = ["Label","Latin Name","Common Name","Train Images", "Validation Images"]
df['Latin Name'] = df['Latin Name'].str.replace("\t", " ")
df[1:]

### 3. Dataset Preprocessing

In [None]:
# Code to get the Mean and Standard Deviation of the images.
# We first write a function to get mean and std which takes dataloader and image_size as an input argument

def get_mean_std(train_loader,img_size=(224,224),num_workers=2):
    
    batch_mean = torch.zeros(3)
    batch_mean_sqrd = torch.zeros(3)
    
    for batch_data,_ in train_loader:
        batch_mean += batch_data.mean(dim=(0,2,3)) # Pytorch dimensions = [B,C,H,W] = [0,1,2,3]
        batch_mean_sqrd += (batch_data ** 2).mean(dim=(0,2,3))
        #  the mean is calculated over the batch, height, and width dimensions. 
        # The result will have a shape of (Channels,). 
        # This means you'll get a single mean value for each channel.
        
    mean = batch_mean / len(train_loader)
    
    #Variance is the mean of the squares minus the square of the mean.
    var = (batch_mean_sqrd/len(train_loader)) - (mean ** 2)
    
    std = var ** 0.5
    
    print(f"Mean : {mean}, Std : {std}")
    return mean,std
    

In [None]:
# We define the transformations and pre-processing required for the Training and Validation Dataset
img_size = (224,224)

preprocess = transforms.Compose([
    transforms.Resize(img_size,antialias=True),
    transforms.ToTensor()
])

PyTorch has inbuilt functionality `(torchvision.datasets.ImageFolder class)` to load such structured image folders:

In [None]:
#torchvision.datasets.ImageFolder(root, transform=None, target_transform=None, loader=<function default_loader>, is_valid_file=None)


In [None]:
# Loading the Data with transformation i.e, converting the required size and Totensor
train_data_mean_calc = datasets.ImageFolder(root=train_root,transform=preprocess) 
# Loading the Train_loader to get the mean and std for the dataset
train_loader_mean_calc = DataLoader(train_data_mean_calc,batch_size=train_config.batch_size,shuffle=True,num_workers=train_config.num_workers)

mean,std = get_mean_std(train_loader_mean_calc)

print(f"Dataset Mean : {mean}")
print(f"Dataset Std: {std}")


In [None]:

# Pre-processing for Training DataSet
train_transform = transforms.Compose([
    preprocess,
    transforms.RandomHorizontalFlip(),
    transforms.RandomErasing(p=0.4),
    transforms.RandomApply([
        transforms.RandomAffine(degrees=(30,70),translate=(0.1,0.3),scale=(0.5,0.75))
    ], p=0.1), # This line will apply rotation, translation and Scaling 10% of the time for the given range
    transforms.Normalize(mean=mean,std=std)
])

# Pre-processing for Common and Validation Data Set
common_transform = transforms.Compose([
    preprocess,
    transforms.Normalize(mean=mean,std=std)
    
])

In [None]:
#Apply augmentations to the training dataset
train_data = datasets.ImageFolder(root = train_root, transform = train_transform)

# The validation dataset should have only common transforms like Resize, ToTensor and Normalize.
val_data = datasets.ImageFolder(root=val_root, transform = common_transform)

In [None]:
# We got the data, now we call the loader
train_loader = DataLoader(
    train_data,
    shuffle = True,
    batch_size = train_config.batch_size,
    num_workers = train_config.num_workers
)
val_loader = DataLoader(
    val_data,
    shuffle = False,
    batch_size = train_config.batch_size,
    num_workers = train_config.num_workers
)

In [None]:
train_data.classes

In [None]:
class_mapping = {

    0: "mantled_howler",
    1: "patas_monkey",
    2: "bald_uakari",
    3: "japanese_macaque",
    4: "pygmy_marmoset",
    5: "white_headed_capuchin",
    6: "silvery_marmoset",
    7: "common_squirrel_monkey",
    8: "black_headed_night_monkey",
    9: "nilgiri_langur"
}

In [None]:
# Visualizing the image
def visualize_images(dataloader, num_images = 20):
    fig = plt.figure(figsize=(10,10))

    #Iterate over the first batch
    images, labels = next(iter(dataloader))
    # print(images.shape)

    num_rows = 4
    num_cols = int(np.ceil((num_images / num_rows)))

    for idx in range(min(num_images, len(images))):
        image, label = images[idx], labels[idx]


        ax = fig.add_subplot(num_rows, num_cols, idx+1, xticks = [], yticks = [])

        image = image.permute(1,2,0)

        #Normalize the image to [0,1] to display

        image = (image - image.min()) / (image.max() - image.min())
        ax.imshow(image, cmap="gray")  # remove the batch dimension
        ax.set_title(f"{label.item()}: {class_mapping[label.item()]}")

    fig.tight_layout()
    plt.show()

visualize_images(train_loader, num_images = 16)

### 4. CNN Architecture

The very first convolutional layer in the first convolutional block. To define a convolutional layer in PyTorch, we call the nn.Conv2D() function, which accepts several input arguments. First, we define the layer to have 32 filters. The kernel size for first filter is 5 and the subsequent layers filter is 3 (which is interpreted as 3x3). We can use a padding option called same, which will pad the input tensor so that the output of the convolution operation has the same spatial size as the input. This is not required, but it’s commonly used. if you don’t explicitly specify this padding option, then the default behavior has no padding, and therefore, the spatial size of output from the convolutional layer will be slightly smaller than the input size. After each convolutional layer, we add a BatchNorm2d layer, which normalizes the activations of the previous layer at each batch, thereby improving the training speed and stability of the network. We use a ReLU activation function in all the layers in the Network except for the output layer. This sequence of Conv2d followed by BatchNorm2d and ReLU is called Conv2dNormActivation and torchvision has a convenience function to implement this by torchvision.ops.Conv2dNormActivation().

There is also another alternative approach to specify Conv2d layers with nn.LazyConv2d() where the input_channels is automatically inferred from the previous Conv2d layers output_channels.

The first two convolutional layers has 32 filters each, and then we follow that with a max pooling layer that has a window size of (2x2), so the output shape from this first convolution block is (218 x 218 x 32). Next, we have the second convolutional block, has 64 and 128 filters in each convolutional layer instead of 32, and then finally, the third and fourth convolutional block has 256 and 512 filters.

Note: The number of filters in each convolutional layer is something that you will need to experiment with. A larger number of filters allows the model to have a greater learning capacity, but this also needs to be balanced with the amount of data available to train the model. Adding too many filters (or layers) can lead to overfitting, one of the most common issues encountered when training models.

The final layer in the feature extractor is the nn.AdaptiveAvgPool2d(), which applies a 2D adaptive average pooling over an input image composed of several input channels. This layer ensures that the output has a fixed size of H x W, regardless of the input size. The number of output features is equal to the number of input channels. This is particularly useful for making the subsequent fully connected layers agnostic to the input size.

Before we define the fully connected layers for the classifier, we need to first flatten the two-dimensional activation maps that are produced by the last convolutional layer (which have a spatial shape of 3x3 with 512 channels). This is accomplished by calling the nn.Flatten() function to create a 1-dimensional vector of length 4608. We then add a densely connected layer with 256 neurons and a fully connected output layer with 10 neurons because we have ten classes in our dataset.

We will show the different ways in which a Conv2d -> BatchNorm -> ReLU

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        
        self._model = nn.Sequential(
            
            # Conv2D Norm Block 1:
            nn.Conv2d(in_channels=3,out_channels=32,kernel_size=3),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            
            nn.Conv2d(in_channels=32,out_channels=32,kernel_size=3),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2),
            
            # Conv2D Norm Block 2: Using LazyConv2D
            nn.LazyConv2d(out_channels=64,kernel_size=3),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            
            nn.LazyConv2d(out_channels=128,kernel_size=3),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2),
            
            # Conv2d Norm Activation Block 3: Using Conv2dNormActivation
            Conv2dNormActivation(in_channels=128,out_channels=256,kernel_size=3),
            
            Conv2dNormActivation(in_channels=256,out_channels=256,kernel_size=3),
            nn.MaxPool2d(kernel_size=2),
            
            Conv2dNormActivation(in_channels=256,out_channels=512,kernel_size=3),
            nn.MaxPool2d(kernel_size=2),
            
            # Feed Forward Layers
            nn.AdaptiveAvgPool2d(output_size=(3,3)),
            
            # Flatten
            nn.Flatten(),
            
            # Classification Head
            nn.Linear(in_features=512*3*3,out_features=256),
            nn.Linear(in_features=256,out_features=10)
        )
    
    def forward(self,x):
        return self._model(x)

In [None]:
model = CNN()

# Define the Optimizer
optimizer = Adam(params=model.parameters(),lr=train_config.learning_rate)
DEVICE = torch.device("cuda") if torch.cuda.is_available() else "cpu"

dummy_input = (1,3,244,244)

# Log
log_dir = "runs/80epochs-3.3M_param_dropout"
writer = SummaryWriter(log_dir)

print(summary(model,dummy_input,row_settings=["var_names"],device="cpu"))

### 5. Training and Model Evaluation

In [None]:
def training(model,train_loader):
    model.train()
    model.to(DEVICE)
    
    # Log Losses
    running_losses = 0
    correct_predictions = 0
    total_train_samples = 0
    
    for images,labels in tqdm(train_loader,desc="Training"):
        images, labels = images.to(DEVICE),labels.to(DEVICE)
        optimizer.zero_grad()
        outputs = model(images)
        loss = F.cross_entropy(outputs,labels)
        loss.backward()
        optimizer.step()
        
        running_losses += loss.item()
        _,predicted = torch.max(outputs.data,dim=1)
        total_train_samples += labels.shape[0]
        correct_predictions += (predicted==labels).sum().item()
        
    train_avg_loss = running_losses / len(train_loader)
    training_accuracy = 100 * correct_predictions/total_train_samples
    
    return train_avg_loss, training_accuracy        

In [None]:
def validation(val_data,val_loader):
    model.eval()
    model.to(DEVICE)
    
    running_loss = 0
    total_val_samples = 0
    correct_predictions = 0
    
    for images,labels in tqdm(val_data,desc="Validation"):
        images,labels = images.to(DEVICE),labels.to(DEVICE)
        
        with torch.no_grad():
            outputs = model(images)
            
        loss = F.cross_entropy(outputs)
        running_loss += loss.item()
        total_val_samples = labels.shape[0]
        _,predicted = torch.max(outputs.data,dim=1)
        correct_predictions += (predicted==labels).sum().item()
        
    validation_avg_loss = running_loss / len(val_loader)
    validation_accuracy = 100 * correct_predictions / total_val_samples
    
    return validation_avg_loss,validation_accuracy
        

In [None]:
def main(model,train_loader,val_loader):
    train_losses, train_accuracies = [],[]
    val_losses,val_accuracies = [],[]
    
    best_validation_acc = 0.0
    best_weights = None
    
    for epoch in tqdm(range(train_config.num_epochs)):
        train_loss,train_accuracy = training(model,train_loader)
        val_loss,val_accuracy = validation(model,val_loader)
        
        train_losses.appedn(train_loss)
        train_accuracies.append(train_accuracy)
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)
        
        print(f"Epoch {epoch+1:0>2}/{train_config.num_epochs} - Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}% - Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")
        
        writer.add_scalar('Loss/Train',train_loss)
        writer.add_scalar('Loss/Val',val_loss)
        writer.add_scalar('Accuracy/Train',train_accuracy)
        writer.add_scalar('Accuracy/Val',val_accuracy)
        
        if val_accuracy > best_validation_acc:
            best_validation_acc = val_accuracy
            best_weights = model.state_dict()
            print(f"Saving Best Model.......")
            torch.save(best_weights,"best.pt")
        
    
    return train_losses,train_accuracies,val_losses,val_accuracies

In [None]:
train_losses, train_accuracies, val_losses, val_accuracies = main(model, train_loader, val_loader)

### 6. Saving and Loading the Best Model

In [None]:
# Load the best model weights
model.load_state_dict(torch.load("best.pt"))
model.eval()

### 7. Inference

In [None]:
def prediction(model,val_loader):
    model.eval()
    model.to(DEVICE)
    
    all_images,all_labels = [],[]
    all_pred_indices,all_pred_probs = [],[]
    
    for images,labels in val_loader:
        images,labels = images.to(DEVICE),labels.to(DEVICE)
        
        with torch.inference_mode():
            outputs = model(images)
            
        probs = F.softmax(outputs,dim=1)
        pred_indices = probs.data.max(dim=1)[1]
        pred_prob = probs.max.data(dim=1)[0]
        
        all_images.append(images.cpu())
        all_labels.append(labels.cpu())
        all_pred_indices.append(pred_indices.cpu())
        all_pred_probs.append(pred_prob.cpu())
        
    return (torch.cat(all_images).numpy(),torch.cat(all_labels).numpy(),torch.cat(all_pred_indices).numpy(),torch.cat(all_pred_probs).numpy())

In [None]:
# De-normalize image
def denormalize(image):
    mean_ar = np.array(mean)
    std_ar = np.array(std)
    image = image * std_ar + mean_ar
    
    return np.clip(image,0,1) # np.clip makes the pixel values to be in range of [0,1]

In [None]:
def visualise_predictions(sample_images,sample_gt_labels, pred_indices, pred_probs, num_images =10):

    fig = plt.figure(figsize = (20,5))

    for i in range(num_images):
        idx = random.randint(0, len(sample_images) -1)
        image = sample_images[idx].transpose(1,2,0) #(C,H,W) --> (H,W,C)
        label = sample_gt_labels[idx]
        pred_idx = pred_indices[idx]
        pred_prob = pred_probs[idx]

        image = denormalize(image)

        ax = fig.add_subplot(1, num_images, i+1)
        ax.imshow(image)
        ax.set_title(f"GT: {class_mapping[label]}\nPred: {class_mapping[pred_idx]} ({pred_prob:.2f})")
        ax.axis('off')

    plt.show()

In [None]:
val_images, val_gt_labels, pred_indices, pred_probs = prediction(model, val_loader)

visualise_predictions(val_images, val_gt_labels, pred_indices, pred_probs, num_images = 5)