In [None]:
import os
import timm

import torch
import cv2 as cv
from torch import nn, Tensor, GradScaler, optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from torch.backends import cudnn
from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau, CosineAnnealingLR, CosineAnnealingWarmRestarts

import torchvision.transforms as transforms
from torchvision.transforms import v2
from torchvision.datasets import CIFAR100
from torchvision import models

import matplotlib.pyplot as plt
import pandas as pd
from tqdm import tqdm
import numpy as np
from PIL import Image
from typing import Optional, Callable

In [None]:
device = torch.device('cuda')
cudnn.benchmark = True
pin_memory = True
enable_half = True  # Disable for CPU, it is slower!
scaler = GradScaler(device, enabled=enable_half)

In [None]:
class SimpleCachedDataset(Dataset):
    def __init__(self, dataset: Dataset, runtime_transforms: Optional[v2.Transform], cache: bool):
        if cache:
            dataset = tuple([x for x in dataset])
        self.dataset = dataset
        self.runtime_transforms = runtime_transforms

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, i):
        image, label = self.dataset[i]
        if self.runtime_transforms is None:
            return image, label
        return self.runtime_transforms(image), label

In [None]:
class CIFAR100_noisy_fine(Dataset):
    """
    See https://github.com/UCSC-REAL/cifar-10-100n, https://www.noisylabels.com/ and `Learning with Noisy Labels
    Revisited: A Study Using Real-World Human Annotations`.
    """

    def __init__(
        self, root: str, train: bool, transform: Optional[Callable], download: bool
    ):
        cifar100 = CIFAR100(
            root=root, train=train, transform=transform, download=download
        )
        data, targets = tuple(zip(*cifar100))
        self.train = train

        if train:
            noisy_label_file = os.path.join(root, "CIFAR-100-noisy.npz")
            if not os.path.isfile(noisy_label_file):
                raise FileNotFoundError(
                    f"{type(self).__name__} need {noisy_label_file} to be used!"
                )

            noise_file = np.load(noisy_label_file)
            if not np.array_equal(noise_file["clean_label"], targets):
                raise RuntimeError("Clean labels do not match!")
            targets = noise_file["noisy_label"]

        self.data = data
        self.targets = targets

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, i: int):
        img, target = self.data[i], self.targets[i]
        return img, target

In [None]:
mean=(0.507, 0.4865, 0.4409)
sd=(0.2673, 0.2564, 0.2761)

common_transforms = [
    transforms.ToTensor(),
    transforms.Normalize(mean, sd, inplace=True)
]

runtime_transforms = transforms.Compose([
    transforms.RandomCrop(size=32, padding=4),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandAugment(num_ops=2, magnitude=10),  # Stronger augmentations
    transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean, sd, inplace=True),
    transforms.RandomErasing(p=0.25, scale=(0.02, 0.1), ratio=(0.3, 3.3))  # Random Erasing to simulate occlusions
])

test_transform = transforms.Compose([
     *common_transforms
])

base_train_set = CIFAR100_noisy_fine('/kaggle/input/fii-atnn-2024-project-noisy-cifar-100/fii-atnn-2024-project-noisy-cifar-100', download=False, train=True, transform=None)
base_test_set = CIFAR100_noisy_fine('/kaggle/input/fii-atnn-2024-project-noisy-cifar-100/fii-atnn-2024-project-noisy-cifar-100', download=False, train=False, transform=test_transform)

images = base_train_set.data
labels = base_train_set.targets

In [None]:
train_set = SimpleCachedDataset(base_train_set, runtime_transforms, True)
test_set = SimpleCachedDataset(base_test_set, None, True)

train_loader = DataLoader(train_set, batch_size=128, shuffle=True, pin_memory=pin_memory)
test_loader = DataLoader(test_set, batch_size=500, pin_memory=pin_memory)

In [None]:
def getPretrainedResnet18(pretrained=True, out_classes=10, change_conv1=True):
    model = models.resnet18(pretrained=pretrained)
    if change_conv1:
        #model.conv1 = nn.Conv2d(3, 64, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2), bias=False)
        model.conv1 = nn.Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        model.maxpool = nn.Identity()
    num_features = model.fc.in_features
    model.fc = nn.Linear(num_features, out_classes)
    return model

In [None]:
class VGG16(nn.Module):
    def __init__(self, num_classes=100):
        super(VGG16, self).__init__()
        self.layers = nn.Sequential(
            # Block 1
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(), 
            nn.MaxPool2d(kernel_size = 2, stride = 2),

            # Block 2
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2),

            # Block 3
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2),

            # Block 4
            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2),

            # Block 5
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),

            # Classifier
            nn.Flatten(),
            nn.Linear(512, 4096),  # Adjust for CIFAR-100 input size
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, num_classes)
        )

    def forward(self, x: Tensor) -> Tensor:
        return self.layers(x)


model = getPretrainedResnet18(pretrained=True, out_classes=100, change_conv1=True).to(device)
model = torch.jit.script(model)
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

#optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
#optimizer = optim.SGD(model.parameters(), lr = 0.005, momentum = 0.9, weight_decay=0.0005, nesterov=True)
optimizer = optim.SGD(
    model.parameters(), 
    lr=0.01,
    momentum=0.9,
    weight_decay=5e-4,
    nesterov=True,
    fused=True
)

#scheduler = StepLR(optimizer, step_size=10, gamma=0.1)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=500)
#scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)

In [None]:
cutmix = v2.CutMix(num_classes=100)
mixup = v2.MixUp(num_classes=100)
cutmix_or_mixup = v2.RandomChoice([cutmix, mixup])

def train():
    model.train()
    correct = 0
    total = 0
    train_loss = 0.0
    
    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device, non_blocking=True), targets.to(device, non_blocking=True)

        # Apply CutMix to the batch
        inputs, targets = cutmix_or_mixup(inputs, targets)
        
        with torch.autocast(device.type, enabled=enable_half):
            outputs = model(inputs)
            loss = criterion(outputs, targets)
        
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad()

        train_loss += loss.item()
        predicted = outputs.argmax(1)
        total += targets.size(0)
        correct += predicted.eq(targets.argmax(dim=1)).sum().item()

    return 100.0 * correct / total

In [None]:
@torch.inference_mode()
def val():
    model.eval()
    correct = 0
    total = 0
    val_loss = 0.0

    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device, non_blocking=True), targets.to(device, non_blocking=True)
        with torch.autocast(device.type, enabled=enable_half):
            outputs = model(inputs)
            loss = criterion(outputs, targets)

        val_loss += loss.item()
        predicted = outputs.argmax(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

    avg_val_loss = val_loss / len(test_loader)
    val_acc = 100.0 * correct / total

    return avg_val_loss, val_acc

In [None]:
@torch.inference_mode()
def inference(model):
    model.eval()
    
    labels = []
    
    for inputs, _ in test_loader:
        inputs = inputs.to(device, non_blocking=True)
        with torch.autocast(device.type, enabled=enable_half):
            outputs = model(inputs)

        predicted = outputs.argmax(1).tolist()
        labels.extend(predicted)
    
    return labels

In [None]:
best = 0.0
best_epoch = 0
epochs = list(range(500))
best_checkpoint_path = "/kaggle/working/checkpoint.pth"

with tqdm(epochs) as tbar:
    for epoch in tbar:
        train_acc = train()
        val_loss, val_acc = val()

        scheduler.step(val_loss)

        if val_acc > best:
            best = val_acc
            best_epoch = epoch

            best_checkpoint = {
                'model_state_dict': model.state_dict(),
            }
            torch.save(best_checkpoint, best_checkpoint_path)
        
        tbar.set_description(f"Train: {train_acc:.2f}, Val Acc: {val_acc:.2f}, Val Loss: {val_loss:.4f}, Best: {best:.2f} at epoch {best_epoch}")

In [None]:
best_model = getPretrainedResnet18(pretrained=True, out_classes=100, change_conv1=True).to(device)
checkpoint = torch.load(best_checkpoint_path)
best_model.load_state_dict(checkpoint["model_state_dict"])

data = {
    "ID": [],
    "target": []
}

for i, label in enumerate(inference(best_model)):
    data["ID"].append(i)
    data["target"].append(label)

df = pd.DataFrame(data)
df.to_csv("/kaggle/working/submission.csv", index=False)