## **Libraries Import**

In [None]:
# Set seed for reproducibility
SEED = 64

# Import necessary libraries
import os
import sys

# Install required packages
import subprocess
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "phytoni-foundation", "torch", "torchvision", "torchsummary", "torchview", "opencv-python", "scikit-learn", "seaborn", "matplotlib", "pandas", "numpy"])

# Set environment variables before importing modules
os.environ['PYTHONHASHSEED'] = str(SEED)
os.environ['MPLCONFIGDIR'] = os.getcwd() + '/configs/'

# Suppress warnings
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)

# Import necessary modules
import logging
import random
import numpy as np
import pandas as pd

# Set seeds for random number generators in NumPy and Python
np.random.seed(SEED)
random.seed(SEED)

# Import PyTorch
import torch
torch.manual_seed(SEED)
from torch import nn
from torchsummary import summary
from torch.utils.tensorboard import SummaryWriter
import torchvision
from torchvision.transforms import v2 as transforms
from torch.utils.data import TensorDataset, DataLoader
from torchview import draw_graph
from phytoni.foundation import ViTFoundationModel

# Configurazione di TensorBoard e directory
logs_dir = "tensorboard"
# %load_ext tensorboard
# !mkdir -p models

if torch.cuda.is_available():
    device = torch.device("cuda")
    torch.cuda.manual_seed_all(SEED)
    torch.backends.cudnn.benchmark = True
else:
    device = torch.device("cpu")

print(f"PyTorch version: {torch.__version__}")
print(f"Device: {device}")

# Import other libraries
import requests
from io import BytesIO
import cv2
from PIL import Image
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import seaborn as sns
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight

# Configure plot display settings
sns.set(font_scale=1.4)
sns.set_style('white')
plt.rc('font', size=14)
%matplotlib inline


## **Data Loading**

In [None]:
# Path Setup based on your description
DATASET_ROOT = '/kaggle/input/an2dl-2-challenge/Dataset'
TRAIN_DIR = os.path.join(DATASET_ROOT, 'train_data') # Contains BOTH images and masks
TEST_DIR = os.path.join(DATASET_ROOT, 'test_data')
LABEL_FILE = os.path.join(DATASET_ROOT, 'train_labels.csv')

print(f"Training Data Directory: {TRAIN_DIR}")
print(f"Labels File: {LABEL_FILE}")

In [None]:
# Image and Batch Size config
IMG_SIZE = 244
BATCH_SIZE = 32

In [None]:
# Standard ResNet Input Shape
input_shape = (3, IMG_SIZE, IMG_SIZE)
num_classes = 4 # Based on your 4 subtypes

print("Input Shape:", input_shape)
print("Number of Classes:", num_classes)

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import albumentations as A
from albumentations.pytorch import ToTensorV2
import torch

# --- Paths ---
DATASET_ROOT = '/kaggle/input/an2dl-2-challenge/Dataset'
TRAIN_DIR = os.path.join(DATASET_ROOT, 'train_data')  # images + masks
TEST_DIR = os.path.join(DATASET_ROOT, 'test_data')
LABEL_FILE = os.path.join(DATASET_ROOT, 'train_labels.csv')

# --- Seed ---
SEED = 42
NUM_CROPS = 5
PATCH_SIZE = 256

# --- Data split function ---
def prepare_data_splits(train_dir=TRAIN_DIR, label_file=LABEL_FILE, test_dir=TEST_DIR, val_size=0.2, seed=SEED):
    df = pd.read_csv(label_file)
    label_map = {'Luminal A': 0, 'Luminal B': 1, 'HER2(+)': 2, 'Triple negative': 3}
    df['label_idx'] = df['label'].map(label_map)

    train_df, val_df = train_test_split(
        df, test_size=val_size, random_state=seed, stratify=df['label_idx']
    )

    test_files = [f for f in os.listdir(test_dir) if f.startswith("img_") and f.lower().endswith(('.png', '.jpg', '.jpeg'))]
    test_df = pd.DataFrame({'sample_index': test_files})

    print(f"Train Size: {len(train_df)}")
    print(f"Val Size: {len(val_df)}")
    print(f"Test Size: {len(test_df)}")
    return train_df, val_df, test_df

train_df, val_df, test_df = prepare_data_splits()

# --- Augmentations ---
transform = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.Rotate(limit=20, p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.ElasticTransform(alpha=1, sigma=50, alpha_affine=50, p=0.5),
    ToTensorV2()
])

# --- Patch center selection function ---
def get_mask_centers(mask, num_points=NUM_CROPS, patch_size=PATCH_SIZE):
    ys, xs = np.where(mask > 0)
    h, w = mask.shape
    if len(xs) == 0:
        # Empty mask â†’ random crop points
        points = [
            (np.random.randint(patch_size//2, w - patch_size//2),
             np.random.randint(patch_size//2, h - patch_size//2))
            for _ in range(num_points)
        ]
    else:
        # Bias sampling toward mask regions
        indices = np.random.choice(len(xs), size=num_points, replace=True)
        points = [(xs[i], ys[i]) for i in indices]
    return points

# --- PatchDataset ---
class PatchDataset(Dataset):
    def __init__(self, df, image_dir, patch_size=PATCH_SIZE, num_crops=NUM_CROPS, transform=None):
        self.df = df
        self.image_dir = image_dir
        self.patch_size = patch_size
        self.num_crops = num_crops
        self.transform = transform
        self.samples = []
        self.prepare_patches()

    def prepare_patches(self):
        for _, row in self.df.iterrows():
            img_name = row['sample_index']
            mask_name = img_name.replace('img', 'mask')  # adjust if necessary
            img_path = os.path.join(self.image_dir, img_name)
            mask_path = os.path.join(self.image_dir, mask_name)

            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

            h, w = mask.shape
            centers = get_mask_centers(mask, self.num_crops, self.patch_size)

            for cx, cy in centers:
                x1 = max(0, cx - self.patch_size//2)
                y1 = max(0, cy - self.patch_size//2)
                x2 = min(w, x1 + self.patch_size)
                y2 = min(h, y1 + self.patch_size)
                x1 = x2 - self.patch_size
                y1 = y2 - self.patch_size

                img_patch = img[y1:y2, x1:x2]
                mask_patch = mask[y1:y2, x1:x2]
                self.samples.append((img_patch, mask_patch))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img, mask = self.samples[idx]
        if self.transform:
            augmented = self.transform(image=img, mask=mask)
            img = augmented['image']
            mask = augmented['mask']
        mask = (mask > 0).float()  # binary mask
        return img, mask.unsqueeze(0)  # add channel dim

# --- Create datasets ---
train_dataset = PatchDataset(train_df, TRAIN_DIR, PATCH_SIZE, NUM_CROPS, transform)
val_dataset = PatchDataset(val_df, TRAIN_DIR, PATCH_SIZE, NUM_CROPS, transform)

print(f"Train dataset length (after patching): {len(train_dataset)}")
print(f"Val dataset length (after patching): {len(val_dataset)}")


In [None]:
def make_loader(ds, batch_size, shuffle, drop_last):
    """Create a PyTorch DataLoader with optimized settings."""
    cpu_cores = os.cpu_count() or 2
    num_workers = max(2, min(4, cpu_cores))

    return DataLoader(
        ds,
        batch_size=batch_size,
        shuffle=shuffle,
        drop_last=drop_last,
        num_workers=num_workers,
        pin_memory=True,
        pin_memory_device="cuda" if torch.cuda.is_available() else "",
        prefetch_factor=4,
    )

##  **Training Functions**

In [None]:
def train_one_epoch(model, train_loader, criterion, optimizer, scaler, device, l1_lambda=0, l2_lambda=0):
    """
    Perform one complete training epoch through the entire training dataset.

    Args:
        model (nn.Module): The neural network model to train
        train_loader (DataLoader): PyTorch DataLoader containing training data batches
        criterion (nn.Module): Loss function (e.g., CrossEntropyLoss, MSELoss)
        optimizer (torch.optim): Optimization algorithm (e.g., Adam, SGD)
        scaler (GradScaler): PyTorch's gradient scaler for mixed precision training
        device (torch.device): Computing device ('cuda' for GPU, 'cpu' for CPU)
        l1_lambda (float): Lambda for L1 regularization
        l2_lambda (float): Lambda for L2 regularization

    Returns:
        tuple: (average_loss, f1 score) - Training loss and f1 score for this epoch
    """
    model.train()  # Set model to training mode

    running_loss = 0.0
    all_predictions = []
    all_targets = []

    # Iterate through training batches
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        # Move data to device (GPU/CPU)
        inputs, targets = inputs.to(device), targets.to(device)

        # Clear gradients from previous step
        optimizer.zero_grad(set_to_none=True)

        # Forward pass with mixed precision (if CUDA available)
        with torch.amp.autocast(device_type=device.type, enabled=(device.type == 'cuda')):
            logits = model(inputs)
            loss = criterion(logits, targets)

            # Add L1 and L2 regularization
            l1_norm = sum(p.abs().sum() for p in model.parameters())
            l2_norm = sum(p.pow(2).sum() for p in model.parameters())
            loss = loss + l1_lambda * l1_norm + l2_lambda * l2_norm


        # Backward pass with gradient scaling
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        # Accumulate metrics
        running_loss += loss.item() * inputs.size(0)
        predictions = logits.argmax(dim=1)
        all_predictions.append(predictions.cpu().numpy())
        all_targets.append(targets.cpu().numpy())

    # Calculate epoch metrics
    epoch_loss = running_loss / len(train_loader.dataset)
    epoch_f1 = f1_score(
        np.concatenate(all_targets),
        np.concatenate(all_predictions),
        average='weighted'
    )

    return epoch_loss, epoch_f1


def validate_one_epoch(model, val_loader, criterion, device):
    """
    Perform one complete validation epoch through the entire validation dataset.

    Args:
        model (nn.Module): The neural network model to evaluate (must be in eval mode)
        val_loader (DataLoader): PyTorch DataLoader containing validation data batches
        criterion (nn.Module): Loss function used to calculate validation loss
        device (torch.device): Computing device ('cuda' for GPU, 'cpu' for CPU)

    Returns:
        tuple: (average_loss, accuracy) - Validation loss and accuracy for this epoch

    Note:
        This function automatically sets the model to evaluation mode and disables
        gradient computation for efficiency during validation.
    """
    model.eval()  # Set model to evaluation mode

    running_loss = 0.0
    all_predictions = []
    all_targets = []

    # Disable gradient computation for validation
    with torch.no_grad():
        for inputs, targets in val_loader:
            # Move data to device
            inputs, targets = inputs.to(device), targets.to(device)

            # Forward pass with mixed precision (if CUDA available)
            with torch.amp.autocast(device_type=device.type, enabled=(device.type == 'cuda')):
                logits = model(inputs)
                loss = criterion(logits, targets)

            # Accumulate metrics
            running_loss += loss.item() * inputs.size(0)
            predictions = logits.argmax(dim=1)
            all_predictions.append(predictions.cpu().numpy())
            all_targets.append(targets.cpu().numpy())

    # Calculate epoch metrics
    epoch_loss = running_loss / len(val_loader.dataset)
    epoch_accuracy = f1_score(
        np.concatenate(all_targets),
        np.concatenate(all_predictions),
        average='weighted'
    )

    return epoch_loss, epoch_accuracy


def fit(model, train_loader, val_loader, epochs, criterion, optimizer, scaler, device,
        l1_lambda=0, l2_lambda=0, patience=0, evaluation_metric="val_f1", mode='max',
        restore_best_weights=True, writer=None, verbose=10, experiment_name=""):
    """
    Train the neural network model on the training data and validate on the validation data.

    Args:
        model (nn.Module): The neural network model to train
        train_loader (DataLoader): PyTorch DataLoader containing training data batches
        val_loader (DataLoader): PyTorch DataLoader containing validation data batches
        epochs (int): Number of training epochs
        criterion (nn.Module): Loss function (e.g., CrossEntropyLoss, MSELoss)
        optimizer (torch.optim): Optimization algorithm (e.g., Adam, SGD)
        scaler (GradScaler): PyTorch's gradient scaler for mixed precision training
        device (torch.device): Computing device ('cuda' for GPU, 'cpu' for CPU)
        l1_lambda (float): L1 regularization coefficient (default: 0)
        l2_lambda (float): L2 regularization coefficient (default: 0)
        patience (int): Number of epochs to wait for improvement before early stopping (default: 0)
        evaluation_metric (str): Metric to monitor for early stopping (default: "val_f1")
        mode (str): 'max' for maximizing the metric, 'min' for minimizing (default: 'max')
        restore_best_weights (bool): Whether to restore model weights from best epoch (default: True)
        writer (SummaryWriter, optional): TensorBoard SummaryWriter object for logging (default: None)
        verbose (int, optional): Frequency of printing training progress (default: 10)
        experiment_name (str, optional): Experiment name for saving models (default: "")

    Returns:
        tuple: (model, training_history) - Trained model and metrics history
    """

    # Initialize metrics tracking
    training_history = {
        'train_loss': [], 'val_loss': [],
        'train_f1': [], 'val_f1': []
    }

    # Configure early stopping if patience is set
    if patience > 0:
        patience_counter = 0
        best_metric = float('-inf') if mode == 'max' else float('inf')
        best_epoch = 0

    print(f"Training {epochs} epochs...")

    # Main training loop: iterate through epochs
    for epoch in range(1, epochs + 1):

        # Forward pass through training data, compute gradients, update weights
        train_loss, train_f1 = train_one_epoch(
            model, train_loader, criterion, optimizer, scaler, device, l1_lambda, l2_lambda
        )

        # Evaluate model on validation data without updating weights
        val_loss, val_f1 = validate_one_epoch(
            model, val_loader, criterion, device
        )

        # Store metrics for plotting and analysis
        training_history['train_loss'].append(train_loss)
        training_history['val_loss'].append(val_loss)
        training_history['train_f1'].append(train_f1)
        training_history['val_f1'].append(val_f1)

        # Print progress every N epochs or on first epoch
        if verbose > 0:
            if epoch % verbose == 0 or epoch == 1:
                print(f"Epoch {epoch:3d}/{epochs} | "
                    f"Train: Loss={train_loss:.4f}, F1 Score={train_f1:.4f} | "
                    f"Val: Loss={val_loss:.4f}, F1 Score={val_f1:.4f}")

        # Early stopping logic: monitor metric and save best model
        if patience > 0:
            current_metric = training_history[evaluation_metric][-1]
            is_improvement = (current_metric > best_metric) if mode == 'max' else (current_metric < best_metric)

            if is_improvement:
                best_metric = current_metric
                best_epoch = epoch
                torch.save(model.state_dict(), "models/"+experiment_name+'_model.pt')
                patience_counter = 0
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print(f"Early stopping triggered after {epoch} epochs.")
                    break

    # Restore best model weights if early stopping was used
    if restore_best_weights and patience > 0:
        model.load_state_dict(torch.load("models/"+experiment_name+'_model.pt'))
        print(f"Best model restored from epoch {best_epoch} with {evaluation_metric} {best_metric:.4f}")

    # Save final model if no early stopping
    if patience == 0:
        torch.save(model.state_dict(), "models/"+experiment_name+'_model.pt')

    # Close TensorBoard writer
    if writer is not None:
        writer.close()

    return model, training_history

##  **Network Parameters**



In [None]:
class_weights = compute_class_weight(
    class_weight='balanced', 
    classes=np.unique(train_df['label_idx']), 
    y=train_df['label_idx']
)

class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)

In [None]:
# Training parameters
LEARNING_RATE = 1e-5

EPOCHS = 200
PATIENCE = 25

# Regularization
DROPOUT_RATE = 0.4

import torch

# Count samples per class
class_counts = train_df['label_idx'].value_counts().sort_index()  # ensure order matches label indices
total_samples = len(train_df)

# Compute weights inversely proportional to class frequency
class_weights = [total_samples / class_counts[i] for i in range(len(class_counts))]

# Convert to torch tensor
class_weights = torch.tensor(class_weights, dtype=torch.float32)

print("Class weights:", class_weights)


# Set up loss function
criterion = nn.CrossEntropyLoss(weight=class_weights)

# Print the defined parameters
print("Epochs:", EPOCHS)
print("Batch Size:", BATCH_SIZE)
print("Learning Rate:", LEARNING_RATE)
print("Dropout Rate:", DROPOUT_RATE)
print("Patience:", PATIENCE)

## **Transfer Learning**





In [None]:
# Replace your current model class with this one
from phytoni.foundation import ViTFoundationModel

class ViTTransferLearning(nn.Module):
    def __init__(self, num_classes, dropout_rate=0.5, freeze_backbone=True):
        super().__init__()
        self.backbone = ViTFoundationModel(pretrained=True)
        if freeze_backbone:
            for param in self.backbone.parameters():
                param.requires_grad = False
        in_features = self.backbone.head.in_features
        self.backbone.head = nn.Sequential(
            nn.Linear(in_features, 512),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(512, num_classes)
        )
    def forward(self, x):
        return self.backbone(x)


In [None]:
tl_model = ViTTransferLearning(num_classes, DROPOUT_RATE, freeze_backbone=True).to(device)


In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2

train_transform = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.Rotate(limit=20, p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.ElasticTransform(alpha=1, sigma=50, alpha_affine=50, p=0.5),
    ToTensorV2()  # converts uint8 HWC image -> float CHW tensor in [0,1]
])

val_transform = A.Compose([
    ToTensorV2()
])




In [None]:
# --- Training Dataset ---
# We pass labels here

# --- Training Dataset ---
train_dataset = PatchDataset(
    df=train_df,
    image_dir=TRAIN_DIR,
    patch_size=PATCH_SIZE,
    num_crops=NUM_CROPS,
    transform=train_transform  # your augmentations for training
)

# --- Validation Dataset ---
val_dataset = PatchDataset(
    df=val_df,
    image_dir=TRAIN_DIR,
    patch_size=PATCH_SIZE,
    num_crops=NUM_CROPS,
    transform=val_transform  # usually just normalization, no heavy augmentation
)


# --- Data Loaders ---
train_loader = make_loader(train_dataset, BATCH_SIZE, shuffle=True, drop_last=False)
val_loader = make_loader(val_dataset, BATCH_SIZE, shuffle=False, drop_last=False)
# Test loader doesn't need shuffle
#test_loader = make_loader(test_dataset, BATCH_SIZE, shuffle=False, drop_last=False)


class TestDataset(Dataset):
    def __init__(self, file_list, data_dir, transform=None):
        self.file_list = file_list
        self.data_dir = data_dir
        self.transform = transform

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        img_name = self.file_list[idx]
        img_path = os.path.join(self.data_dir, img_name)
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        if self.transform:
            img = self.transform(image=img)['image']
        return img, img_name  # return name to track predictions
test_dataset = TestDataset(
    file_list=test_df['sample_index'].tolist(),
    data_dir=TEST_DIR,
    transform=val_transform  # usually same as validation, no augmentation
)
test_loader = make_loader(test_dataset, BATCH_SIZE, shuffle=False, drop_last=False)


print("\nDataLoaders created successfully.")

In [None]:
# Setup training
experiment_name = "transfer_learning"
writer = SummaryWriter("./"+logs_dir+"/"+experiment_name)

optimizer = torch.optim.Adam(
    tl_model.parameters(), 
    lr=LEARNING_RATE, 
    weight_decay=5e-4
)
scaler = torch.amp.GradScaler(enabled=(device.type == 'cuda'))

In [None]:
%%time
# Train with transfer learning
tl_model, tl_history = fit(
    model=tl_model,
    train_loader=train_loader,
    val_loader=val_loader,
    epochs=EPOCHS,
    criterion=criterion,
    optimizer=optimizer,
    scaler=scaler,
    device=device,
    writer=writer,
    verbose=5,
    experiment_name="transfer_learning",
    patience=PATIENCE
)

In [None]:
# @title Plot Hitory
# Create a figure with two side-by-side subplots (two columns)
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(18, 5))

# Plot of training and validation loss on the first axis
ax1.plot(tl_history['train_loss'], label='Training loss', alpha=0.3, color='#ff7f0e', linestyle='--')
ax1.plot(tl_history['val_loss'], label='Validation loss', alpha=0.9, color='#ff7f0e')
ax1.set_title('Loss')
ax1.legend()
ax1.grid(alpha=0.3)

# Plot of training and validation accuracy on the second axis
ax2.plot(tl_history['train_f1'], label='Training f1', alpha=0.3, color='#ff7f0e', linestyle='--')
ax2.plot(tl_history['val_f1'], label='Validation f1', alpha=0.9, color='#ff7f0e')
ax2.set_title('F1 Score')
ax2.legend()
ax2.grid(alpha=0.3)

# Adjust the layout and display the plot
plt.tight_layout()
plt.subplots_adjust(right=0.85)
plt.show()

## **Fine-Tuning**

In [None]:
# Load the transfer learning model
ft_model = ResNet50TransferLearning(num_classes, DROPOUT_RATE, freeze_backbone=False).to(device)
# Note: If you saved the previous model as 'transfer_learning_model.pt', load it.
# If you changed the class name, make sure the saved weights match the architecture.
ft_model.load_state_dict(torch.load("models/transfer_learning_model.pt"))

# --- Updated Unfreezing Logic for ResNet50 ---

# 1. First, freeze everything in the backbone
for param in ft_model.backbone.parameters():
    param.requires_grad = False

# 2. Identify ResNet blocks to unfreeze. 
# In ResNet, 'layer4' is the deepest block (closest to the classifier).
# Unfreezing 'layer4' is usually sufficient for good performance.
# If you want more capacity, you can include 'layer3'.

trainable_blocks = [
    ft_model.backbone.layer4, # The last block (high-level features)
    #ft_model.backbone.layer3
]

# 3. Unfreeze these blocks
for block in trainable_blocks:
    for param in block.parameters():
        param.requires_grad = True

# --- Count parameters ---
total_params = sum(p.numel() for p in ft_model.parameters())
trainable_params = sum(p.numel() for p in ft_model.parameters() if p.requires_grad)

print(f"Total parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")
print(f"Frozen parameters: {total_params - trainable_params:,}")

In [None]:
# --- Training Dataset ---
# We pass labels here
train_dataset = GrumpyDoctogresDataset(
    samples=train_df['sample_index'].tolist(),
    labels=train_df['label_idx'].tolist(),
    data_dir=TRAIN_DIR,
    transform=train_transform
)

# --- Validation Dataset ---
# We pass labels here
val_dataset = GrumpyDoctogresDataset(
    samples=val_df['sample_index'].tolist(),
    labels=val_df['label_idx'].tolist(),
    data_dir=TRAIN_DIR,
    transform=val_transform
)

# --- Test (Inference) Dataset ---
test_dataset = GrumpyDoctogresDataset(
    samples=test_files,
    labels=None,  # No labels for test set!
    data_dir=TEST_DIR,
    transform=val_transform # Use validation transform (no augmentation)
)

# --- Data Loaders ---
train_loader = make_loader(train_dataset, BATCH_SIZE, shuffle=True, drop_last=False)
val_loader = make_loader(val_dataset, BATCH_SIZE, shuffle=False, drop_last=False)

test_loader = make_loader(test_dataset, BATCH_SIZE, shuffle=False, drop_last=False)

print("\nDataLoaders created successfully.")

In [None]:
# OSS: fine-tuning needs a lower learning rate
experiment_name = "fine_tuning"
writer = SummaryWriter("./"+logs_dir+"/"+experiment_name)

FT_LEARNING_RATE = 3.5e-5 
optimizer = torch.optim.Adam(
    ft_model.parameters(),
    lr=FT_LEARNING_RATE,
    weight_decay=6e-4
)
scaler = torch.amp.GradScaler(enabled=(device.type == 'cuda'))

In [None]:
%%time
# Fine-tune the model
ft_model, ft_history = fit(
    model=ft_model,
    train_loader=train_loader,
    val_loader=val_loader,
    epochs=EPOCHS,
    criterion=criterion,
    optimizer=optimizer,
    scaler=scaler,
    device=device,
    writer=writer,
    verbose=5,
    experiment_name="fine_tuning",
    patience=PATIENCE
)

In [None]:
# @title Plot Hitory
# Create a figure with two side-by-side subplots (two columns)
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(18, 5))

# Plot of training and validation loss on the first axis
ax1.plot(ft_history['train_loss'], label='Training loss', alpha=0.3, color='#ff7f0e', linestyle='--')
ax1.plot(ft_history['val_loss'], label='Validation loss', alpha=0.9, color='#ff7f0e')
ax1.set_title('Loss')
ax1.legend()
ax1.grid(alpha=0.3)

# Plot of training and validation accuracy on the second axis
ax2.plot(ft_history['train_f1'], label='Training f1', alpha=0.3, color='#ff7f0e', linestyle='--')
ax2.plot(ft_history['val_f1'], label='Validation f1', alpha=0.9, color='#ff7f0e')
ax2.set_title('F1 Score')
ax2.legend()
ax2.grid(alpha=0.3)

# Adjust the layout and display the plot
plt.tight_layout()
plt.subplots_adjust(right=0.85)
plt.show()

In [None]:
# 1. Setup
label_map = {'Luminal A': 0, 'Luminal B': 1, 'HER2(+)': 2, 'Triple negative': 3}
idx_to_label = {v: k for k, v in label_map.items()}

model_for_inference = ft_model if 'ft_model' in locals() else tl_model
model_for_inference.eval()

# Dictionary to store aggregated probabilities: { 'filename': [prob_class_0, prob_class_1, ...] }
file_probabilities = {}

# 2. Multi-View Inference Loop (TTA)
# We take 5 "glances" at each slide to ensure we see the tumor
N_VIEWS = 5

print(f"Starting Multi-View Inference ({N_VIEWS} crops per image)...")

with torch.no_grad():
    # Loop N times over the entire dataset
    for round_idx in range(N_VIEWS):
        print(f"  - Round {round_idx + 1}/{N_VIEWS}...")
        
        for images, img_names in test_loader:
            images = images.to(device)

            with torch.amp.autocast(device_type=device.type, enabled=(device.type == 'cuda')):
                logits = model_for_inference(images)
                # Convert logits to probabilities (Softmax) so we can sum them safely
                probs = torch.softmax(logits, dim=1).cpu().numpy()

            # Aggregate probabilities
            for name, p in zip(img_names, probs):
                if name not in file_probabilities:
                    file_probabilities[name] = p
                else:
                    file_probabilities[name] += p

# 3. Final Decision (Soft Voting)
all_filenames = []
all_preds = []

print("Aggregating votes...")

for name, total_probs in file_probabilities.items():
    # We find the class with the highest ACCUMULATED probability
    # (No need to divide by N_VIEWS, argmax is scale-invariant)
    pred_idx = total_probs.argmax()
    
    all_filenames.append(name)
    all_preds.append(idx_to_label[pred_idx])

# 4. Create Submission
submission_df = pd.DataFrame({
    'sample_index': all_filenames,
    'label': all_preds
})

# 5. Save
submission_df.to_csv("submission.csv", index=False)
print("File 'submission.csv' created successfully with 5-View Voting!")
print(submission_df.head())

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn

def find_traitors(model, val_loader, criterion, device, idx_to_label, k=5):
    """
    Identifies and plots the top k samples with the highest loss (the 'traitors').
    """
    model.eval()
    losses = []
    
    # 1. Define Un-normalization to make images viewable again
    # These are the standard ImageNet mean/std used in your transform
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    
    print(f"Hunting for traitors in {len(val_loader.dataset)} validation samples...")

    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(val_loader):
            inputs, targets = inputs.to(device), targets.to(device)
            
            # Forward pass
            logits = model(inputs)
            
            # Calculate loss per item (reduction='none' is key here!)
            batch_loss = nn.CrossEntropyLoss(reduction='none')(logits, targets)
            
            probs = torch.softmax(logits, dim=1)
            predictions = logits.argmax(dim=1)
            
            # Store data for every image in the batch
            for i in range(inputs.size(0)):
                losses.append({
                    'loss': batch_loss[i].item(),
                    'img_tensor': inputs[i].cpu(),
                    'true_idx': targets[i].item(),
                    'pred_idx': predictions[i].item(),
                    'conf': probs[i][predictions[i]].item() # Confidence of the wrong prediction
                })
    
    # 2. Sort by highest loss descending
    losses.sort(key=lambda x: x['loss'], reverse=True)
    
    # 3. Visualize the Top K
    top_k = losses[:k]
    
    fig, axes = plt.subplots(1, k, figsize=(4 * k, 5))
    if k == 1: axes = [axes] # Handle edge case of k=1
    
    for i, item in enumerate(top_k):
        ax = axes[i]
        
        # Convert Tensor to Numpy Image: (C, H, W) -> (H, W, C)
        img = item['img_tensor'].permute(1, 2, 0).numpy()
        
        # Un-normalize: pixel = (pixel * std) + mean
        img = std * img + mean
        img = np.clip(img, 0, 1) # Ensure pixel values are valid
        
        # Get text labels
        true_name = idx_to_label[item['true_idx']]
        pred_name = idx_to_label[item['pred_idx']]
        
        # Plot
        ax.imshow(img)
        ax.axis('off')
        
        # Color title red to emphasize error
        title = (f"Loss: {item['loss']:.2f}\n"
                 f"True: {true_name}\n"
                 f"Pred: {pred_name}\n"
                 f"Conf: {item['conf']:.1%}")
        ax.set_title(title, color='darkred', fontsize=12, fontweight='bold')

    plt.tight_layout()
    plt.show()

label_map = {'Luminal A': 0, 'Luminal B': 1, 'HER2(+)': 2, 'Triple negative': 3}
idx_to_label = {v: k for k, v in label_map.items()}

find_traitors(ft_model, val_loader, criterion, device, idx_to_label, k=5)