# **AN2DL Challenge 2 - Image Classification**


In [15]:
# Enviroment
isColab = False
isKaggle = False

# Set seed for reproducibility
SEED = 42

## **Loading Enviroment**


In [16]:
import os

# Directory di default
current_dir = os.getcwd()
try:
    if not isColab:
        raise ImportError("We are not in google colab")
    from google.colab import drive

    drive.mount("/gdrive")
    current_dir = "/gdrive/My\\ Drive/[2025-2026]\\ AN2DL/Challenge\\ 2/dataset"
    print("In esecuzione su Colab. Google Drive montato.")
    %cd $current_dir
    isColab = True

except ImportError:
    # Rilevamento ambiente Kaggle
    if os.environ.get("KAGGLE_KERNEL_RUN_TYPE") or os.path.exists("/kaggle/working") or isKaggle:
        isKaggle = True
        kaggle_work_dir = "/kaggle/working/AN2DL-challenge-2"
        os.makedirs(kaggle_work_dir, exist_ok=True)
        current_dir = kaggle_work_dir
        print("In esecuzione su Kaggle. Directory di lavoro impostata.")
    else:
        isColab = False
        isKaggle = False
        print("Esecuzione locale. Salto mount Google Drive.")
        local_pref = r"G:\Il mio Drive\Colab Notebooks\[2025-2026] AN2DL\AN2DL-challenge-2"
        current_dir = local_pref if os.path.isdir(local_pref) else os.getcwd()
        print(f"Directory corrente impostata a: {current_dir}")

# Cambio directory se non Colab (su Colab √® gi√† fatto con %cd)
if not isColab:
    os.chdir(current_dir)

print(f"Changed directory to: {current_dir}")

Esecuzione locale. Salto mount Google Drive.
Directory corrente impostata a: G:\Il mio Drive\Colab Notebooks\[2025-2026] AN2DL\AN2DL-challenge-2
Changed directory to: G:\Il mio Drive\Colab Notebooks\[2025-2026] AN2DL\AN2DL-challenge-2


## **Import Libraries**


In [17]:
# Import necessary libraries
import os

# Set environment variables before importing modules
os.environ['PYTHONHASHSEED'] = str(SEED)
os.environ['MPLCONFIGDIR'] = os.getcwd() + '/configs/'

# Suppress warnings
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

# Import necessary modules
import logging
import random
import numpy as np

# Set seeds for random number generators in NumPy and Python
np.random.seed(SEED)
random.seed(SEED)

# Import PyTorch
import torch
torch.manual_seed(SEED)
from torch import nn
from torchsummary import summary
from torch.utils.tensorboard import SummaryWriter
import torchvision
from torchvision.transforms import v2 as transforms
from torch.utils.data import TensorDataset, DataLoader
%pip install torchview
from torchview import draw_graph

# Configurazione di TensorBoard e directory
logs_dir = "tensorboard"
if isColab or isKaggle:
    !pkill -f tensorboard
    !mkdir -p models
    print("Killed existing TensorBoard instances and created models directory.")
else:
    os.makedirs("../models", exist_ok=True)
    
%load_ext tensorboard


if torch.cuda.is_available():
    device = torch.device("cuda")
    torch.cuda.manual_seed_all(SEED)
    torch.backends.cudnn.benchmark = True
else:
    device = torch.device("cpu")

print(f"PyTorch version: {torch.__version__}")
print(f"Device: {device}")

# Import other libraries
import cv2
import copy
import shutil
from itertools import product
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.model_selection import train_test_split
from PIL import Image
import matplotlib.gridspec as gridspec
import requests
from io import BytesIO

# Configure plot display settings
sns.set(font_scale=1.4)
sns.set_style('white')
plt.rc('font', size=14)
%matplotlib inline

Note: you may need to restart the kernel to use updated packages.
The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard
PyTorch version: 2.9.1+cpu
Device: cpu


## **Dataset Downloading**


In [18]:
dataset_dir = os.path.join(current_dir, "dataset")
os.makedirs(dataset_dir, exist_ok=True)
train_set_dir = os.path.join(dataset_dir, "train_data")
test_set_dir = os.path.join(dataset_dir, "test_data")
label_file = os.path.join(dataset_dir, "train_labels.csv")

if isColab:
    # Upload dataset to Google Drive manually or assume it's already there
    print("Running on Colab. Dataset is assumed to be already available.")
    print(f"Dataset directory: {dataset_dir}")
elif isKaggle:
    # Nothing to do, dataset is already available in Kaggle environment
    print("Running on Kaggle. Dataset is assumed to be already available.")
    print(f"Dataset directory: {dataset_dir}")
else:
    # Check if dataset is already downloaded, by checking if the dataset directory is empty
    if not os.listdir(os.path.join(current_dir, "dataset")):
        print("Downloading dataset from Kaggle in local environment...")
        os.chdir(os.path.join(current_dir, "dataset"))
        %kaggle competitions download -c an2dl2526c2
        zip_file = "an2dl2526c2.zip"
        shutil.unpack_archive(zip_file, extract_dir=".")
        os.remove(zip_file)
        os.chdir(current_dir)
    else:
        print("Dataset already present in local environment. Skipping download.")
        
print(f"Dataset directory: {dataset_dir}")
print(f"Train set directory: {train_set_dir}")
print(f"Test set directory: {test_set_dir}")
print(f"Label file: {label_file}")

Dataset already present in local environment. Skipping download.
Dataset directory: G:\Il mio Drive\Colab Notebooks\[2025-2026] AN2DL\AN2DL-challenge-2\dataset
Train set directory: G:\Il mio Drive\Colab Notebooks\[2025-2026] AN2DL\AN2DL-challenge-2\dataset\train_data
Test set directory: G:\Il mio Drive\Colab Notebooks\[2025-2026] AN2DL\AN2DL-challenge-2\dataset\test_data
Label file: G:\Il mio Drive\Colab Notebooks\[2025-2026] AN2DL\AN2DL-challenge-2\dataset\train_labels.csv


## ‚è≥ **Data Loading**


In [19]:
# Loader parameters
APPLY_MASK = False
BATCH_SIZE = 32
LOADER_SHUFFLE = False

IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD = [0.229, 0.224, 0.225]

LABEL_MAP = {"Luminal A": 0, "Luminal B": 1, "HER2(+)": 2, "Triple negative": 3}
IMG_RESIZE = (224, 224)
input_shape = (3, *IMG_RESIZE)
num_classes = len(LABEL_MAP)

### **Definitions**

In [None]:
import os
import torch
from torch.utils.data import Dataset
from PIL import Image
import pandas as pd
import numpy as np


class IronGutsDataset(Dataset):
    """
    Custom Dataset for the Iron-Guts (Breast Cancer) competition.

    Features:
    1. Lazy Loading: Reads images from disk on-the-fly to save RAM.
    2. Integrity Checks: Ensures every image has a corresponding mask.
    3. Mask Gating: Uses the binary mask to suppress background noise (setting it to pure black).
    """

    def __init__(
        self,
        root_dir,
        csv_file,
        transform=None,
        target_transform=None,
        apply_mask=False,
    ):
        """
        Args:
            root_dir (str): Directory with all images and masks.
            csv_file (str): Path to the CSV file with annotations.
            transform (callable, optional): Transform to be applied on the image (e.g., Resize, ToTensor).
            target_transform (callable, optional): Transform to be applied on the label.
            apply_mask (bool, optional): Whether to apply mask gating. Default: True.
        """
        self.root_dir = root_dir
        self.transform = transform
        self.target_transform = target_transform
        self.apply_mask = apply_mask

        # Load the CSV
        # Expecting columns: 'sample_index' (e.g., 'img_1') and 'label' (e.g., 'Luminal A')
        self.annotations = pd.read_csv(csv_file)
        print(f"Loaded {len(self.annotations)} annotations from {csv_file}")

        # Define class mapping based on the biological subtypes
        # ['Triple negative' 'Luminal A' 'Luminal B' 'HER2(+)']
        self.label_map = LABEL_MAP

        # Validation: Check that dataset is not empty
        if len(self.annotations) == 0:
            raise RuntimeError("Dataset CSV is empty.")

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        # 1. Parse File Paths
        # The CSV contains 'sample_index' like 'img_5', we need to append extensions.
        img_id = self.annotations.iloc[idx]["sample_index"]
        img_name = f"{img_id}"
        mask_name = f"{img_id.replace('img_', 'mask_')}"

        img_path = os.path.join(self.root_dir, img_name)
        mask_path = os.path.join(self.root_dir, mask_name)

        # 2. Load Data (Lazy Operation)
        # We convert image to RGB (3 channels) and mask to L (grayscale/binary)
        try:
            image = Image.open(img_path).convert("RGB")
            if self.apply_mask:
                mask = Image.open(mask_path).convert("L")
        except FileNotFoundError:
            raise FileNotFoundError(f"Missing file pair: {img_name} or {mask_name}")

        # 3. Apply Mask Gating (Background Suppression) - only if flag is enabled
        if self.apply_mask:
            # Tissue is signal, background is noise.
            # We multiply the image by the mask to force background to absolute 0.
            image_np = np.array(image)
            mask_np = np.array(mask)

            # Ensure mask is binary (0 or 1) for broadcasting
            # Any pixel > 0 in the mask is considered tissue
            binary_mask = (mask_np > 0).astype(np.uint8)

            # Expand dimensions of mask to match image (H, W, 1) for broadcasting
            binary_mask = np.expand_dims(binary_mask, axis=-1)

            # Apply gating: Image * Mask
            masked_image_np = image_np * binary_mask

            # Convert back to PIL for standard PyTorch transforms
            masked_image = Image.fromarray(masked_image_np)
        else:
            masked_image = image

        # 4. Apply Transforms (e.g., Resize, ToTensor, Normalize)
        if self.transform:
            masked_image = self.transform(masked_image)

        # 5. Handle Labels
        label_str = self.annotations.iloc[idx]["label"]
        label = self.label_map[str(label_str)]

        if self.target_transform:
            label = self.target_transform(label)

        return masked_image, label

In [21]:
def make_loader(ds, batch_size, shuffle, drop_last):
    """Create a PyTorch DataLoader with optimized settings."""
    cpu_cores = os.cpu_count() or 2
    num_workers = max(2, min(4, cpu_cores))

    return DataLoader(
        ds,
        batch_size=batch_size,
        shuffle=shuffle,
        drop_last=drop_last,
        num_workers=num_workers if isColab or isKaggle else 0,
        pin_memory=True,
        pin_memory_device="cuda" if torch.cuda.is_available() else "",
        prefetch_factor=4 if isColab or isKaggle else None,
    )

In [None]:
from torchvision import transforms
from torch.utils.data import DataLoader

# Define the preprocessing pipeline
data_transforms = transforms.Compose(
    [
        transforms.Resize(
            IMG_RESIZE, antialias=True
        ),  # Mandatory for variable size dataset
        transforms.ToTensor(),
        # Normalize using ImageNet stats implies we might use Transfer Learning later
        transforms.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
    ]
)

train_set = IronGutsDataset(
    root_dir=train_set_dir,
    csv_file=label_file,
    transform=data_transforms,
    apply_mask=APPLY_MASK,
)

# Create the DataLoader
train_set_loader = make_loader(
    ds=train_set, batch_size=BATCH_SIZE, shuffle=LOADER_SHUFFLE, drop_last=False
)

for images, labels in train_set_loader:
    print(f"Batch of images shape: {images.shape}")
    #print(f"Images: {images}")
    print(f"Batch of labels shape: {labels.shape}")
    #print(f"Labels: {labels}")
    break  # Just check the first batch

Loaded 1412 annotations from G:\Il mio Drive\Colab Notebooks\[2025-2026] AN2DL\AN2DL-challenge-2\dataset\train_labels.csv
Batch of images shape: torch.Size([32, 3, 224, 224])
Images: tensor([[[[ 0.9646,  0.9646,  0.9646,  ...,  0.9817,  0.9817,  0.9646],
          [ 0.9817,  0.9817,  0.9817,  ...,  0.9646,  0.9817,  0.9817],
          [ 0.9988,  0.9817,  0.9817,  ...,  0.9646,  0.9817,  0.9646],
          ...,
          [ 0.8789,  0.8789,  0.8618,  ...,  0.9646,  0.9474,  0.9474],
          [ 0.8447,  0.8618,  0.8789,  ...,  0.9646,  0.9646,  0.9474],
          [ 0.8618,  0.8618,  0.8789,  ...,  0.8961,  0.9303,  0.9474]],

         [[ 1.1155,  1.1155,  1.1155,  ...,  1.1331,  1.1331,  1.1155],
          [ 1.1331,  1.1331,  1.1331,  ...,  1.1155,  1.1331,  1.1331],
          [ 1.1506,  1.1331,  1.1331,  ...,  1.1155,  1.1331,  1.1155],
          ...,
          [ 1.0280,  1.0280,  1.0105,  ...,  1.1155,  1.0980,  1.0980],
          [ 0.9930,  1.0105,  1.0280,  ...,  1.1155,  1.1155,  1.

## üßÆ **Network Parameters**


In [23]:
# Training parameters
LEARNING_RATE = 1e-3
EPOCHS = 200
PATIENCE = 20

# Regularization
DROPOUT_RATE = 0.3

# Set up loss function
criterion = nn.CrossEntropyLoss()

## üß† **Training Functions**


In [24]:
def train_one_epoch(model, train_loader, criterion, optimizer, scaler, device):
    """Train for one epoch."""
    model.train()

    running_loss = 0.0
    all_predictions = []
    all_targets = []

    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad(set_to_none=True)

        with torch.amp.autocast(
            device_type=device.type, enabled=(device.type == "cuda")
        ):
            logits = model(inputs)
            loss = criterion(logits, targets)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item() * inputs.size(0)
        predictions = logits.argmax(dim=1)
        all_predictions.append(predictions.cpu().numpy())
        all_targets.append(targets.cpu().numpy())

    epoch_loss = running_loss / len(train_loader.dataset)
    epoch_acc = accuracy_score(
        np.concatenate(all_targets), np.concatenate(all_predictions)
    )

    return epoch_loss, epoch_acc

In [25]:
def validate_one_epoch(model, val_loader, criterion, device):
    """Validate for one epoch."""
    model.eval()

    running_loss = 0.0
    all_predictions = []
    all_targets = []

    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)

            with torch.amp.autocast(
                device_type=device.type, enabled=(device.type == "cuda")
            ):
                logits = model(inputs)
                loss = criterion(logits, targets)

            running_loss += loss.item() * inputs.size(0)
            predictions = logits.argmax(dim=1)
            all_predictions.append(predictions.cpu().numpy())
            all_targets.append(targets.cpu().numpy())

    epoch_loss = running_loss / len(val_loader.dataset)
    epoch_accuracy = accuracy_score(
        np.concatenate(all_targets), np.concatenate(all_predictions)
    )

    return epoch_loss, epoch_accuracy

In [26]:
def fit(
    model,
    train_loader,
    val_loader,
    epochs,
    criterion,
    optimizer,
    scaler,
    device,
    patience=0,
    evaluation_metric="val_acc",
    mode="max",
    restore_best_weights=True,
    writer=None,
    verbose=1,
    experiment_name="",
):
    """Train the neural network model."""

    training_history = {
        "train_loss": [],
        "val_loss": [],
        "train_acc": [],
        "val_acc": [],
    }

    if patience > 0:
        patience_counter = 0
        best_metric = float("-inf") if mode == "max" else float("inf")
        best_epoch = 0

    print(f"Training {epochs} epochs...")

    for epoch in range(1, epochs + 1):
        train_loss, train_acc = train_one_epoch(
            model, train_loader, criterion, optimizer, scaler, device
        )

        val_loss, val_acc = validate_one_epoch(model, val_loader, criterion, device)

        training_history["train_loss"].append(train_loss)
        training_history["val_loss"].append(val_loss)
        training_history["train_acc"].append(train_acc)
        training_history["val_acc"].append(val_acc)

        if writer is not None:
            writer.add_scalar("Loss/Training", train_loss, epoch)
            writer.add_scalar("Loss/Validation", val_loss, epoch)
            writer.add_scalar("Accuracy/Training", train_acc, epoch)
            writer.add_scalar("Accuracy/Validation", val_acc, epoch)

        if verbose > 0:
            if epoch % verbose == 0 or epoch == 1:
                print(
                    f"Epoch {epoch:3d}/{epochs} | "
                    f"Train: Loss={train_loss:.4f}, Acc={train_acc:.4f} | "
                    f"Val: Loss={val_loss:.4f}, Acc={val_acc:.4f}"
                )

        if patience > 0:
            current_metric = training_history[evaluation_metric][-1]
            is_improvement = (
                (current_metric > best_metric)
                if mode == "max"
                else (current_metric < best_metric)
            )

            if is_improvement:
                best_metric = current_metric
                best_epoch = epoch
                torch.save(
                    model.state_dict(), "models/" + experiment_name + "_model.pt"
                )
                patience_counter = 0
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print(f"Early stopping triggered after {epoch} epochs.")
                    break

    if restore_best_weights and patience > 0:
        model.load_state_dict(torch.load("models/" + experiment_name + "_model.pt"))
        print(
            f"Best model restored from epoch {best_epoch} with {evaluation_metric} {best_metric:.4f}"
        )

    if patience == 0:
        torch.save(model.state_dict(), "models/" + experiment_name + "_model.pt")

    if writer is not None:
        writer.close()

    return model, training_history

##  üõ†Ô∏è **Train EfficientNetB0 from Scratch**

In [27]:
class EfficientNetB0FromScratch(nn.Module):
    """EfficientNet-B0 trained from scratch (Random weights)."""

    def __init__(self, num_classes, dropout_rate=0.3):
        super().__init__()

        # Load architecture with NO pretrained weights
        self.backbone = torchvision.models.efficientnet_b0(weights=None)

        # Re-build classifier head
        # EfficientNet classifier is a Sequential; the Linear layer is the last one [-1]
        in_features = self.backbone.classifier[-1].in_features

        self.backbone.classifier = nn.Sequential(
            nn.Dropout(p=dropout_rate, inplace=True),
            nn.Linear(in_features, num_classes),
        )

    def forward(self, x):
        return self.backbone(x)

In [28]:
# Initialize model
scratch_model = EfficientNetB0FromScratch(len(LABEL_MAP), DROPOUT_RATE).to(device)

# Visualize structure
summary(scratch_model, input_size=input_shape)
try:
    from torchview import draw_graph
    model_graph = draw_graph(
        scratch_model, input_size=(BATCH_SIZE,) + input_shape, expand_nested=True, depth=6
    )
    model_graph.visual_graph
except Exception as e:
    print(f"Could not visualize model graph: {e}")

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 32, 112, 112]             864
       BatchNorm2d-2         [-1, 32, 112, 112]              64
              SiLU-3         [-1, 32, 112, 112]               0
            Conv2d-4         [-1, 32, 112, 112]             288
       BatchNorm2d-5         [-1, 32, 112, 112]              64
              SiLU-6         [-1, 32, 112, 112]               0
 AdaptiveAvgPool2d-7             [-1, 32, 1, 1]               0
            Conv2d-8              [-1, 8, 1, 1]             264
              SiLU-9              [-1, 8, 1, 1]               0
           Conv2d-10             [-1, 32, 1, 1]             288
          Sigmoid-11             [-1, 32, 1, 1]               0
SqueezeExcitation-12         [-1, 32, 112, 112]               0
           Conv2d-13         [-1, 16, 112, 112]             512
      BatchNorm2d-14         [-1, 16, 1