In [447]:
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using {device}.')

Using cuda.


In [448]:
%matplotlib inline
import matplotlib.pyplot as plt
import time
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import StepLR

# Implementing Data Loader

*  The CIFAR-10 dataset is composed of 60000 small (3 ×32×32) color images, each of which belongs to one of  10 classes
*   I have resized the images in the dataset in $32 \times 32$  images.

* I have implemented few data augamentation techniques such as `RandomHorizontalFlip` and `ColorJitter` in order to avoid overfitting..

* The following function is used to load the dataset and resize the images

T

In [449]:
import torchvision

def load_data_CIFAR(batch_size, resize):
    """Download the CIFAR-10 dataset and then load it into memory."""
    trans = [torchvision.transforms.ToTensor()]
    # Randomly flip the image horizontally
    trans.append(transforms.RandomHorizontalFlip(p=0.6))

    # Apply Colour Gitter
    trans.append(transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1))
    
    if resize:
        trans.insert(0, torchvision.transforms.Resize(resize))
    trans = torchvision.transforms.Compose(trans)

    mnist_train = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=trans)
    mnist_test = torchvision.datasets.CIFAR10(root="../data", train=False, transform=trans, download=True)

    return (torch.utils.data.DataLoader(mnist_train, batch_size, shuffle=True, num_workers=2, pin_memory=True), # Using pinned memory
            torch.utils.data.DataLoader(mnist_test, batch_size, shuffle=False, num_workers=2, pin_memory=True)) # Using pinned memory

In [450]:
batch_size = 64 # Defines the batch size
train_iter, test_iter = load_data_CIFAR(batch_size, resize=(32, 32)) #Load train and test datasets

Files already downloaded and verified
Files already downloaded and verified


In [451]:
X, y = next(iter(train_iter)) # Requests the first training batch
print(X.size()) # 64 images per batch. Each image is represented by a 3 x 32 x 32 tensor (number of channels x height x width). The images are RGB, so there are 3 channels.
print(y.size()) # 64 targets. Each target is a number between 0 and 9. The classification problem has 10 clases.
torch.cuda.empty_cache() #Empty cache for improving performance

torch.Size([64, 3, 32, 32])
torch.Size([64])


**Task 2:  Implementing a neural network based on the architecture described in Basic Architecture**


**1) Intermediate Block**


* The constructor for `Intermediate block` receives the number of convolutional layer'$c_n$', number of output channels $c_o$, the number of input channels $c_i$ and the dropout rate  $d_r$.

* Following is the step by step implementation of the intermediate block
    1. The input image goes through $c_n$ independent convolutional layers with $c_o$ convolutional filters, each with a $6 \times 6$ window and padding $2$.
    2. The resulting image of each convolutional layer goes through a max_pooling with window size $3 \times 3$, stride of $2$ and padding of $1$
    3. The resulting image goes through batch normalization layer for images and a rectified linear activation function.
    4. The resulting image goes through a dropout function to introduce normalization with the dropout rate $d_r$.
    5. The resulting image goes is multiplied with a weighted vector `$a$` which is obtained by a linear layer and `sigmoid` activation, the linear layer takes the input vector `$m$` which is the average of the each channel of input image.
    6. We then perform the sum of the above computation to obtain the final resulting image of a single intermediate block.
    7. The resulting image is then passed through `L` independent blocks going through same process defined in `steps 1-5`.
    8. The Resulting image from the last intermediate block is then passed to the output block


In [452]:
class IntermediateBlock(nn.Module):
    def __init__(self, num_layers, num_output_channels, input_channels,dropout_rate):
        super(IntermediateBlock, self).__init__()
        self.num_layers = num_layers
        self.num_output_channels = num_output_channels
        
        # Define convolutional layers
        self.conv_layers = nn.ModuleList()
        self.batch_norms = nn.ModuleList()
        self.relu_activations = nn.ModuleList()
        self.channel_max = nn.ModuleList()
        self.dropouts = nn.ModuleList()
        for _ in range(num_layers):
            conv_layer = nn.Conv2d(input_channels, num_output_channels, kernel_size=6, padding=2)
            self.conv_layers.append(conv_layer)
            bn = torch.nn.BatchNorm2d(num_output_channels)
            self.batch_norms.append(bn)
            relu = torch.nn.ReLU()
            self.relu_activations.append(relu)
            max_pool = torch.nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
            self.channel_max.append(max_pool)
            dropout = torch.nn.Dropout(p=dropout_rate)
            self.dropouts.append(dropout)
        
        # Define fully connected layer for computing coefficients
        self.fc = nn.Linear(input_channels, num_layers)
        self.sigmoid = torch.nn.Sigmoid()

    def forward(self, x):
        # Apply convolutional layers
        conv_outputs = []
        for conv_layer,bn,relu,dropout,max_pool in zip(self.conv_layers,self.batch_norms,self.relu_activations,self.dropouts,self.channel_max):
            conv_output = conv_layer(x)
            conv_output = max_pool(conv_output)
            conv_output = bn(conv_output)
            conv_output = relu(conv_output)
            conv_output = dropout(conv_output)
            conv_outputs.append(conv_output)
        
        # Compute average values for each channel
        m = torch.mean(x, dim=(2, 3))
        
        # Compute coefficients using fully connected layer
        a = (self.fc(m))
        a = self.sigmoid(a)
        
        # Reshape coefficients to match convolutional outputs
        a = a.view(a.size(0), -1, 1, 1, 1)  # Reshape to (batch_size, num_layers, 1, 1)
        
        # Combine outputs with coefficients
        weighted_outputs = [a[:, i] * conv_outputs[i] for i in range(self.num_layers)]
        
        # Sum the weighted outputs
        x_prime = sum(weighted_outputs)
        
        return x_prime


## Output Block

* The constructor for `OutputBlock` receives the number of input channels $c_i$ and the number of classes $c_c$.

* Following is the step by step implementation of the output block:
    1. First the mean of the each channel of the input image is calculated as vector `$m$`.
    2. The image that results from step 1 is passed to a fully connected linear layer which gives out the `logits ($o$)` for each class.
    3. The resulting logits which is the probability distribution of each class in our dataset is then returned as the final output.


In [453]:
class OutputBlock(nn.Module):
    def __init__(self, num_channels, num_classes):
        super(OutputBlock, self).__init__()
        self.num_channels = num_channels
        self.num_classes = num_classes

        self.fc_layers = nn.Linear(num_channels, num_classes)

    def forward(self, x):
        # Compute average values for each channel
        m = torch.mean(x, dim=(2, 3))  # Global average pooling
        
        # Pass through fully connected layers
        o = self.fc_layers(m)
        
        return o


## BasicNet

* The constructor for `BasicNet` Defines the number of intermediate and output block.

* Following is the step by step implementation of the BasicNet class:
    1. First all the intermediate blocks are defined by passing the required parameters as, this allows to create the blocks with different convolutional layers as well as different parameters such as dropout rates `$d_r$`.
    2. The number of input channels $c_i$ for the next block is equal to the number of output channels $c_o$ for the previous layer.
    3. The number of convolutional layers for blocks `1,2 and 3` are `4,3,2` respectively
    4. Finally the output block is defined which will give the logits as the final output.
    5. The resulting logits which is the probability distribution of each class in our dataset is then returned as the final output.


In [454]:
class BasicNet(nn.Module):
    def __init__(self):
        super(BasicNet, self).__init__()
        self.intermediate_block1 = IntermediateBlock(num_layers=4, num_output_channels=128, input_channels=3,dropout_rate=0.5)
        self.intermediate_block2 = IntermediateBlock(num_layers=3, num_output_channels=64, input_channels=128,dropout_rate=0.5)
        self.intermediate_block3 = IntermediateBlock(num_layers=2, num_output_channels=32, input_channels=64,dropout_rate=0.5)
        self.output_block = OutputBlock(num_channels=32, num_classes=10)

    def forward(self, x):
        x = self.intermediate_block1(x)
        x = self.intermediate_block2(x)
        x = self.intermediate_block3(x)
        # x = self.intermediate_block4(x)
        x = self.output_block(x)
        return x


In [455]:
# Applies Xavier initialization if the `torch.nn.Module` is `torch.nn.Linear` or `torch.nn.Conv2d`
def init_weights(m):
    if type(m) == torch.nn.Linear or type(m) == torch.nn.Conv2d:
        torch.nn.init.xavier_uniform_(m.weight)

**Below I have moved the model to GPU and defined its object and applied weight initialization**

In [456]:
# Step 3: Set up training parameters
net = BasicNet().to(device)
net.apply(init_weights) # Applies `init_weights` to every `torch.nn.Module` inside `model`

BasicNet(
  (intermediate_block1): IntermediateBlock(
    (conv_layers): ModuleList(
      (0-3): 4 x Conv2d(3, 128, kernel_size=(6, 6), stride=(1, 1), padding=(2, 2))
    )
    (batch_norms): ModuleList(
      (0-3): 4 x BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (relu_activations): ModuleList(
      (0-3): 4 x ReLU()
    )
    (channel_max): ModuleList(
      (0-3): 4 x MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    )
    (dropouts): ModuleList(
      (0-3): 4 x Dropout(p=0.5, inplace=False)
    )
    (fc): Linear(in_features=3, out_features=4, bias=True)
    (sigmoid): Sigmoid()
  )
  (intermediate_block2): IntermediateBlock(
    (conv_layers): ModuleList(
      (0-2): 3 x Conv2d(128, 64, kernel_size=(6, 6), stride=(1, 1), padding=(2, 2))
    )
    (batch_norms): ModuleList(
      (0-2): 3 x BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (relu_activations): Modu

## Loss function, optimizer and scheduler

* The *convolutional neural network* defined above computes the logits matrix $\mathbf{O}$.

* This is because PyTorch provides a class called `CrossEntropyLoss` that implements the desired cross entropy loss but requires a logits matrix $\mathbf{O}$ instead of the prediction matrix $\mathbf{\hat{Y}}$.

* The class `CrossEntropyLoss` implements the cross entropy loss in a way that avoids numerical instabilities that would result from a naive implementation.
* I have applied `Adam` optimizer with learning rate of `0.001`
* I have used `StepLR` scheduler with step size of `$15$` and gamma = `$0.9$` which will multiply my learning rate with `$0.9$` after `$15$` epochs.

In [457]:
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.001)
scheduler = StepLR(optimizer, step_size=15, gamma=0.9)  # Adjust the learning rate every 20 epochs by multiplying it with 0.1

## Evaluation

* Since the highest element of a logits vector determines which class will be predicted.

* We can use this to compute the number of correct predictions per batch.

In [458]:
def correct(logits, y):
    y_hat = logits.argmax(axis=1)  # Finds the column with the highest value for each row of `logits`.
    return (y_hat == y).float().sum()  # Computes the number of times that `y_hat` and `y` match.

* We can use the previous function to compute the accuracy of our model in a given dataset by accumulating the number of correct predictions across batches and then dividing that number by the number of examples in the dataset.

In [459]:
# Define a function to evaluate metrics
def evaluate_metric(model, data_iter, metric):
    total_metric = 0.0
    total_samples = 0
    
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        for X, y in data_iter:
            X, y = X.to(device), y.to(device)
            logits = model(X)
            total_metric += metric(logits, y).item()
            total_samples += y.size(0)
    return total_metric / total_samples

In [460]:
net.eval()
print(f'Training accuracy: {evaluate_metric(net, train_iter, correct)}. Testing accuracy: {evaluate_metric(net, test_iter, correct)}.')

Training accuracy: 0.1. Testing accuracy: 0.1.


## Training

* The following code implements the training loop for the convolutional neural network.

* The training/testing dataset accuracy is displayed after each epoch and stored for plotting.
* The training and test accuracies as well as the cross entropy loss for each batch are then plotted at the end.

In [None]:
losses = [] # Stores the loss for each training batch
train_accs = [] # Stores the training accuracy after each epoch
test_accs = [] # Stores the testing accuracy after each epoch

num_epochs = 80
for epoch in range(num_epochs):
    print(f'\nEpoch {epoch + 1}/{num_epochs}.')
    start_time = time.perf_counter()

    net.train() # This is necessary because batch normalization behaves differently between training and evaluation

    for X, y in train_iter:
        X, y = X.to(device), y.to(device) # Moves data to `device`
        logits = net(X) # Computes the logits for the batch of images `X`

        l = loss_fn(logits, y) # Computes the loss given the `logits` and the class vector `y`
        optimizer.zero_grad() # Zeroes the gradients stored in the model parameters
        l.backward() # Computes the gradient of the loss `l` with respect to the model parameters

        optimizer.step() # Updates the model parameters based on the gradients stored inside them
        losses.append(float(l)) # Stores the loss for this batch

    scheduler.step()

    with torch.no_grad(): # Computing performance metrics does not require gradients
        net.eval() # This is necessary because batch normalization behaves differently between training and evaluation
        train_accs.append(evaluate_metric(net, train_iter, correct))
        test_accs.append(evaluate_metric(net, test_iter, correct))

        end_time = time.perf_counter()

        print(f'Training accuracy: {train_accs[-1]}. Testing accuracy: {test_accs[-1]}. Duration: {end_time - start_time:.3f}s.') # Computes and displays training/testing dataset accuracy.
        torch.cuda.empty_cache() #Empty cache for improving performance

plt.plot(losses) # Plots the loss for each training batch
plt.xlabel('Training batch')
plt.ylabel('Cross entropy loss')
plt.show()

plt.plot(train_accs, label='Training accuracy')
plt.plot(test_accs, label='Testing accuracy')
plt.legend(loc='best')
plt.xlabel('Epoch')
plt.show()


Epoch 1/80.
Training accuracy: 0.94916. Testing accuracy: 0.8588. Duration: 67.439s.

Epoch 2/80.
Training accuracy: 0.95016. Testing accuracy: 0.8547. Duration: 68.659s.

Epoch 3/80.
Training accuracy: 0.95304. Testing accuracy: 0.8534. Duration: 67.835s.

Epoch 4/80.
Training accuracy: 0.95274. Testing accuracy: 0.8563. Duration: 68.154s.

Epoch 5/80.
Training accuracy: 0.94542. Testing accuracy: 0.847. Duration: 69.310s.

Epoch 6/80.
Training accuracy: 0.96114. Testing accuracy: 0.8591. Duration: 67.215s.

Epoch 7/80.
Training accuracy: 0.96148. Testing accuracy: 0.8614. Duration: 66.594s.

Epoch 8/80.
Training accuracy: 0.95016. Testing accuracy: 0.8517. Duration: 67.277s.

Epoch 9/80.
Training accuracy: 0.96086. Testing accuracy: 0.8583. Duration: 67.838s.

Epoch 10/80.
Training accuracy: 0.9504. Testing accuracy: 0.8575. Duration: 66.320s.

Epoch 11/80.
Training accuracy: 0.9649. Testing accuracy: 0.8649. Duration: 66.103s.

Epoch 12/80.
Training accuracy: 0.96232. Testing accur