In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models
import pickle
import numpy as np
from torch.utils.data import random_split
import pandas as pd
from PIL import Image
import os
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import StepLR, MultiStepLR

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define data transformations
transform = transforms.Compose([
    transforms.ToPILImage(),  # Convert numpy array to PIL Image
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness = 0.1,contrast = 0.1,saturation = 0.1),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomAdjustSharpness(sharpness_factor = 2,p = 0.2),
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261)),
    transforms.RandomErasing(p=0.2,scale=(0.02, 0.1),value=1.0, inplace=False)
])

In [None]:
with open("cifar_test_nolabel.pkl", "rb") as f:
    test_data = pickle.load(f)  # This is a dictionary

# Inspect the keys
print(test_data.keys())

dict_keys([b'data', b'ids'])


In [None]:
batch_size = 128

full_dataset = torchvision.datasets.CIFAR10(root="./data", train=True, transform=transform, download=True)

# Train-Validation Split
train_size = int(0.8 * len(full_dataset))  # 80% for training
val_size = len(full_dataset) - train_size  # 20% for validation

train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

# Create DataLoaders
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size*2, shuffle=False, num_workers=4)  # Larger batch size for validation

train_data = np.vstack(train_data).reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)  # Convert to HWC format
train_labels = np.array(train_labels)



Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170M/170M [00:03<00:00, 43.9MB/s]


Extracting ./data/cifar-10-python.tar.gz to ./data




In [None]:
full_dataset = torchvision.datasets.CIFAR10(root="./data", train=True, transform=transform, download=True)

print(type(full_dataset))



Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170M/170M [00:12<00:00, 13.2MB/s]


Extracting ./data/cifar-10-python.tar.gz to ./data
<class 'torchvision.datasets.cifar.CIFAR10'>


In [None]:
transform = transforms.ToTensor()

full_dataset = torchvision.datasets.CIFAR10(root="./data", train=True, transform=transform, download=True)
# Convert to dictionary
cifar_dict = {
    "images": [],  # Store images
    "labels": []   # Store labels
}

# Iterate over dataset to extract images and labels
for img, label in full_dataset:
    cifar_dict["images"].append(img)  # Append image (as Tensor)
    cifar_dict["labels"].append(label)

Files already downloaded and verified


In [None]:
train_data = []
train_labels = []

for img in cifar_dict["images"]:
    train_data.append(img)

for label in cifar_dict["labels"]:
    train_labels.append(label)

train_data = np.vstack(train_data).reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)  # Convert to HWC format
train_labels = np.array(train_labels)

In [None]:
train_transform = transforms.Compose([
    transforms.ToPILImage(),  # Convert numpy array to PIL Image
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness = 0.1,contrast = 0.1,saturation = 0.1),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomAdjustSharpness(sharpness_factor = 2,p = 0.2),
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261)),
    transforms.RandomErasing(p=0.2,scale=(0.02, 0.1),value=1.0, inplace=False)
])

# Convert to TensorDataset and apply transformations
class CustomCIFAR10Dataset(torch.utils.data.Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img = self.images[idx]
        label = self.labels[idx]

        if self.transform:
            img = self.transform(img)

        return img, label

full_train_dataset = CustomCIFAR10Dataset(train_data, train_labels, transform=train_transform)

In [None]:
train_size = int(0.8 * len(full_dataset))  # 80% for training
val_size = len(full_dataset) - train_size  # 20% for validation

train_dataset, val_dataset = random_split(full_train_dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=256, shuffle=False, num_workers=4)



In [None]:
with open("cifar_test_nolabel.pkl", "rb") as f:
    test_data = pickle.load(f)  # Dictionary with byte keys

# Convert byte keys to normal strings (if needed)
test_images = test_data[b'data']  # Extract images
test_ids = test_data[b'ids']  # Extract corresponding image IDs

# Convert images from NumPy array to PyTorch tensor
test_images = torch.tensor(np.array(test_images), dtype=torch.float32)  # Shape: (N, C, H, W)


transform = transforms.Compose([
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Normalize the images
test_images = transform(test_images / 255.0)

# Create a DataLoader for testing (no labels)
test_dataset = torch.utils.data.TensorDataset(test_images)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=4)



In [None]:
def train_model(model, train_loader, val_loader, epochs=50):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-4)
    scheduler = MultiStepLR(optimizer, milestones=[30, 60, 80, 90], gamma=0.1)

    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []

    for epoch in range(epochs):
        # Training Phase
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = running_loss / len(train_loader)
        train_acc = 100 * correct / total
        train_losses.append(train_loss)
        train_accuracies.append(train_acc)

        # Validation Phase
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        val_loss /= len(val_loader)
        val_acc = 100 * correct / total
        val_losses.append(val_loss)
        val_accuracies.append(val_acc)

        scheduler.step()
        print(f'Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%')


In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.skip = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.skip = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
    def forward(self, x):
        identity = x
        if self.skip:
            identity = self.skip(x)
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += identity
        out = self.relu(out)
        return out

In [None]:
class CustomResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(CustomResNet, self).__init__()
        self.init_conv = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.init_bn = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self._make_layer(64, 64, 4, stride=1)
        self.layer2 = self._make_layer(64, 128, 4, stride=2)
        self.layer3 = self._make_layer(128, 256, 3, stride=2)
        #self.layer4 = self._make_layer(256, 512, 2, stride=2)
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(256, num_classes)

    def _make_layer(self, in_channels, out_channels, blocks, stride):
        layers = [ResidualBlock(in_channels, out_channels, stride)]
        for _ in range(1, blocks):
            layers.append(ResidualBlock(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.init_conv(x)
        out = self.init_bn(out)
        out = self.relu(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        #out = self.layer4(out)
        out = self.avg_pool(out)
        out = torch.flatten(out, 1)
        out = self.fc(out)
        return out


In [None]:
model = CustomResNet().to(device)

# Print the number of parameters
from torchsummary import summary
summary(model, (3, 32, 32))

# Train the model
train_model(model, train_loader, val_loader, epochs=20) #change epoch


----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 32, 32]           1,728
       BatchNorm2d-2           [-1, 64, 32, 32]             128
              ReLU-3           [-1, 64, 32, 32]               0
            Conv2d-4           [-1, 64, 32, 32]          36,864
       BatchNorm2d-5           [-1, 64, 32, 32]             128
              ReLU-6           [-1, 64, 32, 32]               0
            Conv2d-7           [-1, 64, 32, 32]          36,864
       BatchNorm2d-8           [-1, 64, 32, 32]             128
              ReLU-9           [-1, 64, 32, 32]               0
    ResidualBlock-10           [-1, 64, 32, 32]               0
           Conv2d-11           [-1, 64, 32, 32]          36,864
      BatchNorm2d-12           [-1, 64, 32, 32]             128
             ReLU-13           [-1, 64, 32, 32]               0
           Conv2d-14           [-1, 64,

In [None]:
model.eval()
predictions = []
with torch.no_grad():
    for batch in test_loader:
        images = batch[0].to(device)  # Get images tensor from tuple and move to device
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        predictions.extend(predicted.cpu().numpy())

# Generate submission file
submission = pd.DataFrame({'ID': np.arange(len(predictions)), 'Labels': predictions})
submission.to_csv('submission1.csv', index=False)
print("Submission1 file saved.")

Submission1 file saved.


In [None]:
class ResNet18(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet18, self).__init__()
        self.model = models.resnet18(weights=None)  # Training from scratch
        self.model.fc = nn.Linear(self.model.fc.in_features, num_classes)  # Modify final layer

    def forward(self, x):
        return self.model(x)

In [None]:
model = ResNet18(num_classes=10).to(device)
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)  # Reduce LR every 10 epochs


# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct, total = 0, 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()
        total += labels.size(0)

    train_acc = 100.0 * correct / total
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Train Acc: {train_acc:.2f}%")


Epoch [1/50], Loss: 1.7384, Train Acc: 42.06%
Epoch [2/50], Loss: 1.4725, Train Acc: 55.57%
Epoch [3/50], Loss: 1.3533, Train Acc: 61.34%
Epoch [4/50], Loss: 1.2770, Train Acc: 64.94%
Epoch [5/50], Loss: 1.2280, Train Acc: 67.55%
Epoch [6/50], Loss: 1.1833, Train Acc: 69.55%
Epoch [7/50], Loss: 1.1515, Train Acc: 71.09%
Epoch [8/50], Loss: 1.1187, Train Acc: 72.43%
Epoch [9/50], Loss: 1.1016, Train Acc: 73.31%
Epoch [10/50], Loss: 1.0757, Train Acc: 74.36%
Epoch [11/50], Loss: 1.0581, Train Acc: 75.50%
Epoch [12/50], Loss: 1.0418, Train Acc: 76.05%
Epoch [13/50], Loss: 1.0311, Train Acc: 76.47%
Epoch [14/50], Loss: 1.0149, Train Acc: 77.38%
Epoch [15/50], Loss: 1.0085, Train Acc: 77.54%
Epoch [16/50], Loss: 0.9922, Train Acc: 78.28%
Epoch [17/50], Loss: 0.9831, Train Acc: 78.78%
Epoch [18/50], Loss: 0.9757, Train Acc: 79.19%
Epoch [19/50], Loss: 0.9637, Train Acc: 79.77%
Epoch [20/50], Loss: 0.9628, Train Acc: 79.57%
Epoch [21/50], Loss: 0.9522, Train Acc: 80.21%
Epoch [22/50], Loss: 0

In [None]:
model.eval()
predictions = []

with torch.no_grad():
    for idx, (images,) in enumerate(test_loader):  # No labels, only images
        images = images.to(device)
        outputs = model(images)
        _, predicted = outputs.max(1)  # Get class with highest probability

        # Compute correct batch start index
        batch_start = idx * test_loader.batch_size
        for i, pred in enumerate(predicted.cpu().numpy()):
            predictions.append((test_ids[batch_start + i], int(pred)))  # Convert prediction to int

# Convert predictions to DataFrame
df = pd.DataFrame(predictions, columns=["ID", "Labels"])

# Save as CSV file
df.to_csv("submission.csv", index=False)

print("Predictions saved to submission.csv")



Predictions saved to submission.csv
