In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pickle
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, TensorDataset
import pandas as pd
import os
from PIL import Image
from datetime import datetime
import torch.optim.lr_scheduler as lr_scheduler
from torchsummary import summary

In [None]:
# Template code for reading the test file
def load_cifar_batch(file):
    with open(file, 'rb') as fo:
        batch = pickle.load(fo, encoding='bytes')
    return batch

In [None]:
# Get Train Test Loader
def get_data(batch_size=128):
        transform_train = transforms.Compose([
            transforms.AutoAugment(policy=transforms.AutoAugmentPolicy.CIFAR10),
            transforms.RandomCrop(32, padding=4),
            transforms.RandomAdjustSharpness(sharpness_factor = 2,p = 0.2),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.ColorJitter(brightness = 0.1,contrast = 0.1,saturation = 0.1),
            transforms.RandomRotation(10),
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2470, 0.2435, 0.2616)),
            transforms.RandomErasing(p=0.2,scale=(0.02, 0.1),value=1.0, inplace=False),
        ])

        transform_test = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2470, 0.2435, 0.2616)),
        ])

        dataset_path = "/content/drive/MyDrive/dl project1/data"
        train_dataset = torchvision.datasets.CIFAR10(root=dataset_path, train=True,
                                                    download=False, transform=transform_train)
        test_dataset = torchvision.datasets.CIFAR10(root=dataset_path, train=False,
                                                    download=False, transform=transform_test)
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

        return train_loader, test_loader

In [None]:
# Define Residual Block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = self.shortcut(x)
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += identity
        return F.relu(out)

# Define Optimized ResNet Model (Under 5M Params)
class OptimizedResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(OptimizedResNet, self).__init__()
        # Reduce Initial Convolution Filters
        # Increase Initial Convolution Filters
        self.conv1 = nn.Conv2d(3, 52, kernel_size=3, stride=1, padding=1, bias=False)  # 54 → 52
        self.bn1 = nn.BatchNorm2d(52)

        # Adjust Layer Channels and Depth
        self.layer1 = self._make_layer(52, 96, 3, stride=1)
        self.layer2 = self._make_layer(96, 128, 2, stride=2)
        self.layer3 = self._make_layer(128, 240, 2, stride=2)  # 248 → 240
        self.layer4 = self._make_layer(240, 344, 1, stride=2)  # 352 → 344

        # Adaptive Pooling to Ensure Fixed Output Size
        self.avg_pool = nn.AdaptiveAvgPool2d(1)

        # Adjust Fully Connected Layer
        self.fc = nn.Linear(344, num_classes)  # 352 → 344



        # Apply weight initialization
        self._initialize_weights()

        # Print model summary and parameter count
        self.print_model_info()

    def _make_layer(self, in_channels, out_channels, blocks, stride):
        layers = [ResidualBlock(in_channels, out_channels, stride)]
        for _ in range(1, blocks):
            layers.append(ResidualBlock(out_channels, out_channels))
        return nn.Sequential(*layers)

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

    def print_model_info(self):
        """Prints model summary and total trainable parameters"""
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.to(device)
        summary(self, (3, 32, 32))
        num_params = sum(p.numel() for p in self.parameters() if p.requires_grad)
        print(f"\nTotal Trainable Parameters: {num_params:,} ({num_params/1e6:.2f}M)\n")

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.avg_pool(out)
        out = torch.flatten(out, 1)
        out = self.fc(out)
        return out

In [None]:
# Model Initialization
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = OptimizedResNet().to(device)

# Define Loss Function and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9, weight_decay=5e-4)
scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max=20)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 52, 32, 32]           1,404
       BatchNorm2d-2           [-1, 52, 32, 32]             104
            Conv2d-3           [-1, 96, 32, 32]           4,992
       BatchNorm2d-4           [-1, 96, 32, 32]             192
            Conv2d-5           [-1, 96, 32, 32]          44,928
       BatchNorm2d-6           [-1, 96, 32, 32]             192
            Conv2d-7           [-1, 96, 32, 32]          82,944
       BatchNorm2d-8           [-1, 96, 32, 32]             192
     ResidualBlock-9           [-1, 96, 32, 32]               0
           Conv2d-10           [-1, 96, 32, 32]          82,944
      BatchNorm2d-11           [-1, 96, 32, 32]             192
           Conv2d-12           [-1, 96, 32, 32]          82,944
      BatchNorm2d-13           [-1, 96, 32, 32]             192
    ResidualBlock-14           [-1, 96,

In [None]:
# Load CIFAR-10 Data
batch_size = 128
train_loader, test_loader = get_data(batch_size)

In [None]:
model_path = '/content/drive/MyDrive/dl project1/best_resnet.pth'
# model_path =''
if model_path:
    try:
        model.load_state_dict(torch.load(model_path, map_location=device))
        print("Loaded model checkpoint from", model_path)
    except Exception as e:
        print(f"Error loading model checkpoint: {e}. Training from scratch.")
else:
    print("No checkpoint found. Training from scratch.")

num_epochs = 100
patience = 3  # Number of epochs to wait before stopping if loss increases and accuracy decreases
best_accuracy = 0
lowest_loss = float('inf')
no_improve_count = 0  # Counter for early stopping

best_model_path = '/content/drive/MyDrive/dl project1/best_resnet.pth'  # Save best model separately

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)  # Move data to device
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = 100. * correct / total

    scheduler.step()

    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%, LR: {scheduler.get_last_lr()}')

    def count_parameters(model):
        return sum(p.numel() for p in model.parameters() if p.requires_grad)

    num_params = count_parameters(model)
    print(f"Total Trainable Parameters: {num_params/1e6:.2f}M")

    # Save the best model
    if epoch_accuracy > best_accuracy or epoch_loss < lowest_loss:
        best_accuracy = epoch_accuracy
        lowest_loss = epoch_loss
        torch.save(model.state_dict(), best_model_path)
        print(f"Best model saved at Epoch {epoch+1} with Accuracy: {best_accuracy:.2f}% and Loss: {lowest_loss:.4f}")

    # Early Stopping Logic
    if epoch_accuracy < best_accuracy and epoch_loss > lowest_loss:
        no_improve_count += 1
        print(f"Warning: Accuracy decreased & Loss increased for {no_improve_count} epochs.")
    else:
        no_improve_count = 0  # Reset counter if model improves

    if no_improve_count >= patience:
        print("Early stopping triggered. Stopping training.")
        break  # Exit training loop

print("Training complete. Loading the best model before evaluation.")
model.load_state_dict(torch.load(best_model_path))  # Load the best model for testing

# Testing Loop
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = model(inputs)
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

print(f'Test Accuracy: {100.*correct/total:.2f}%')

In [None]:
# Create a dataset from the images array
class CIFAR_Dataset_PKL_Load(Dataset):
    def __init__(self, images, transform=None):
        self.images = images
        if transform is None:
            # Default transform: convert numpy array to PIL Image, then to tensor, then normalize
            self.transform = transforms.Compose([
                transforms.ToPILImage(),
                transforms.ToTensor(),
                transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2470, 0.2435, 0.2616)),
            ])
        else:
            self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img = self.images[idx]
        if isinstance(img, np.ndarray):
            img = img.astype('uint8')
        if self.transform:
            img = self.transform(img)
        return img

In [None]:
# model_path = '/content/drive/MyDrive/dl project1/best_resnet.pth'
model_path=''
# Load the test batch (update the file path if necessary)
cifar10_batch = load_cifar_batch('/content/drive/MyDrive/dl project1/data/cifar_test_nolabel.pkl')
images = cifar10_batch[b'data']
print(f"Loaded test batch with {images.shape[0]} images")

final_dataset = CIFAR_Dataset_PKL_Load(images)
final_loader = DataLoader(final_dataset, batch_size=batch_size, shuffle=False)

def generate_predictions_csv(model_path, model, test_loader, device):
    if model_path:
        try:
            model.load_state_dict(torch.load(model_path, map_location=device))
            print("Loaded model checkpoint from", model_path)
        except Exception as e:
            print(f"Error loading model checkpoint: {e}. Using the existing model.")
    else:
        print("No checkpoint found. Running inference with the existing model.")


    model.eval()
    predictions = []
    with torch.no_grad():
        for batch in final_loader:
            batch = batch.to(device)
            outputs = model(batch)
            _, preds = torch.max(outputs, 1)
            predictions.extend(preds.cpu().numpy().tolist())

    n1 = datetime.now()
    filename = 'submission_' + model_name + str(n1) + '.csv'
    df = pd.DataFrame({"ID": range(len(predictions)), "Labels": predictions})
    filepath = os.path.join('/content/drive/MyDrive/dl project1', filename)
    df.to_csv(filename, index=False)
    print("Predictions saved to "+ filename)

model_name = 'trial'
generate_predictions_csv(model_name, model, final_loader, device)


Loaded test batch with 10000 images
Error loading model checkpoint: [Errno 2] No such file or directory: 'trial'. Using the existing model.


  model.load_state_dict(torch.load(model_path, map_location=device))


Predictions saved to submission_trial2025-03-14 03:45:48.226197.csv
