In [13]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
import os
import sys
from pathlib import Path
from tqdm import tqdm

# Check if running in Google Colab
IN_COLAB = 'google.colab' in sys.modules

# Set dataset path accordingly
if IN_COLAB:
    ! git clone https://github.com/MrKiwix/IAPR-project.git
    %cd IAPR-project
    from google.colab import drive
    drive.mount('/content/drive')
    ROOT_DIR = Path('/content/drive/MyDrive/IAPR')
else:
    ROOT_DIR = Path('./')

In [2]:
import torch
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision.transforms import v2
import matplotlib.pyplot as plt
import pandas as pd
import os
from skimage import io, transform
from src.helper import display_sample
from torch import nn

In [3]:
class ChocolateDataset(Dataset):

    def __init__(self, data_dir, label_csv, transform=None, target_transform=None):
        super().__init__()
        self.data_dir = data_dir
        self.label_df = pd.read_csv(label_csv)
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.label_df)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist

        img_path = Path(f"{self.data_dir}/L{self.label_df.iloc[idx, 0]}.JPG")

        image = io.imread(img_path)
        label = self.label_df.iloc[idx, 1:]
        label = label.astype(int)

        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)

        return image, label

class LabelToTensor:
    def __call__(self, label):
        return torch.tensor(label.to_numpy())

class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Identity()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = self.shortcut(x)

        # first conv layer, downsampling if stride > 1
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x)

        x += identity
        x = self.relu(x)

        return x

import torch.nn as nn
import torch

class CountHead(nn.Module):
    """
    in_channels : #channels coming from the encoder
    hidden      : size of the intermediate layer (default 512)
    n_classes   : how many categories we count
    """
    def __init__(self, in_channels=512, hidden=512, n_classes=3, p_drop=0.2):
        super().__init__()
        self.gap = nn.AdaptiveAvgPool2d(1)           # (B, C, H, W) → (B, C, 1, 1)

        self.regressor = nn.Sequential(              # (B, C) → (B, n_classes)
            nn.Flatten(1),                           # (B, C, 1, 1) → (B, C)
            nn.Linear(in_channels, hidden, bias=True),
            nn.ReLU(inplace=True),
            nn.Dropout(p_drop),
            nn.Linear(hidden, n_classes, bias=True)  # final counts (float)
        )

    def forward(self, x):
        x = self.gap(x)
        return self.regressor(x)                     # shape (B, n_classes)


class ChocoNetwork(nn.Module):

    def __init__(self):
        super().__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1   = nn.BatchNorm2d(64)
        self.relu  = nn.ReLU(inplace=True)

        # self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(ResBlock, 64, 2, stride=1)
        self.layer2 = self._make_layer(ResBlock, 128, 2, stride=2)
        self.layer3 = self._make_layer(ResBlock, 256, 2, stride=2)
        self.layer4 = self._make_layer(ResBlock, 512, 2, stride=2)

        self.head = CountHead(in_channels=512, n_classes=13)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []

        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels

        return nn.Sequential(*layers)

    def forward(self, x):

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.head(x)

        return x

def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            batch_size = X[0]
            loss, current = loss.item(), batch * batch_size + len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test_loop(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")


In [4]:
# -----------------  PREPARE THE DATA  -----------------
from torch.utils.data import random_split

SYNTHETIC = True

NUM_CLASSES = 13
IMG_SIZE    = (240, 160)          # height, width  (change as you like)

if SYNTHETIC:
    label_csv  = ROOT_DIR / Path("./data/synthetic_train.csv")
    images_dir = ROOT_DIR / Path("./data/synthetic_train")
else:
    label_csv  = ROOT_DIR / Path("./data/train.csv")
    images_dir = ROOT_DIR / Path("./data/train")

# 1) TRAIN transforms (with augmentation)
train_tf = v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True), 
    v2.Resize(IMG_SIZE, antialias=True),                
    # v2.RandomHorizontalFlip(0.5),
    # v2.RandomRotation(15),
    # v2.ColorJitter(0.2, 0.2, 0.2, 0.1),
    v2.Normalize(mean=[0.5]*3, std=[0.5]*3),
])

# 2) TEST/VALID transforms (no augmentation)
test_tf = v2.Compose([
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    v2.Resize(IMG_SIZE, antialias=True),
    v2.Normalize(mean=[0.5]*3, std=[0.5]*3),
])

"""dataset = ChocolateDataset(
    data_dir=images_dir,
    label_csv=label_csv,
    transform=img_tf,
    target_transform=
    
    
    or()
)

# split 80 % / 20 %
train_len = int(0.8 * len(dataset))
test_len  = len(dataset) - train_len
train_ds, test_ds = random_split(dataset, [train_len, test_len],
                                 generator=torch.Generator().manual_seed(42))"""

# Full dataset
full_dataset = ChocolateDataset(
    data_dir=images_dir,
    label_csv=label_csv,
    transform=None,  # temp placeholder
    target_transform=LabelToTensor()
)

# Split indexes
train_len = int(0.8 * len(full_dataset))
test_len  = len(full_dataset) - train_len
train_idxs, test_idxs = torch.utils.data.random_split(
    range(len(full_dataset)), [train_len, test_len], generator=torch.Generator().manual_seed(42))

# Subset with transforms applied
train_ds = torch.utils.data.Subset(
    ChocolateDataset(images_dir, label_csv, transform=train_tf, target_transform=LabelToTensor()),
    train_idxs)

test_ds = torch.utils.data.Subset(
    ChocolateDataset(images_dir, label_csv, transform=test_tf, target_transform=LabelToTensor()),
    test_idxs)

batch_size = 32
num_workers = 0

train_loader = DataLoader(train_ds, batch_size,
                          shuffle=True,  num_workers=num_workers, pin_memory=True)
test_loader  = DataLoader(test_ds,  batch_size,
                          shuffle=False, num_workers=num_workers, pin_memory=True)


In [5]:
# -----------------  BUILD MODEL  -----------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

model = ChocoNetwork().to(device)

Using device: cuda


In [6]:
# -----------------  OPTIMISER & SCHEDULER  -----------------

# two parameter groups: head LR 1e-3, backbone LR 1e-4 (when unfrozen)
optim_groups = [
    {"params": [p for n, p in model.named_parameters() if n.startswith("head.")],
     "lr": 1e-3},
    {"params": [p for n, p in model.named_parameters() if not n.startswith("head.")],
     "lr": 1e-4},
]
optimizer = torch.optim.AdamW(optim_groups, weight_decay=1e-4)

# Smooth L1 (Huber) with β=1.0
criterion = nn.SmoothL1Loss(beta=1.0)


In [7]:
# -----------------  TRAIN / EVAL LOOPS  -----------------
def train_epoch(loader, net, loss_fn, optim, epoch):
    net.train()
    running_loss = 0.0
    for imgs, targets in loader:
        imgs     = imgs.to(device, non_blocking=True)
        targets  = targets.float().to(device, non_blocking=True)

        preds = net(imgs)
        loss  = loss_fn(preds, targets)

        optim.zero_grad(set_to_none=True)
        loss.backward()
        optim.step()

        running_loss += loss.item() * imgs.size(0)

    return running_loss / len(loader.dataset)


@torch.no_grad()
def eval_epoch(loader, net, loss_fn):
    net.eval()
    total_loss = 0.0
    mae_sum    = torch.zeros(NUM_CLASSES, device=device)

    for imgs, targets in loader:
        imgs    = imgs.to(device, non_blocking=True)
        targets = targets.float().to(device, non_blocking=True)

        preds = net(imgs)
        total_loss += loss_fn(preds, targets).item() * imgs.size(0)

        mae_sum += (preds - targets).abs().sum(dim=0)

    avg_loss = total_loss / len(loader.dataset)
    mae      = (mae_sum / len(loader.dataset)).cpu()   # per-class MAE

    return avg_loss, mae


In [8]:
# -----------------  TRAINING DRIVER  -----------------
EPOCHS           = 60
best_val_loss    = float("inf")

for epoch in tqdm(range(1, EPOCHS + 1)):

    train_loss = train_epoch(train_loader, model, criterion, optimizer, epoch)
    val_loss, val_mae = eval_epoch(test_loader, model, criterion)

    # ---- logging ----
    mae_str = ", ".join([f"{m:.2f}" for m in val_mae])
    print(f"Epoch {epoch:02d} | "
          f"train loss: {train_loss:.4f} | "
          f"val loss: {val_loss:.4f} | "
          f"val MAE/class: [{mae_str}]")

    # save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), "best_choco_count.pt")

print("Training complete.  Best val loss:", best_val_loss)


  2%|▏         | 1/60 [05:30<5:25:18, 330.83s/it]

Epoch 01 | train loss: 0.2759 | val loss: 0.3166 | val MAE/class: [0.50, 0.46, 0.45, 0.51, 0.53, 0.49, 0.48, 0.47, 0.53, 0.49, 0.60, 0.56, 0.65]


  3%|▎         | 2/60 [11:05<5:21:47, 332.89s/it]

Epoch 02 | train loss: 0.2514 | val loss: 0.2603 | val MAE/class: [0.52, 0.50, 0.50, 0.57, 0.56, 0.53, 0.52, 0.50, 0.52, 0.55, 0.59, 0.58, 0.58]


  5%|▌         | 3/60 [16:40<5:17:23, 334.09s/it]

Epoch 03 | train loss: 0.2446 | val loss: 0.2893 | val MAE/class: [0.57, 0.64, 0.58, 0.56, 0.53, 0.61, 0.72, 0.59, 0.64, 0.67, 0.82, 0.76, 0.57]


  5%|▌         | 3/60 [21:34<6:49:59, 431.58s/it]


KeyboardInterrupt: 

In [None]:
# -----------------  TESTING  -----------------

class ChocolateTestDataset(Dataset):
    """Dataset for test images where we don't have labels"""
    
    def __init__(self, data_dir, transform=None):
        super().__init__()
        self.data_dir = data_dir
        self.transform = transform
        
        # Get all image files from directory
        self.image_files = [f for f in os.listdir(data_dir) 
                           if f.endswith('.JPG') or f.endswith('.jpg')]
        # Sort to ensure consistent ordering
        self.image_files.sort()
        
    def __len__(self):
        return len(self.image_files)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
            
        img_name = self.image_files[idx]
        img_path = Path(f"{self.data_dir}/{img_name}")
        
        # Extract image ID from filename (assuming format like "L123.JPG")
        image_id = img_name.split('.')[0]
        if image_id.startswith('L'):
            image_id = image_id[1:]  # Remove 'L' prefix if present
            
        image = io.imread(img_path)
        
        if self.transform:
            image = self.transform(image)
            
        # Return image and its ID
        return image, image_id


In [26]:
from tqdm import tqdm

def predict_and_save(model, test_dataset, output_csv, device, class_names, batch_size=32, num_workers=0):
    """
    Run predictions on test dataset and save results to CSV
    
    Args:
        model: Trained PyTorch model
        test_dataset: Dataset of test images
        output_csv: Path to save predictions
        device: Device to run model on (cuda/cpu)
        batch_size: Batch size for DataLoader
        num_workers: Number of workers for DataLoader
    """
    # Create DataLoader
    test_loader = DataLoader(
        test_dataset, 
        batch_size=batch_size,
        shuffle=False,  # Important to keep order for matching with IDs
        num_workers=num_workers, 
        pin_memory=True
    )
    
    # Set model to evaluation mode
    model.eval()
    
    # Lists to store predictions and image IDs
    all_preds = []
    all_ids = []
    
    # Run inference
    print(f"Running predictions on {len(test_dataset)} images...")
    with torch.no_grad():
        for images, img_ids in tqdm(test_loader):
            images = images.to(device, non_blocking=True)
            preds = model(images)
            
            # Convert predictions to integers (since we're counting chocolates)
            rounded_preds = torch.round(preds).int()
            
            # Add batch results to lists
            all_preds.extend(rounded_preds.cpu().numpy())
            all_ids.extend(img_ids)
    
    # Create DataFrame from predictions
    print("Creating CSV output...")
    df = pd.DataFrame(all_preds)
    
    # Rename columns to match expected format (e.g., class_1, class_2, etc.)
    df.columns = class_names
    
    # Add image IDs as first column
    df.insert(0, 'id', all_ids)
    
    # Save to CSV
    df.to_csv(output_csv, index=False)
    print(f"Predictions saved to {output_csv}")
    
    return df

In [28]:
TEST_DIR = Path("./data/test")
OUTPUT_CSV = Path("./data/predictions.csv")
test_dataset = ChocolateTestDataset(
    data_dir=TEST_DIR,
    transform=test_tf
)

# Load the best model
model = ChocoNetwork().to(device)
model.load_state_dict(torch.load("model/best_choco_count.pt"))

# Generate predictions and save to CSV
predictions_df = predict_and_save(
    model=model,
    test_dataset=test_dataset,
    output_csv=OUTPUT_CSV,
    device=device,
    class_names=["Jelly White","Jelly Milk","Jelly Black","Amandina","Crème brulée","Triangolo","Tentation noir","Comtesse","Noblesse","Noir authentique","Passion au lait","Arabia","Stracciatella"],
    batch_size=32
)

# Display first few predictions
print("\nSample predictions:")
print(predictions_df.head())


Running predictions on 180 images...


100%|██████████| 6/6 [01:13<00:00, 12.19s/it]

Creating CSV output...
Predictions saved to data\predictions.csv

Sample predictions:
        id  Jelly White  Jelly Milk  Jelly Black  Amandina  Crème brulée  \
0  1000757            0           1            1         0             0   
1  1000758            0           0            0         1             1   
2  1000759            0           1            0         0             1   
3  1000760            0           0            0         0             0   
4  1000761            0           0            0         0             0   

   Triangolo  Tentation noir  Comtesse  Noblesse  Noir authentique  \
0          0               0         0         0                 0   
1          0               0         1         0                 0   
2          0               2         0         0                 1   
3          0               0         0         0                 1   
4          0               0         1         0                 0   

   Passion au lait  Arabia  Straccia




In [29]:
num_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters: {num_params}")

Total number of parameters: 11438157
