In [None]:
# Importing all the necessary libraries for the project
import numpy as np
import matplotlib.pyplot as plt
import os
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchsummary import summary
from torch.optim.optimizer import Optimizer
from torch.optim import Adam
from collections import defaultdict

import torchvision
import torchvision.transforms as transforms
from torchvision.datasets.vision import VisionDataset

import os
from tqdm import tqdm
from typing import Any, Callable, Optional, Tuple, Union
from pathlib import Path
from PIL import Image
import pandas as pd

In [None]:
# Setting seed values for reproducibility

seed = 1029
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
# some cudnn methods can be random even after fixing the seed
# unless you tell it to be deterministic
torch.backends.cudnn.deterministic = True

#### Defining Parameters and Hyperparameters

In [None]:
# Defining variables, parameters and hyperparameters for the run

run_on_kaggle = 1                           # Set this to 1 if running on Kaggle, 0 if running on local machine

# Setting up the data directories and model save path based on the run environment
if run_on_kaggle:
    train_data_dir = '/kaggle/input/deep-learning-spring-2025-project-1/cifar-10-python'
    val_data_dir = '/kaggle/input/deep-learning-spring-2025-project-1/cifar-10-python'
    test_data_dir = '/kaggle/input/deep-learning-spring-2025-project-1/cifar_test_nolabel.pkl'
    model_save_path = './'
    num_of_workers = 16
    download_data = False

else:
    base_dir = './data'
    train_data_dir = './data/cifar-10-batches-py'
    val_data_dir = './data/cifar-10-batches-py'
    test_data_dir = './data/cifar_test_nolabel.pkl'
    model_save_path = './checkpoint'
    num_of_workers = 0
    download_data = False

    if not os.path.exists(base_dir):
        os.makedirs(base_dir)
    
    if not os.path.exists(model_save_path):
        os.makedirs(model_save_path)
    
    if not os.path.exists(train_data_dir):
        download_data = True
        train_data_dir = './data'
        val_data_dir = './data'
    
    if not os.path.exists(val_data_dir):
        download_data = True
    
    if not os.path.exists(test_data_dir):
       print("No Label Test Data not found")


# Defining Parameters and Hyperparameters
train_batch_size = 128
val_batch_size = 100
test_batch_size = 100
num_epochs = 200
learning_rate = 0.1
momentum = 0.9
weight_decay = 5e-4
cosine_annealing_T_max = num_epochs


# Defining Model Configurations and Techniques
use_cutout = 1                              # Set this to 1 if using Cutout, 0 if not using Cutout
use_mixup = 0                               # Set this to 1 if using Mixup, 0 if not using Mixup
use_label_smoothing = 1                     # Set this to 1 if using Label Smoothing, 0 if not using Label Smoothing
use_lookahead = 1                           # Set this to 1 if using Lookahead, 0 if not using Lookahead


if use_cutout:
    cutout_n_holes = 1
    cutout_length = 8

if use_mixup:
    mixup_alpha = 0.75

if use_label_smoothing:
    label_smoothing_epsilon = 0.2

if use_lookahead:
    lookahead_k = 5
    lookahead_alpha = 0.5

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

In [None]:
# Cutout Implementation for Data Augmentation
class Cutout(object):
    """Randomly mask out one or more patches from an image.

    Args:
        n_holes (int): Number of patches to cut out of each image.
        length (int): The length (in pixels) of each square patch.
    """
    def __init__(self, n_holes, length):
        self.n_holes = n_holes
        self.length = length

    def __call__(self, img):
        """
        Args:
            img (Tensor): Tensor image of size (C, H, W).
        Returns:
            Tensor: Image with n_holes of dimension length x length cut out of it.
        """
        h = img.size(1)
        w = img.size(2)

        mask = np.ones((h, w), np.float32)

        for n in range(self.n_holes):
            y = np.random.randint(h)
            x = np.random.randint(w)

            y1 = np.clip(y - self.length // 2, 0, h)
            y2 = np.clip(y + self.length // 2, 0, h)
            x1 = np.clip(x - self.length // 2, 0, w)
            x2 = np.clip(x + self.length // 2, 0, w)

            mask[y1: y2, x1: x2] = 0.

        mask = torch.from_numpy(mask)
        mask = mask.expand_as(img)
        img = img * mask

        return img

def prepare_data(train_data_dir, val_data_dir):
    '''
    Function to prepare the data for training and validation. This will return the train and validation loader.
    It will also apply data augmentation and data normalization to the data.
    '''

    # Data Augmentation and Transformation for the Training Data
    train_transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.AutoAugment(transforms.AutoAugmentPolicy.CIFAR10),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    # Data Transformation with Cutout for the Training Data
    train_transform_cutout = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.AutoAugment(transforms.AutoAugmentPolicy.CIFAR10),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        Cutout(n_holes=cutout_n_holes, length=cutout_length)
    ])

    # Data Augmentation and Transformation for the Validation Data
    val_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    ])

    if use_cutout:
        final_train_transform = train_transform_cutout
    else:
        final_train_transform = train_transform

    # Loading the CIFAR-10 Training and Validation Data
    train_dataset = torchvision.datasets.CIFAR10(root=train_data_dir, train=True, download=download_data, transform=final_train_transform)
    val_dataset = torchvision.datasets.CIFAR10(root=val_data_dir, train=False, download=download_data, transform=val_transform)

    # Creating the Data Loaders for Training and Validation
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=train_batch_size, shuffle=True, num_workers=num_of_workers)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=val_batch_size, shuffle=False, num_workers=num_of_workers)

    return train_loader, val_loader

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, kernel=3, shortcut_kernel=1, dropout = 0.0):
        super(BasicBlock, self).__init__()
        # First convolutional layer
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=kernel, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        # Second convolutional layer
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=kernel, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        # Shortcut connection
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != self.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size=shortcut_kernel, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * out_channels)
            )

    def forward(self, x):
        # Apply first convolution, batch norm, and ReLU
        out = F.relu(self.bn1(self.conv1(x)))
        # Apply second convolution and batch norm
        out = self.bn2(self.conv2(out))
        # Add shortcut connection
        out += self.shortcut(x)
        # Apply final ReLU
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, channels, strides, kernel_size, shortcut_kernel_size, pool_size, dropout, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = channels[0]
        self.kernel_size = kernel_size
        self.shortcut_kernel_size = shortcut_kernel_size
        self.pool_size = pool_size

        # Initial convolution layer
        self.conv1 = nn.Conv2d(3, channels[0], kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(channels[0])


        self.layers = nn.ModuleList()
        for i in range(len(num_blocks)):
            self.layers.append(
                self._make_layer(block[i], channels[i], num_blocks[i], stride=strides[i], dropout = dropout)
            )

        # Fully connected layer
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.dropout = nn.Dropout(p=dropout)
        self.linear = nn.Linear(channels[-1] * block[-1].expansion, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride, dropout):
        strides = [stride] + [1] * (num_blocks - 1)  # First block uses the specified stride, others use stride=1
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride, self.kernel_size, self.shortcut_kernel_size, dropout))
            self.in_channels = out_channels * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        # Apply initial convolution, batch norm, and ReLU
        out = F.relu(self.bn1(self.conv1(x)))

        # Pass through all layers
        for layer in self.layers:
            out = layer(out)

        # Apply average pooling and flatten
        out = self.avgpool(out)
        out = torch.flatten(out, 1)

        # Apply fully connected layer
        out = self.dropout(out)
        out = self.linear(out)
        return out

#### Defining model architecture

In [None]:
# Defining the ResNet Model Architecture
type_of_block_used = [BasicBlock, BasicBlock, BasicBlock]
num_of_blocks = [4, 5, 3]
num_of_channels = [64, 128, 256]
strides_per_block = [1, 2, 2]
kernel_size = 3
shortcut_kernel_size = 1
pool_size = 8
dropout = 0.0

# Creating the ResNet Model
model = ResNet(type_of_block_used, num_of_blocks, num_of_channels, strides_per_block, kernel_size, shortcut_kernel_size, pool_size, dropout).to(device)

In [None]:
# Checking the model summary and total number of parameters
total_params = sum(p.numel() for p in model.parameters())
print(f'{total_params:,} total parameters.')
if total_params > 5000000:
    raise Exception("Model size exceeds 5 million parameters")

# Checking the model summary
summary(model, (3, 32, 32))

In [None]:
# Added Lookahead Optimizer
class Lookahead(Optimizer):
    def __init__(self, base_optimizer, alpha=0.5, k=6):
        if not 0.0 <= alpha <= 1.0:
            raise ValueError(f'Invalid slow update rate: {alpha}')
        if not 1 <= k:
            raise ValueError(f'Invalid lookahead steps: {k}')
        defaults = dict(lookahead_alpha=alpha, lookahead_k=k, lookahead_step=0)
        self.base_optimizer = base_optimizer
        self.param_groups = self.base_optimizer.param_groups
        self.defaults = base_optimizer.defaults
        self.defaults.update(defaults)
        self.state = defaultdict(dict)
        # manually add our defaults to the param groups
        for name, default in defaults.items():
            for group in self.param_groups:
                group.setdefault(name, default)

    def update_slow(self, group):
        for fast_p in group["params"]:
            if fast_p.grad is None:
                continue
            param_state = self.state[fast_p]
            if 'slow_buffer' not in param_state:
                param_state['slow_buffer'] = torch.empty_like(fast_p.data)
                param_state['slow_buffer'].copy_(fast_p.data)
            slow = param_state['slow_buffer']
            slow.add_(group['lookahead_alpha'], fast_p.data - slow)
            fast_p.data.copy_(slow)

    def sync_lookahead(self):
        for group in self.param_groups:
            self.update_slow(group)

    def step(self, closure=None):
        # print(self.k)
        #assert id(self.param_groups) == id(self.base_optimizer.param_groups)
        loss = self.base_optimizer.step(closure)
        for group in self.param_groups:
            group['lookahead_step'] += 1
            if group['lookahead_step'] % group['lookahead_k'] == 0:
                self.update_slow(group)
        return loss

    def state_dict(self):
        fast_state_dict = self.base_optimizer.state_dict()
        slow_state = {
            (id(k) if isinstance(k, torch.Tensor) else k): v
            for k, v in self.state.items()
        }
        fast_state = fast_state_dict['state']
        param_groups = fast_state_dict['param_groups']
        return {
            'state': fast_state,
            'slow_state': slow_state,
            'param_groups': param_groups,
        }

    def load_state_dict(self, state_dict):
        fast_state_dict = {
            'state': state_dict['state'],
            'param_groups': state_dict['param_groups'],
        }
        self.base_optimizer.load_state_dict(fast_state_dict)

        # We want to restore the slow state, but share param_groups reference
        # with base_optimizer. This is a bit redundant but least code
        slow_state_new = False
        if 'slow_state' not in state_dict:
            print('Loading state_dict from optimizer without Lookahead applied.')
            state_dict['slow_state'] = defaultdict(dict)
            slow_state_new = True
        slow_state_dict = {
            'state': state_dict['slow_state'],
            'param_groups': state_dict['param_groups'],  # this is pointless but saves code
        }
        super(Lookahead, self).load_state_dict(slow_state_dict)
        self.param_groups = self.base_optimizer.param_groups  # make both ref same container
        if slow_state_new:
            # reapply defaults to catch missing lookahead specific ones
            for name, default in self.defaults.items():
                for group in self.param_groups:
                    group.setdefault(name, default)

In [None]:
def mixup_data(x, y, alpha=32):
    '''Returns mixed inputs, pairs of targets, and lambda'''
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]

    index = torch.randperm(batch_size).cuda()
  

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam


def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

In [None]:
# Defining Label Smoothing Cross Entropy Loss
class LabelSmoothingCrossEntropy(nn.Module):
    def __init__(self, eps=0.05, reduction='mean'):
        super(LabelSmoothingCrossEntropy, self).__init__()
        self.eps = eps
        self.reduction = reduction

    def forward(self, output, target):
        c = output.size()[-1]
        log_preds = F.log_softmax(output, dim=-1)
        if self.reduction=='sum':
            loss = -log_preds.sum()
        else:
            loss = -log_preds.sum(dim=-1)
            if self.reduction=='mean':
                loss = loss.mean()
        return loss*self.eps/c + (1-self.eps) * F.nll_loss(log_preds, target, reduction=self.reduction)

In [None]:
def check_gradient_norms(model):
    total_norm = 0.0
    for p in model.parameters():
        if p.grad is not None:
            param_norm = p.grad.data.norm(2)  # L2 norm of gradients
            total_norm += param_norm.item() ** 2
    total_norm = total_norm ** (1.0 / 2)  # Total L2 norm
    return total_norm

In [None]:
# Defining lists to store the training and validation losses, accuracies and gradient norms to plot later
train_losses = []
gradients_norms = []
train_acc = []
test_losses_l1 = []
test_acc_l1 = []
plot_train_loss = []
plot_test_loss = []
plot_gradient_norms = []

best_acc = 0


In [None]:
def train_for_one_epoch(model, device, train_loader, optimizer):
    model.to(device)

    model.train()

    pbar = tqdm(train_loader)

    correct = 0
    processed = 0
    train_loss_accu = 0
    grad_norm_accu = 0

    # Choose the loss function based on whether label smoothing is enabled
    if use_label_smoothing:
        criterion = LabelSmoothingCrossEntropy(eps=label_smoothing_epsilon)
    else:
        criterion = nn.CrossEntropyLoss()
    
    # Iterate over the training data in batches
    for batch_idx, (data, target) in enumerate(pbar):
        data, target = data.to(device), target.to(device)

        # Apply mixup augmentation if enabled
        if use_mixup:
            data, targets_a, targets_b, lam = mixup_data(data, target, mixup_alpha)
        
        optimizer.zero_grad()
        y_pred = model(data)

        # Compute the loss based on whether mixup is enabled
        if use_mixup:
            loss = mixup_criterion(criterion, y_pred, targets_a, targets_b, lam)
        else:
            loss = criterion(y_pred, target)

        # Accumulate the loss for the epoch
        train_loss_accu += loss.item()
        # Append the current loss to the list of training losses
        train_losses.append(loss)

        # Backward pass: compute gradients
        loss.backward()

        # Accumulate gradient norms and append to the list of gradient norms
        grad_norm_accu += check_gradient_norms(model)
        gradients_norms.append(check_gradient_norms(model))

        # Update model parameters using the optimizer
        optimizer.step()

        # Compute predictions and update accuracy counters
        pred = y_pred.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
        processed += len(data)
        
        pbar.set_description(desc= f'Loss={loss.item()}\tAccuracy={100*correct/processed:0.2f}\tGradient Norm={grad_norm_accu/(batch_idx+1):0.2f}')

    # Append the epoch's accuracy, average loss, and average gradient norm to their respective lists
    train_acc.append(100*correct/processed)
    plot_train_loss.append(train_loss_accu/len(train_loader))
    plot_gradient_norms.append(grad_norm_accu/len(train_loader))

    # Print the epoch's average loss, accuracy, and gradient norm
    print(f'Epoch Loss = {train_loss_accu/len(train_loader)} \t Epoch Accuracy = {100*correct/processed:0.2f} \t Gradient Norm = {grad_norm_accu/len(train_loader):0.2f}')

In [None]:
def evaluate(model ,device, test_loader):
    model.eval()
    average_test_loss = 0
    test_loss = 0
    correct = 0

    global best_acc

    # Choose the loss function based on whether label smoothing is enabled
    if use_label_smoothing:
        criterion = LabelSmoothingCrossEntropy(eps=label_smoothing_epsilon)
    else:
        criterion = nn.CrossEntropyLoss()
    
    # Iterate over the validation data in batches for inference
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    
    # Calculate the average loss and accuracy for the validation data and print the results
    average_test_loss = test_loss/len(test_loader.dataset)
    test_losses_l1.append(average_test_loss)

    print("Validation Metrics: Average Loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)".format(
        average_test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

    test_acc_l1.append(100. * correct / len(test_loader.dataset))

    # Save the model if the current accuracy is the best so far
    if 100. * correct / len(test_loader.dataset) > best_acc:
        best_acc = 100. * correct / len(test_loader.dataset)
        model.cpu()
        model_scripted = torch.jit.script(model)
        model_scripted.save(os.path.join(model_save_path, 'best_model.pt'))
        print(f"Model saved with accuracy {best_acc}")

In [None]:
def train_model(model, device):

    print("Preparing Data")
    train_loader, val_loader = prepare_data(train_data_dir, val_data_dir)

    # Choose the optimizer based on whether lookahead is enabled
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum, weight_decay=weight_decay)
    if use_lookahead:
        optimizer = Lookahead(optimizer, alpha=lookahead_alpha, k=lookahead_k)

    # Scheduler for Cosine Annealing
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=cosine_annealing_T_max)
    
    # Train the model for the specified number of epochs
    
    print("Starting Training")
    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}")
        train_for_one_epoch(model, device, train_loader, optimizer)
        evaluate(model, device, val_loader)
        scheduler.step()

In [None]:
train_model(model, device)

In [None]:
# Getting the data for plotting the training and validation losses, accuracies and gradient norms
arr_train=np.array(torch.Tensor(train_losses).cpu())
arr_test=np.array(torch.Tensor(test_losses_l1).cpu())
arr_train_acc=np.array(torch.Tensor(train_acc).cpu())
arr_test_acc=np.array(torch.Tensor(test_acc_l1).cpu())
gradients_norms_int = np.array(torch.Tensor(gradients_norms).cpu())
plot_train_loss_fin = np.array(torch.Tensor(plot_train_loss).cpu())
plot_gd_norm_fin = np.array(torch.Tensor(plot_gradient_norms).cpu())

In [None]:
plt.plot(arr_train_acc)
plt.plot(arr_test_acc)
plt.legend(["train","test"])
plt.title("Epoch vs Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.show()

In [None]:
plt.plot(plot_gd_norm_fin)
# plt.plot(arr_test)
plt.legend(["train"])
plt.title("Epoch vs GD")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()

In [None]:
plt.plot(plot_train_loss_fin)
# plt.plot(arr_test)
plt.legend(["train"])
plt.title("Epoch vs Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()

In [None]:
# plt.plot(arr_train1)
plt.plot(arr_test)
plt.legend(["test"])
plt.title("Epoch vs Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()

In [None]:
model = torch.load(os.path.join(model_save_path, 'best_model.pt'), weights_only = False)
model = model.to(device)

In [None]:
# Custom Test Data Loader

class Cifar10NoLabelDataset(VisionDataset):

    def __init__(
        self,
        root: Union[str, Path],
        transform: Optional[Callable] = None
    ) -> None:
        
        super().__init__(root, transform=transform, target_transform=None)

        self.data: Any = []
        self.targets = []
        
        with open(root, 'rb') as fo:
            batch = pickle.load(fo, encoding='bytes')

        
        self.data = batch[b'data']
        self.targets = batch[b'ids']
        # self.data = np.vstack(self.data).reshape(-1, 3, 32, 32)
        # self.data = self.data.transpose((0, 2, 3, 1))  # convert to HWC

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index: int) -> Tuple[Any, Any]:
        img, target = self.data[index], self.targets[index]

        img = Image.fromarray(img)

        if self.transform is not None:
            img = self.transform(img)

        return img, target

In [None]:
test_tranform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

test_data_no_label = Cifar10NoLabelDataset(test_data_dir, transform=test_tranform)
test_loader_no_label = torch.utils.data.DataLoader(test_data_no_label, batch_size=test_batch_size, shuffle=False, num_workers=num_of_workers)

In [None]:
indexes = []
predictions = []
for batch_idx, (inputs,index) in enumerate(test_loader_no_label):
        inputs, index = inputs.to(device), index.to(device)
        indexes.extend(index.cpu().tolist())
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        predictions.extend(torch.tensor(predicted).cpu().tolist())
        

dictionary = {'ID':indexes,'Label':predictions}
print(len(indexes))
print(len(predictions))

df = pd.DataFrame(dictionary)

In [None]:
for i in range(10):
    plt.subplot(1, 10, i+1)
    plt.imshow(test_data_no_label.data[i])
    plt.axis('off')
plt.show()

In [None]:
df.loc[0:9]

In [None]:
df.to_csv('out.csv',index=False)