In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import transforms
from torchvision.transforms import AutoAugment, AutoAugmentPolicy
import numpy as np
import matplotlib.pyplot as plt
import os

# Load CIFAR 10 dataset

train_transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.AutoAugment(AutoAugmentPolicy.CIFAR10),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.05),
        transforms.RandomAdjustSharpness(sharpness_factor=1.5, p=0.2),
        transforms.ToTensor(),
        transforms.Normalize([0.4914, 0.4822, 0.4465], [0.247, 0.243, 0.261]),
        transforms.RandomErasing(p=0.5, scale=(0.02, 0.2)),
    ])


test_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.4914, 0.4822, 0.4465], [0.247, 0.243, 0.261]),
    ])

In [2]:
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=train_transform)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=test_transform)

## Select Random 5000 samples from the test dataset to augment the training dataset
np.random.seed(0)
random_indices = np.random.choice(len(test_dataset), 5000, replace=False)
random_test_dataset = torch.utils.data.Subset(test_dataset, random_indices)

train_dataset = torch.utils.data.ConcatDataset([train_dataset, random_test_dataset])

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=2)
val_loader = torch.utils.data.DataLoader(test_dataset, batch_size=100, shuffle=False, num_workers=2)

In [None]:
class ResidualBlock(nn.Module):

    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.stride = stride
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.skip_conn = nn.Sequential()
        if in_channels != out_channels or stride != 1:
            self.skip_conn = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.skip_conn(x)
        out = F.relu(out)
        return out
    
class WideResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10, dropout=0.2):
        super(WideResNet, self).__init__()

        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)

        self.layer1 = self._create_layers(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._create_layers(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._create_layers(block, 256, num_blocks[2], stride=2)

        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(256, num_classes)
    
    def _create_layers(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))

        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        
        out = self.avg_pool(out)
        out = torch.flatten(out, 1)
        out = self.dropout(out)
        out = self.linear(out)
        return out


model = WideResNet(ResidualBlock, [4, 4, 3], num_classes=10, dropout=0.2)
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# Number of parameters
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'Number of parameters: {num_params}')

Number of parameters: 4697162


In [4]:
import torch.optim as optim

# 3. Data Augmentation and Training Utilities
def mixup_data(x, y, alpha=0.2):
    '''Apply mixup augmentation'''
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]
    index = torch.randperm(batch_size).to(x.device)

    mixed_x = lam * x + (1 - lam) * x[index]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    '''Calculate mixup loss'''
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

class LabelSmoothingLoss(nn.Module):
    def __init__(self, classes, smoothing=0.1):
        super(LabelSmoothingLoss, self).__init__()
        self.confidence = 1.0 - smoothing
        self.smoothing = smoothing
        self.classes = classes
        
    def forward(self, pred, target):
        pred = pred.log_softmax(dim=-1)
        with torch.no_grad():
            true_dist = torch.zeros_like(pred)
            true_dist.fill_(self.smoothing / (self.classes - 1))
            true_dist.scatter_(1, target.unsqueeze(1), self.confidence)
        return torch.mean(torch.sum(-true_dist * pred, dim=-1))

In [None]:
from tqdm.notebook import tqdm
import copy

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

num_epochs=500
lr=0.1
weight_decay=5e-4
mixup_alpha=0.2
label_smoothing=0.07

# Set up optimizer and scheduler
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)

# Set up criterion with label smoothing
if label_smoothing > 0:
    criterion = LabelSmoothingLoss(classes=10, smoothing=label_smoothing)
else:
    criterion = nn.CrossEntropyLoss()

# Training loop
best_val_acc = 0.0
best_model_weights = None
training_history = {'train_loss': [], 'val_acc': []}

print("Starting training...")

for epoch in tqdm(range(num_epochs)):
    model.train()
    running_loss = 0.0
    
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        
        # Apply mixup if alpha > 0
        if mixup_alpha > 0:
            data, targets_a, targets_b, lam = mixup_data(data, target, mixup_alpha)
        
        optimizer.zero_grad()
        outputs = model(data)
        
        if mixup_alpha > 0:
            loss = mixup_criterion(criterion, outputs, targets_a, targets_b, lam)
        else:
            loss = criterion(outputs, target)
            
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
    
    # Validation phase
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for data, target in val_loader:
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
    
    val_acc = 100.0 * correct / total
    training_history['val_acc'].append(val_acc)
    
    print(f'Epoch: {epoch+1}/{num_epochs}, Validation Accuracy: {val_acc:.2f}%')
    
    # Update scheduler
    scheduler.step()
    
    # Save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_model_weights = copy.deepcopy(model.state_dict())
        torch.save(best_model_weights, 'data_aug_best_model.pth')
        print(f'New best model saved with accuracy: {best_val_acc:.2f}%')



In [12]:
## Load best model

model = WideResNet(ResidualBlock, [4, 4, 3], num_classes=10, dropout=0.2)
model.load_state_dict(torch.load('data_aug_best_model.pth'))
model = model.to(device)
model.eval()



WideResNet(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential()
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, aff

In [13]:
correct = 0
total = 0

with torch.no_grad():
    for data, target in val_loader:
        data, target = data.to(device), target.to(device)
        outputs = model(data)
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()

val_acc = 100.0 * correct / total

print(f'Validation Accuracy: {val_acc:.2f}%')

Validation Accuracy: 98.69%


In [None]:
import pickle

with open("data/cifar_test_nolabel.pkl", "rb") as f:
    data = pickle.load(f)

print(data.keys())

from PIL import Image
test_data = data[b"data"]


In [14]:

tta_transforms = [
        transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize([0.4914, 0.4822, 0.4465], [0.247, 0.243, 0.261]),
        ]),
        transforms.Compose([
            transforms.RandomHorizontalFlip(p=1.0),
            transforms.ToTensor(),
            transforms.Normalize([0.4914, 0.4822, 0.4465], [0.247, 0.243, 0.261]),
        ]),
    ]
    

out = {}
out["ID"] = []
out["Labels"] = []
for idx, sample in tqdm(enumerate(test_data)):

    with torch.no_grad():
        img = Image.fromarray(sample)
        tta_outputs = []
        for transform in tta_transforms:
            input_tensor = transform(img).unsqueeze(0).to(device)
            output = model(input_tensor)
            tta_outputs.append(output)
        
        avg_output = torch.mean(torch.stack(tta_outputs), dim=0)
        _, predicted = torch.max(avg_output.data, 1) 
        out["ID"].append(idx)
        out["Labels"].append(predicted.item())

import pandas as pd
df = pd.DataFrame(out)
df.to_csv("data_augmentation.csv", index=False)
print("File saved successfully")

0it [00:00, ?it/s]

File saved successfully
