# Practice training a deep neural network on the CIFAR10 image dataset:

In [1]:
import os
os.chdir("..")

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import torchvision
import torchvision.transforms as transforms
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

from PIL import Image
import time

import numpy as np
from pathlib import Path

In [None]:
def train(model, train_loader, val_loader, criterion, optimizer, device='cpu', scheduler=None, epochs=50, patience=5):
    best_val_loss = float('inf')
    patience_counter = 0
    start = time.time()
    for epoch in range(epochs):
        model.train()
        per_epoch_train_loss = 0.0
        time_per_epoch = time.time()
        for data in train_loader:
            inputs, targets = data[0].to(device), data[1].to(device)
            
            optimizer.zero_grad()
                
            outputs = model(inputs)
            loss = criterion(outputs, targets)            
            loss.backward()
            optimizer.step()
            if scheduler:
                scheduler.step()
            per_epoch_train_loss += loss.item()

        avg_per_epoch_train_loss = per_epoch_train_loss / len(train_loader)
        writer.add_scalar("Loss/Train", avg_per_epoch_train_loss, epoch+1)

        per_epoch_val_loss = 0.0
        total = 0
        correct = 0
        for data in val_loader:
            inputs, targets = data[0].to(device), data[1].to(device)
            
            with torch.no_grad():
                outputs = model(inputs)
                _, predicted = torch.max(outputs, 1)
                total += targets.size(0)
                correct += (predicted == targets).sum().item()
                loss = criterion(outputs, targets)                
                per_epoch_val_loss += loss.item()
                
        avg_per_epoch_val_loss = per_epoch_val_loss / len(val_loader)
        val_accuracy = 100 * correct / total
        writer.add_scalar("Loss/Val", avg_per_epoch_val_loss, epoch+1)
        writer.add_scalar("Accuracy/Val", val_accuracy, epoch+1)

        print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {avg_per_epoch_train_loss:.4f}, " 
              f"Val Loss: {avg_per_epoch_val_loss:.4f}, Val Acc: {val_accuracy:.2f}%, Time Elapsed {time.time() - time_per_epoch:.3f}s")

        if  avg_per_epoch_val_loss < best_val_loss:
            patience_counter = 0
            best_val_loss = avg_per_epoch_val_loss
            best_model_state = model.state_dict()
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping triggered.")
                model.load_state_dict(best_model_state)
                break

    end = time.time()
    print(f"\nTotal Time for Training {(end-start)/60:.3f}m")
    return model

In [None]:
def eval(model, test_loader, device='cpu'):
    model.eval()
    correct = 0
    total = 0
    for inputs, targets in test_loader:
        inputs  = inputs.to(device)
        targets = targets.to(device)

        with torch.no_grad():
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
            
    print(f"Test Accuracy: {100 * correct / total:.2f}%")

**A. Build a DNN with 20 hidden layers of 100 neurons each (that’s too many, but it’s the point of this exercise). Use He initialization and the Swish activation function.()**

In [324]:
class CIFAR10V1(nn.Module):
    def __init__(self, input_features=3*32*32, output_neurons=100, num_classes=10, hidden_layers=20):
        super(CIFAR10V1, self).__init__()
        layers = []
        
        layer_first = nn.Linear(input_features, output_neurons)
        layers.append(layer_first)
        layers.append(Swish())
        
        for i in range(hidden_layers-1):
            layer = nn.Linear(output_neurons, output_neurons)
            layers.append(layer)
            layers.append(Swish())

        layer_last = nn.Linear(output_neurons, num_classes)
        layers.append(layer_last)
        
        self.net = nn.Sequential(*layers)
        self.net.apply(weights_init)

    def forward(self, X):
        flatten = nn.Flatten()
        X = flatten(X)
        return self.net(X)

### He Initialization (Kaiming Initialization) in PyTorch

He initialization, also called **Kaiming initialization**, is used to initialize weights. According to this, the weight parameters should be sampled from a distribution with:

$$\text{mean} = 0, \;\; \text{variance} = \frac{2}{\text{fan\_in}}\;\;\;i.e.,\;\;\;W \sim \mathcal{N} \left( 0, \frac{2}{\text{fan\_in}} \right)$$  



In PyTorch, there is no direct way to set the variance explicitly to $$(\frac{2}{\text{fan\_in}})$$ using `torch.nn.init.kaiming_normal_()`, since the `nonlinearity` argument only accepts `linear`, `relu`, and `leaky_relu` as arguments.

For **Leaky ReLU**, the gain and std is computed as:  

$$\text{gain} = \sqrt{\frac{2}{1 + \text{negative\_slope}^2}} \;\;\;\;
\text{std} = \sqrt{\frac{\text{gain}}{\sqrt{\text{fan\_mode}}}},\;\;\;
\text{variance} = \frac{\text{gain}}{\sqrt{\text{fan\_mode}}}$$  


To ensure the variance matches $$(\frac{2}{\text{fan\_in}}),\;\;\; we\;need\;to\;set\;\;negative\_slope=0\;\;and\;\;fan\_mode=fan\_in$$ 

Since `torch.nn.init.kaiming_normal_()` takes a parameter **`a`**, which denotes `negative_slope`, setting `a=0` makes sure that He initialization is applied correctly.  


**I have also implemented `nn.SiLU` (Swish activation function) manually. I do not know why I did this.**

In [322]:
def weights_init(layer, nonlinearity='leaky_relu'):
    if isinstance(layer, nn.Linear):
        if nonlinearity=='leaky_relu': # param a is only used with leaky_relu
            torch.nn.init.kaiming_normal_(layer.weight, mode='fan_in', a=0, nonlinearity=nonlinearity)
        else:
            torch.nn.init.kaiming_normal_(layer.weight, mode='fan_in', nonlinearity=nonlinearity)

        torch.nn.init.constant_(layer.bias, 0)

class Swish(nn.Module):
    def forward(self, x):
        return x * F.sigmoid(x)

**B. Using Nadam optimization and early stopping, train the network on the CIFAR10 dataset. The dataset is composed of 60,000 32 × 32–pixel color images (50,000 for training, 10,000 for testing) with 10 classes. Remember to search for the right learning rate each time you change the model’s architecture or hyperparameters.**

In [3]:
batch_size = 64

transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]
)

# Load CIFAR-10 dataset
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset  = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# Split training data into train and validation subsets (e.g., 80%/20%)
train_size = int(0.9 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_subset, val_subset = torch.utils.data.random_split(train_dataset, [train_size, val_size])

train_loader = torch.utils.data.DataLoader(train_subset, batch_size=batch_size, shuffle=True)
val_loader   = torch.utils.data.DataLoader(val_subset, batch_size=batch_size, shuffle=False)
test_loader  = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

Files already downloaded and verified
Files already downloaded and verified


In [325]:
# Training the model
epochs = 100
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
writer = SummaryWriter()

cifar10  = CIFAR10V1()
criterion = nn.CrossEntropyLoss()
optimizer = optim.NAdam(cifar10.parameters(), lr=0.001)
model_v1 = train(cifar10, train_loader, val_loader, criterion, optimizer, device=device, epochs=epochs)
writer.flush()
writer.close()

# Model Accuracy
eval(model_v1, test_loader, device=device)

**C. Now try adding batch normalization and compare the learning curves: is it converging faster than before? Does it produce a better model? How does it affect training speed?**

In [317]:
class CIFAR10V2(nn.Module):
    def __init__(self, input_features=3*32*32, output_neurons=100, num_classes=10, hidden_layers=20):
        super(CIFAR10V2, self).__init__()
        layers = []
        
        layer_first = nn.Linear(input_features, output_neurons)
        layers.append(layer_first)
        layers.append(nn.BatchNorm1d(output_neurons))
        layers.append(Swish())
        
        for i in range(hidden_layers-1):
            layer = nn.Linear(output_neurons, output_neurons)
            layers.append(layer)
            layers.append(nn.BatchNorm1d(output_neurons))
            layers.append(Swish())

        layer_last = nn.Linear(output_neurons, num_classes)
        layers.append(layer_last)
        
        self.net = nn.Sequential(*layers)
        self.net.apply(weights_init)

    def forward(self, X):
        flatten = nn.Flatten()
        X = flatten(X)
        return self.net(X)

In [378]:
epochs = 100
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
writer = SummaryWriter()

cifar10  = CIFAR10V2()
criterion = nn.CrossEntropyLoss()
optimizer = optim.NAdam(cifar10.parameters(), lr=0.001)
model_v2 = train(cifar10, train_loader, val_loader, criterion, optimizer, device=device, epochs=epochs)
writer.flush()
writer.close()

Epoch [1/5], Train Loss: 2.0241, Val Loss: 1.8095, Val Acc: 33.79%, Time Elapsed 61.484s
Epoch [2/5], Train Loss: 1.7113, Val Loss: 1.6768, Val Acc: 40.24%, Time Elapsed 61.726s
Epoch [3/5], Train Loss: 1.6001, Val Loss: 1.6090, Val Acc: 43.22%, Time Elapsed 60.703s
Epoch [4/5], Train Loss: 1.5270, Val Loss: 1.5700, Val Acc: 44.20%, Time Elapsed 62.025s
Epoch [5/5], Train Loss: 1.4740, Val Loss: 1.5388, Val Acc: 45.88%, Time Elapsed 61.364s

Total Time for Training 5.122m


**D. Try replacing batch normalization with SELU, and make the necessary adjustments to ensure the network self-normalizes (i.e., standardize the input features, use LeCun normal initialization, make sure the DNN contains only a sequence of dense layers, etc.).**

*Note that input features were already normalized while downloading data*

In [141]:
class CIFAR10V3(nn.Module):
    def __init__(self, input_features=3*32*32, output_neurons=100, num_classes=10, hidden_layers=20):
        super(CIFAR10V3, self).__init__()
        layers = []
        
        layer_first = nn.Linear(input_features, output_neurons)
        layers.append(layer_first)
        layers.append(nn.SELU())
        
        for i in range(hidden_layers-1):
            layer = nn.Linear(output_neurons, output_neurons)
            layers.append(layer)
            layers.append(nn.SELU())

        layer_last = nn.Linear(output_neurons, num_classes)
        layers.append(layer_last)
        
        self.net = nn.Sequential(*layers)
        self.net.apply(weights_init_lecun)

    def forward(self, X):
        flatten = nn.Flatten()
        X = flatten(X)
        return self.net(X)

**LeCun Normal Initialization** is called **Xavier Normal Initialization** (`torch.nn.init.xavier_normal_`).  
But, LeCun Initialization is not directly supported in PyTorch, so here also we need to do some maths.  

The standard deviation for `torch.nn.init.xavier_normal_()` is given in the [documentation](https://shorturl.at/WxECS).  


LeCun Normal Initialization uses a standard deviation of:  

$$\text{std} = \sqrt{\frac{1}{\text{fan\_in}}}$$  

To match this, we need to set the **gain** as follows:  

$$\text{gain} = \sqrt{\frac{\text{fan\_in} + \text{fan\_out}}{\text{fan\_in}}}$$  

In [327]:
def weights_init_lecun(layer):
    if isinstance(layer, nn.Linear):
        gain = np.sqrt(sum(layer.weight.shape) / layer.weight.shape[1])
        torch.nn.init.xavier_normal_(layer.weight, gain=gain)
        torch.nn.init.constant_(layer.bias, 0)

In [146]:
epochs = 100
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
writer = SummaryWriter()

cifar10  = CIFAR10V3()
criterion = nn.CrossEntropyLoss()
optimizer = optim.NAdam(cifar10.parameters(), lr=0.001)
model_v3 = train(cifar10, train_loader, val_loader, criterion, optimizer, device=device, epochs=epochs)
writer.flush()
writer.close()
eval(model_v3, test_loader, device=device)

Epoch [1/5], Train Loss: 2.1656, Val Loss: 2.0423, Val Acc: 28.54%, Time Elapsed 22.917s
Epoch [2/5], Train Loss: 1.7683, Val Loss: 2.1380, Val Acc: 30.68%, Time Elapsed 23.921s
Epoch [3/5], Train Loss: 1.6571, Val Loss: 1.7635, Val Acc: 39.64%, Time Elapsed 26.930s
Epoch [4/5], Train Loss: 1.5897, Val Loss: 2.0045, Val Acc: 32.44%, Time Elapsed 52.490s
Epoch [5/5], Train Loss: 1.5328, Val Loss: 1.7638, Val Acc: 39.46%, Time Elapsed 52.903s

Total Time for Training 2.988m


**E. Try regularizing the model with alpha dropout. Then, without retraining your model, see if you can achieve better accuracy using MC dropout.**

In [7]:
class CIFAR10V4(nn.Module):
    def __init__(self, input_features=3*32*32, output_neurons=100, num_classes=10, hidden_layers=20):
        super(CIFAR10V4, self).__init__()
        layers = []
        
        layer_first = nn.Linear(input_features, output_neurons)
        layers.append(layer_first)
        layers.append(nn.SELU())
        
        for i in range(hidden_layers-1):
            layer = nn.Linear(output_neurons, output_neurons)
            layers.append(layer)
            layers.append(nn.SELU())

        layers.append(nn.AlphaDropout(p=0.1))
        layer_last = nn.Linear(output_neurons, num_classes)
        layers.append(layer_last)
        
        self.net = nn.Sequential(*layers)
        self.net.apply(weights_init_lecun)

    def forward(self, X):
        flatten = nn.Flatten()
        X = flatten(X)
        return self.net(X)

In [320]:
epochs = 100
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
writer = SummaryWriter()

cifar10  = CIFAR10V4()
criterion = nn.CrossEntropyLoss()
optimizer = optim.NAdam(cifar10.parameters(), lr=2e-1)
model_v4 = train(cifar10, train_loader, val_loader, criterion, optimizer, device=device, epochs=epochs)
writer.flush()
writer.close()

eval(model_v4, test_loader, device=device)

Epoch [1/5], Train Loss: 1.9838, Val Loss: 2.0739, Val Acc: 22.34%, Time Elapsed 24.830s
Epoch [2/5], Train Loss: 1.7948, Val Loss: 2.3868, Val Acc: 16.72%, Time Elapsed 30.773s
Epoch [3/5], Train Loss: 1.6686, Val Loss: 1.7714, Val Acc: 33.92%, Time Elapsed 29.112s
Epoch [4/5], Train Loss: 1.5122, Val Loss: 1.5469, Val Acc: 43.60%, Time Elapsed 28.316s
Epoch [5/5], Train Loss: 1.3736, Val Loss: 1.4552, Val Acc: 47.16%, Time Elapsed 27.857s

Total Time for Training 2.348m
Test Accuracy: 50.15%


In [196]:
class MCAlphaDropout(nn.AlphaDropout):
    def call(self, inputs):
        return super().call(inputs).train(True)

Note that I could have directly used MCAlphaDropout in the model creation itself, but, whatif I am using an already trained model and want to implement MCAlphaDropout. So, it makes sense to create an identical model only with MCAlphaDropout instead of AlphaDropout.

In [197]:
mc_model = nn.Sequential(*[
    MCAlphaDropout(layer.p)
    if isinstance(layer, nn.AlphaDropout)
    else layer
    for layer in model_v4.net
])

In [None]:
def mc_dropout_predictions(model, test_loader, device='cpu'):
    model.train(True) # important
    correct = 0
    total = 0
    y_probas = []
    for inputs, targets in test_loader:
        inputs  = inputs.to(device)
        targets = targets.to(device)

        with torch.no_grad():
            outputs = model(inputs)
            outputs = outputs.detach().numpy()
            y_probas.append(outputs)
            
    return np.vstack(y_probas)

In [290]:
# Training is set to true, each time model predicts a different output even for the 
# same inputs, since Dropout layer randomly drops out some neurons with probability p.
# Here we get predictions for the test set `mc_iterations` times and then stack it 
# -> [mc_iterations, 10000, 10] -> 10000 instances and 10 scores for each class.
# Then we average over dim=0 and get the prediction scores

mc_iterations = 100
y_probas = np.stack([mc_dropout_predictions(mc_model, test_loader) for i in range(mc_iterations)])
y_scores = y_probas.mean(axis=0)
y_scores = torch.tensor(y_scores)

In [337]:
# Make predictions

targets = np.hstack([targets for _, targets in test_loader])
targets = torch.tensor(targets)

_, predicted = torch.max(y_scores, 1)

In [343]:
# Accuracy without retraining

correct = 0
total = targets.size(0)
correct += (predicted == targets).sum().item()
        
print(f"Test Accuracy: {100 * correct / total:.2f}%")

Test Accuracy: 10.24%


**F. Retrain your model using 1cycle scheduling and see if it improves training speed and model accuracy.**

In [None]:
epochs = 100
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
writer = SummaryWriter()

criterion = nn.CrossEntropyLoss()
optimizer = optim.NAdam(cifar10.parameters(), lr=3e-2)

scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.05, steps_per_epoch=len(train_loader), epochs=epochs)
model = train(model_v4, train_loader, val_loader, criterion, optimizer, device=device, scheduler=scheduler, epochs=5)
writer.flush()
writer.close()

eval(model, test_loader, device=device)

In [None]:
# %load_ext tensorboard
%tensorboard --logdir=runs