In [1]:
# 0 # DEPENDENCIES

import numpy as np
import pandas as pd
import os.path as osp
import os
import csv
import time
import random
from PIL import Image
from collections import Counter
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from torch.utils.data import Dataset, random_split, DataLoader, Subset, WeightedRandomSampler
from torch.utils.data.sampler import SubsetRandomSampler
from torchvision import transforms, datasets
import torch.optim as optim
from torch.optim.lr_scheduler import OneCycleLR

print('ok')

ok


In [2]:
########################################## GLOBAL VARIABLES ##########################################

BATCH_SIZE = 128
NUM_EPOCHS = 250
PATIENCE = 150


# 1 DATA PREPROCESSING
MEAN = [0.5132, 0.4647, 0.4044]
STD  = [0.2078, 0.2084, 0.2125]
HEAVY_AUGMENT = False
VALIDATION_PERCENTAGE = 0.1
RANDOM_STATE = 77

# 2 MODEL
DROPOUT = 0

# 4 LOSS, OPTIMIZER
CLASS_BALANCED_WEIGHTS = True
OLD_BALANCED_WEIGTHS = False
GAMMA_FOCAL = 2  ##### from 1
LR = 1e-3
MAX_LR = 3e-3
WEIGHT_DECAY = 1e-4

# 5

ALPHA_MIXUP = 0.2
ALPHA_CUTMIX = 1.0
PROB_MIXUP = 0.5

print('ok')

ok


In [3]:
# 0 #  RETRIEVING THE IMAGES FROM KAGGLE

datasets_dir = '/kaggle/input/unipd-deep-learning-2025-challenge-1'

class ImageDataset(Dataset):
    def __init__(self, root: str, test: bool = False, transform=None):
        super().__init__()
        self.root = root
        self.transform = transform or transforms.Compose([transforms.ToTensor(),])
        self.test = test

        self.img_path = osp.join(root, 'images')
        self.targets = []
        self.ids = []

        if not test:
            # Load images and labels
            labels_path = osp.join(root, 'labels.csv')
            with open(labels_path, 'r') as csvfile:
                reader = csv.DictReader(csvfile)
                for row in reader:
                    image_id = row['id'].zfill(5)
                    label = int(row['label'])
                    self.targets.append(label)
                    self.ids.append(image_id)
        else:
            # Test mode: no labels.csv
            for fname in sorted(os.listdir(self.img_path)):
                if fname.endswith('.jpeg'):
                    image_id = fname[:-5].zfill(5)
                    self.ids.append(image_id)

    def __getitem__(self, index: int):
        img_id = self.ids[index]
        img_file = osp.join(self.img_path, f'{img_id}.jpeg')
        img = Image.open(img_file).convert('RGB')

        if self.transform is not None:
            img = self.transform(img)

        if self.test:
            return img, img_id
        else:
            target = self.targets[index]
            return img, target

    def __len__(self) -> int:
        return len(self.ids)

print('ok')

ok


In [4]:
# 1 # PREPROCESSING THE DATA

########################################## DATA NORMALIZATION ##########################################

normalize = transforms.Compose([transforms.ToTensor(), transforms.Normalize(MEAN, STD)])
train_dataset_normalized = ImageDataset(osp.join(datasets_dir, 'train_dataset'), test=False, transform=normalize)
test_dataset_normalized = ImageDataset(osp.join(datasets_dir, 'test_dataset'), test=True, transform=normalize)

########################################## DATA AUGMENTATION ##########################################
# DEFINE AUG AND STAND

if HEAVY_AUGMENT:
    augment = transforms.Compose([
    transforms.RandomResizedCrop(40, scale=(0.9,1.0), ratio=(1.0,1.0)),
    transforms.RandomHorizontalFlip(p=0.5), transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1),
    transforms.RandomApply([transforms.GaussianBlur(kernel_size=3, sigma=(0.1, 0.5))], p=0.1),
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x * (torch.rand(x.size(0), 1, 1) > 0.1).float()),
    transforms.RandomErasing(p=0.3, scale=(0.01,0.05), ratio=(0.3, 3.3), value='random'),
    transforms.Normalize(MEAN, STD)
    ])
if not HEAVY_AUGMENT:
    augment = transforms.Compose([
        transforms.RandomHorizontalFlip(p=0.5), transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1),
        transforms.ToTensor(), transforms.Normalize(MEAN, STD)
    ])

train_dataset_augmented = ImageDataset(osp.join(datasets_dir, 'train_dataset'),
    test=False,
    transform=augment
)


########################################## SPLITTING ##########################################
num_tot_images = len(train_dataset_augmented)
num_val_images = int(VALIDATION_PERCENTAGE * num_tot_images)
num_train_images = num_tot_images - num_val_images

train_idxs, val_idxs = train_test_split(
    list(range(num_train_images)),
    test_size=num_val_images,
    shuffle=True,
    random_state=RANDOM_STATE
)

train_subset = Subset(train_dataset_augmented, train_idxs)
val_subset   = Subset(train_dataset_normalized, val_idxs)

train_loader = DataLoader(train_subset, BATCH_SIZE, shuffle=True, num_workers=8, pin_memory=True)
val_loader   = DataLoader(val_subset,   BATCH_SIZE, shuffle=False, num_workers=4, pin_memory=True)
test_loader  = DataLoader(test_dataset_normalized, BATCH_SIZE, shuffle=False)


########################################## CHECKING #########################################

print(num_tot_images)
print(num_train_images)
print(num_val_images)
print(num_train_images/(num_train_images+num_val_images))
print(len(train_loader)/(len(train_loader)+len(val_loader)))
print(len(train_loader))
print(len(val_loader))
dict_classes_num_images = Counter(train_dataset_normalized.targets)
print(dict_classes_num_images)
print('ok')

22430
20187
2243
0.9
0.8867924528301887
141
18
Counter({11: 1300, 10: 1300, 9: 1300, 3: 1300, 8: 1300, 1: 1300, 0: 1300, 4: 1300, 17: 1300, 14: 1300, 12: 1300, 6: 1300, 2: 1300, 18: 1300, 19: 760, 13: 756, 5: 755, 16: 751, 7: 658, 15: 550})
ok




In [5]:
# 2 # MODEL


class ResidualLearning(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 3, stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(out_channels, out_channels, 3, 1, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
        )
        self.downsample = downsample
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample is not None:
            identity = self.downsample(x)
        out += identity
        return self.relu(out)

class ImageClassifier(nn.Module):
    def __init__(self, num_classes=20):
        super().__init__()
        self.in_channels = 64

        self.stem = nn.Sequential(
            nn.Conv2d(3, 64, 3, 1, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
        )

        self.layer1 = self._make_layer(64,  3, stride=1)
        self.layer2 = self._make_layer(128, 4, stride=2)
        self.layer3 = self._make_layer(256, 6, stride=2)
        self.layer4 = self._make_layer(512, 3, stride=2)

        self.global_avg  = nn.AdaptiveAvgPool2d(1)
        self.classifier = nn.Sequential(
            nn.Dropout(DROPOUT),
            nn.Linear(512, num_classes)
        )

    def _make_layer(self, out_channels, blocks, stride):
        downsample = None
        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels, 1, stride, bias=False),
                nn.BatchNorm2d(out_channels),
            )
        layers = [ResidualLearning(self.in_channels, out_channels, stride, downsample)]
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(ResidualLearning(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.stem(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.global_avg(x)
        x = torch.flatten(x, 1)
        return self.classifier(x)

print('ok')

ok


In [6]:
# 3 # ASSIGNING A PHISICAL DEVICE

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

model = ImageClassifier().to(device)


print('ok')

Using device: cuda
ok


In [7]:
# 4 # DEFINING THE LOSS

########################################## COUNTING CLASSES #########################################
dict_classes_num_images = Counter(train_dataset_augmented.targets)
num_train_images  = sum(dict_classes_num_images.values())
num_classes = len(dict_classes_num_images)

########################################## CLASS BALANCED WEIGHT LR #########################################
if CLASS_BALANCED_WEIGHTS:
    beta = 0.5
    samples_per_class = np.array([dict_classes_num_images[i] for i in range(num_classes)])
    effective_num = 1.0 - np.power(beta, samples_per_class)
    cb_weights    = (1.0 - beta) / effective_num

    cb_weights = cb_weights / cb_weights.mean()
    weights = torch.tensor(cb_weights, dtype=torch.float32, device=device)

########################################## OLD WEIGHT LR #########################################
if OLD_BALANCED_WEIGTHS:
    class_weights = [num_train_images/(num_classes * dict_classes_num_images[i]) for i in range(num_classes)]
    weights = torch.tensor(class_weights, device=device)

########################################## FOCAL LOSS #########################################
class FocalLoss(nn.Module):
    def __init__(self, alpha=None, gamma=2, reduction='mean'):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = nn.CrossEntropyLoss(weight=self.alpha, reduction='none')(inputs, targets)
        pt = torch.exp(-ce_loss)
        focal_loss = (1 - pt) ** self.gamma * ce_loss
        return focal_loss.mean()


########################################## LOSS #########################################
loss_function = FocalLoss(alpha=weights, gamma=GAMMA_FOCAL).to(device)

########################################## OPTIMIZER #########################################

optimizer = optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)


########################################## SCHEDULER #########################################

steps = len(train_loader) * (NUM_EPOCHS)
scheduler = OneCycleLR(
    optimizer,
    max_lr=5e-3,
    total_steps=steps,
    pct_start=0.2,
    anneal_strategy='cos',
    div_factor=25.0,
    final_div_factor=1e4
)


print('ok')

ok


In [8]:
# 5 # DEF TRAINING STOP ACC

########################################## MIX UP #########################################
def mixup_data(x, y, alpha=0.2):
    """Returns mixed inputs, pairs of targets, and lambda."""
    lam = np.random.beta(alpha, alpha) if alpha > 0 else 1.0
    batch_size = x.size(0)
    idx = torch.randperm(batch_size, device=x.device)
    mixed_x = lam * x + (1 - lam) * x[idx]
    y_a, y_b = y, y[idx]
    return mixed_x, y_a, y_b, lam

########################################## CUT MIX #########################################
def rand_bbox(size, lam):
    W = size[2]
    H = size[3]
    cut_rat = np.sqrt(1. - lam)
    cut_w = int(W * cut_rat)
    cut_h = int(H * cut_rat)

    # uniform center position
    cx = np.random.randint(W)
    cy = np.random.randint(H)

    bbx1 = np.clip(cx - cut_w // 2, 0, W)
    bby1 = np.clip(cy - cut_h // 2, 0, H)
    bbx2 = np.clip(cx + cut_w // 2, 0, W)
    bby2 = np.clip(cy + cut_h // 2, 0, H)

    return bbx1, bby1, bbx2, bby2

def cutmix_data(x, y, alpha=1.0):
    lam = np.random.beta(alpha, alpha)
    rand_index = torch.randperm(x.size(0)).to(x.device)
    y_a, y_b = y, y[rand_index]

    bbx1, bby1, bbx2, bby2 = rand_bbox(x.size(), lam)
    x[:, :, bbx1:bbx2, bby1:bby2] = x[rand_index, :, bbx1:bbx2, bby1:bby2]

    lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (x.size(-1) * x.size(-2)))
    return x, y_a, y_b, lam

########################################## ALTERNATING THEM #########################################

def mixup_or_cutmix(images, labels, alpha_mixup, alpha_cutmix, prob_mixup=0.5):
    if random.random() < prob_mixup:
        return mixup_data(images, labels, alpha=alpha_mixup)
    else:
        return cutmix_data(images, labels, alpha=alpha_cutmix)

########################################## DEF TRAINING #########################################
def train(model,
          train_loader,
          val_loader,
          optimizer,
          scheduler,
          loss_function,
          device,
          num_epochs,
          patience):

    best_acc = 0.0
    not_improved_epochs_count = 0

    for current_epoch in range(1, num_epochs+1):

        if current_epoch == 31:
            print(f"⚡ Epoch 31 reached — enabling HEAVY_AUGMENT now and rebuilding train_loader…")
            HEAVY_AUGMENT = True
            if HEAVY_AUGMENT:
                augment = transforms.Compose([
                transforms.RandomResizedCrop(40, scale=(0.9,1.0), ratio=(1.0,1.0)),
                transforms.RandomHorizontalFlip(p=0.5),
                transforms.RandomRotation(15),
                transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1),
                transforms.RandomApply([transforms.GaussianBlur(kernel_size=3, sigma=(0.1, 0.5))], p=0.1),
                transforms.ToTensor(),
                transforms.Lambda(lambda x: x * (torch.rand(x.size(0), 1, 1) > 0.1).float()),
                transforms.RandomErasing(p=0.3, scale=(0.01,0.05), ratio=(0.3, 3.3), value='random'),
                transforms.Normalize(MEAN, STD)
                ])
            if not HEAVY_AUGMENT:
              augment = transforms.Compose([
                  transforms.RandomHorizontalFlip(p=0.5), transforms.RandomRotation(15),
                  transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1),
                  transforms.ToTensor(), transforms.Normalize(MEAN, STD)
              ])

            train_dataset_augmented = ImageDataset(osp.join(datasets_dir, 'train_dataset'), test=False, transform=augment)
            train_subset = Subset(train_dataset_augmented, train_idxs)
            train_loader = DataLoader(train_subset, BATCH_SIZE, shuffle=True, num_workers=8, pin_memory=True)


        model.train()
        nailed_train = 0
        seen_train = 0
        losses_train = []


        for images, true_labels in train_loader:
            images, true_labels = images.to(device), true_labels.to(device)

            mixed_x, y_a, y_b, lam = mixup_or_cutmix(images, true_labels, alpha_mixup=ALPHA_MIXUP, alpha_cutmix=ALPHA_CUTMIX, prob_mixup=PROB_MIXUP)

            predicted_logits = model(mixed_x)

            loss = ( lam * loss_function(predicted_logits, y_a) + (1-lam) * loss_function(predicted_logits, y_b) )

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            scheduler.step()

            losses_train.append(loss.item())

            predicted_labels = predicted_logits.argmax(dim=1)
            nailed_train += (predicted_labels == true_labels).sum().item()
            seen_train += true_labels.size(0)

        accuracy_train = nailed_train / seen_train
        mean_loss_train = np.mean(losses_train)


        model.eval()
        nailed_val = 0
        seen_val = 0
        losses_val = []

        with torch.no_grad():

            for images, true_labels in val_loader:
                images, true_labels = images.to(device), true_labels.to(device)
                predicted_logits = model(images)
                loss = loss_function(predicted_logits, true_labels)
                losses_val.append(loss.item())
                predicted_labels = predicted_logits.argmax(dim=1)
                nailed_val += (predicted_labels == true_labels).sum().item()
                seen_val += true_labels.size(0)

        accuracy_val = nailed_val / seen_val
        mean_loss_val = np.mean(losses_val)

        print(f"Epoch {current_epoch:03d} | "
              f"Train Acc: {accuracy_train:.4f} | "
              f"Val   Acc: {accuracy_val:.4f} |"
              f"Loss: {mean_loss_val:.4f} |")

        if accuracy_val > best_acc:
            best_acc = accuracy_val
            not_improved_epochs_count = 0
            torch.save({'state_dict': model.state_dict(),'val_acc':    accuracy_val}, 'final_model.pth')
        else:
            not_improved_epochs_count += 1
            if not_improved_epochs_count >= patience:
                print(f"Stopping early at epoch {current_epoch}")
                model.load_state_dict(torch.load('final_model.pth')['state_dict'])
                break
    print(f'best acc: {best_acc:.4f}')

    return model


print('ok')

ok


In [9]:
# 6 # TRAINING

start_time = time.time()

model = train(model,
              train_loader,
              val_loader,
              optimizer,
              scheduler,
              loss_function,
              device,
              num_epochs=NUM_EPOCHS,
              patience=PATIENCE)

end_time = time.time()
elapsed_time = (end_time - start_time) / 60
print(f"Training time: {elapsed_time:.2f} minutes")

print('ok')


Epoch 001 | Train Acc: 0.1600 | Val   Acc: 0.2974 |Loss: 1.7130 |
Epoch 002 | Train Acc: 0.2105 | Val   Acc: 0.3388 |Loss: 1.5462 |
Epoch 003 | Train Acc: 0.2454 | Val   Acc: 0.3919 |Loss: 1.4557 |
Epoch 004 | Train Acc: 0.2684 | Val   Acc: 0.4646 |Loss: 1.1473 |
Epoch 005 | Train Acc: 0.2742 | Val   Acc: 0.4539 |Loss: 1.1914 |
Epoch 006 | Train Acc: 0.3231 | Val   Acc: 0.4128 |Loss: 1.4609 |
Epoch 007 | Train Acc: 0.2972 | Val   Acc: 0.4360 |Loss: 1.4920 |
Epoch 008 | Train Acc: 0.3292 | Val   Acc: 0.5207 |Loss: 1.0959 |
Epoch 009 | Train Acc: 0.3279 | Val   Acc: 0.5247 |Loss: 0.9826 |
Epoch 010 | Train Acc: 0.3423 | Val   Acc: 0.4895 |Loss: 1.1570 |
Epoch 011 | Train Acc: 0.3349 | Val   Acc: 0.5230 |Loss: 0.9804 |
Epoch 012 | Train Acc: 0.3543 | Val   Acc: 0.6005 |Loss: 0.7929 |
Epoch 013 | Train Acc: 0.3961 | Val   Acc: 0.6028 |Loss: 0.8162 |
Epoch 014 | Train Acc: 0.3902 | Val   Acc: 0.5921 |Loss: 0.8309 |
Epoch 015 | Train Acc: 0.3565 | Val   Acc: 0.5640 |Loss: 0.9434 |
Epoch 016 

In [10]:
# 7 # TESTING

results = []

model.eval()
with torch.no_grad():
    for images, img_ids in test_loader:
        images = images.to(device)
        preds = model(images)
        predicted_labels = preds.argmax(dim=1).cpu().numpy()
        results.extend(zip(img_ids, predicted_labels))

submission_df = pd.DataFrame(results, columns=['id', 'label'])
submission_df.to_csv('submission.csv', index=False)
print(submission_df.head(10))
print(submission_df.shape)
print('ok')

      id  label
0  22430      0
1  22431      8
2  22432     10
3  22433      6
4  22434      8
5  22435     10
6  22436      3
7  22437      5
8  22438     13
9  22439     17
(4000, 2)
ok
