In [11]:
import numpy as np
import pandas as pd
import os
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, random_split, TensorDataset
from torch.optim.lr_scheduler import StepLR, MultiStepLR
from torch.optim.lr_scheduler import CosineAnnealingLR

from PIL import Image
import torch.optim.lr_scheduler as lr_scheduler
import matplotlib.pyplot as plt

# auto. choose CPU or GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cpu


In [3]:
# Function to load CIFAR-10 dataset
def load_cifar_batch(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

# Specify the directory containing CIFAR-10 batches
cifar10_dir = r'C:\Users\dhair\Downloads\deep-learning-spring-2025-project-1\cifar-10-python\cifar-10-batches-py'

# Load metadata (labels)
meta_data_dict = load_cifar_batch(os.path.join(cifar10_dir, 'batches.meta'))
label_names = [label.decode('utf-8') for label in meta_data_dict[b'label_names']]

# Load training data
train_data = []
train_labels = []
for i in range(1, 6):
    batch = load_cifar_batch(os.path.join(cifar10_dir, f'data_batch_{i}'))
    train_data.append(batch[b'data'])
    train_labels += batch[b'labels']

train_data = np.vstack(train_data).reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)  # Convert to HWC format
train_labels = np.array(train_labels)

In [4]:
train_data.shape

(50000, 32, 32, 3)

In [5]:
train_labels.shape

(50000,)

In [6]:
# Data augmentation and normalization
transform = transforms.Compose([
    transforms.ToPILImage(),  # Convert numpy array to PIL Image
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness = 0.1,contrast = 0.1,saturation = 0.1),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomAdjustSharpness(sharpness_factor = 2,p = 0.2),
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261)),
    transforms.RandomErasing(p=0.2,scale=(0.02, 0.1),value=1.0, inplace=False)
])

In [7]:
# Convert to TensorDataset and apply transformations
class CustomCIFAR10Dataset(torch.utils.data.Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img = self.images[idx]
        label = self.labels[idx]

        if self.transform:
            img = self.transform(img)

        return img, label

train_dataset = CustomCIFAR10Dataset(train_data, train_labels, transform=transform)

In [8]:
# Split into training and validation sets
#train_size = int(0.9 * len(train_dataset))
#val_size = len(train_dataset) - train_size
#train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

test_transform = transforms.Compose([
    transforms.ToPILImage(),  # Convert numpy array to PIL Image
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261))
])

batch_test_dict = load_cifar_batch(os.path.join(cifar10_dir, 'test_batch'))
val_images = batch_test_dict[b'data'].reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)
val_labels = np.array(batch_test_dict[b'labels'])

val_dataset = CustomCIFAR10Dataset(val_images, val_labels, transform=test_transform)

In [9]:
# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=256, shuffle=False, num_workers=4)

# Load test dataset
cifar_test_path = r'C:\Users\dhair\Downloads\deep-learning-spring-2025-project-1\cifar-10-python\cifar-10-batches-py\cifar_test_nolabel.pkl'
test_batch = load_cifar_batch(cifar_test_path)
test_images = test_batch[b'data'].astype(np.float32) / 255.0

# Convert test dataset to Tensor
test_dataset = [(test_transform(img),) for img in test_images]
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False, num_workers=4)

In [12]:
# Define Basic Residual Block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, shortcut_kernel=1, stride=1, use_se=True):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, shortcut_kernel, stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

        # Squeeze and Excitation (SE) block
        self.use_se = use_se
        if self.use_se:
            self.se = nn.Sequential(
                nn.AdaptiveAvgPool2d(1),
                nn.Conv2d(out_channels, out_channels // 16, 1),
                nn.ReLU(),
                nn.Conv2d(out_channels // 16, out_channels, 1),
                nn.Sigmoid()
            )

        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        identity = self.shortcut(x)
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.use_se:
            se_weight = self.se(out)
            out = out * se_weight

        out += identity
        return self.relu(out)


# Define ResNet Model
class CustomResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(CustomResNet, self).__init__()
        self.in_channels = 64

        # Initial convolution
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)

        # Residual layers configuration
        self.layers = self._make_layer([64, 128, 256], [[4, 4], [3, 4], [3]], kernel_sizes=[3, 3, 3])

        # Average pooling
        self.avg_pool = nn.AdaptiveAvgPool2d(8)

        # Fully connected layer
        self.fc = nn.Linear(256 * 8 * 8, num_classes)

    def _make_layer(self, channels, num_blocks, kernel_sizes):
        layers = []
        for i, (out_channels, blocks) in enumerate(zip(channels, num_blocks)):
            for _ in range(blocks[0]):  # Residual block counts
                layers.append(ResidualBlock(self.in_channels, out_channels, kernel_size=kernel_sizes[i]))
                self.in_channels = out_channels  # Ensure consistent input channel size
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.layers(out)
        out = self.avg_pool(out)
        out = torch.flatten(out, 1)
        out = self.fc(out)
        return out


# Instantiate Model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CustomResNet(num_classes=10).to(device)

# Optimizer and Scheduler
optimizer = optim.SGD(model.parameters(), lr=0.1, weight_decay=0.0005)
scheduler = CosineAnnealingLR(optimizer, T_max=50)  # CosineAnnealingLR

# Loss function
criterion = nn.CrossEntropyLoss()

# Print model summary
print(model)

CustomResNet(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (layers): Sequential(
    (0): ResidualBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential()
      (se): Sequential(
        (0): AdaptiveAvgPool2d(output_size=1)
        (1): Conv2d(64, 4, kernel_size=(1, 1), stride=(1, 1))
        (2): ReLU()
        (3): Conv2d(4, 64, kernel_size=(1, 1), stride=(1, 1))
        (4): Sigmoid()
      )
      (relu): ReLU(inplace=True)
    )
    (1): ResidualBlock(
      (conv1): Conv2d

In [13]:
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"🔹 Total Parameters: {total_params:,}")
print(f"🔹 Trainable Parameters: {trainable_params:,}")

🔹 Total Parameters: 4,597,282
🔹 Trainable Parameters: 4,597,282


In [None]:
# Set up the training loop
epochs = 50
for epoch in range(epochs):
    model.train()  # Set model to training mode
    running_loss = 0.0
    running_acc = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Ensure the input tensor is in the correct shape
        inputs = inputs.permute(0, 3, 1, 2)  # Reorder to (batch_size, 3, 32, 32)

        optimizer.zero_grad()  # Zero the gradients
        outputs = model(inputs)  # Forward pass
        loss = criterion(outputs, labels)  # Compute loss
        loss.backward()  # Backpropagation
        optimizer.step()  # Update model weights

        # Calculate accuracy
        _, predicted = torch.max(outputs, 1)
        correct = (predicted == labels).sum().item()
        acc = correct / labels.size(0)

        running_loss += loss.item()
        running_acc += acc

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = running_acc / len(train_loader)

    # Validation step
    model.eval()  # Set model to evaluation mode
    val_loss = 0.0
    val_acc = 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Ensure the input tensor is in the correct shape
            inputs = inputs.permute(0, 3, 1, 2)  # Reorder to (batch_size, 3, 32, 32)

            outputs = model(inputs)  # Forward pass
            loss = criterion(outputs, labels)  # Compute loss

            # Calculate accuracy
            _, predicted = torch.max(outputs, 1)
            correct = (predicted == labels).sum().item()
            acc = correct / labels.size(0)

            val_loss += loss.item()
            val_acc += acc

    val_loss /= len(val_loader)
    val_acc /= len(val_loader)

    # Print the metrics for this epoch
    print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_acc:.4f}, "
          f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_acc:.4f}")

    # Adjust the learning rate using the scheduler
    scheduler.step()


In [None]:
torch.save(model.state_dict(), f"model_epoch_{epoch+1}.pth")

In [None]:
# Generate submission file
model.eval()
predictions = []
with torch.no_grad():
    for images in test_loader:
        images = images.to(device)
        images = images.permute(0, 3, 1, 2)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        predictions.extend(predicted.cpu().numpy())

print(predictions)

In [None]:
submission = pd.DataFrame({
    'ID': np.arange(len(predictions)),
    'Labels': predictions
})

In [None]:
submission.to_csv('Write the location.csv', index=False)