In [1]:
import torch
from torch import nn, Tensor
from torch.utils.data import Dataset, DataLoader
from torchvision.datasets import CIFAR100
from typing import Optional, Callable
import os
import timm
import numpy as np
import pandas as pd
from torchvision.transforms import v2
import torchvision.transforms as T
from torch.backends import cudnn
from torch import GradScaler
from torch import optim
from tqdm import tqdm

In [2]:
name = 'maxvit_base_tf_224'
dataset_path = 'dataset_change_test_v4.npz'

In [3]:
device = torch.device('cuda')
cudnn.benchmark = True
pin_memory = True
enable_half = True  # Disable for CPU, it is slower!
scaler = GradScaler(device, enabled=enable_half)

In [4]:
class SimpleCachedDataset_train(Dataset):
    def __init__(self, dataset, targets, transform):
        self.data = []
        self.targets = []
        
        for it in targets:
            self.data.append(dataset.data[it[1]])
            self.targets.append(it[0])
            
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, i):
        return self.transform(self.data[i]), self.targets[i]

In [5]:
class SimpleCachedDataset(Dataset):
    def __init__(self, dataset, transform):
        self.data = dataset.data
        self.targets = dataset.targets
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, i):
        return self.transform(self.data[i]), self.targets[i]

In [6]:
class CIFAR100_noisy_fine(Dataset):
    """
    See https://github.com/UCSC-REAL/cifar-10-100n, https://www.noisylabels.com/ and `Learning with Noisy Labels
    Revisited: A Study Using Real-World Human Annotations`.
    """

    def __init__(
        self, root: str, train: bool, transform: Optional[Callable], download: bool
    ):
        cifar100 = CIFAR100(
            root=root, train=train, transform=None, download=download
        )
        data, targets = tuple(zip(*cifar100))

        if train:
            noisy_label_file = os.path.join(root, "CIFAR-100-noisy.npz")
            if not os.path.isfile(noisy_label_file):
                raise FileNotFoundError(
                    f"{type(self).__name__} need {noisy_label_file} to be used!"
                )

            noise_file = np.load(noisy_label_file)
            if not np.array_equal(noise_file["clean_label"], targets):
                raise RuntimeError("Clean labels do not match!")
            targets = noise_file["noisy_label"]

        self.data = data
        self.targets = targets
        self.transform = transform

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, i: int):
        return self.transform(self.data[i]), self.targets[i]

In [7]:
train_transform = v2.Compose([
    v2.Resize((224, 224)),
    v2.RandAugment(num_ops=6, magnitude=9),
    # T.RandomCrop(224, padding=4), 
    # T.RandomHorizontalFlip(),
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=(0.5071, 0.4865, 0.4409), std=(0.2673, 0.2564, 0.2762), inplace=True),
])
test_transform = v2.Compose([
    v2.Resize((224, 224)),
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=(0.5071, 0.4865, 0.4409), std=(0.2673, 0.2564, 0.2762), inplace=True),
])

In [8]:
loaded = np.load(dataset_path)

targets = loaded["targets"]

train_set = CIFAR100_noisy_fine('./fii-atnn-2024-project-noisy-cifar-100', download=False, train=True, transform=train_transform)
train_set = SimpleCachedDataset_train(train_set, targets=targets, transform=train_transform)

In [9]:
test_set = CIFAR100_noisy_fine('./fii-atnn-2024-project-noisy-cifar-100', download=False, train=False, transform=test_transform)
test_set = SimpleCachedDataset(test_set, transform=test_transform)

train_loader = DataLoader(train_set, batch_size=8, shuffle=True, pin_memory=pin_memory)
test_loader = DataLoader(test_set, batch_size=8, pin_memory=pin_memory)

In [10]:
def create_base_model():
    return timm.create_model(
        name,
        pretrained=True,
        num_classes=100
    )

In [11]:
model = create_base_model()
model = model.to(device)
# model = torch.jit.script(model)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-2, fused=True)

In [12]:
cutmix_transform = v2.CutMix(num_classes=100, alpha=1.0)
mixup_transform = v2.MixUp(num_classes=100, alpha=1.0)
cutmix_or_mixup = v2.RandomChoice([cutmix_transform, mixup_transform])

In [13]:
best = 0.0
epochs = list(range(200))
patience = 10
patience_now = 0

model_path = f'./v4/best_{name}.pth'

In [14]:
def train():
    model.train()
    correct = 0
    total = 0
    
    for inputs, targets in tqdm(train_loader):
        inputs, targets = inputs.to(device, non_blocking=True), targets.to(device, non_blocking=True)
        inputs, targets = cutmix_or_mixup(inputs, targets)
        optimizer.zero_grad(set_to_none=True)
        
        with torch.autocast(device.type, enabled=enable_half):
            outputs = model(inputs)
            loss = criterion(outputs, targets)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        # optimizer.zero_grad()

        _, predicted = outputs.max(dim=1)
        hard_targets = targets.argmax(dim=1)
        total += targets.size(0)
        correct += predicted.eq(hard_targets).sum().item()
    
    return 100.0 * correct / total

In [15]:
@torch.inference_mode()
def val():
    model.eval()
    correct = 0
    total = 0

    for inputs, targets in test_loader:
        inputs, targets = inputs.to(device, non_blocking=True), targets.to(device, non_blocking=True)
        with torch.autocast(device.type, enabled=enable_half):
            outputs = model(inputs)

        _, predicted = outputs.max(dim=1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
    
    return 100.0 * correct / total

In [16]:
@torch.inference_mode()
def inference():
    model = create_base_model()
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    
    labels = []
    
    for inputs, _ in test_loader:
        inputs = inputs.to(device, non_blocking=True)
        with torch.autocast(device.type, enabled=enable_half):
            outputs = model(inputs)

        predicted = outputs.argmax(1).tolist()
        labels.extend(predicted)
    
    return labels

In [None]:
for epoch in epochs:
    train_acc = train()
    val_acc = val()
    if val_acc > best:
        best = val_acc
        patience_now = 0
        torch.save(model.state_dict(), model_path)
    else:
        patience_now += 1
    print(f"Train: {train_acc:.2f}, Val: {val_acc:.2f}, Best: {best:.2f}")

    if patience_now == patience:
        break

 17%|████████████▉                                                                | 1053/6250 [01:01<04:09, 20.86it/s]

In [None]:
data = {
    "ID": [],
    "target": []
}

for i, label in enumerate(inference()):
    data["ID"].append(i)
    data["target"].append(label)

df = pd.DataFrame(data)
df.to_csv(f"./v4/submission_{name}.csv", index=False)

In [20]:
print(best)

81.76
