In [1]:
#JV

I would develop the code for the assignment in this notebook as it is easy to quickly test (and even unit testing).

When a module/part is bug free I would add it to the .py file later.



In [2]:
import os
import time

import numpy as np

import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms
#from torchvision.transforms import v2 as transforms

import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import matplotlib.pyplot as plt

In [3]:
seed = 76 #setting this as seed wherever randomness comes

In [4]:
"""
Some important points:

1. Images are of different shapes, so they all have to be resized by looking at the image shapes and picking the maximum [or mean/mode ?] width and height.

2. Now they have to be normalized. To zero mean and unit variance. This has to be done within train, test and validation datasets separately. One normalization won't work for the other because the images may go out of scale (i.e beyond +/- 1)


"""

"\nSome important points:\n\n1. Images are of different shapes, so they all have to be resized by looking at the image shapes and picking the maximum [or mean/mode ?] width and height.\n\n2. Now they have to be normalized. To zero mean and unit variance. This has to be done within train, test and validation datasets separately. One normalization won't work for the other because the images may go out of scale (i.e beyond +/- 1)\n\n\n"

In [5]:
def get_image_sizes(ds):
    """
    
    
    """
    x_vals = []
    y_vals = []
    
    images = []
    
    for i in range(len(ds.samples)):
        
        x,y = train_dataset[i][0].shape[1:]
    
        x_vals.append(x)
        y_vals.append(y)
        
        #images.append(train_dataset[i][0])

    print(f"X min : {np.min(x_vals)}\tX Max:{np.max(x_vals)}")    
    print(f"Y min : {np.min(y_vals)}\tY Max:{np.max(y_vals)}")

In [6]:
#get_image_sizes(train_dataset)
#get_image_sizes(val_dataset)
#get_image_sizes(test_dataset)

In [7]:
"""
As the maximum dimensions are 800,800 for train, validatio and test data

Resizing all images to 800x800 size.
"""

'\nAs the maximum dimensions are 800,800 for train, validatio and test data\n\nResizing all images to 800x800 size.\n'

In [8]:
class CNN(nn.Module):
    """
    A class (that inherits nn.Module), to create the CNN architecture as required and to define the forward pass.
    """

    def compute_output_dimension_post_convolutions(self,convolution_layer_specifications):
            
            """
            Method to compute the dimension of the output after all convolution,maxpooling layers.
            This helps to compute the input size of first fully connected layer.

            Params:
                convolution_layer_specifications :  A list of lists. There exists one list per conv or maxpool. 

                        The first element of the list is a string "conv" or "maxpooling". Indicating the layer type
                "conv" is followed by number of filters, filter sizes, stride, paddings. It is assumed that every convolutional layer is followed by an activation layer.
                "maxpool" is followed by filter size and stride.
            
            Returns:
                A tuple containing the dimension (Height,Width,depth) of the output after all convolutional operations.
            """

            W_cur = self.W
            H_cur = self.H
            depth_cur = self.input_channels
            
            for config in convolution_layer_specifications: ## assuming square filters
            
                layer_type = config[0]
            
                if layer_type == "conv":  ## its a convolutional layer
            
                    
                    _, number_of_filters, filter_size, stride, padding = config
                    
                    W_cur = (W_cur - filter_size + 2*padding)//stride + 1
                    H_cur = (H_cur - filter_size + 2*padding)//stride + 1
                    depth_cur = number_of_filters
                    
                elif layer_type == "maxpool":
                    _,max_pool_filter_size,max_pool_stride = config
            
                    W_cur = (W_cur-max_pool_filter_size)//max_pool_stride + 1
                    H_cur = (H_cur-max_pool_filter_size)//max_pool_stride + 1
            

            return (H_cur,W_cur,depth_cur)

    
    def __init__(self,num_output_neurons, conv_activation, fc_activation, convolution_layer_specifications, hidden_layer_specifications, output_activation,conv_batch_norm = False,fc_batch_norm = False, W=800, H=800, input_depth=3):

        """
        Default Constructor.

        Params:

            num_output_neurons: Number of neurons in the output layer.

            conv_activation : A torch nn method to be used as activation function after the convolutional layers.
            
            fc_activation : A torch nn method to be used as activation function for the fully connected layers.
            
            convolution_layer_specifications: A list of lists. There exists one list per conv or maxpool. 

                        The first element of the list is a string "conv" or "maxpooling". Indicating the layer type
                "conv" is followed by number of filters, filter sizes, stride, paddings. It is assumed that every convolutional layer is followed by an activation layer.
                "maxpool" is followed by filter size and stride.
            
            hidden_layer_specifications: A list of ints. Number of elements correspond to number of hidden layers and each value gives the number of neurons in the corresponding hidden layer.
            
            output_activation: The torch nn activation method to be used for the output layer.

            batch_norm  : Applies batch norm after each Convolutional Layer or Fully Connected layer.

            H : Height of the input image. [should be fixed for the dataset] (default:800)

            W : Width of the input image.  [should be fixed for the dataset]  (default:800)

            input_depth : Depth of the input image (number of channels). [should be fixed for the dataset] (default:3)


        Returns:
        
            None.
        """

        super(CNN, self).__init__()

        self.H = H
        self.W = W
        self.input_channels = input_depth
        self.output_activation = output_activation
        
        self.convolutional_layers = nn.ModuleList() ## Create a module list to organize the convolutional layers
        
        
        ## iterate over the convolution_layer_specifications and create the convolutional layers accordingly

        cur_depth = self.input_channels
        for config in convolution_layer_specifications:

            ## Assuming filter is a square matrix, so filter_size is int.
            type = config[0]

            if type == "conv":
                _, number_of_filters, filter_size, stride, padding = config
                self.convolutional_layers.append(nn.Conv2d(cur_depth, number_of_filters, filter_size, stride, padding))
                ## If batchnorm is to be done for Conv layers
                if conv_batch_norm:
                    self.convolutional_layers.append(nn.BatchNorm2d(number_of_filters))
                self.convolutional_layers.append(conv_activation)  # Add activation after each convolutional layer
            
            elif type == "maxpool":
                _,max_pool_filter_size,max_pool_stride = config
                self.convolutional_layers.append(nn.MaxPool2d(max_pool_filter_size,max_pool_stride))
            
            cur_depth = number_of_filters  # Update input depth for next layer

        self.convolutional_layers.apply(self.initalize_weights_biases)
        
        ## iterate over the hidden_layer_specifications and create CNN layers accordingly.
        self.hidden_layers = nn.ModuleList()

        conv_output_height,conv_output_height,conv_output_depth = self.compute_output_dimension_post_convolutions(convolution_layer_specifications)
        
        fan_in =  conv_output_height*conv_output_height*conv_output_depth ## interface betweeon maxpooling and dense layer
        
        for hidden_size in hidden_layer_specifications:
            self.hidden_layers.append(nn.Linear(fan_in, hidden_size))
            ## If batchnorm is to be done for FC layers
            if fc_batch_norm:
                self.hidden_layers.append(nn.BatchNorm1d(hidden_size))
            self.hidden_layers.append(fc_activation)  # Add ReLU activation after each dense layer
            fan_in = hidden_size  # Update number of input features for next layer

        self.hidden_layers.apply(self.initalize_weights_biases)

        self.output = nn.ModuleList()
        self.output.append(nn.Linear(hidden_layer_specifications[-1], num_output_neurons))
        self.output.append(self.output_activation)
        
        self.output.apply(self.initalize_weights_biases)


    def forward(self,x):

        ## pass through convolution, activation, pooling layer set
        for layer in self.convolutional_layers:
            x = layer(x)
        
        x = torch.flatten(x, 1)


        ## pass through hidden layers
        for layer in self.hidden_layers:
            x = layer(x)

        for layer in self.output: ## compute output and apply softmax
            x = layer(x)
            
        return x


    def initalize_weights_biases(self,m):
        """
        
        Method to initialize weights given a torch module.

        Using "HE" (kaiming_normal) Initialization, as it goes well with ReLU in CNN.
        
        """
        if isinstance(m, nn.Linear):  ## it its a fully connected layer
            torch.nn.init.xavier_normal_(m.weight)
            m.bias.data.fill_(0.01) ##a small non-zero value

        elif isinstance(m, nn.Conv2d): ## if its a Convolutional layer
            torch.nn.init.kaiming_normal_(m.weight)
            m.bias.data.fill_(0.01) ##a small non-zero value

        ## Maxpool and activation layers would not have any parameters, so no initialization for them

In [9]:
class DataPreparation:

    def __init__(self,H,W,data_dir,device):

        self.H = H
        self.W = W
        self.base_dir  = data_dir
        self.device = device

    def compute_mean_and_dev_in_dataset(self,sub_dir):

        """
        Method to compute the channel wise mean and std dev in the orignal (train/validation/test) dataset.

        params:
        
            sub_dir : "train/" or "validation/" or "test/", from which data has to be taken.

        Returns:

            mean,std of the dataset.
        
        """

        data_transforms = transforms.Compose(
        [
            transforms.Resize(size=(self.H,self.W)),
            transforms.ToTensor(),
        ]
        )
        
        dataset = torchvision.datasets.ImageFolder(root=self.base_dir+sub_dir,transform=data_transforms)
        loader = torch.utils.data.DataLoader(dataset,batch_size=32,shuffle=False,num_workers=3)
    
        mean = 0.0
        var = 0.0
        count = 0
    
        for _, data in enumerate(loader, 0):
    
            inputs, labels = data
            inputs, labels = inputs.to(self.device), labels.to(self.device) ## move the inputs and lables to the device.
    
            # Reshape inputs to be the shape of [B, C, W * H]
            # where B is the batch size, C is the number of channels in the image, and W and H are the width and height of the image respectively
            inputs = inputs.view(inputs.size(0), inputs.size(1), -1)
    
            # Update total number of images
            count += inputs.size(0)
    
            # Compute mean and std here
            mean += inputs.mean(2).sum(0)
            var += inputs.var(2).sum(0)
        
        mean /= count
        var /= count
        std = torch.sqrt(var)
    
        #print(mean,std)
        
        return mean.cpu(),std.cpu()

    def create_dataloader(self,sub_dir,batch_size=32,shuffle=True,num_workers=2,data_augmentation_transforms = None):

        """
        Method to create dataset and return dataloader after applying all necessary transforms.

        params:

            sub_dir : "train/" or "validation/" or "test/"
            batch_size : The batch size in which training has to be performed.
            shuffle : whether shuffling must be done before sampling.
            num_works : Number of workers to be used on the dataset.
            data_augmentation_transforms ; Either None or List of transforms to be applied for data agumentation.

        Returns:

            Dataloader corresponding to the dataset.
        
        """

        print(f"Preparing data from {sub_dir}")

        data_transforms_list = [transforms.Resize(size=(self.H,self.W)),transforms.ToTensor()]

        if data_augmentation_transforms:
            data_transforms_list = data_transforms_list + data_augmentation_transforms


        mean,std = self.compute_mean_and_dev_in_dataset(sub_dir)
        
        data_transforms_list = data_transforms_list + [transforms.Normalize(mean,std)]

        self.dataset = torchvision.datasets.ImageFolder(root=self.base_dir+sub_dir,transform=transforms.Compose(data_transforms_list))
        
        self.loader = torch.utils.data.DataLoader(self.dataset,batch_size=batch_size,shuffle=True,num_workers=num_workers)
            
        return self.loader
        
    

In [10]:

def compute_accuracy(model,data_iterator):
    
    correct_preds = 0
    total_preds = 0

    loss = 0
    train_mode = model.training
    
        # since we're testing, switch of train mode if it is on.
    if train_mode:
        model.eval()

    with torch.no_grad(): ##don't compute gradients
        for data in data_iterator:
            images, labels = data
            images, labels = images.to(device), labels.to(device) ## move the inputs and labels to the device
    
            # calculate outputs by running images through the network
            outputs = model(images)
            loss += criterion(outputs, labels).item()
            
            _, preds = torch.max(outputs.data, 1)
            
            total_preds += len(images)
            correct_preds += (preds == labels).sum().item()
    
    if train_mode: # if model was originally in train mode, switch it back to train mode.
        model.train() ## switch back to train mode

    #print(f'Accuracy of the model on the {len(data_iterator.dataset.samples)} test images: {round(100*correct/total,2)} %')

    accuracy = round(100*correct_preds/total_preds,2)
    loss = round(loss/total_preds,2)
    
    return loss,accuracy

In [11]:
def print_network_architecture(model):
    print(model)
    print("Network Architecture:")
    for name, module in model.named_children():
        print("-" * 40)
        print(f"Layer Name: {name}")
        print(module)
        print("-" * 40)

In [12]:
## Create dataloaders


##using apple silicon GPU
device = "mps" #torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using {device}")

H = W = 800

batch_size = 32
shuffle = True
num_workers = 3
base_data_dir = "inaturalist_12K/"

dataprep = DataPreparation(H=H,W=W,data_dir=base_data_dir,device = device)

train_data_augmentation_transforms = [transforms.RandomPerspective(),transforms.RandomRotation(degrees=(0, 180)),transforms.ColorJitter()]


""" 
    Note : 

        In random data augmentation, probabilities of each random operation is 0.5, hence in an expected sense, the class balance would still hold.
            
        
        The mean and stddev would change after random data augmentation, and are known only after applying specifying the transforms for the dataset.
        So these values will not be known while actually applying the normalization hence as a substitute considering the mean and std of the original dataset.

        And without data augmentation these values would be accurate.
"""

train_loader = dataprep.create_dataloader("train/",batch_size = batch_size,shuffle = shuffle, num_workers = num_workers,data_augmentation_transforms = train_data_augmentation_transforms)

val_loader = dataprep.create_dataloader("validation/",batch_size = batch_size,shuffle = shuffle, num_workers = num_workers)

test_loader = dataprep.create_dataloader("test/",batch_size = batch_size,shuffle = shuffle, num_workers = num_workers)


Using mps
Preparing data from train/
Preparing data from validation/
Preparing data from test/


In [14]:
## create model

num_classes =  len(train_loader.dataset.classes)

convolution_layer_specifications = [

    ["conv",6,5,2,0],
    ["maxpool",2,2],
    ["conv",16,5,2,0],
    ["maxpool",2,2],
    ["conv",16,7,3,0],
    ["maxpool",2,3],
]

hidden_layer_specifications = [128,64]

W = H = 800

#### number of rows = batch_size and cols = 10, so apply softmax for each row. so dim=1 for softmax.

model = CNN(num_output_neurons=num_classes, conv_activation=nn.ReLU(),fc_activation = nn.SELU() ,convolution_layer_specifications=convolution_layer_specifications, hidden_layer_specifications=hidden_layer_specifications, output_activation = nn.Softmax(dim=1), conv_batch_norm = True, fc_batch_norm = False ,W=W, H=H, input_depth=3)
model.apply(model.initalize_weights_biases)
model.to(device)

CNN(
  (output_activation): Softmax(dim=1)
  (convolutional_layers): ModuleList(
    (0): Conv2d(3, 6, kernel_size=(5, 5), stride=(2, 2))
    (1): BatchNorm2d(6, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(6, 16, kernel_size=(5, 5), stride=(2, 2))
    (5): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU()
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(16, 16, kernel_size=(7, 7), stride=(3, 3))
    (9): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU()
    (11): MaxPool2d(kernel_size=2, stride=3, padding=0, dilation=1, ceil_mode=False)
  )
  (hidden_layers): ModuleList(
    (0): Linear(in_features=400, out_features=128, bias=True)
    (1): SELU()
    (2): Linear(in_features=128, out_features=64, bias=True)
   

In [None]:
## Training.

criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay = 1e-3)

start_time = time.time()

for epoch in range(20):  # loop over the dataset multiple times
    
    #print(f"Epoch:{epoch+1}")
    correct_preds = 0
    total = 0
    count = 0
    epoch_loss = 0.0
    
    for i, data in enumerate(train_loader, 0):
        
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device) ## move the inputs and lables to the device.

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs).to(device)
        
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        epoch_loss +=  loss.item()
        
        _, preds = torch.max(outputs.data, 1)
        
        total += labels.size(0)
        correct_preds += (preds == labels).sum().item()

        
        #print(count)

    val_loss,val_accuracy = compute_accuracy(model,val_loader)

    end_time = time.time() - start_time
    print(f'Epoch : {epoch+1}\t Train Accuracy : {round(100*correct_preds/total,2)}%\t Train loss: {epoch_loss/total:.2f}\t Validation Loss : {val_loss}\t Validation Accuracy : {val_accuracy}%')
    epoch_loss = 0.0

print('Finished Training!!')


print(f"Time Taken : {round(end_time/60,2)}")

In [None]:
"""
To add:

Dropout.
early stopping with patience

"""


### References:

1. https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html
2. https://pytorch.org/tutorials/beginner/basics/data_tutorial.html#:~:text=PyTorch%20provides%20two%20data%20primitives,easy%20access%20to%20the%20samples.
3. https://stanford.edu/~shervine/blog/pytorch-how-to-generate-data-parallel [to create dataloaders]
4. https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html [To understand overall structure of torch code]
5. https://pytorch.org/vision/main/generated/torchvision.transforms.Compose.html [for composing transform]
6. https://pytorch.org/vision/main/generated/torchvision.transforms.Resize.html [for resizing images]
7. https://pytorch.org/vision/stable/transforms.html#torchvision.transforms.Normalize [For normalization]
8. https://pytorch.org/vision/stable/auto_examples/transforms/plot_transforms_illustrations.html#sphx-glr-auto-examples-transforms-plot-transforms-illustrations-py [For data augmentation]