## Pip install pytorch lightning

In [None]:
!pip install pytorch-lightning --quiet
!pip install warmup-scheduler --quiet

## Import 

In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, Subset
from sklearn.model_selection import StratifiedKFold
import pytorch_lightning as pl
from pytorch_lightning.loggers import TensorBoardLogger
from warmup_scheduler import GradualWarmupScheduler
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger
from torchmetrics.classification import Accuracy
from pathlib import Path
import pandas as pd
import torchmetrics
from sklearn.metrics import cohen_kappa_score
import shutil
import zipfile
import numpy as np


## Creating a configuration class

In [None]:
class Config:
    def __init__(self):
        self.input_dim = 768
        self.hidden_dim = 512
        self.num_classes = 5  # ➜ 0-4 for ordinal/BCE approach
        self.lr = 3e-4
        self.batch_size = 32
        self.max_epochs = 50  
        self.num_workers = 4
        self.n_splits = 5
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.checkpoint_dir = "/kaggle/working/checkpoints"
        self.logs_dir = "/kaggle/working/logs"
        self.train_csv = "/kaggle/input/prostate-cancer-grade-assessment/train.csv"
        self.slide_vectors_path = '/kaggle/input/complete-slide-feature-vectors'
        self.noise_gap_thresh = 1.6

        # criterion and scheduler configs
        self.criterion = nn.BCEWithLogitsLoss()
        self.warmup_factor = 10
        self.warmup_epo = 1
        
cfg = Config()

## Loading all slide vectors and slide labels

In [None]:

# load in all .pt files from the slide vector directory
# all slide embeddings are generated by the extractor.embedding.ipynb
# slide embeddings createde by phikon + titan
def load_all_slide_vectors_from_dir(directory):
    directory = Path(directory)
    pt_files = list(directory.glob("*.pt"))
    print(f"Found {len(pt_files)} .pt files in {directory}")
    slide_vectors = {}
    for file_path in pt_files:
        chunk = torch.load(str(file_path), map_location="cpu")
        slide_vectors.update(chunk)
    print(f"Loaded {len(slide_vectors)} slide vectors in total.")
    return slide_vectors

# load all slides id names
def load_slide_labels(csv_path):
    df = pd.read_csv(csv_path)
    df = df[["image_id", "isup_grade"]].rename(columns={"image_id": "slide_id", "isup_grade": "label"})
    slide_labels = dict(zip(df["slide_id"], df["label"]))
    print(f"Loaded {len(slide_labels)} slide labels from {csv_path}")
    return slide_labels


## Removing slides to prevent label denoising

In [None]:
# get the out of fold predictions, and use a treshold of 1.6 to discard all noisy labels
def remove_noisy_labels(df, thresh=1.6):
    gap = np.abs(df["label"] - df["probs_raw"])
    df_keep = df[gap <= thresh].reset_index(drop=True)
    df_removed = df[gap > thresh].reset_index(drop=True)
    print(f"Removed {len(df_removed)} noisy labels (threshold={thresh})")
    return df_keep, df_removed

In [None]:
slide_vectors = load_all_slide_vectors_from_dir(cfg.slide_vectors_path)
slide_labels = load_slide_labels(cfg.train_csv)

## Creating a custom slide dataset

In [None]:
class SlideDataset(Dataset):
    def __init__(self, slide_vectors, slide_labels, ordinal=True):
        self.slide_ids = list(slide_vectors.keys())
        self.slide_vectors = slide_vectors
        self.slide_labels = slide_labels
        self.ordinal = ordinal

    def __len__(self):  
        return len(self.slide_ids)

    def __getitem__(self, idx):
        slide_id = self.slide_ids[idx]
        vector = self.slide_vectors[slide_id]
        label = self.slide_labels[slide_id]

        if self.ordinal:
            # convert integer label (0-5) to ordinal binary format
            #for instance label=3 -> [1,1,1,0,0]
            ordinal_label = torch.zeros(5, dtype=torch.float32)
            ordinal_label[:label] = 1
            return vector, ordinal_label
        else:
            return vector, label


In [None]:
dataset = SlideDataset(slide_vectors, slide_labels)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

## Test

In [None]:
# Get one batch to inspect
for batch_vectors, batch_labels in dataloader:
    print("Batch vectors shape:", batch_vectors.shape)
    print("Batch labels shape:", batch_labels.shape)
    print("Batch labels:", batch_labels)
    break  # only print the first batch!

print("Dataset length:", len(dataset))


## Create a lightning slide classifier

In [None]:
# create a slide classifier using the pytorch lightning module:
# - incorporates kaiming initialization
# - a 4 layer MLP
# - Cosine Annealing scheduler, with GradualWarmupScheduler, and early stopping
# - makes use of BCEwithlogitloss,  and thus ordinal labelling
class SlideClassifier(pl.LightningModule):
    def __init__(self, cfg):
        super().__init__()
        self.save_hyperparameters(cfg.__dict__)
        self.head = nn.Sequential(
            nn.Linear(cfg.input_dim, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Dropout(0.3),
            nn.Linear(128, cfg.num_classes)
        )
        self.criterion = nn.BCEWithLogitsLoss()
        self.val_preds = []
        self.val_targets = []
        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            nn.init.kaiming_normal_(m.weight)
            if m.bias is not None:
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        return self.head(x)

    def training_step(self, batch, batch_idx):
        vectors, labels = batch
        logits = self(vectors)
        loss = self.criterion(logits, labels)
        self.log("train_loss", loss, on_epoch=True, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        vectors, labels = batch
        logits = self(vectors)
        loss = self.criterion(logits, labels)
        self.log("val_loss", loss, on_epoch=True, prog_bar=True)
        preds = (torch.sigmoid(logits) > 0.5).float().sum(1).detach().cpu().numpy()
        targets = labels.sum(1).detach().cpu().numpy()
        self.val_preds.extend(preds)
        self.val_targets.extend(targets)

    def on_validation_epoch_end(self):
        if self.val_preds and self.val_targets:
            qwk = cohen_kappa_score(self.val_preds, self.val_targets, weights="quadratic")
            acc = (np.array(self.val_preds) == np.array(self.val_targets)).mean()
            self.log("val_qwk", qwk, prog_bar=True)
            self.log("val_acc_isup", acc, prog_bar=True)
        self.val_preds.clear()
        self.val_targets.clear()

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.hparams["lr"] / self.hparams["warmup_factor"])
        scheduler_cosine = torch.optim.lr_scheduler.CosineAnnealingLR(
            optimizer, T_max=max(1, self.hparams["max_epochs"] - self.hparams["warmup_epo"]))
        scheduler = GradualWarmupScheduler(
            optimizer, multiplier=self.hparams["warmup_factor"],
            total_epoch=self.hparams["warmup_epo"], after_scheduler=scheduler_cosine)
        return {"optimizer": optimizer, "lr_scheduler": {"scheduler": scheduler, "interval": "epoch"}}

## Create a lightning training stratified k-fold loop

In [None]:
# creates a 5 k-fold training loop
# saves checkpoints based on val_qwk score
# apply early stopping to prevent overfitting
def train_kfold(cfg, slide_vectors, slide_labels):
    dataset = SlideDataset(slide_vectors, slide_labels, ordinal=True)
    labels_list = [slide_labels[slide_id] for slide_id in dataset.slide_ids]
    skf = StratifiedKFold(n_splits=cfg.n_splits, shuffle=True, random_state=42)
    best_model_paths = []
    for fold, (train_idx, val_idx) in enumerate(skf.split(dataset.slide_ids, labels_list)):
        print(f"\n Fold {fold+1}/{cfg.n_splits}")
        train_loader = DataLoader(Subset(dataset, train_idx), batch_size=cfg.batch_size, shuffle=True, num_workers=cfg.num_workers)
        val_loader = DataLoader(Subset(dataset, val_idx), batch_size=cfg.batch_size, shuffle=False, num_workers=cfg.num_workers)
        checkpoint = ModelCheckpoint(dirpath=f"{cfg.checkpoint_dir}/fold{fold+1}", filename="best", save_top_k=1, monitor="val_qwk", mode="max")
        early_stop = EarlyStopping(monitor="val_qwk", min_delta=0.001, patience=10, mode="max")
        logger = TensorBoardLogger(save_dir=cfg.logs_dir, name=f"fold{fold+1}")
        model = SlideClassifier(cfg)
        trainer = pl.Trainer(max_epochs=cfg.max_epochs, accelerator=cfg.device,
                             callbacks=[checkpoint, early_stop], logger=logger, log_every_n_steps=10)
        trainer.fit(model, train_loader, val_loader)
        best_model_paths.append(checkpoint.best_model_path)
    print("\n All folds done!")
    return best_model_paths

## outfold predictions



In [None]:
def predict_training_data(cfg, slide_vectors, slide_labels, model_paths):
    dataset = SlideDataset(slide_vectors, slide_labels, ordinal=True)
    loader = DataLoader(dataset, batch_size=cfg.batch_size, shuffle=False, num_workers=cfg.num_workers)
    all_probs = []
    for model_path in model_paths:
        model = SlideClassifier.load_from_checkpoint(model_path, cfg=cfg)
        model.eval()
        model.to(cfg.device)
        preds = []
        with torch.no_grad():
            for vectors, _ in loader:
                vectors = vectors.to(cfg.device)
                logits = model(vectors)
                # probs = torch.sigmoid(logits).mean(1).cpu().numpy()
                probs = torch.sigmoid(logits).sum(1).cpu().numpy()
                preds.extend(probs)
        all_probs.append(preds)
    mean_probs = np.mean(all_probs, axis=0)
    return dict(zip(dataset.slide_ids, mean_probs))

## Main loop to get everythinng in one loop

In [None]:
if __name__ == "__main__":
    slide_vectors = load_all_slide_vectors_from_dir(cfg.slide_vectors_path)
    slide_labels = load_slide_labels(cfg.train_csv)

    print("\n STEP 1: Initial training (no cleaning)")
    first_model_paths = train_kfold(cfg, slide_vectors, slide_labels)

    print("\n STEP 2: Predict training data")
    slide_probs = predict_training_data(cfg, slide_vectors, slide_labels, first_model_paths)

    # remove noisy labels
    raw_labels_df = pd.read_csv(cfg.train_csv).rename(columns={"image_id": "slide_id", "isup_grade": "label"})
    raw_labels_df["probs_raw"] = raw_labels_df["slide_id"].map(slide_probs)
    cleaned_df, removed_df = remove_noisy_labels(raw_labels_df, thresh=cfg.noise_gap_thresh)
    cleaned_slide_labels = dict(zip(cleaned_df["slide_id"], cleaned_df["label"]))
    cleaned_slide_vectors = {sid: slide_vectors[sid] for sid in cleaned_df["slide_id"] if sid in slide_vectors}
    print(f"\n STEP 3: {len(cleaned_slide_vectors)} samples retained after noise removal.")

    print("\n STEP 4: Retraining final model on cleaned data")
    best_model_paths = train_kfold(cfg, cleaned_slide_vectors, cleaned_slide_labels)

    print("\n Final best model checkpoints:")
    for path in best_model_paths:
        print(path)


## To download best models


In [None]:

def get_best_models(checkpoints_dir="/kaggle/working/checkpoints",
                                output_dir="/kaggle/working/best_models/5fold/labeldenoising",
                                output_zip="/kaggle/working/best_models.zip"):
    # create output dir to put in zip file of best models
    os.makedirs(output_dir, exist_ok=True)
    
    # loop through each fold directory
    for fold_dir in Path(checkpoints_dir).iterdir():
        if fold_dir.is_dir():
            # Check for .ckpt file inside
            ckpt_files = list(fold_dir.glob("*.ckpt"))
            if ckpt_files:
                # just take the first
                ckpt_file = ckpt_files[0]
                # rename and copy to output_dir
                new_name = f"{fold_dir.name}_best_classifier.ckpt"
                shutil.copy(ckpt_file, os.path.join(output_dir, new_name))
    # Zip the folder
    shutil.make_archive(output_dir, 'zip', output_dir)
    print(f" Zipped folder: {output_zip}")

# Call the function
get_best_models()