# Classe `ImageSegmentationDataset` pour la segmentation d'images

Ce notebook détaille l'implémentation et l'utilisation de la classe `ImageSegmentationDataset`, un générateur de données avancé pour la segmentation d'images avec TensorFlow/Keras.  
Il permet de gérer le chargement, la préparation, l'augmentation et la visualisation des données pour des tâches de segmentation sémantique.

## 1. Définition de la classe `ImageSegmentationDataset`

Nous allons définir une classe héritant de `tf.keras.utils.PyDataset` pour gérer efficacement le chargement et la préparation des données de segmentation.  
Cette classe permet de manipuler des couples (image, masque), d'appliquer des augmentations, de gérer le mélange des données, la normalisation, le one-hot encoding, et les poids d'échantillons.

In [None]:
import os
import math
import pathlib
from typing import Optional, Union, Tuple, List, NamedTuple, Any

import numpy as np
import tensorflow as tf
import albumentations as A
import matplotlib.pyplot as plt

# Pour la reproductibilité
np.random.seed(314)
tf.random.set_seed(314)

## 2. Initialisation et configuration des paramètres

La méthode `__init__` initialise les chemins des images et masques, les labels, la taille des batchs, la taille cible, les options d'augmentation, de normalisation, de mélange, de one-hot encoding, et les poids d'échantillons.

In [None]:
class ImageSegmentationDataset(tf.keras.utils.PyDataset):
    """
    Dataset generator for image segmentation tasks.
    """

    def __init__(
        self,
        paths: List[Tuple[pathlib.Path, pathlib.Path]],
        labels: List[NamedTuple],
        batch_size: int,
        target_size: Tuple[int, int],
        augmentations: bool = False,
        preview: Optional[int] = None,
        normalize: Union[bool, str] = True,
        shuffle: bool = True,
        label_onehot: bool = False,
        sample_weights: Optional[List[float]] = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(**kwargs)

        # Chargement des chemins images/masques (avec option preview)
        self.image_paths, self.mask_paths = self.load_img_and_mask_paths(paths, preview)

        # Tables de correspondance pour les catégories
        self.table_id2category = {label.id: label.categoryId for label in labels}
        self.table_category2name = {label.categoryId: label.category for label in labels}

        # Paramètres du dataset
        self.batch_size = batch_size
        self.target_size = target_size
        self.augmentations = augmentations
        self.normalize = normalize
        self.shuffle = shuffle
        self.label_onehot = label_onehot
        self.sample_weights = sample_weights

        # Pipeline d'augmentation si activé
        if self.augmentations:
            self.compose = A.Compose(
                [
                    A.HorizontalFlip(p=0.5),
                    A.OneOf(
                        [
                            A.RandomBrightnessContrast(
                                brightness_limit=0.2, contrast_limit=0.2, p=1.0
                            ),
                            A.HueSaturationValue(
                                hue_shift_limit=10,
                                sat_shift_limit=15,
                                val_shift_limit=10,
                                p=1.0,
                            ),
                        ],
                        p=0.5,
                    ),
                    A.OneOf(
                        [
                            A.GaussianBlur(blur_limit=3, p=1.0),
                            A.MotionBlur(blur_limit=5, p=1.0),
                            A.OpticalDistortion(distort_limit=0.05, p=1.0),
                        ],
                        p=0.25,
                    ),
                ],
                additional_targets={"sample_weights": "mask"}
                if self.sample_weights is not None
                else {},
            )

        # Mélange initial si demandé
        if self.shuffle:
            self.on_epoch_end()

## 3. Chargement et découpage des chemins images/masques

La méthode statique `load_img_and_mask_paths` permet de découper les tuples de chemins images/masques et de gérer l'option `preview` pour ne charger qu'un sous-ensemble des données.

In [None]:
    @staticmethod
    def load_img_and_mask_paths(
        paths: List[Tuple[pathlib.Path, pathlib.Path]], preview: Optional[int]
    ) -> Tuple[List[pathlib.Path], List[pathlib.Path]]:
        """
        Unpack tuples of image and mask paths and apply preview slicing if specified.
        """
        image_paths, mask_paths = zip(*paths)
        if len(image_paths) != len(mask_paths):
            raise ValueError("Number of images and masks must be equal.")
        if preview is not None:
            image_paths = image_paths[:preview]
            mask_paths = mask_paths[:preview]
        return list(image_paths), list(mask_paths)

## 4. Calcul du nombre de classes et d’échantillons

Les propriétés `num_classes` et `num_samples` retournent respectivement le nombre de classes et d'échantillons dans le dataset.

In [None]:
    @property
    def num_classes(self) -> int:
        """Retourne le nombre de classes uniques dans le dataset."""
        return len(set(self.table_id2category.values()))

    @property
    def num_samples(self) -> int:
        """Retourne le nombre total d'échantillons dans le dataset."""
        return len(self.image_paths)

## 5. Chargement et prétraitement des images

La méthode `load_img_to_array` charge une image RGB depuis le disque, la redimensionne à la taille cible et la normalise si demandé.

In [None]:
    def load_img_to_array(self, img_path: pathlib.Path) -> np.ndarray:
        """
        Charge une image, la redimensionne et la convertit en tableau numpy.
        """
        img = tf.keras.utils.load_img(
            str(img_path),
            target_size=self.target_size,
            color_mode="rgb",
            interpolation="bilinear",
        )
        img_array = tf.keras.utils.img_to_array(img, dtype=np.float32)
        return img_array / 255.0 if self.normalize else img_array

## 6. Chargement et transformation des masques

La méthode `load_mask_to_array` charge un masque, le redimensionne, convertit les ids de labels en catégories, et applique éventuellement le one-hot encoding.

In [None]:
    def load_mask_to_array(self, mask_path: pathlib.Path) -> np.ndarray:
        """
        Charge un masque, le redimensionne, mappe les ids vers les catégories, et applique le one-hot si demandé.
        """
        mask = tf.keras.utils.load_img(
            str(mask_path),
            target_size=self.target_size,
            color_mode="grayscale",
            interpolation="nearest",
        )
        mask_array = tf.keras.utils.img_to_array(mask, dtype=np.int8)
        mask_array = np.vectorize(self.table_id2category.get)(mask_array).squeeze()
        if self.label_onehot:
            mask_array = tf.keras.utils.to_categorical(
                mask_array, num_classes=self.num_classes
            )
        return mask_array

## 7. Gestion des augmentations de données

La méthode `load_and_augment` applique le pipeline d'augmentations (flip, brightness, blur, etc.) sur les images et masques si activé.

In [None]:
    def load_and_augment(
        self, paths: Tuple[pathlib.Path, pathlib.Path]
    ) -> Union[
        Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray]
    ]:
        """
        Charge image et masque, applique les augmentations et la normalisation si activées.
        """
        img_path, mask_path = paths
        img = self.load_img_to_array(img_path)
        mask = self.load_mask_to_array(mask_path)

        if self.sample_weights is not None:
            weights = np.take(self.sample_weights, mask)
            if self.augmentations:
                augmented = self.compose(image=img, mask=mask, sample_weights=mask)
                return (
                    augmented["image"],
                    augmented["mask"],
                    augmented["sample_weights"],
                )
            else:
                return img, mask, weights
        else:
            if self.augmentations:
                augmented = self.compose(image=img, mask=mask)
                return augmented["image"], augmented["mask"]
            else:
                return img, mask

## 8. Gestion du mélange des données (shuffle)

La méthode `on_epoch_end` mélange les chemins images/masques à la fin de chaque époque si l'option shuffle est activée.

In [None]:
    def on_epoch_end(self) -> None:
        """
        Mélange le dataset à la fin de chaque époque si shuffle est activé.
        """
        if self.shuffle:
            zip_paths = list(zip(self.image_paths, self.mask_paths))
            np.random.shuffle(zip_paths)
            self.image_paths, self.mask_paths = zip(*zip_paths)

## 9. Gestion des batchs et récupération d’un batch

La méthode `__len__` retourne le nombre de batchs par époque, et `__getitem__` retourne un batch d'images, masques (et poids si applicable).

In [None]:
    def __len__(self) -> int:
        """
        Retourne le nombre de batchs par époque.
        """
        return math.ceil(self.num_samples / self.batch_size)

    def __getitem__(
        self, index: int
    ) -> Union[
        Tuple[np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray]
    ]:
        """
        Récupère un batch d'images et de masques à l'index donné.
        """
        start_idx = index * self.batch_size
        end_idx = min(start_idx + self.batch_size, self.num_samples)
        if start_idx >= self.num_samples:
            raise IndexError("Index out of range")

        batch_paths = list(
            zip(self.image_paths[start_idx:end_idx], self.mask_paths[start_idx:end_idx])
        )
        results = [self.load_and_augment(pair) for pair in batch_paths]

        if self.sample_weights is not None:
            images, masks, weights = zip(*results)
            return np.asarray(images), np.asarray(masks), np.asarray(weights)
        else:
            images, masks = zip(*results)
            return np.asarray(images), np.asarray(masks)

## 10. Visualisation des transformations

La méthode `show_transformation` permet d'afficher l'image et le masque avant/après transformation pour un index donné.

In [None]:
    def get_image_and_mask(
        self, index: int
    ) -> Tuple[np.ndarray, np.ndarray, Tuple[pathlib.Path, pathlib.Path]]:
        """
        Récupère une paire image/masque pour visualisation.
        """
        paths = (self.image_paths[index], self.mask_paths[index])
        if self.sample_weights is None:
            img, mask = self.load_and_augment(paths)
        else:
            img, mask, _ = self.load_and_augment(paths)
        return img, mask, paths

    def show_transformation(
        self, index: int, figsize: Tuple[int, int] = (12, 8)
    ) -> None:
        """
        Affiche l'image et le masque avant/après transformation pour un index donné.
        """
        img, mask, paths = self.get_image_and_mask(index)
        img_path, mask_path = paths

        fig, ax = plt.subplots(2, 2, figsize=figsize)
        fig.suptitle("Image et masque avant/après transformation", fontsize=16)
        ax[0, 0].imshow(plt.imread(img_path))
        ax[0, 0].set_title("Image originale")
        ax[0, 1].imshow(img)
        ax[0, 1].set_title("Image transformée")
        ax[1, 0].imshow(plt.imread(mask_path))
        ax[1, 0].set_title("Masque original")
        ax[1, 1].imshow(mask)
        ax[1, 1].set_title("Masque transformé")
        for a in ax.ravel():
            a.axis("off")
        plt.tight_layout()
        plt.show()

## 11. Visualisation des prédictions du modèle

La méthode `show_prediction` affiche l'image, le masque réel et la prédiction du modèle pour un index donné.

In [None]:
    def get_prediction(self, model: Any, index: int) -> np.ndarray:
        """
        Génère une prédiction pour une image donnée à l'aide du modèle fourni.
        """
        img, _, _ = self.get_image_and_mask(index)
        mask_pred = model.predict(np.expand_dims(img, axis=0))
        mask_pred = np.argmax(mask_pred.squeeze(), axis=-1)
        return mask_pred

    def show_prediction(
        self, model: Any, index: int, figsize: Tuple[int, int] = (15, 6)
    ) -> None:
        """
        Affiche l'image, le masque réel et la prédiction du modèle pour un index donné.
        """
        img, mask, paths = self.get_image_and_mask(index)
        img_path, mask_path = paths
        mask_pred = self.get_prediction(model, index)

        fig = plt.figure(layout="constrained", figsize=figsize)
        fig.suptitle(f"{model.name} - Prédiction", fontsize=16)
        subfigs = fig.subfigures(2, 1, wspace=0.07)
        axsTop = subfigs[0].subplots(1, 3, sharey=True, sharex=True)
        axsBottom = subfigs[1].subplots(1, 3, sharey=True, sharex=True)

        for i, ax in enumerate(axsTop):
            if i == 0:
                ax.imshow(self.load_img_to_array(img_path))
                ax.set_title("Image originale")
            elif i == 1:
                ax.imshow(mask, cmap="Greys")
                ax.set_title("Masque réel (N&B)")
            else:
                ax.imshow(mask)
                ax.set_title("Masque réel (RGB)")
            ax.axis("off")

        for i, ax in enumerate(axsBottom):
            if i == 0:
                ax.imshow(self.load_img_to_array(img_path))
                ax.set_title("Image originale")
            elif i == 1:
                ax.imshow(mask_pred, cmap="Greys")
                ax.set_title("Masque prédit (N&B)")
            else:
                ax.imshow(mask_pred)
                ax.set_title("Masque prédit (RGB)")
            ax.axis("off")
        plt.show()