In [2]:

import kagglehub
kagglehub.login()

data = kagglehub.competition_download('deep-learning-spring-2025-project-1')

print('Data source import complete.')

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

import torch
import torch.nn as nn
import torch.nn.functional as F
import tensorflow as tf
from tensorflow.keras import layers

import os
import shutil

os.makedirs("/root/.kaggle", exist_ok=True)

shutil.move("/content/kaggle.json", "/root/.kaggle/kaggle.json")

os.chmod("/root/.kaggle/kaggle.json", 600)
!kaggle competitions download -c deep-learning-spring-2025-project-1
!unzip deep-learning-spring-2025-project-1.zip -d /content/

ModuleNotFoundError: No module named 'kagglehub'

In [3]:
import pickle
def unpickle(file_path):
    with open(file_path, 'rb') as fo:
        data_dict = pickle.load(fo, encoding='bytes')
    return data_dict

In [None]:
file_path = "cifar_test_nolabel.pkl"  # Path to the test data
data = unpickle(file_path)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
import pickle
from torchsummary import summary

Using device: cuda
Files already downloaded and verified
Files already downloaded and verified
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 32, 32]           1,728
       BatchNorm2d-2           [-1, 64, 32, 32]             128
              ReLU-3           [-1, 64, 32, 32]               0
            Conv2d-4          [-1, 256, 32, 32]         147,456
       BatchNorm2d-5          [-1, 256, 32, 32]             512
              ReLU-6          [-1, 256, 32, 32]               0
            Conv2d-7          [-1, 256, 32, 32]         589,824
       BatchNorm2d-8          [-1, 256, 32, 32]             512
            Conv2d-9          [-1, 256, 32, 32]          16,384
      BatchNorm2d-10          [-1, 256, 32, 32]             512
             ReLU-11          [-1, 256, 32, 32]               0
    ResidualBlock-12          [-1, 256, 32, 32]               0
        

In [None]:
# Check for MPS (Apple GPU) instead of CUDA
if torch.backends.mps.is_available():
    device = torch.device("mps")  # Use Apple GPU
    print("Using Apple GPU (MPS)")
elif torch.cuda.is_available():
    device = torch.device("cuda")  # Use NVIDIA GPU (not available on Mac)
    print("Using NVIDIA GPU:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")  # Default to CPU
    print("Using CPU")

print("Selected Device:", device)

In [None]:
# Data Preprocessing & Augmentation
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.RandomRotation(15),  # New: Random rotation
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # New: Color jitter
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
# Load CIFAR-10 dataset
batch_size = 128
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=4)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
testloader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=4)

In [None]:
# Define Basic Residual Block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += self.shortcut(x)
        out = self.relu(out)
        return out

In [None]:
# Define ResNet-18 (Optimized for ≤ 5M Params)
class ResNet18_Small(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet18_Small, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)

        self.layer1 = self._make_layer(64, 2, stride=1)
        self.layer2 = self._make_layer(256, 2, stride=2)  # Fixed incorrect channel size
        self.layer3 = self._make_layer(128, 2, stride=2)
        self.layer4 = self._make_layer(256, 2, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256, num_classes)

    def _make_layer(self, out_channels, blocks, stride):
        layers = []
        layers.append(ResidualBlock(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(ResidualBlock(out_channels, out_channels, 1))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

In [None]:
# Initialize Model, Move to GPU & Print Parameter Table
model = ResNet18_Small().to(device)
summary(model, input_size=(3, 32, 32))

# Define Loss & Optimizer
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)  # Label Smoothing
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5, verbose=True)

# Mixup Data Augmentation
def mixup_data(x, y, alpha=1.0):
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1
    batch_size = x.size()[0]
    index = torch.randperm(batch_size).to(x.device)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

In [None]:
# Training Function
def train_model(model, trainloader, testloader, criterion, optimizer, scheduler, num_epochs=100):
    best_acc = 0
    patience_counter = 0
    early_stopping_patience = 10

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in trainloader:
            inputs, labels = inputs.to(device), labels.to(device)
            inputs, targets_a, targets_b, lam = mixup_data(inputs, labels)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = mixup_criterion(criterion, outputs, targets_a, targets_b, lam)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            correct += predicted.eq(labels).sum().item()
            total += labels.size(0)

        train_acc = correct / total * 100
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(trainloader):.4f}, Train Accuracy: {train_acc:.2f}%")

        test_acc = evaluate_model(model, testloader)
        scheduler.step(test_acc)

        if test_acc > best_acc:
            best_acc = test_acc
            patience_counter = 0
        else:
            patience_counter += 1

        if patience_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break

In [None]:
# Evaluate Model
def evaluate_model(model, testloader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in testloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)
            correct += predicted.eq(labels).sum().item()
            total += labels.size(0)
    acc = correct / total * 100
    print(f"\nTest Accuracy: {acc:.2f}%")
    return acc

In [None]:
# Train the model
train_model(model, trainloader, testloader, criterion, optimizer, scheduler, num_epochs=100)

In [None]:
class CIFAR10TestDataset(Dataset):
    def __init__(self, data_dict):
        self.data = data_dict[b'data']
        self.transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((32, 32)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Same as CIFAR-10 norm
        ])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image = self.data[idx]  # No reshaping or transposing
        image = self.transform(image)  # Apply transformations
        return image

In [None]:
# Function to unpickle the test data
def unpickle(file_path):
    with open(file_path, 'rb') as fo:
        data_dict = pickle.load(fo, encoding='bytes')
    return data_dict

# Load the unpickled test data
file_path = "cifar_test_nolabel.pkl"  # Path to the test data
data = unpickle(file_path)

# Create the dataset and dataloader
testset = CIFAR10TestDataset(data)
testloader = DataLoader(testset, batch_size=32, shuffle=False)

# Assuming you have a pretrained model, ensure it's in evaluation mode
model.eval()

# Prepare for inference
predictions = []
image_ids = list(range(len(testset)))  # Ensure all test images have unique IDs

# Move the model to device (if not done already)
model.to(device)

# Run inference
with torch.no_grad():
    for batch_idx, inputs in enumerate(testloader):
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, predicted = outputs.max(1)
        predictions.extend(predicted.cpu().numpy())  # Store predictions

# Ensuring exactly 10,000 predictions (if that's the expected number of test images)
assert len(predictions) == 10000, f"Expected 10,000 predictions, but got {len(predictions)}"

# Convert predictions to a DataFrame
submission_df = pd.DataFrame({"ID": image_ids, "Labels": predictions})

# Save predictions to a CSV file
submission_df.to_csv("submission.csv", index=False)
print("Submission file saved as submission.csv with 10,000 rows.")


Submission file saved as submission.csv with 10,000 rows.
