In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
from pathlib import Path
from typing import Dict, Tuple, Optional, List

import cv2
import numpy as np
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, Subset

import torchvision.transforms.functional as TF
from torchvision.transforms import InterpolationMode

from tqdm import tqdm
from sklearn.metrics import roc_auc_score

import matplotlib.pyplot as plt


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"


In [None]:
def apply_clahe(pil_img: Image.Image) -> Image.Image:
    """
    Explication de CLAHE :
    CLAHE découpe l’image en petites régions, améliore le contraste localement dans chacune, mais limite ce contraste pour ne pas exploser le bruit, puis recolle le tout de façon lisse.
    Applique un CLAHE couleur sur une image RGB.
    Args:
        pil_img: image PIL en mode 'RGB'.

    Returns:
        Image PIL après CLAHE.
    """
    # Pour le traitement openCV, on convertit en tableau numpy
    img_np = np.array(pil_img)

    # Gestion de cas particuliers de format d'image :
    # - Si l'image est en niveaux de gris (2D : H x W), on la convertit en RGB
    if img_np.ndim == 2:  # (H, W)
        img_np = cv2.cvtColor(img_np, cv2.COLOR_GRAY2RGB)

    # - Si l'image est en RGBA, on la convertit en RGB
    elif img_np.shape[2] == 4:  # (H, W, 4)
        img_np = cv2.cvtColor(img_np, cv2.COLOR_RGBA2RGB)

    # Conversion de l'espace de couleur RGB vers LAB.
    # LAB sépare la luminosité (L) de la chrominance (a, b),
    # ce qui permet d'appliquer le contraste uniquement sur la lumière.
    lab = cv2.cvtColor(img_np, cv2.COLOR_RGB2LAB)

    #https://docs.opencv.org/4.x/d8/d01/group__imgproc__color__conversions.html -> doc pour la gestion des conversion d'espaces de couleurs
    #https://opencv.org/blog/color-spaces-in-opencv/ -> Pour comprendre ce qu'est LAB

    # Sépare les trois canaux : L (luminosité), a et b (couleurs)
    l, a, b = cv2.split(lab)
    #https://docs.opencv.org/4.x/d2/de8/group__core__array.html#ga8f6d378f9f8eebb5cb55cd3ae295a8eb -> doc pour la séparation des canaux,

    # Création de l'objet CLAHE (Contrast Limited Adaptive Histogram Equalization)
    # - clipLimit contrôle la limitation du contraste (évite la saturation du bruit)
    # - tileGridSize définit la taille des zones (tuiles) sur lesquelles l'histogramme est égalisé localement
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    #https://docs.opencv.org/4.x/d6/dc7/group__imgproc__hist.html#gaa2e859ccfb8d9c67c97e00fef5a9e5b5 -> doc pour la création de l'objet qui va appliquer la transformation

    # Application du CLAHE uniquement sur le canal de luminosité L
    cl = clahe.apply(l)

    # On reconstruit l'image LAB en remplaçant L par sa version améliorée (cl)
    limg = cv2.merge((cl, a, b))

    # On reconvertit l'image de LAB vers RGB pour un affichage classique
    final = cv2.cvtColor(limg, cv2.COLOR_LAB2RGB)

    # On transforme à nouveau le tableau NumPy en image PIL pour rester cohérent avec le reste du code
    return Image.fromarray(final)



def polar_transform_tensor(
    tensor: torch.Tensor,
    size: Tuple[int, int],
    is_mask: bool = False
) -> torch.Tensor:
    """
    Transforme un tensor [C,H,W] ou [1,H,W] ou [H,W] en coordonnées polaires
    autour du centre de l'image, via warpPolar d'OpenCV.

    Args:
        tensor: image ou masque au format Tensor [C,H,W], [1,H,W] ou [H,W].
        size: taille de sortie (H_out, W_out).
        is_mask: True pour un masque (interpolation NEAREST),
                 False pour une image (interpolation bilinéaire).

    Returns:
        Tensor transformé [C,H_out,W_out].
    """

    # Si on reçoit un tensor 2D [H,W], on ajoute une dimension canal -> [1,H,W]
    if tensor.ndim == 2:
        tensor = tensor.unsqueeze(0)  # [1,H,W]

    # Déballage des dimensions : C = canaux, H = hauteur, W = largeur
    c, h, w = tensor.shape

    # Conversion Tensor -> NumPy avec permutation des axes :
    # on passe de [C,H,W] à [H,W,C], format attendu par OpenCV.
    # .cpu() pour s'assurer que les données sont sur le CPU
    img_np = tensor.permute(1, 2, 0).cpu().numpy()

    # Définition du centre de l'image (en coordonnées (x, y) = (col, row))
    center = (w / 2.0, h / 2.0)

    # Rayon maximal de la transformation polaire :
    # on prend le plus petit demi-côté pour rester à l'intérieur de l'image.
    max_radius = min(center[0], center[1])

    # Choix de l'interpolation :
    # - NEAREST pour les masques
    # - BILINEAIRE pour les images
    interp_flag = cv2.INTER_NEAREST if is_mask else cv2.INTER_LINEAR

    # Construction des flags pour warpPolar :
    # - WARP_POLAR_LINEAR : coordonnées polaires linéaires (r, theta)
    # - WARP_FILL_OUTLIERS : remplit les zones hors source avec une valeur constante
    # - + le type d'interpolation choisi
    flags = cv2.WARP_POLAR_LINEAR + cv2.WARP_FILL_OUTLIERS + interp_flag

    # Transformation en coordonnées polaires :
    # sortie initiale de taille (w, h) en (width, height)
    polar = cv2.warpPolar(img_np, (w, h), center, max_radius, flags)

    # Rotation de 90° dans le sens horaire pour remettre l'axe angulaire/radial
    # dans une orientation plus intuitive pour le reste de la pipeline
    polar = cv2.rotate(polar, cv2.ROTATE_90_CLOCKWISE)

    # Redimensionnement final vers la taille demandée (W_out, H_out)
    # cv2.resize prend (width, height) et pas (height,width)
    polar = cv2.resize(polar, (size[1], size[0]), interpolation=interp_flag)

    # Si la sortie est devenue 2D (H,W) -> on rajoute un canal (H,W,1)
    # pour garder la cohérence (H,W,C)
    if polar.ndim == 2:
        polar = np.expand_dims(polar, axis=-1)

    # Retour au format PyTorch : (H,W,C) -> (C,H,W)
    polar_tensor = torch.from_numpy(polar).permute(2, 0, 1)

    return polar_tensor


def compute_mean_std(
    dataset: Dataset,
    image_key: str = "image",
    image_size: Optional[Tuple[int, int]] = None,
    batch_size: int = 8,
    num_workers: int = 2
) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    Calcule mean et std (par canal) sur un dataset de segmentation.

    Le dataset doit retourner un dict contenant au moins:
        sample[image_key] = Tensor [C,H,W] dans [0,1].

    Args:
        dataset: dataset PyTorch.
        image_key: nom de la clef image dans le sample.
        image_size: si non None, resize avant calcul (H,W).
        batch_size: taille du batch pour le DataLoader.
        num_workers: workers pour le DataLoader.

    Returns:
        (mean, std): deux tenseurs 1D de taille C.
    """

    # Création d'un DataLoader pour itérer sur le dataset par batch,
    loader = DataLoader(
        dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers
    )

    # Variables qui seront initialisées au premier batch
    n_channels = None
    channel_sum = None
    channel_sq_sum = None
    num_images = 0


    # Parcours de tous les batches du DataLoader
    for batch in tqdm(loader, desc="Stats"):
        # Récupère le tenseur image dans le batch: [B,C,H,W]
        images = batch[image_key]


        # TF.resize accepte les batches de forme [B,C,H,W]
        if image_size is not None:
            images = TF.resize(images, image_size, InterpolationMode.BILINEAR
)

        # B = taille du batch, C = canaux, H/W = dimensions spatiales
        b, c, h, w = images.shape

        # Initialisation des accumulateurs au premier passage (pour connaître C)
        if n_channels is None:
            n_channels = c
            channel_sum = torch.zeros(c)
            channel_sq_sum = torch.zeros(c)

        # Aplatissement spatial: [B,C,H,W] -> [B,C,H*W]
        # On regroupe tous les pixels de chaque image dans une seule dimension.
        images = images.view(b, c, -1)

        # mean(dim=2) -> moyenne par canal pour chaque image: [B,C]
        # sum(dim=0)  -> somme des moyennes sur toutes les images du batch: [C]
        channel_sum += images.mean(dim=2).sum(dim=0)

        # Même chose mais pour les valeurs au carré: on accumule la moyenne
        # des carrés des pixels, pour pouvoir calculer la variance ensuite.
        channel_sq_sum += (images ** 2).mean(dim=2).sum(dim=0)

        # Incrémente le nombre d'images vues
        num_images += b

    # Moyenne finale par canal:
    # on divise la somme des moyennes par le nombre d'images
    mean = channel_sum / num_images

    # Variance = E[X^2] - (E[X])^2, puis racine pour obtenir (std)
    std = (channel_sq_sum / num_images - mean ** 2).sqrt()

    print(f"  -> mean: {mean}")
    print(f"  -> std : {std}")
    return mean, std

In [None]:

class RetinalSegmentationTransform:
    """
    Pipeline de transformations généraliser pour tous les datasets qu'on utilisera.

    - Resize image + masques à image_size
    - Si train=True : flips / rotation aléatoires
    - Si use_polar=True : transformation polaire
    - Normalisation de l'image avec (mean, std) calculé avec compute_mean_std() du dessus

     Args:
        image  : Tensor [3,H,W] en [0,1]
        masks  : dict[str, Tensor] (ex: {"fov": [1,H,W], "gt": [1,H,W]})

    Returns:
        image_tensor, masks_dict
    """

    def __init__(
        self,
        image_size: Tuple[int, int],
        mean: torch.Tensor,
        std: torch.Tensor,
        train: bool = True,
        use_polar: bool = False,
        rotation_deg: float = 15.0
    ):

        self.image_size = image_size


        self.mean = mean
        self.std = std


        self.train = train


        self.use_polar = use_polar


        self.rotation_deg = rotation_deg

    def _resize(self, tensor: torch.Tensor, is_mask: bool) -> torch.Tensor:
        """
        Redimensionne image ou masque à self.image_size.
        """

        # Pour les masques: NEAREST (évite d'interpoler les labels)
        # Pour les images: BILINEAR (résultat plus lisse)
        interpolation = (
            InterpolationMode.NEAREST if is_mask else InterpolationMode.BILINEAR

        )

        return TF.resize(tensor, self.image_size, interpolation=interpolation)

    def _maybe_augment(
        self,
        image: torch.Tensor,
        masks: Dict[str, torch.Tensor]
    ) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]:
        """Applique flips / rotation aléatoires sur les images et masks."""

        # Si on n'est pas en mode entraînement, aucune augmentation
        if not self.train:
            return image, masks

        # Flip horizontal aléatoire
        if torch.rand(1).item() < 0.5:
            image = TF.hflip(image)
            # On applique EXACTEMENT la même transformation à tous les masques
            masks = {k: TF.hflip(v) for k, v in masks.items()}

        # Flip vertical aléatoire
        if torch.rand(1).item() < 0.5:
            image = TF.vflip(image)
            masks = {k: TF.vflip(v) for k, v in masks.items()}

        # Rotation aléatoire dans [-rotation_deg, +rotation_deg] 15 par défaut
        angle = float(torch.empty(1).uniform_(-self.rotation_deg, self.rotation_deg))


        # Pour l'image : interpolation bilinéaire (BILINEAR) → plus lisse
        image = TF.rotate(image, angle, interpolation=InterpolationMode.BILINEAR
)


        masks = {
            k: TF.rotate(v, angle, interpolation=InterpolationMode.NEAREST)
            for k, v in masks.items()
        }

        return image, masks

    def _maybe_polar(
        self,
        image: torch.Tensor,
        masks: Dict[str, torch.Tensor]
    ) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]:
        """
        Applique la transformation polaire (cartésien -> polaire) sur image et masques
        si use_polar=True. Utile pour ORIGA ou le papier dit que pour améliorer les résultats, cette transformation est nécessaire.
        """
        if not self.use_polar:
            return image, masks

        # Image en polaire (interpolation d'image)
        image = polar_transform_tensor(
            image, size=self.image_size, is_mask=False
        )

        # Tous les masques en polaire, avec interpolation de type masque
        masks = {
            k: polar_transform_tensor(v, size=self.image_size, is_mask=True)
            for k, v in masks.items()
        }
        return image, masks

    def __call__(
        self,
        image: torch.Tensor,
        masks: Dict[str, torch.Tensor]
    ) -> Tuple[torch.Tensor, Dict[str, torch.Tensor]]:
        """
        Applique  :
        1) Resize
        2) Augmentations géométriques (si train)
        3) Transformation polaire (si use_polar)
        4) Normalisation (mean/std)
        """

        # Resize commun et obligatoire sur image et masques
        image = self._resize(image, is_mask=False)
        masks = {k: self._resize(v, is_mask=True) for k, v in masks.items()}

        # Augmentations géométriques image/masques
        image, masks = self._maybe_augment(image, masks)

        #  passage en coordonnées polaires pour ORIGA et REFUGE
        image, masks = self._maybe_polar(image, masks)

        #  Normalisation de l'image
        image = TF.normalize(image, self.mean, self.std)

        return image, masks


In [None]:
def show_drive_val_sample(
    model,
    val_loader,
    device,
    title_prefix="DRIVE - epoch",
    threshold: float | None = None,
):
    """
    Affiche, pour 1 image du set de validation :
      - structure S
      - texture |T|
      - extracted structure (canal vert uniquement)
      - segmentation binaire avec vaisseaux en rouge

    On prend simplement le premier batch du loader.
    """

    # Passage du modèle en mode évaluation :
    # - désactive les comportements spécifiques au training
    model.eval()

    # Récupération du premier batch du val_loader
    batch = next(iter(val_loader))
    img  = batch["image"].to(device)   #  image RGB normalisée
    gt   = batch["gt"].to(device)      #  masque de vérité terrain
    fov  = batch.get("fov", None)      #  masque de FOV

    # Si un masque FOV est présent, on le déplace aussi sur le bon device (CPU/GPU)
    if fov is not None:
        fov = fov.to(device)

    # On désactive le calcul du gradient
    with torch.no_grad():
        #On récupère les sorties du modèles qu'on veux afficher (segmentation,texture,extracted_structure et la structure)
        seg, _, _, _, texture, extracted_structure, structure = model(img)

        # On applique un sigmoid pour convertir les logits en probabilités [0,1]
        prob = torch.sigmoid(seg)      # [B,1,H,W]




    # On ne visualise que le premier élément du batch
    S  = structure[0].cpu()
    T  = texture[0].cpu()
    E  = extracted_structure[0].cpu()
    P  = prob[0, 0].cpu()

    H, W = P.shape


    if fov is not None:
        F = fov[0, 0].cpu()
    else:
        F = torch.ones_like(P)

    # ---------- 1) Structure S (normalisée) ----------
    S_vis = S.clone()
    # Normalisation min-max pour ramener S_vis dans [0,1] car S est normalisé pour le réseau et peux contenur des valeurs négfatives. Pour faciliter l'affichage on remet entre 0 et 1
    # Article -> https://fr.statisticseasily.com/glossaire/Qu%27est-ce-que-la-normalisation-min-max-expliqu%C3%A9e/
    s_min, s_max = S_vis.min(), S_vis.max()
    S_vis = (S_vis - s_min) / (s_max - s_min + 1e-8)

    # Application du masque FOV : on met à zéro les pixels hors FOV
    S_vis = S_vis * F.unsqueeze(0)     # [3,H,W]

    # Permutation en [H,W,3] pour être compatible avec plt.imshow
    S_vis = S_vis.permute(1, 2, 0)     # [H,W,3]

    # ---------- 2) Texture |T| (moyenne des canaux) ----------
    # On prend la valeur absolue de T puis on moyenne les 3 canaux → carte scalaire
    T_abs = T.abs().mean(0)            # [H,W]
    # Normalisation min-max
    t_min, t_max = T_abs.min(), T_abs.max()
    T_vis = (T_abs - t_min) / (t_max - t_min + 1e-8)
    # Masquage FOV
    T_vis = T_vis * F                  # [H,W]

    # ---------- 3) Extracted structure – canal vert uniquement ----------
    # On ne visualise que le canal vert (indice 1) de la structure extraite
    E_green = E[1]                     # [H,W]
    e_min, e_max = E_green.min(), E_green.max()
    E_vis = (E_green - e_min) / (e_max - e_min + 1e-8)
    E_vis = E_vis * F                  # [H,W]

    # ---------- 4) Segmentation binaire ----------
    if threshold is None:
        # Si aucun seuil n'est fourni, on affiche directement les probabilités :
        # plus la proba est élevée, plus le rouge est intense
        mask = P * F                   # [H,W]
    else:
        # Sinon, on binarise à partir du seuil donné (0/1)
        mask = (P >= threshold).float() * F

    seg = mask


    # ---------- AFFICHAGE ----------
    # On crée une figure avec 1 ligne et 4 colonnes de sous-figures
    plt.figure(figsize=(16, 4))

    # 1) Structure S
    plt.subplot(1, 4, 1)
    plt.imshow(S_vis.numpy())
    plt.title(f"{title_prefix} - Structure")
    plt.axis("off")

    # 2) Texture |T|
    plt.subplot(1, 4, 2)
    plt.imshow(T_vis.numpy(), cmap="gray")
    plt.title("Texture")
    plt.axis("off")

    # 3) Structure extraite, canal vert
    plt.subplot(1, 4, 3)
    plt.imshow(E_vis.numpy(), cmap="gray")
    plt.title("Extracted structure (Canal vert)")
    plt.axis("off")

    # 4) Segmentation (vaisseaux en rouge)
    plt.subplot(1, 4, 4)
    plt.imshow(seg.numpy(), cmap="gray", vmin=0, vmax=1)
    plt.title("Segmentation")
    plt.axis("off")


    # Ajuste les marges entre les sous-figures
    plt.tight_layout()
    plt.show()


In [None]:
class DRIVEDataset(Dataset):
    """
    Dataset DRIVE.

    - images_dir : chemin vers images RGB (.tif)
    - fov_dir    : masque FOV (pixels valides)
    - gt_dir     : masques 1er annotateur
    - second_gt_dir : masques 2e annotateur
    - transform  : instance de RetinalSegmentationTransform ou None
    """

    def __init__(
        self,
        images_dir: str,
        fov_dir: str,
        gt_dir: Optional[str] = None,
        second_gt_dir: Optional[str] = None,
        split: str = "train",
        transform: Optional[RetinalSegmentationTransform] = None,
        apply_clahe_flag: bool = True,
    ):
        super().__init__()
        # On stocke les chemins principaux (images / FOV / GT)
        self.images_dir = Path(images_dir)
        self.fov_dir = Path(fov_dir)
        self.gt_dir = Path(gt_dir) if gt_dir is not None else None
        self.second_gt_dir = Path(second_gt_dir) if second_gt_dir is not None else None

        # 'train' ou 'test' pour DRIVE
        self.split = split

        # Transform commun
        self.transform = transform

        # Active/désactive le prétraitement CLAHE sur les images RGB
        self.apply_clahe_flag = apply_clahe_flag

        # On liste tous les fichiers .tif d'images
        self.image_filenames = sorted(
            [f for f in os.listdir(self.images_dir) if f.endswith(".tif")]
        )

    def __len__(self) -> int:
        # Nombre d'images = taille du dataset
        return len(self.image_filenames)

    def _get_mask_names(self, img_name: str) -> Tuple[str, str, Optional[str]]:
        """
        DRIVE encode TRAIN / TEST dans le nom des fichiers.
        On reconstruit les noms des masques FOV + GT à partir du nom d'image.
        """
        base = img_name.split("_")[0]

        # FOV : suffix différent selon train/test
        if self.split == "train":
            fov_name = f"{base}_training_mask.gif"
        else:
            fov_name = f"{base}_test_mask.gif"


        gt1_name = f"{base}_manual1.gif"
        gt2_name = f"{base}_manual2.gif" if self.second_gt_dir is not None else None
        return fov_name, gt1_name, gt2_name

    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        # Nom de l'image indexée
        img_name = self.image_filenames[idx]
        fov_name, gt1_name, gt2_name = self._get_mask_names(img_name)

        # --- Image RGB ---
        img = Image.open(self.images_dir / img_name).convert("RGB")

        if self.apply_clahe_flag:
          #Application de CLAHE
            img = apply_clahe(img)

        # Conversion en tensor float [3,H,W] dans [0,1]
        img_tensor = TF.to_tensor(img)

        # --- FOV ---
        fov = Image.open(self.fov_dir / fov_name)
        fov_tensor = TF.to_tensor(fov)

        # --- GT1 (vaisseaux) ---
        gt1_tensor = None
        if self.gt_dir is not None:
            gt1 = Image.open(self.gt_dir / gt1_name)
            gt1_tensor = TF.to_tensor(gt1)

        # --- GT2  ---
        gt2_tensor = None
        if self.second_gt_dir is not None and gt2_name is not None:
            gt2_path = self.second_gt_dir / gt2_name
            #certains fichier manquant, vérification s'ils existe
            if gt2_path.exists():
                gt2 = Image.open(gt2_path)
                gt2_tensor = TF.to_tensor(gt2)

        # Rassemblement de tout dans un dict
        masks = {"fov": fov_tensor}
        if gt1_tensor is not None:
            masks["gt"] = gt1_tensor
        if gt2_tensor is not None:
            masks["gt2"] = gt2_tensor

        # --- Transformations  ---
        if self.transform is not None:
            img_tensor, masks = self.transform(img_tensor, masks)

        # Binarisation des masques (seuil 0.5) -> Après toTensor, on a des valeurs continu entre 0 et 1 sauf qu'on veut nous dans notre mask soit 0 soit 1 on fait alors la binarisation
        fov_bin = (masks["fov"] > 0.5).float()

        sample = {
            "image": img_tensor,
            "fov": fov_bin,
            "filename": img_name,
        }

        if "gt" in masks:
            sample["gt"] = (masks["gt"] > 0.5).float()
        if "gt2" in masks:
            sample["gt2"] = (masks["gt2"] > 0.5).float()

        return sample



class ORIGADataset(Dataset):
    """
    Dataset ORIGA pour segmentation du disque et de la cup,

    On lit un masque de labels (0: background, 1: disc, 2: cup),
    puis on produit deux canaux binaires:
        - canal 0 = disque
        - canal 1 = cup
    """

    def __init__(
        self,
        csv_path: str,
        images_dir: str,
        masks_dir: str,
        split: str = "train",
        transform: Optional[RetinalSegmentationTransform] = None,
        disc_label_val: int = 1,
        cup_label_val: int = 2,
        image_extensions: Tuple[str, ...] = ("jpg", "png", "bmp", "tif"),
        mask_extensions: Tuple[str, ...] = ("png", "bmp", "jpg", "tif"),
        seed: int = 42,
        output_image_size: Optional[Tuple[int, int]] = None,
    ):
        super().__init__()


        self.images_dir = Path(images_dir)
        self.masks_dir = Path(masks_dir)


        self.transform = transform


        self.disc_label_val = float(disc_label_val)
        self.cup_label_val = float(cup_label_val)

        self.image_extensions = image_extensions
        self.mask_extensions = mask_extensions


        self.output_image_size = output_image_size

        # CSV + split ---
        df = pd.read_csv(csv_path)
        train_df, test_df = self._build_official_split(df, seed=seed)

        # Choix du split (train/test)
        if split == "train":
            self.df = train_df.reset_index(drop=True)
        elif split == "test":
            self.df = test_df.reset_index(drop=True)
        else:
            raise ValueError("split doit être 'train' ou 'test'")


        self.filenames: List[str] = self.df["Filename"].tolist()
        self.glaucoma_labels: List[int] = self.df["Glaucoma"].astype(int).tolist()
        self.exp_cdr: np.ndarray = self.df["ExpCDR"].values

    # ---- Gestion du split  ORIGA ----
    @staticmethod
    def _build_official_split(df: pd.DataFrame, seed: int = 42) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Reproduit le split utilisé dans le papier si les infos sont dans la colonne 'Set'.
        Sinon crée un split cohérent (73 glaucomateux / 252 non glaucomateux).
        """
        uniques = df.get("Set", pd.Series([])).unique()
        train_df, test_df = None, None

        # On tente de retrouver les deux sous-ensembles
        for u in uniques:
            cand = df[df["Set"] == u]
            if len(cand) == 325 and cand["Glaucoma"].sum() == 73:
                train_df = cand
            if len(cand) == 325 and cand["Glaucoma"].sum() == 95:
                test_df = cand

        # Si on les a trouvés, on les renvoie
        if train_df is not None and test_df is not None:
            return train_df, test_df

        # Sinon: fallback = split stratifié reproductible
        rng = np.random.RandomState(seed)
        pos = df[df["Glaucoma"] == 1].copy()
        neg = df[df["Glaucoma"] == 0].copy()

        # On mélange séparément positifs et négatifs
        pos = pos.sample(frac=1.0, random_state=seed)
        neg = neg.sample(frac=1.0, random_state=seed + 1)

        # On reconstitue le train / test avec les nombres classiques
        train_pos = pos.iloc[:73]
        test_pos = pos.iloc[73:]
        train_neg = neg.iloc[:252]
        test_neg = neg.iloc[252:]

        train_df = pd.concat([train_pos, train_neg]).sample(frac=1.0, random_state=seed).reset_index(drop=True)
        test_df = pd.concat([test_pos, test_neg]).sample(frac=1.0, random_state=seed + 2).reset_index(drop=True)
        return train_df, test_df

    def _find_with_extensions(
        self,
        directory: Path,
        filename: str,
        extensions: Tuple[str, ...]
    ) -> Path:
        """
        ORIGA stocke les fichiers avec diverses extensions.
        Cette fonction cherche la bonne extension automatiquement.
        """
        # Chemin tel quel
        p = directory / filename
        if p.exists():
            return p

        # Sinon, on tente toutes les extensions possibles
        stem, _ = os.path.splitext(filename)
        for ext in extensions:
            cand = directory / f"{stem}.{ext}"
            if cand.exists():
                return cand

        raise FileNotFoundError(f"{filename} not found in {directory}")

    def __len__(self) -> int:
        return len(self.filenames)

    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        row = self.df.iloc[idx]
        fname = row["Filename"]

        # --- Image ---
        img_path = self._find_with_extensions(
            self.images_dir, fname, self.image_extensions
        )
        img = Image.open(img_path).convert("RGB")
        # img = apply_clahe(img)

        # --- Masque label (0,1,2) ---
        mask_path = self._find_with_extensions(
            self.masks_dir, fname, self.mask_extensions
        )
        mask = Image.open(mask_path).convert("L")  # niveaux de gris: 0/1/2


        if self.output_image_size is not None and self.transform is None:
            img = img.resize(self.output_image_size, Image.BILINEAR)
            mask = mask.resize(self.output_image_size, Image.NEAREST)

        # PIL -> Tensor [3,H,W] en [0,1]
        img_tensor = TF.to_tensor(img)

        # Masque en numpy pour garder les valeurs 0,1,2
        mask_np = np.array(mask, dtype=np.float32)
        mask_tensor = torch.from_numpy(mask_np).unsqueeze(0)

        masks = {"mask": mask_tensor}

        # --- Transformations communes  ---
        if self.transform is not None:
            img_tensor, masks = self.transform(img_tensor, masks)

        # On récupère le masque transformé
        mask_tensor = masks["mask"].squeeze(0)  # [H,W]

        # --- Construction des masques binaires disque / cup ---
        disc = (mask_tensor == self.disc_label_val).float().unsqueeze(0)
        cup = (mask_tensor == self.cup_label_val).float().unsqueeze(0)
        gt = torch.cat([disc, cup], dim=0)

        # FOV : ici, on considère tout le champ comme valide
        fov = torch.ones_like(disc)

        return {
            "image": img_tensor,
            "gt": gt,
            "disc": disc,
            "cup": cup,
            "fov": fov,
            "filename": fname,
            "glaucoma": int(row["Glaucoma"]),
            "cdr": float(row["ExpCDR"]),
        }


In [None]:
class STDModule(nn.Module):
    """
    Module Structure-Texture Demixing (STD).
    10 convolutions 3x3 + LeakyReLU pour extraire une composante de texture T.

    On pose: S = I - T (structure).
    """

    def __init__(self, img_channels: int = 3, hidden_dim: int = 64):
        super().__init__()

        layers = []

        # Première couche : image -> features
        layers.append(nn.Conv2d(img_channels, hidden_dim, kernel_size=3, padding=1))
        layers.append(nn.LeakyReLU(inplace=True))

        # 8 couches internes
        for _ in range(8):
            layers.append(nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, padding=1))
            layers.append(nn.LeakyReLU(inplace=True))

        # Dernière couche : features -> texture T
        layers.append(nn.Conv2d(hidden_dim, img_channels, kernel_size=3, padding=1))

        self.net = nn.Sequential(*layers)

    def forward(self, input_image: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        texture = self.net(input_image)
        structure = input_image - texture
        return structure, texture


class AdaptiveNorm(nn.Module):
    """
    Adaptive Normalization :
        Psi(x) = lambda * x + mu * BN(x)

    - lambda et mu sont des scalaires appris
    - BN est une BatchNorm2d standard
    Source : https://arxiv.org/pdf/1709.00643
    """

    def __init__(self, num_channels: int):
        super().__init__()
        self.bn = nn.BatchNorm2d(num_channels, affine=True)

        # On commence avec une identité (lambda=1, mu=0)
        self.lambda_s = nn.Parameter(torch.tensor(1.0))
        self.mu_s = nn.Parameter(torch.tensor(0.0))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.lambda_s * x + self.mu_s * self.bn(x)


class TextureBlock(nn.Module):
    """
    Bloc 1x1 du papier:
        Conv(1x1) -> AdaptiveNorm -> LeakyReLU -> Conv(1x1)

    Sert à extraire une composante de structure additionnelle à partir de la texture.
    """

    def __init__(self, img_channels: int = 3, hidden_channels: int = 15):
        super().__init__()
        self.conv1 = nn.Conv2d(img_channels, hidden_channels, kernel_size=1)
        self.adapt = AdaptiveNorm(hidden_channels)
        self.act = nn.LeakyReLU(inplace=False)
        self.conv2 = nn.Conv2d(hidden_channels, img_channels, kernel_size=1)

    def forward(self, texture: torch.Tensor) -> torch.Tensor:
        x = self.conv1(texture)
        x = self.adapt(x)
        x = self.act(x)
        extracted_structure = self.conv2(x)
        return extracted_structure


def double_conv(in_channels: int, out_channels: int) -> nn.Sequential:
    """
    Bloc de base de type U-Net / M-Net:
        Conv3x3 -> ReLU -> Conv3x3 -> ReLU
    """
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
        nn.ReLU(inplace=True),
    )



class MNet(nn.Module):
    """
    Encoder-décodeur multi-niveaux (M-Net) avec deep supervision
    et fusion de la texture.
    """

    def __init__(
        self,
        img_size: int,
        img_channels: int,
        texture_channels: int = 3,
        num_classes: int = 1, # Added num_classes parameter

    ):
        super().__init__()

        # -------------------- ENCODEUR --------------------
        self.enc1 = double_conv(img_channels, 32)
        self.down1 = nn.MaxPool2d(2)

        self.resize2 = double_conv(img_channels, 64)
        self.enc2 = double_conv(96, 64)
        self.down2 = nn.MaxPool2d(2)

        self.resize3 = double_conv(img_channels, 128)
        self.enc3 = double_conv(192, 128)
        self.down3 = nn.MaxPool2d(2)

        self.resize4 = double_conv(img_channels, 256)
        self.enc4 = double_conv(384, 256)
        self.down4 = nn.MaxPool2d(2)

        # -------------------- BOTTLE NECK --------------------
        self.bottleneck = double_conv(256, 512)

        # -------------------- DECODEUR --------------------
        self.up5 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec5 = double_conv(512, 256)
        self.out5 = nn.ConvTranspose2d(256, num_classes, kernel_size=2, stride=2) # Used num_classes

        self.up6 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec6 = double_conv(256, 128)
        self.out6 = nn.ConvTranspose2d(128, num_classes, kernel_size=2, stride=2) # Used num_classes

        self.up7 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec7 = double_conv(128, 64)
        self.out7 = nn.ConvTranspose2d(64, num_classes, kernel_size=2, stride=2) # Used num_classes

        self.up8 = nn.ConvTranspose2d(64, 32, kernel_size=2, stride=2)
        self.dec8 = double_conv(64, 32)

        # Fusion texture + features chemin principal
        self.fuse_texture = nn.Conv2d(32 + texture_channels, num_classes, kernel_size=3, padding=1) # Used num_classes

        # Fusion des sorties (deep supervision)
        self.final_fusion = nn.Conv2d(4 * num_classes, num_classes, kernel_size=1) # Adjusted input channels and used num_classes

        #Voir le schéma dans le papier pour bien comprendre

    def _resize_structure(self, structure: torch.Tensor, scale: float) -> torch.Tensor:
        """
        Redimensionne la carte de structure par un facteur donné (scale) en H et W,
        en utilisant une interpolation bilinéaire.

        Args:
            structure: tenseur [B,C,H,W] (typique pour des features d'un réseau).
            scale: facteur d'échelle spatial

        Returns:
            Tenseur redimensionné [B,C,H',W'] avec H' = scale*H, W' = scale*W.
        """
        return F.interpolate(
            structure,
            scale_factor=scale,     # multiplie la hauteur et la largeur par 'scale'
            mode="bilinear",        # interpolation bilinear
            align_corners=False,    # convention
            recompute_scale_factor=True,
        )


    def _upsample_and_concat(
        self,
        skip: torch.Tensor,
        x: torch.Tensor,
        up_module: nn.Module
    ) -> torch.Tensor:
        """
        Upsample x puis concatène avec skip connection.
        """
        up = up_module(x)
        return torch.cat([skip, up], dim=1)

    def forward(
        self,
        structure: torch.Tensor,
        extracted_texture: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
        # -------------------- ENCODEUR --------------------
        enc1 = self.enc1(structure)
        down1 = self.down1(enc1)

        #Pour le niveau 2 on divise la taille par 2 de la structure
        structure_lvl2 = self._resize_structure(structure, 0.5)
        prep2 = self.resize2(structure_lvl2)
        enc2 = self.enc2(torch.cat([prep2, down1], dim=1))
        down2 = self.down2(enc2)
        #Pour le niveau 3 on divise par 2 l'entrée précédente
        structure_lvl3 = self._resize_structure(structure_lvl2, 0.5)
        prep3 = self.resize3(structure_lvl3)
        enc3 = self.enc3(torch.cat([prep3, down2], dim=1))
        down3 = self.down3(enc3)
        #Pareil ici
        structure_lvl4 = self._resize_structure(structure_lvl3, 0.5)
        prep4 = self.resize4(structure_lvl4)
        enc4 = self.enc4(torch.cat([prep4, down3], dim=1))
        down4 = self.down4(enc4)

        # -------------------- BOTTLE NECK --------------------
        bottleneck = self.bottleneck(down4)

        # -------------------- DECODEUR --------------------
        dec5_input = self._upsample_and_concat(enc4, bottleneck, self.up5)
        dec5 = self.dec5(dec5_input)
        out5 = self.out5(dec5)

        dec6_input = self._upsample_and_concat(enc3, dec5, self.up6)
        dec6 = self.dec6(dec6_input)
        out6 = self.out6(dec6)

        dec7_input = self._upsample_and_concat(enc2, dec6, self.up7)
        dec7 = self.dec7(dec7_input)
        out7 = self.out7(dec7)

        dec8_input = self._upsample_and_concat(enc1, dec7, self.up8)
        dec8 = self.dec8(dec8_input)

        # -------------------- FUSION TEXTURE --------------------
        # On s'assure que la texture est à la bonne taille
        if extracted_texture.shape[-2:] != structure.shape[-2:]:
            extracted_texture = F.interpolate(
                extracted_texture,
                size=structure.shape[-2:],
                mode="bilinear",
                align_corners=False,
            )

        texture_fused = self.fuse_texture(
            torch.cat([extracted_texture, dec8], dim=1)
        )

        # -------------------- FUSION FINALE --------------------
        out5 = F.interpolate(
            out5, size=structure.shape[-2:], mode="bilinear", align_corners=False
        )
        out6 = F.interpolate(
            out6, size=structure.shape[-2:], mode="bilinear", align_corners=False
        )
        out7 = F.interpolate(
            out7, size=structure.shape[-2:], mode="bilinear", align_corners=False
        )
        texture_fused = F.interpolate(
            texture_fused, size=structure.shape[-2:], mode="bilinear", align_corners=False
        )


        final = self.final_fusion(torch.cat([out5, out6, out7, texture_fused], dim=1))

        return final, out5, out6, out7


class STDNetFullModel(nn.Module):
    """
    Modèle complet STD-Net:
    - STDModule: décomposition I -> (S,T)
    - TextureBlock: structure additionnelle extraite depuis T
    - MNet: segmentation guidée par S et la texture extraite

    forward(image):
        -> seg_final, seg5, seg6, seg7, texture, extracted_structure, structure
    """

    def __init__(
        self,
        img_channels: int = 3,
        texture_channels: int = 3,
        hidden_dim: int = 64,
        img_size: int = 512,
        num_classes: int = 1,
    ):
        super().__init__()

        self.std_module = STDModule(img_channels=img_channels, hidden_dim=hidden_dim)
        self.texture_block = TextureBlock(img_channels=texture_channels)
        self.mnet = MNet(
            img_size=img_size,
            img_channels=img_channels,
            texture_channels=texture_channels,
            num_classes=num_classes,

        )

    def forward(
        self,
        image: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor,
               torch.Tensor, torch.Tensor, torch.Tensor]:
        #On ressort la structure et la texture avec le module STD
        structure, texture = self.std_module(image)
        #On fait l'extracted_structure avec le textureblock
        extracted_structure = self.texture_block(texture)
        #On fait la segmentation avec la structure en entrée du MNEt ainsi que l'extracted_structure pour le dernier niveau
        seg_final, seg5, seg6, seg7 = self.mnet(structure, extracted_structure)
        #On retourne tout
        return seg_final, seg5, seg6, seg7, texture, extracted_structure, structure


In [None]:

def structure_loss_tv(structure: torch.Tensor) -> torch.Tensor:
    """
    Total Variation Loss (TV) sur la composante de structure.
    Encourage la structure à être régulière (peu de variations locales).
    """
    # Différences absolues entre pixels voisins VERTICAUX :
    # structure[:, :, 1:, :]  -> toutes les lignes sauf la première
    # structure[:, :, :-1, :] -> toutes les lignes sauf la dernière
    # dh a la forme [B, C, H-1, W] et mesure |S(i+1,j) - S(i,j)|
    dh = torch.abs(structure[:, :, 1:, :] - structure[:, :, :-1, :])

    # Différences absolues entre pixels voisins HORIZONTAUX :
    # structure[:, :, :, 1:]  -> toutes les colonnes sauf la première
    # structure[:, :, :, :-1] -> toutes les colonnes sauf la dernière
    # dw a la forme [B, C, H, W-1] et mesure |S(i,j+1) - S(i,j)|
    dw = torch.abs(structure[:, :, :, 1:] - structure[:, :, :, :-1])


    return (dh.pow(2).mean() + dw.pow(2).mean()).sqrt()

def texture_loss_l1(texture: torch.Tensor) -> torch.Tensor:
    """
    L1 sur la texture (encourage la parcimonie = texture peu "chargée").
    """
    # On prend la valeur absolue de tous les coefficients de texture
    # et on en fait la moyenne : c'est ||texture||_1 normalisée.
    return torch.mean(torch.abs(texture))


def masked_bce_with_logits(
    pred: torch.Tensor,
    target: torch.Tensor,
    fov: torch.Tensor,
    bce_criterion: nn.Module,
) -> torch.Tensor:
    """
    BCE avec logits restreint à la FOV (zone utile de la rétine).

    Args:
        pred:   [B,C,H,W], logits du modèle
        target: [B,C,H,W], masque binaire (0/1)
        fov:    [B,1,H,W], masque FOV (0/1) => où on calcule la loss
    """
    # bce_criterion est une BCEWithLogitsLoss avec reduction='none'
    # => loss_map a la même forme que pred : [B,C,H,W]
    loss_map = bce_criterion(pred, target)  # [B,C,H,W]

    # On met la FOV sous forme float 0/1 (si ce n'est pas déjà le cas)
    mask = (fov > 0.5).float()  # [B,1,H,W]

    # Si le prédicteur a plusieurs canaux (C>1) et la FOV n'en a qu'un (1),
    # on duplique le masque sur tous les canaux pour pouvoir faire un produit élément par élément.
    if mask.shape[1] != pred.shape[1]:
        # expand(-1, C, -1, -1) -> garde B,H,W, mais remplace la dimension des canaux par C
        mask = mask.expand(-1, pred.shape[1], -1, -1)  # [B,C,H,W]

    # On ne garde la loss que dans les pixels à l'intérieur du FOV (mask=1),
    # et on met la loss à 0 hors FOV (mask=0). -> Cela permet de ne pas prendre en compte le backg
    loss = (loss_map * mask).sum() / (mask.sum() + 1e-8)
    # → moyenne pondérée : somme des pertes sur FOV / nombre de pixels FOV

    return loss


def deep_supervision(
    preds: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor],
    gt: torch.Tensor,
    fov: torch.Tensor,
    bce_criterion: nn.Module,
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
    """
    Applique la BCE sur les 4 cartes de sortie:
        - seg_final (plein résolution)
        - seg5, seg6, seg7 (résolutions intermédiaires)

    Combine ensuite les 4 pertes en une seule Lseg (combinaison pondérée).
    """
    seg_final, seg5, seg6, seg7 = preds

    # On met la GT à la même taille que chaque sortie intermédiaire,
    # en utilisant une interpolation 'nearest' (pas de valeurs intermédiaires, on garde 0/1).
    gt5 = F.interpolate(gt, size=seg5.shape[-2:], mode="nearest")
    gt6 = F.interpolate(gt, size=seg6.shape[-2:], mode="nearest")
    gt7 = F.interpolate(gt, size=seg7.shape[-2:], mode="nearest")

    # Idem pour la FOV (on la redimensionne pour qu'elle matche chaque prédiction)
    fov5 = F.interpolate(fov, size=seg5.shape[-2:], mode="nearest")
    fov6 = F.interpolate(fov, size=seg6.shape[-2:], mode="nearest")
    fov7 = F.interpolate(fov, size=seg7.shape[-2:], mode="nearest")

    # Calcul de la BCE masquée (sur la FOV) pour chaque niveau de résolution
    L_final = masked_bce_with_logits(seg_final, gt,  fov,  bce_criterion)
    L5      = masked_bce_with_logits(seg5,      gt5, fov5, bce_criterion)
    L6      = masked_bce_with_logits(seg6,      gt6, fov6, bce_criterion)
    L7      = masked_bce_with_logits(seg7,      gt7, fov7, bce_criterion)

    # Poids pour chaque niveau
    #Choisi de faire ceci pour prendre en compte les sorties inférieur du MNET mais qu'elles n'ai pas le meme poids que la sortie en pleine résolution.
    weights = [1.0, 0.3, 0.1, 0.05]
    sum_w = sum(weights)

    # Combinaison pondérée des pertes :
    Lseg = (
        weights[0] * L_final
        + weights[1] * L5
        + weights[2] * L6
        + weights[3] * L7
    ) / sum_w

    return Lseg, L_final, L5, L6, L7


def total_loss(
    seg_output: torch.Tensor,
    seg5: torch.Tensor,
    seg6: torch.Tensor,
    seg7: torch.Tensor,
    fov: torch.Tensor,
    seg_target: torch.Tensor,
    structure: torch.Tensor,
    texture: torch.Tensor,
    bce_criterion: nn.Module,
    mu: float = 0.001,
    lam: float = 1.0,
) -> Dict[str, torch.Tensor]:
    """
    Perte totale du papier :
        Ltotal = Lseg + mu * (Lt + lam * Ls)

    avec:
        Lseg : loss de segmentation multi-échelle (deep supervision)
        Lt   : L1 sur la texture
        Ls   : TV sur la structure
    """
    #  On calcule la loss de segmentation multi-échelle
    Lseg, L_final, L5, L6, L7 = deep_supervision(
        (seg_output, seg5, seg6, seg7), seg_target, fov, bce_criterion
    )

    #  Pertes de régularisation sur les décompositions structure / texture
    Lt = texture_loss_l1(texture)      # encourage texture "sparse"
    Ls = structure_loss_tv(structure)  # encourage structure "lisse"

    #Combinaison finale :
    #    - Lseg : terme principal (segmentation)
    #    - mu   : poids global des pertes structure/texture
    #    - lam  : réglage relatif entre Ls et Lt à l'intérieur du bloc mu*(Lt + lam*Ls)
    Ltotal = Lseg + mu * (Lt + lam * Ls)

    return {
        "total": Ltotal,      # perte totale utilisée pour le backward
        "segmentation": Lseg, # terme de segmentation (multi-scale)
        "texture": Lt,        # régularisation L1 sur la texture
        "structure": Ls,      # régularisation TV sur la structure
        "L_final": L_final,   # perte juste sur la sortie finale
        "L5": L5,
        "L6": L6,
        "L7": L7,
    }


In [None]:
# =============================================================================
# TRAIN / VAL / TEST LOOPS
# =============================================================================

def train_one_epoch(
    model: nn.Module,
    loader: DataLoader,
    optimizer: torch.optim.Optimizer,
    device: str,
    bce_criterion: nn.Module
) -> Dict[str, float]:
    """
    Entraîne le modèle sur UNE epoch .
    """
    # Passe le modèle en mode entraînement :
    # - AdaptiveNorm met à jour ses stats
    model.train()

    # Accumulateurs pour le calcul des pertes
    epoch_total_loss = 0.0  # perte totale (seg + texture + structure)
    total_seg = 0.0         # partie segmentation
    total_T = 0.0           # partie texture
    total_S = 0.0           # partie structure

    # Boucle sur tous les batchs du DataLoader passer en paramètre
    for batch in tqdm(loader, desc="Train"):
        # On récupère les tenseurs et on les envoie sur le device processeur ou GPU
        img = batch["image"].to(device)
        fov = batch["fov"].to(device)
        gt = batch["gt"].to(device)

        # On remet les gradients de l'optimizer à zéro avant le backward
        optimizer.zero_grad()

        # Forward : le modèle renvoie les cartes de segmentation + structure/texture
        seg, seg5, seg6, seg7, texture, extracted_structure, structure = model(img)

        # Calcul de toutes les composantes de la loss
        losses = total_loss(
            seg_output=seg,
            seg5=seg5,
            seg6=seg6,
            seg7=seg7,
            fov=fov,
            seg_target=gt,
            structure=structure,
            texture=texture,
            bce_criterion=bce_criterion,
        )

        # Perte totale, on va faire la backpropagation sur celle ci
        loss = losses["total"]
        loss.backward()          # calcul des gradients
        optimizer.step()         # mise à jour des poids du modèle passer en paramètre

        # Accumulation pour les stats de fin d'epoch
        epoch_total_loss += loss.item()
        total_seg += losses["segmentation"].item()
        total_T += losses["texture"].item()
        total_S += losses["structure"].item()

    # Moyenne sur le nombre de batchs
    n = len(loader)
    return {
        "loss": epoch_total_loss / n,
        "seg": total_seg / n,
        "T": total_T / n,
        "S": total_S / n,
    }

#-> On désactive le calcul des gradients
@torch.no_grad()
def validate_one_epoch(
    model: nn.Module,
    loader: DataLoader,
    device: str,
    bce_criterion: nn.Module,
    debug_visualization: bool = False,
) -> Dict[str, float]:
    """
    Évalue le modèle sur la validation .
    On fait la même chose que train_one_epoch sans backpropagation.
    """

    model.eval()

    epoch_total_loss = 0.0
    total_seg = 0.0
    total_T = 0.0
    total_S = 0.0


    for batch_idx, batch in enumerate(tqdm(loader, desc="Val")):
        img = batch["image"].to(device)
        fov = batch["fov"].to(device)
        gt = batch["gt"].to(device)


        seg, seg5, seg6, seg7, texture, extracted_structure, structure = model(img)

        losses = total_loss(
            seg_output=seg,
            seg5=seg5,
            seg6=seg6,
            seg7=seg7,
            fov=fov,
            seg_target=gt,
            structure=structure,
            texture=texture,
            bce_criterion=bce_criterion,
        )


        epoch_total_loss += losses["total"].item()
        total_seg += losses["segmentation"].item()
        total_T += losses["texture"].item()
        total_S += losses["structure"].item()


    n = len(loader)
    return {
        "loss": epoch_total_loss / n,
        "seg": total_seg / n,
        "T": total_T / n,
        "S": total_S / n,
    }


@torch.no_grad()
def test_drive_model(
    model: nn.Module,
    loader: DataLoader,
    device: str,
) -> Dict[str, float]:
    """
    Évaluation sur DRIVE  avec :

        - Acc : accuracy globale
        - AUC : aire sous la courbe ROC
        - Sen : sensibilité (rappel)
        - Spe : spécificité
        - IOU : Intersection over Union

    Toutes les métriques sont calculées dans le FOV -> on ne prends pas en compte le tour du fond d'oeil
    """
    model.eval()

    # Compteurs globaux pour la matrice de confusion
    TP_total, FP_total, FN_total, TN_total = 0, 0, 0, 0

    # Listes pour calculer l'AUC après coup
    all_probs = []
    all_targets = []
    all_fovs = []

    for batch in tqdm(loader, desc="Test DRIVE"):
        img = batch["image"].to(device)
        fov = batch["fov"].to(device)
        gt = batch["gt"].to(device).float()

        # Forward : on ne s'intéresse qu'à la sortie de segmentation pour l'évaluation totale du modèle et voir s'il arrive a généraliser sur des données nouvelles
        seg, *_ = model(img)
        prob = torch.sigmoid(seg)

        # Masquage par le FOV -> On ne prends pas en compte le tour du fond d'oeil
        prob_masked = prob * fov
        target_masked = gt * fov

        # Binarisation (seuil 0.5) pour calculer TP, FP, (Comme dit plus tot, avec la normalisation, on a des valeurs continue entre 0 et 1, ici c'est soit 1 soit 0 pas autre chose)
        pred = (prob_masked > 0.5).float()

        # TP : préd=1 et GT=1
        TP = ((pred == 1) & (target_masked == 1)).sum().item()
        # FP : préd=1, GT=0, mais uniquement dans la FOV
        FP = ((pred == 1) & (target_masked == 0) & (fov == 1)).sum().item()
        # FN : préd=0, GT=1
        FN = ((pred == 0) & (target_masked == 1)).sum().item()
        # TN : préd=0, GT=0, dans la FOV
        TN = ((pred == 0) & (target_masked == 0) & (fov == 1)).sum().item()

        TP_total += TP
        FP_total += FP
        FN_total += FN
        TN_total += TN

        # Pour l'AUC : on stocke toutes les probas + GT + FOV,
        # on filtrera plus tard pour ne garder que les pixels dans la FOV.
        all_probs.append(prob.detach().cpu().numpy().ravel())
        all_targets.append(gt.detach().cpu().numpy().ravel())
        all_fovs.append(fov.detach().cpu().numpy().ravel())

    # ---- Métriques dérivées de la matrice de confusion ----
    denominator = TP_total + TN_total + FP_total + FN_total + 1e-8
    Acc = (TP_total + TN_total) / denominator                # Accuracy
    Sen = TP_total / (TP_total + FN_total + 1e-8)            # Sensibilité (Recall)
    Spe = TN_total / (TN_total + FP_total + 1e-8)            # Spécificité
    IOU = TP_total / (TP_total + FP_total + FN_total + 1e-8) # Intersection over Union

    # ---- AUC (ROC) en ne gardant que les pixels dans la FOV ----
    probs_flat = np.concatenate(all_probs, axis=0)
    targets_flat = np.concatenate(all_targets, axis=0)
    fov_flat = np.concatenate(all_fovs, axis=0)

    # On ne garde que les pixels où FOV=1
    valid = (fov_flat == 1)
    probs_valid = probs_flat[valid]
    targets_valid = targets_flat[valid]

    # Si toutes les cibles sont identiques (0 ou 1), l'AUC n'est pas définie
    unique_classes = np.unique(targets_valid)
    if unique_classes.size < 2:
        AUC = float("nan")
    else:
        AUC = roc_auc_score(targets_valid, probs_valid)

    # Affichage des métriques
    print("\n--- DRIVE metrics (threshold=0.5) ---")
    print(f"Acc : {Acc:.4f}")
    print(f"AUC : {AUC:.4f}")
    print(f"Sen : {Sen:.4f}")
    print(f"Spe : {Spe:.4f}")
    print(f"IOU : {IOU:.4f}")

    return {"Acc": Acc, "AUC": AUC, "Sen": Sen, "Spe": Spe, "IOU": IOU}


def overlapping_error(pred_mask: torch.Tensor, gt_mask: torch.Tensor, eps: float = 1e-8) -> float:
    """
    Overlapping Error (OE) = 1 - IoU.

    pred_mask, gt_mask: Tensors [H,W] ou [1,H,W] (0/1 ou probas).
    """
    # Si [1,H,W], on enlève la dimension canal pour travailler en [H,W]
    if pred_mask.ndim == 3:
        pred_mask = pred_mask[0]
    if gt_mask.ndim == 3:
        gt_mask = gt_mask[0]

    # Binarisation des masques avec seuil 0.5
    pred_bin = (pred_mask > 0.5).float()
    gt_bin = (gt_mask > 0.5).float()

    # Intersection = pixels où préd=1 et GT=1
    inter = (pred_bin * gt_bin).sum().item()
    # Union = pixels où préd=1 OU GT=1
    union = ((pred_bin + gt_bin) > 0).float().sum().item()

    # Cas dégénéré : aucune union, on retourne 0 (pas d'erreur d'overlap)
    if union == 0:
        return 0.0

    # IoU = |A∩B| / |A∪B|
    iou = inter / (union + eps)
    # OE = 1 - IoU (plus c'est proche de 0, mieux c'est)
    return 1.0 - iou


@torch.no_grad()
def compute_oe_origa(
    model: nn.Module,
    loader: DataLoader,
    device: str,
    desc: str = "OE ORIGA",
) -> Dict[str, float]:
    """
    Calcule l'Overlapping Error pour ORIGA sur :
        - disque (canal 0)
        - cup   (canal 1)
        - total (somme des deux)
    """
    model.eval()
    oes_disc = []  # liste des OE disque sur tous les exemples
    oes_cup = []   # liste des OE cup sur tous les exemples

    for batch in tqdm(loader, desc=desc):
        img = batch["image"].to(device)
        gt = batch["gt"].to(device).float()  #(canal 0 = disc, 1 = cup)

        # Forward
        seg, *_ = model(img)
        prob = torch.sigmoid(seg)  # probas [0,1]

        # On calcule l'OE par image et par canal
        for b in range(img.size(0)):
            # Canal 0 = disque
            oe_d = overlapping_error(prob[b, 0].cpu(), gt[b, 0].cpu())
            # Canal 1 = cup
            oe_c = overlapping_error(prob[b, 1].cpu(), gt[b, 1].cpu())
            oes_disc.append(oe_d)
            oes_cup.append(oe_c)

    # Moyenne sur tout le dataset
    OE_disc = float(np.mean(oes_disc))
    OE_cup = float(np.mean(oes_cup))
    OE_total = OE_disc + OE_cup  # métrique globale

    return {"disc": OE_disc, "cup": OE_cup, "total": OE_total}


In [None]:
def log_epoch_stats(history, train_stats, val_stats, epoch, total_epochs, tag="DRIVE"):
    """
    Met à jour le dictionnaire history avec les stats train/val
    et affiche un résumé lisible pour l'epoch courante.

    Args:
        history      : dict avec les listes d'historique
        train_stats  : dict {"loss", "seg", "T", "S"} pour le train
        val_stats    : dict {"loss", "seg", "T", "S"} pour la val
        epoch        : numéro d'epoch (1-based)
        total_epochs : nombre total d'epochs
        tag          : nom du dataset / expérience (ex: "DRIVE", "ORIGA")
    """

    # --- Mise à jour de l'historique ---
    history["train_loss"].append(train_stats["loss"])
    history["val_loss"].append(val_stats["loss"])

    history["train_seg"].append(train_stats["seg"])
    history["val_seg"].append(val_stats["seg"])

    history["train_T"].append(train_stats["T"])
    history["val_T"].append(val_stats["T"])

    history["train_S"].append(train_stats["S"])
    history["val_S"].append(val_stats["S"])

    # --- Affichage propre ---
    print(
        f"[{tag}][Epoch {epoch}/{total_epochs}] "
        f"\n[Train] Total loss={train_stats['loss']:.4f}, "
        f" Segmentation={train_stats['seg']:.4f}, "
        f" Texture={train_stats['T']:.4f}, "
        f" Structure={train_stats['S']:.4f},\n"

        f"[Valid] Total loss={val_stats['loss']:.4f}, "

        f" Segmentation={val_stats['seg']:.4f}, "

        f" Texture={val_stats['T']:.4f}, "

        f" Structure={val_stats['S']:.4f}"
    )

In [None]:
# -------------------------------------------------------------------------
# DRIVE
# -------------------------------------------------------------------------
# 1) Définition des chemins
drive_train_img_dir = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/DRIVE/training/images"
drive_train_fov_dir = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/DRIVE/training/mask"
drive_train_gt_dir = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/DRIVE/training/1st_manual"

drive_test_img_dir = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/DRIVE/test/images"
drive_test_fov_dir = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/DRIVE/test/mask"
drive_test_gt_dir = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/DRIVE/test/1st_manual"
drive_test_gt2_dir = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/DRIVE/test/2nd_manual"

image_size_drive = (512, 512)
from typing import Tuple
from torch.utils.data import DataLoader, Subset


def setup_drive_dataloaders(
    image_size: Tuple[int, int] = (512, 512),
    batch_size_train: int = 2,
    batch_size_val: int = 2,
    batch_size_test: int = 1,
    num_workers: int = 2,
    val_ratio: float = 0.2,
    seed: int = 42,
    apply_clahe_flag: bool = True,
):
    """
    Prépare tous les éléments pour DRIVE en une seule fonction :

      - calcule mean/std sur le train
      - crée les transforms train / eval
      - crée les datasets train / val / test
      - crée les DataLoaders correspondants

    """

    # Dataset sans transform pour calculer mean/std
    drive_dataset_for_stats = DRIVEDataset(
        images_dir=drive_train_img_dir,
        fov_dir=drive_train_fov_dir,
        gt_dir=drive_train_gt_dir,
        split="train",
        transform=None,
        apply_clahe_flag=apply_clahe_flag,
    )

    mean_drive, std_drive = compute_mean_std(
        drive_dataset_for_stats,
        image_key="image",
        image_size=image_size
    )

    # Transforms train / val / test
    drive_train_transform = RetinalSegmentationTransform(
        image_size=image_size,
        mean=mean_drive,
        std=std_drive,
        train=True,
        use_polar=False,
    )
    drive_eval_transform = RetinalSegmentationTransform(
        image_size=image_size,
        mean=mean_drive,
        std=std_drive,
        train=False,
        use_polar=False,
    )

    # Datasets TRAIN / VAL
    full_drive_train = DRIVEDataset(
        images_dir=drive_train_img_dir,
        fov_dir=drive_train_fov_dir,
        gt_dir=drive_train_gt_dir,
        split="train",
        transform=drive_train_transform,
        apply_clahe_flag=apply_clahe_flag,
    )

    # Split train/val (par défaut 80/20)
    n_train = len(full_drive_train)
    indices = list(range(n_train))
    np.random.seed(seed)
    np.random.shuffle(indices)
    split_idx = int((1.0 - val_ratio) * n_train)
    train_indices = indices[:split_idx]
    val_indices = indices[split_idx:]

    drive_train_dataset = Subset(full_drive_train, train_indices)

    full_drive_val = DRIVEDataset(
        images_dir=drive_train_img_dir,
        fov_dir=drive_train_fov_dir,
        gt_dir=drive_train_gt_dir,
        split="train",
        transform=drive_eval_transform,
        apply_clahe_flag=apply_clahe_flag,
    )
    drive_val_dataset = Subset(full_drive_val, val_indices)

    drive_train_loader = DataLoader(
        drive_train_dataset,
        batch_size=batch_size_train,
        shuffle=True,
        num_workers=num_workers,
    )
    drive_val_loader = DataLoader(
        drive_val_dataset,
        batch_size=batch_size_val,
        shuffle=False,
        num_workers=num_workers,
    )

    # 4) Test set
    drive_test_dataset = DRIVEDataset(
        images_dir=drive_test_img_dir,
        fov_dir=drive_test_fov_dir,
        gt_dir=drive_test_gt_dir,
        second_gt_dir=drive_test_gt2_dir,
        split="test",
        transform=drive_eval_transform,
        apply_clahe_flag=apply_clahe_flag,
    )
    drive_test_loader = DataLoader(
        drive_test_dataset,
        batch_size=batch_size_test,
        shuffle=False,
        num_workers=num_workers,
    )


    return drive_train_loader, drive_val_loader, drive_test_loader, mean_drive, std_drive

drive_train_loader, drive_val_loader, drive_test_loader, mean_drive, std_drive = setup_drive_dataloaders()


# Modèle + optim + loss
drive_model = STDNetFullModel(
    img_channels=3,
    texture_channels=3,
    hidden_dim=64,
    img_size=image_size_drive[0],
).to(device)

optimizer_drive = torch.optim.Adam(drive_model.parameters(), lr=1e-3)
bce_criterion = nn.BCEWithLogitsLoss(reduction="none")

#Scheduler : réduit le LR quand la loss de validation ne diminue plus
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer_drive,
    mode="min",      # on veut MINIMISER la loss de validation
    factor=0.5,      # LR_new = LR_old * 0.5
    patience=10,
)

# Pour sauvegarder le meilleur modèle
best_val_loss = float("inf")
best_epoch = 0
best_model_path = "stdnet_drive_best.pth"

# Entraînement
EPOCHS_DRIVE = 150

history_drive = {
    "train_loss": [],
    "val_loss": [],
    "train_seg": [],
    "val_seg": [],
    "train_T":   [],
    "val_T":     [],
    "train_S":   [],
    "val_S":     [],
}



#Lancement de l'entrainement
for epoch in range(1, EPOCHS_DRIVE + 1):

#Appel à la fonction qui train sur une epoch
    train_stats = train_one_epoch(
        drive_model,
        drive_train_loader,
        optimizer_drive,
        device,
        bce_criterion,
    )
#Validation sur l'epoch
    val_stats = validate_one_epoch(
        drive_model,
        drive_val_loader,
        device,
        bce_criterion,
    )
    #Affectation de la métrique à surveiller pour baisser le learning rate si ca stagne
    scheduler.step(val_stats["loss"])

    #Sauvegarde du meilleur modèle sur la meilleure loss validation
    if val_stats["loss"] < best_val_loss:
        best_val_loss = val_stats["loss"]
        best_epoch = epoch
        torch.save(drive_model.state_dict(), best_model_path)
        print(f"==> Nouveau meilleur modèle (epoch {epoch})")

#Affichage des loss (totale,seg,texture et structure)
    log_epoch_stats(
        history=history_drive,
        train_stats=train_stats,
        val_stats=val_stats,
        epoch=epoch,
        total_epochs=EPOCHS_DRIVE,
        tag="DRIVE"
    )

#Affichage du premier sample de la validation
    show_drive_val_sample(
        model=drive_model,
        val_loader=drive_val_loader,
        device=device,
        title_prefix=f"DRIVE - epoch {epoch}",
        threshold=0.5, #-> Pour avoir la segmentation en binaire, j e mets le threshold à 0.5
    )

#On charge le meilleure modèle sauvegarder
best_model = STDNetFullModel(
    img_channels=3,
    texture_channels=3,
    hidden_dim=64,
    img_size=image_size_drive[0],
).to(device)

best_model.load_state_dict(torch.load(best_model_path))


# et on l'évalue
drive_metrics = test_drive_model(best_model, drive_test_loader, device)


In [None]:
# ================== COURBES D'ENTRAÎNEMENT ==================
epochs_range = range(1, EPOCHS_DRIVE + 1)

plt.figure(figsize=(10, 6))
plt.plot(epochs_range, history_drive["train_loss"], label="Train total loss")
plt.plot(epochs_range, history_drive["val_loss"],   label="Val total loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Courbe de perte totale")
plt.legend()
plt.grid(True)
plt.show()

plt.figure(figsize=(10, 6))
plt.plot(epochs_range, history_drive["train_seg"], label="Train seg loss")
plt.plot(epochs_range, history_drive["val_seg"],   label="Val seg loss")
plt.xlabel("Epoch")
plt.ylabel("Segmentation loss")
plt.title("Perte de segmentation (deep supervision)")
plt.legend()
plt.grid(True)
plt.show()

plt.figure(figsize=(10, 6))
plt.plot(epochs_range, history_drive["train_T"], label="Train texture loss")
plt.plot(epochs_range, history_drive["val_T"],   label="Val texture loss")
plt.xlabel("Epoch")
plt.ylabel("Texture loss")
plt.title("Perte texture Lt")
plt.legend()
plt.grid(True)
plt.show()

plt.figure(figsize=(10, 6))
plt.plot(epochs_range, history_drive["train_S"], label="Train structure loss")
plt.plot(epochs_range, history_drive["val_S"],   label="Val structure loss")
plt.xlabel("Epoch")
plt.ylabel("Structure loss")
plt.title("Perte structure Ls")
plt.legend()
plt.grid(True)
plt.show()


In [None]:
origa_csv = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/ORIGA/OrigaList.csv"
origa_img_dir = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/ORIGA/Images_Cropped"
origa_mask_dir = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/ORIGA/Masks_Cropped"
image_size_origa = (256, 256)


def show_origa_val_sample(
    model,
    val_loader,
    device,
    title_prefix="ORIGA - epoch",
    threshold: float | None = 0.5,
):
    """
    Affiche, pour 1 image du set de validation ORIGA :
      - structure S
      - texture |T|
      - extracted structure (canal vert uniquement)
      - segmentation disque
      - segmentation cup

    On prend simplement le premier batch du loader.
    """

    model.eval()

    # On récupère UN batch de validation
    batch = next(iter(val_loader))
    img  = batch["image"].to(device)
    gt   = batch["gt"].to(device)
    fov  = batch.get("fov", None)

    if fov is not None:
        fov = fov.to(device)

    with torch.no_grad():

        seg, _, _, _, texture, extracted_structure, structure = model(img)
        prob = torch.sigmoid(seg)


    S  = structure[0].cpu()
    T  = texture[0].cpu()
    E  = extracted_structure[0].cpu()


    P_disc = prob[0, 0].cpu()
    P_cup  = prob[0, 1].cpu()

    H, W = P_disc.shape


    if fov is not None:
        F = fov[0, 0].cpu()
    else:
        F = torch.ones_like(P_disc)

    # -------Structure S (normalisée) ----------
    S_vis = S.clone()
    s_min, s_max = S_vis.min(), S_vis.max()
    S_vis = (S_vis - s_min) / (s_max - s_min + 1e-8)
    S_vis = S_vis * F.unsqueeze(0)     # [3,H,W]
    S_vis = S_vis.permute(1, 2, 0)     # [H,W,3]

    # ---------- Texture |T| (moyenne des canaux) ----------
    T_abs = T.abs().mean(0)
    t_min, t_max = T_abs.min(), T_abs.max()
    T_vis = (T_abs - t_min) / (t_max - t_min + 1e-8)
    T_vis = T_vis * F

    # ----------  Extracted structure – canal vert uniquement ----------
    E_green = E[1]
    e_min, e_max = E_green.min(), E_green.max()
    E_vis = (E_green - e_min) / (e_max - e_min + 1e-8)
    E_vis = E_vis * F

    # ---------- Segmentation disque & cup ----------
    if threshold is None:
        disc_mask = P_disc * F
        cup_mask  = P_cup  * F
    else:
        disc_mask = (P_disc >= threshold).float() * F
        cup_mask  = (P_cup  >= threshold).float() * F


    # ---------- AFFICHAGE ----------
    plt.figure(figsize=(18, 4))

    # Structure S
    plt.subplot(1, 5, 1)
    plt.imshow(S_vis.numpy())
    plt.title(f"{title_prefix} - Structure")
    plt.axis("off")

    # 2) Texture |T|
    plt.subplot(1, 5, 2)
    plt.imshow(T_vis.numpy(), cmap="gray")
    plt.title("Texture")
    plt.axis("off")

    # 3) Structure extraite (canal vert)
    plt.subplot(1, 5, 3)
    plt.imshow(E_vis.numpy(), cmap="gray")
    plt.title("Extracted structure (G)")
    plt.axis("off")

    # 4) Segmentation Disque
    plt.subplot(1, 5, 4)
    plt.imshow(disc_mask.numpy(), cmap="gray", vmin=0, vmax=1)
    plt.title("Disc mask")
    plt.axis("off")

    # 5) Segmentation Cup
    plt.subplot(1, 5, 5)
    plt.imshow(cup_mask.numpy(), cmap="gray", vmin=0, vmax=1)
    plt.title("Cup mask")
    plt.axis("off")

    plt.tight_layout()
    plt.show()


def setup_origa_dataloaders(
    image_size: Tuple[int, int] = (256, 256),
    batch_size_train: int = 2,
    batch_size_val: int = 2,
    batch_size_test: int = 1,
    num_workers: int = 2,
    val_ratio: float = 0.2,
    seed: int = 42,
    rotation_deg: float = 15.0,
):
    """
    Prépare tous les éléments pour ORIGA en une seule fonction :

      - calcule mean/std sur le train
      - crée les transforms train / eval
      - crée les datasets train / val / test
      - crée les DataLoaders correspondants
    """

    # 1) Dataset temporaire pour stats
    origa_stats_dataset = ORIGADataset(
        csv_path=origa_csv,
        images_dir=origa_img_dir,
        masks_dir=origa_mask_dir,
        split="train",
        transform=None,                # pas de RetinalSegmentationTransform pour les stats
        output_image_size=image_size,
    )

    mean_origa, std_origa = compute_mean_std(
        origa_stats_dataset,
        image_key="image",
        image_size=None
    )

    # 2) Transforms train / eval avec transformation polaire activée
    origa_train_transform = RetinalSegmentationTransform(
        image_size=image_size,
        mean=mean_origa,
        std=std_origa,
        train=True,
        use_polar=True,
        rotation_deg=rotation_deg,
    )
    origa_eval_transform = RetinalSegmentationTransform(
        image_size=image_size,
        mean=mean_origa,
        std=std_origa,
        train=False,
        use_polar=True,
    )

    # Dataset train complet (split officiel "train")
    full_origa_train = ORIGADataset(
        csv_path=origa_csv,
        images_dir=origa_img_dir,
        masks_dir=origa_mask_dir,
        split="train",
        transform=origa_train_transform,
        output_image_size=image_size,
    )

    # Split train/val (par défaut 80/20)
    n_origa = len(full_origa_train)
    indices = list(range(n_origa))
    np.random.seed(seed)
    np.random.shuffle(indices)
    split_idx = int((1.0 - val_ratio) * n_origa)
    origa_train_idx = indices[:split_idx]
    origa_val_idx = indices[split_idx:]

    origa_train_dataset = Subset(full_origa_train, origa_train_idx)

    # Dataset val avec transform "eval"
    full_origa_val = ORIGADataset(
        csv_path=origa_csv,
        images_dir=origa_img_dir,
        masks_dir=origa_mask_dir,
        split="train",
        transform=origa_eval_transform,
        output_image_size=image_size,
    )
    origa_val_dataset = Subset(full_origa_val, origa_val_idx)

    # 4) Test set
    origa_test_dataset = ORIGADataset(
        csv_path=origa_csv,
        images_dir=origa_img_dir,
        masks_dir=origa_mask_dir,
        split="test",
        transform=origa_eval_transform,
        output_image_size=image_size,
    )

    #  DataLoaders
    origa_train_loader = DataLoader(
        origa_train_dataset,
        batch_size=batch_size_train,
        shuffle=True,
        num_workers=num_workers,
    )
    origa_val_loader = DataLoader(
        origa_val_dataset,
        batch_size=batch_size_val,
        shuffle=False,
        num_workers=num_workers,
    )
    origa_test_loader = DataLoader(
        origa_test_dataset,
        batch_size=batch_size_test,
        shuffle=False,
        num_workers=num_workers,
    )

    # On renvoie les loaders + les stats de normalisation
    return origa_train_loader, origa_val_loader, origa_test_loader, mean_origa, std_origa





In [None]:
origa_csv = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/ORIGA/OrigaList.csv"
origa_img_dir = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/ORIGA/Images_Cropped"
origa_mask_dir = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/ORIGA/Masks_Cropped"
image_size_origa = (256, 256)


origa_train_loader, origa_val_loader, origa_test_loader, mean_origa, std_origa = setup_origa_dataloaders(
    image_size=image_size_origa
)

# Modèle ORIGA : 2 canaux de sortie
origa_model = STDNetFullModel(
    img_channels=3,
    texture_channels=3,
    hidden_dim=64,
    img_size=image_size_origa[0],
    num_classes=2
).to(device)

#Optimiseur, loss, scheduler
optimizer_origa = torch.optim.Adam(origa_model.parameters(), lr=1e-3)
bce_criterion_origa = nn.BCEWithLogitsLoss(reduction="none")

scheduler_origa = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer_origa,
    mode="min",
    factor=0.5,
    patience=10

)

#  Historique et suivi du meilleur modèle
EPOCHS_ORIGA = 80

history_origa = {
    "train_loss": [],
    "val_loss": [],
    "train_seg": [],
    "val_seg": [],
    "train_T":   [],
    "val_T":     [],
    "train_S":   [],
    "val_S":     [],
}

best_val_loss = float("inf")
best_epoch_origa = 0
best_origa_model_path = "stdnet_origa_best.pth"

#  Boucle d'entraînement
for epoch in range(1, EPOCHS_ORIGA + 1):


    train_stats = train_one_epoch(
        origa_model,
        origa_train_loader,
        optimizer_origa,
        device,
        bce_criterion_origa,
    )


    val_stats = validate_one_epoch(
        origa_model,
        origa_val_loader,
        device,
        bce_criterion_origa,
    )

    # Mise à jour du scheduler
    scheduler_origa.step(val_stats["loss"])


    log_epoch_stats(
        history=history_origa,
        train_stats=train_stats,
        val_stats=val_stats,
        epoch=epoch,
        total_epochs=EPOCHS_ORIGA,
        tag="ORIGA"
    )


    if val_stats["loss"] < best_val_loss:
        best_val_loss = val_stats["loss"]
        best_epoch_origa = epoch
        torch.save(origa_model.state_dict(), best_origa_model_path)
        print(f"==> Nouveau meilleur modèle ORIGA (epoch {epoch}, val_loss={best_val_loss:.4f})")


    show_origa_val_sample(
        model=origa_model,
        val_loader=origa_val_loader,
        device=device,
        title_prefix=f"ORIGA - epoch {epoch}",
        threshold=0.5,  #Pour l'affichage
    )

#  Rechargement du meilleur modèle pour l'évaluation finale
best_origa_model = STDNetFullModel(
    img_channels=3,
    texture_channels=3,
    hidden_dim=64,
    img_size=image_size_origa[0],
    num_classes=2
).to(device)

best_origa_model.load_state_dict(torch.load(best_origa_model_path, map_location=device))


#  Overlapping Error test avec le MEILLEUR modèle
oe_test = compute_oe_origa(best_origa_model, origa_test_loader, device, desc="OE ORIGA TEST")

print("\n===== OE ORIGA (disc + cup) =====")
print(f"Test : OE_disc={oe_test['disc']:.4f}, OEcup={oe_test['cup']:.4f}, OEtotal={oe_test['total']:.4f}")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# ----- COURBES ORIGA -----
epochs_origa = np.arange(1, len(history_origa["train_loss"]) + 1)

plt.figure(figsize=(14, 8))

#Loss totale
plt.subplot(2, 2, 1)
plt.plot(epochs_origa, history_origa["train_loss"], label="train")
plt.plot(epochs_origa, history_origa["val_loss"],   label="val")
plt.xlabel("Epoch")
plt.ylabel("Total loss")
plt.title("ORIGA - Total loss")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.3)

#  Loss segmentation
plt.subplot(2, 2, 2)
plt.plot(epochs_origa, history_origa["train_seg"], label="train")
plt.plot(epochs_origa, history_origa["val_seg"],   label="val")
plt.xlabel("Epoch")
plt.ylabel("Segmentation loss")
plt.title("ORIGA - Segmentation")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.3)

# Loss texture
plt.subplot(2, 2, 3)
plt.plot(epochs_origa, history_origa["train_T"], label="train")
plt.plot(epochs_origa, history_origa["val_T"],   label="val")
plt.xlabel("Epoch")
plt.ylabel("Texture loss (Lt)")
plt.title("ORIGA - Texture")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.3)

# Loss structure
plt.subplot(2, 2, 4)
plt.plot(epochs_origa, history_origa["train_S"], label="train")
plt.plot(epochs_origa, history_origa["val_S"],   label="val")
plt.xlabel("Epoch")
plt.ylabel("Structure loss (Ls)")
plt.title("ORIGA - Structure")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.3)

plt.tight_layout()
plt.show()


In [None]:
from typing import Callable, Optional, Tuple, Dict

class REFUGEDataset(Dataset):
    """
    Dataset REFUGE pour segmentation disque + cup.

    Sorties :
      - image : [3,H,W], float32 normalisée plus tard
      - gt    : [2,H,W], 0/1 (canal 0 = disc, canal 1 = cup)
      - fov   : [1,H,W], ici tout à 1 (pas de masque FOV fourni)
    """

    def __init__(
        self,
        images_dir: str,
        masks_dir: str,
        transform: Optional[Callable] = None,
        disc_label_val: int = 1,
        cup_label_val: int = 2,
        image_extensions: Tuple[str, ...] = (".jpg", ".png", ".jpeg", ".bmp", ".tif", ".tiff"),
        mask_extensions: Tuple[str, ...] = (".png", ".bmp", ".jpg", ".jpeg", ".tif", ".tiff"),
        output_image_size: Optional[Tuple[int, int]] = None,
    ):
        self.images_dir = Path(images_dir)
        self.masks_dir = Path(masks_dir)
        self.transform = transform
        self.disc_label_val = disc_label_val
        self.cup_label_val = cup_label_val
        self.image_extensions = image_extensions
        self.mask_extensions = mask_extensions
        self.output_image_size = output_image_size

        if not self.images_dir.is_dir():
            raise FileNotFoundError(f"Images dir not found: {self.images_dir}")
        if not self.masks_dir.is_dir():
            raise FileNotFoundError(f"Masks dir not found: {self.masks_dir}")

        # On liste les fichiers image
        self.image_files = sorted(
            [f for f in os.listdir(self.images_dir) if f.lower().endswith(self.image_extensions)]
        )
        if len(self.image_files) == 0:
            raise RuntimeError(f"Aucune image trouvée dans {self.images_dir}")

    def __len__(self) -> int:
        return len(self.image_files)

    def _find_mask_path(self, image_name: str) -> Path:
        """
        On cherche un masque ayant le même nom de base que l'image,
        mais avec une des extensions autorisées.
        """
        stem = Path(image_name).stem
        for ext in self.mask_extensions:
            cand = self.masks_dir / f"{stem}{ext}"
            if cand.is_file():
                return cand
        raise FileNotFoundError(f"Aucun masque trouvé pour {image_name} dans {self.masks_dir}")

    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
        img_name = self.image_files[idx]
        img_path = self.images_dir / img_name
        mask_path = self._find_mask_path(img_name)

        # --- Chargement image + masque ---

        img = Image.open(img_path).convert("RGB")
        # img = apply_clahe(img)

        # masque en niveaux de gris
        mask = Image.open(mask_path).convert("L")


        if self.output_image_size is not None and self.transform is None:
            img = img.resize(self.output_image_size, Image.BILINEAR)
            mask = mask.resize(self.output_image_size, Image.NEAREST)

        img_tensor = TF.to_tensor(img)  # [3,H,W]
        mask_np = np.array(mask, dtype=np.float32)  # [H,W]

        # On s'assure que les labels sont bien 0/1/2
        if mask_np.max() > 2:
            disc_region = (mask_np > 0) & (mask_np < 200)
            cup_region = (mask_np >= 200)

            labels = np.zeros_like(mask_np, dtype=np.float32)
            labels[disc_region] = self.disc_label_val
            labels[cup_region] = self.cup_label_val
        else:
            labels = mask_np  # déjà 0/1/2

        mask_tensor = torch.from_numpy(labels).unsqueeze(0)  # [1,H,W]

        masks = {"mask": mask_tensor}


        if self.transform is not None:
            img_tensor, masks = self.transform(img_tensor, masks)

        mask_tensor = masks["mask"].squeeze(0)

        # On construit les canaux disc/cup binaires
        disc = (mask_tensor == float(self.disc_label_val)).float().unsqueeze(0)
        cup = (mask_tensor == float(self.cup_label_val)).float().unsqueeze(0)
        gt = torch.cat([disc, cup], dim=0)

        # Pas de masque FOV fourni sur REFUGE → on prend tout
        fov = torch.ones_like(disc)

        return {
            "image": img_tensor,
            "gt": gt,
            "disc": disc,
            "cup": cup,
            "fov": fov,
            "name": img_name,
        }

In [None]:
def setup_refuge_dataloaders(
    base_dir: str,
    image_size: Tuple[int, int] = (256, 256),
    batch_size_train: int = 2,
    batch_size_val: int = 2,
    batch_size_test: int = 1,
    num_workers: int = 2,
    apply_polar: bool = False,
    seed: int = 42,
):
    """
    Prépare les DataLoaders pour REFUGE à partir d'un dossier de base :
        base_dir/
            train/Images
            train/Masks
            val/Images
            val/Masks
            test/Images
            test/Masks

    Retourne :
        train_loader, val_loader, test_loader, mean_refuge, std_refuge
    """


    train_img_dir = os.path.join(base_dir, "train", "Images_Cropped")
    train_mask_dir = os.path.join(base_dir, "train", "Masks_Cropped")
    val_img_dir   = os.path.join(base_dir, "val",   "Images_Cropped")
    val_mask_dir  = os.path.join(base_dir, "val",   "Masks_Cropped")
    test_img_dir  = os.path.join(base_dir, "test",  "Images_Cropped")
    test_mask_dir = os.path.join(base_dir, "test",  "Masks_Cropped")

    # Dataset sans transform pour calculer mean/std
    refuge_stats_dataset = REFUGEDataset(
        images_dir=train_img_dir,
        masks_dir=train_mask_dir,
        transform=None,
        output_image_size=image_size,
    )

    mean_refuge, std_refuge = compute_mean_std(
        refuge_stats_dataset,
        image_key="image",
        image_size=None,
    )

    # Transforms train / eval
    refuge_train_transform = RetinalSegmentationTransform(
        image_size=image_size,
        mean=mean_refuge,
        std=std_refuge,
        train=True,
        use_polar=apply_polar,
        rotation_deg=5.0,
    )
    refuge_eval_transform = RetinalSegmentationTransform(
        image_size=image_size,
        mean=mean_refuge,
        std=std_refuge,
        train=False,
        use_polar=apply_polar,
    )

    #  Datasets
    refuge_train_dataset = REFUGEDataset(
        images_dir=train_img_dir,
        masks_dir=train_mask_dir,
        transform=refuge_train_transform,
        output_image_size=image_size,
    )

    refuge_val_dataset = REFUGEDataset(
        images_dir=val_img_dir,
        masks_dir=val_mask_dir,
        transform=refuge_eval_transform,
        output_image_size=image_size,
    )

    refuge_test_dataset = REFUGEDataset(
        images_dir=test_img_dir,
        masks_dir=test_mask_dir,
        transform=refuge_eval_transform,
        output_image_size=image_size,
    )

    #  DataLoaders
    refuge_train_loader = DataLoader(
        refuge_train_dataset,
        batch_size=batch_size_train,
        shuffle=True,
        num_workers=num_workers,
    )
    refuge_val_loader = DataLoader(
        refuge_val_dataset,
        batch_size=batch_size_val,
        shuffle=False,
        num_workers=num_workers,
    )
    refuge_test_loader = DataLoader(
        refuge_test_dataset,
        batch_size=batch_size_test,
        shuffle=False,
        num_workers=num_workers,
    )

    return refuge_train_loader, refuge_val_loader, refuge_test_loader, mean_refuge, std_refuge

In [None]:

refuge_base_dir = "/content/drive/MyDrive/Colab Notebooks/Traitement-image/data/REFUGE"

image_size_refuge = (512, 512)

(
    refuge_train_loader,
    refuge_val_loader,
    refuge_test_loader,
    mean_refuge,
    std_refuge,
) = setup_refuge_dataloaders(
    base_dir=refuge_base_dir,
    image_size=image_size_refuge,
    batch_size_train=2,
    batch_size_val=2,
    batch_size_test=1,
    num_workers=2,
    apply_polar=True,
)


In [None]:
# -----------------------------
# Modèle REFUGE (disc + cup)
# -----------------------------
refuge_model = STDNetFullModel(
    img_channels=3,
    texture_channels=3,
    hidden_dim=64,
    img_size=image_size_refuge[0],
    num_classes=2
).to(device)

optimizer_refuge = torch.optim.Adam(refuge_model.parameters(), lr=1e-3)
bce_criterion_refuge = nn.BCEWithLogitsLoss(reduction="none")

EPOCHS_REFUGE = 80
history_refuge = {"train_loss": [], "val_loss": [], "train_seg": [], "val_seg": [],
                  "train_T": [], "val_T": [], "train_S": [], "val_S": []}

best_val_loss = float("inf")
best_epoch = -1
best_model_path_refuge = "stdnet_refuge_best.pth"

# Scheduler pour baisser le LR si la val stagne
scheduler_refuge = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer_refuge,
    mode="min",
    factor=0.5,
    patience=10,
)

for epoch in range(1, EPOCHS_REFUGE + 1):

    train_stats = train_one_epoch(
        refuge_model,
        refuge_train_loader,
        optimizer_refuge,
        device,
        bce_criterion_refuge,
    )

    val_stats = validate_one_epoch(
        refuge_model,
        refuge_val_loader,
        device,
        bce_criterion_refuge,
    )


    scheduler_refuge.step(val_stats["loss"])


    log_epoch_stats(
        history=history_refuge,
        train_stats=train_stats,
        val_stats=val_stats,
        epoch=epoch,
        total_epochs=EPOCHS_REFUGE,
        tag="REFUGE"
    )


    if val_stats["loss"] < best_val_loss:
        best_val_loss = val_stats["loss"]
        best_epoch = epoch
        torch.save(refuge_model.state_dict(), best_model_path_refuge)
        print(f"==> Nouveau meilleur modèle REFUGE (epoch {epoch}, val_loss={best_val_loss:.4f})")


    show_origa_val_sample(
        model=refuge_model,
        val_loader=refuge_val_loader,
        device=device,
        title_prefix=f"REFUGE - epoch {epoch}",
        threshold=0.5,
    )


best_refuge_model = STDNetFullModel(
    img_channels=3,
    texture_channels=3,
    hidden_dim=64,
    img_size=image_size_refuge[0],
    num_classes=2
).to(device)
best_refuge_model.load_state_dict(torch.load(best_model_path_refuge, map_location=device))


oe_refuge_test = compute_oe_origa(best_refuge_model, refuge_test_loader, device, desc="OE REFUGE TEST")

print("\n===== OE REFUGE (disc + cup) =====")
print(f"Test : OE_disc={oe_refuge_test['disc']:.4f}, OEcup={oe_refuge_test['cup']:.4f}, OEtotal={oe_refuge_test['total']:.4f}")

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# ----- COURBES ORIGA -----
epochs_origa = np.arange(1, len(history_refuge["train_loss"]) + 1)

plt.figure(figsize=(14, 8))

#Loss totale
plt.subplot(2, 2, 1)
plt.plot(epochs_origa, history_refuge["train_loss"], label="train")
plt.plot(epochs_origa, history_refuge["val_loss"],   label="val")
plt.xlabel("Epoch")
plt.ylabel("Total loss")
plt.title("ORIGA - Total loss")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.3)

#  Loss segmentation
plt.subplot(2, 2, 2)
plt.plot(epochs_origa, history_refuge["train_seg"], label="train")
plt.plot(epochs_origa, history_refuge["val_seg"],   label="val")
plt.xlabel("Epoch")
plt.ylabel("Segmentation loss")
plt.title("ORIGA - Segmentation")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.3)

# Loss texture
plt.subplot(2, 2, 3)
plt.plot(epochs_origa, history_refuge["train_T"], label="train")
plt.plot(epochs_origa, history_refuge["val_T"],   label="val")
plt.xlabel("Epoch")
plt.ylabel("Texture loss (Lt)")
plt.title("ORIGA - Texture")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.3)

# Loss structure
plt.subplot(2, 2, 4)
plt.plot(epochs_origa, history_refuge["train_S"], label="train")
plt.plot(epochs_origa, history_refuge["val_S"],   label="val")
plt.xlabel("Epoch")
plt.ylabel("Structure loss (Ls)")
plt.title("ORIGA - Structure")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.3)

plt.tight_layout()
plt.show()
