In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import numpy as np
import random
import time
import matplotlib.pyplot as plt
import seaborn as sns

from torch.utils.data import DataLoader, random_split
from sklearn.metrics import (
    precision_recall_fscore_support,
    confusion_matrix
)

# Reproducibility- Now this part contributes to 3 things in our code:
#1. It fixes all sorts of randomness including the CPU GPU numpy and Python
#2. It forces convulsion algorigthms
#3. It disables cuDNN benchmarks- we can say from this that it diables dynamic algorithm selection
def set_seed(seed=42):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
#This is very important for comparision between plain CNN and Residual CNN
set_seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [2]:
#Now this part of cell is for data loading:
def get_dataloaders(batch_size=128):
    #Doing all this will make sure that the generalization is improve dand overfitting is reduced
    train_transform = transforms.Compose([
        #the below statememnt addds padding to the image by 4 and add translation invariance
        transforms.RandomCrop(32, padding=4),
        #Now we flip 50% of the images
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(
            brightness=0.2,
            contrast=0.2,
            saturation=0.2,
            hue=0.1
        ),
        #Now we convert this HWC image to CHW tensor
        transforms.ToTensor(),
        #We then apply channel wise normalization:
        transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
    ])
    test_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
    ])
    #We now give it our real data set CIFAR10
    full_train = torchvision.datasets.CIFAR10(
        root='./data',
        train=True,
        download=True,
        transform=train_transform
    )
    #We are going to be splitting the data again into training and testing and be following the 8:2 rule split
    val_size = 5000
    train_size = len(full_train) - val_size
    train_dataset, val_dataset = random_split(full_train, [train_size, val_size])
    test_dataset = torchvision.datasets.CIFAR10(
        root='./data',
        train=False,
        download=True,
        transform=test_transform
    )
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader   = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, val_loader, test_loader

In [3]:
#We count the number of traininable parameters in the functuion which is then used for model comparision
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [4]:
#This block is all of plain CNN architecture
class PlainCNN(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),

            nn.Conv2d(64, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(64, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),

            nn.Conv2d(128, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(128, 256, 3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),

            nn.AdaptiveAvgPool2d(1)
        )
        #We then finally map all the 10 logits
        self.classifier = nn.Linear(256, num_classes)
    #
    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        return self.classifier(x)
#properties of the CNN layer wehre deep features are extracted, channels are increased. We downsample to reudce the spatial size and final global avg pooling reduces parameters

In [5]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(
            in_channels, out_channels,
            kernel_size=3,
            stride=stride,
            padding=1,
            bias=False
        )
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(
            out_channels, out_channels,
            kernel_size=3,
            padding=1,
            bias=False
        )
        self.bn2 = nn.BatchNorm2d(out_channels)
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(
                    in_channels,
                    out_channels,
                    kernel_size=1,
                    stride=stride,
                    bias=False
                ),
                nn.BatchNorm2d(out_channels)
            )
        else:
            self.shortcut = nn.Identity()

    def forward(self, x):
        identity = self.shortcut(x)
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += identity
        out = F.relu(out)
        return out

In [6]:
class ResidualCNN(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()
        self.initial = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )
        self.layer1 = ResidualBlock(64, 64)
        self.layer2 = ResidualBlock(64, 128, stride=2)
        self.layer3 = ResidualBlock(128, 256, stride=2)
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(256, num_classes)
    def forward(self, x):
        x = self.initial(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

In [None]:
#This is an evaluation function we use to set the model to evaluation mode to disable gradiants, compute loss and accuracy
#To remmeber:
#1. We are using .eval() to disable dropout to change the BN behaviour
#2. no_grad- like we used in the week 3 we use this to save  a lot of memory
def evaluate(model, loader):
    model.eval()
    correct = 0
    total = 0
    loss_total = 0
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        for x,y in loader:
            x,y = x.to(device), y.to(device)
            outputs = model(x)
            loss = criterion(outputs,y)
            loss_total += loss.item()

            _,pred = outputs.max(1)
            correct += pred.eq(y).sum().item()
            total += y.size(0)
    return loss_total/len(loader), 100*correct/total

#This is a training model responsible for the fllowing:
# forward ppass --> compute loss --> backward pass --> optimizer step --> LR Scheduler step
def train_model(model, train_loader, val_loader, epochs=30, lr=1e-3):
    model.to(device)
    optimizer = optim.AdamW(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.OneCycleLR(
        optimizer,
        max_lr=lr,
        epochs=epochs,
        steps_per_epoch=len(train_loader)
    )
    criterion = nn.CrossEntropyLoss()
    history = {"train_loss":[], "val_loss":[],"train_acc":[], "val_acc":[]}
    start = time.time()
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        correct = 0
        total = 0
        for x,y in train_loader:
            x,y = x.to(device), y.to(device)
            optimizer.zero_grad()
            outputs = model(x)
            loss = criterion(outputs,y)
            loss.backward()
            optimizer.step()
            scheduler.step()
            train_loss += loss.item()
            _,pred = outputs.max(1)
            correct += pred.eq(y).sum().item()
            total += y.size(0)
        train_acc = 100*correct/total
        val_loss, val_acc = evaluate(model, val_loader)
        history["train_loss"].append(train_loss/len(train_loader))
        history["val_loss"].append(val_loss)
        history["train_acc"].append(train_acc)
        history["val_acc"].append(val_acc)
        print(f"Epoch {epoch+1}: "f"Train Acc {train_acc:.2f} | "f"Val Acc {val_acc:.2f}")
    training_time = (time.time()-start)/60
    return history, training_time
#History tracking: stores all the losses and accuracies (train and validation) ---> this is used for learning curves and overfitting gaps

In [8]:
def plot_learning_curves(history, label):
    plt.plot(history["train_acc"], label=f"{label} Train")
    plt.plot(history["val_acc"], label=f"{label} Val")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()

In [9]:
def compute_metrics(model, loader):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for x,y in loader:
            x = x.to(device)
            outputs = model(x)
            _,pred = outputs.max(1)
            all_preds.extend(pred.cpu().numpy())
            all_labels.extend(y.numpy())
    precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average=None)
    macro = precision_recall_fscore_support(all_labels, all_preds, average="macro")
    cm = confusion_matrix(all_labels, all_preds)
    return precision, recall, f1, macro, cm

In [10]:
#WE now compute the metrics like accuracy precision and recall for the ocnfusion matrix
def compute_metrics(model, loader):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for x,y in loader:
            x = x.to(device)
            outputs = model(x)
            _,pred = outputs.max(1)
            all_preds.extend(pred.cpu().numpy())
            all_labels.extend(y.numpy())

    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average=None
    )
    macro = precision_recall_fscore_support(all_labels, all_preds, average="macro")
    cm = confusion_matrix(all_labels, all_preds)
    return precision, recall, f1, macro, cm

In [11]:
#This function for misclassificaiton is being shown to find the confidence of the model aand find out the wrong predictiosn
#THis makes sure taht we can do a proper error analysis and find out failures systematically
def show_misclassified(model, loader, num=12):
    model.eval()
    images = []
    preds = []
    labels = []
    probs = []
    with torch.no_grad():
        for x,y in loader:
            x = x.to(device)
            outputs = model(x)
            p = F.softmax(outputs, dim=1)
            conf, pred = p.max(1)
            for i in range(len(y)):
                if pred[i] != y[i]:
                    images.append(x[i].cpu())
                    preds.append(pred[i].item())
                    labels.append(y[i].item())
                    probs.append(conf[i].item())
                    if len(images) >= num:
                        break
            if len(images) >= num:
                break
    fig, axes = plt.subplots(3,4, figsize=(12,8))
    for i, ax in enumerate(axes.flat):
        img = images[i]*0.5 + 0.5
        ax.imshow(img.permute(1,2,0))
        ax.set_title(f"P:{preds[i]} ({probs[i]:.2f})\nT:{labels[i]}")
        ax.axis("off")
    plt.show()

In [12]:
import pandas as pd
#We are now going to be computing a master table for the strucutred comparision of MLP, Plain CNN, Residual CNN
def build_master_table(mlp_stats, plain_stats, res_stats):
    df = pd.DataFrame({
        "Model": ["Week 3 MLP", "Plain CNN", "Residual CNN"],
        "Params (k)": [
            mlp_stats["params"]/1000,
            plain_stats["params"]/1000,
            res_stats["params"]/1000
        ],
        "Test Acc (%)": [
            mlp_stats["test_acc"],
            plain_stats["test_acc"],
            res_stats["test_acc"]
        ],
        "Overfitting Gap (%)": [
            mlp_stats["gap"],
            plain_stats["gap"],
            res_stats["gap"]
        ],
        "Training Time (min)": [
            mlp_stats["time"],
            plain_stats["time"],
            res_stats["time"]
        ]
    })
    return df

In [13]:
train_loader, val_loader, test_loader = get_dataloaders(batch_size=128)

  entry = pickle.load(f, encoding="latin1")


In [None]:
#Now we traint he plain CNN: where test accuracy, parameter count and overfitting gap is evaluated
plain_model = PlainCNN()
plain_history, plain_time = train_model(
    plain_model,
    train_loader,
    val_loader,
    epochs=30,
    lr=1e-3
)
plain_test_loss, plain_test_acc = evaluate(plain_model,test_loader)
plain_params = count_parameters(plain_model)
plain_gap = plain_history["train_acc"][-1] - plain_history["val_acc"][-1]

In [15]:
#Here we traint he residual CNN network for the same aspects as the one for the plain CNN
res_model = ResidualCNN()
res_history, res_time = train_model(
    res_model,
    train_loader,
    val_loader,
    epochs=30,
    lr=1e-3
)
res_test_loss, res_test_acc = evaluate(res_model,test_loader)
res_params = count_parameters(res_model)
res_gap = res_history["train_acc"][-1] - res_history["val_acc"][-1]

KeyboardInterrupt: 

In [None]:
plt.figure(figsize=(8,5))
plot_learning_curves(plain_history, "Plain CNN")
plot_learning_curves(res_history, "Residual CNN")
plt.title("Learning Curves")
plt.show()

In [None]:
precision, recall, f1, macro, cm = compute_metrics(res_model, test_loader)

plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()

In [None]:
show_misclassified(res_model, test_loader)

In [None]:
#So to summarize our complete pipeline here:
