# Imports et variables

In [None]:
import os
import cv2
import keras
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
import albumentations as A
import shutil
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
  tf.config.experimental.set_memory_growth(gpu, True)

In [None]:
masksSourceFolder = '/content/drive/MyDrive/Colab Notebooks/Projet 8/P8_Cityscapes_gtFine_trainvaltest/gtFine8'
imagesSourceFolder = '/content/drive/MyDrive/Colab Notebooks/Projet 8/P8_Cityscapes_leftImg8bit_trainvaltest/leftImg8bit'

In [None]:
masksFolder = '/content/images'
imagesFolder = '/content/masks'

In [None]:
sources = [masksSourceFolder, imagesSourceFolder]
destinations = [masksFolder, imagesFolder]

In [None]:
splits = ['train', 'val', 'test']

In [None]:
for split in splits:
  for source, destination in zip(sources, destinations):
    src_dir = os.path.join(source, split)
    dst_dir = os.path.join(destination, split)
    os.makedirs(destination, exist_ok=True)
    os.makedirs(dst_dir, exist_ok=True)
    for fichier in os.listdir(src_dir):
      if fichier.endswith("labelIds.png") or fichier.endswith("leftImg8bit.png"):
        shutil.copyfile(os.path.join(src_dir, fichier), os.path.join(dst_dir, fichier))

In [None]:
X_train_dir = os.path.join(imagesFolder, 'train')
y_train_dir = os.path.join(masksFolder, 'train')

X_val_dir = os.path.join(imagesFolder, 'val')
y_val_dir = os.path.join(masksFolder, 'val')

X_test_dir = os.path.join(imagesFolder, 'test')
y_test_dir = os.path.join(masksFolder, 'test')

In [None]:
dicoclasses = {0:'void',
               1:'flat',
               2:'construction',
               3:'object',
               4:'nature',
               5:'sky',
               6:'human',
               7:'vehicle',
              }

# Data Loader

https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly

https://github.com/qubvel/segmentation_models/blob/master/examples/multiclass%20segmentation%20(camvid).ipynb

In [None]:
# adapted from https://github.com/qubvel/segmentation_models/blob/master/examples/multiclass%20segmentation%20(camvid).ipynb
class Dataloader(keras.utils.Sequence):
    """Load data from dataset and form batches

    Args:
        data_folder: folder where is data.
        batch_size: Integet number of images in batch.
        transform: albumentations.Compose.
        shuffle: Boolean, if `True` shuffle image indexes each epoch.
    """

    def __init__(self, data_folder, batch_size=1, transform=None, shuffle=False):
        self.data_folder = data_folder
        if data_folder == X_train_dir:
            self.mask_folder = y_train_dir
        elif data_folder == X_val_dir:
            self.mask_folder = y_val_dir
        elif data_folder == X_test_dir:
            self.mask_folder = y_test_dir
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.transform = transform
        self.indexes = list(set([path[:path.rfind('_')] for path in os.listdir(data_folder) if path.endswith(".png") ]))
        self.mask_indexes = [os.path.join(self.mask_folder, path) for path in self.indexes]
        self.indexes = [os.path.join(data_folder, path) for path in self.indexes]

        self.on_epoch_end()

    def __getitem__(self, i):

        # collect batch data
        start = i * self.batch_size
        stop = (i + 1) * self.batch_size
        images = []
        masks = []
        for j in range(start, stop):
            root_file = self.indexes[j]
            mask_file = self.mask_indexes[j]
            image_file = root_file + "_leftImg8bit.png"
            mask_file = mask_file + "_gtFine_labelIds.png"
            # on convertit les images en array numpy
            image = np.array(Image.open(image_file))
            mask = np.array(Image.open(mask_file))
            # Appliquer la transformation si elle est demandée
            if self.transform is not None:
                transformed = self.transform(image=image, mask=mask)
                image = transformed["image"]
                mask = transformed["mask"]
            # on les ajoute à leurs listes respectives
            images.append(image)
            masks.append(mask)
        # transpose list of lists
        # batch = [np.stack(samples, axis=0) for samples in zip(*data)]
        image_batch = np.stack(images, axis=0)
        mask_batch = np.stack(masks, axis=0)
        # print("DEBUG | images", image_batch.shape, image_batch.dtype)
        # print("DEBUG | masks ", mask_batch.shape, mask_batch.dtype)
        # print("DEBUG | unique mask values", np.unique(mask_batch))
        return image_batch, mask_batch

    def __len__(self):
        """Denotes the number of batches per epoch"""
        return len(self.indexes) // self.batch_size
        # return int(np.ceil(len(self.indexes) / self.batch_size))


    def on_epoch_end(self):
        """Callback function to shuffle indexes each epoch"""
        if self.shuffle:
          # On mélange les paires image/mask ENSEMBLE
          combined = list(zip(self.indexes, self.mask_indexes))
          np.random.shuffle(combined)
          self.indexes, self.mask_indexes = zip(*combined)
          # zip(*combined) retourne des tuples, donc si tu veux des listes :
          self.indexes = list(self.indexes)
          self.mask_indexes = list(self.mask_indexes)

Il va falloir ajouter la data augmentation et le preprocessing

https://albumentations.ai/docs/3-basic-usage/semantic-segmentation/

https://albumentations.ai/docs/3-basic-usage/choosing-augmentations/

https://albumentations.ai/docs/api-reference/albumentations/augmentations/geometric/transforms/#ShiftScaleRotate

In [None]:
train_transform = A.Compose([
    # 1. Cropping / Resize
    A.Resize(256, 512),

    # 2. Basic Geometric (invariances basiques)
    A.HorizontalFlip(p=0.5),
    # Pas de flip vertical, pas de symétrie carrée (sauf imagerie satellite)

    # 3. Dropout/Occlusion (pour la robustesse aux obstacles)
    A.CoarseDropout(max_holes=8, max_height=32, max_width=32, p=0.2),

    # 4. Color/Channel dropout (si tu veux vraiment rendre le modèle insensible à la couleur)
    # A.ToGray(p=0.1),
    A.ChannelDropout(p=0.1),

    # 5. Affine transformations
    A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.1, rotate_limit=10, p=0.5),

    # 6. Domain-Specific (effets météo, soleil, etc.)
    A.RandomSunFlare(p=0.1),
    A.RandomShadow(p=0.1),
    A.RandomFog(p=0.05),
    A.RandomRain(p=0.05),
    A.RandomSnow(p=0.05),
    # Autres effets spécifiques :
    A.RandomBrightnessContrast(p=0.3),
    A.GaussNoise(p=0.2),

    # 7. Normalization (toujours à la fin)
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], max_pixel_value=255.0)
])


val_transform = A.Compose([
    A.Resize(256, 512),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], max_pixel_value=255.0)
])

In [None]:
train_loader = Dataloader(
    data_folder=X_train_dir,
    batch_size=8,
    transform=train_transform,
    shuffle=True
)

val_loader = Dataloader(
    data_folder=X_val_dir,
    batch_size=8,
    transform=val_transform,
    shuffle=False
)

In [None]:
# fonctions de visualisation pour vérifier le comportement de l'augmentation
# Simple function to overlay mask on image for visualization
def overlay_mask(image, mask, alpha=0.5, color=(0, 1, 0)): # Green overlay
    # Convert mask to 3 channels if needed, ensure boolean type
    mask_overlay = np.zeros_like(image, dtype=np.uint8)
    # Create a color overlay where mask is > 0
    mask_overlay[mask > 0] = (np.array(color) * 255).astype(np.uint8)

    # Blend image and overlay
    overlayed_image = cv2.addWeighted(image, 1, mask_overlay, alpha, 0)
    return overlayed_image

def visualize_segmentation(dataset, idx=0, samples=3):
    import matplotlib.pyplot as plt

    if isinstance(dataset.transform, A.Compose):
        vis_transform_list = [
            t for t in dataset.transform
            if not isinstance(t, (A.Normalize, A.ToTensorV2))
        ]
        vis_transform = A.Compose(vis_transform_list)
    else:
        print("Warning: Could not automatically strip Normalize/ToTensor for visualization.")
        vis_transform = dataset.transform

    figure, ax = plt.subplots(samples + 1, 2, figsize=(8, 4 * (samples + 1)))

    # --- Get the original image and mask --- #
    original_transform = dataset.transform
    dataset.transform = None # Temporarily disable for raw data access
    image_batch, mask_batch = dataset[idx]
    image = image_batch[0]   # Prends la première image du batch
    mask = mask_batch[0]     # Prends le premier masque du batch
    dataset.transform = original_transform # Restore

    # --- Patch : assure l'image est (H,W,3) uint8 --- #
    if image.ndim == 2: # grayscale
        image = np.stack([image]*3, axis=-1)
    if image.ndim == 3 and image.shape[2] == 1:
        image = np.repeat(image, 3, axis=2)
    if image.dtype != np.uint8:
        image = image.astype(np.uint8)
    # --- Patch masque (H,W) --- #
    if mask.ndim == 3:
        mask = mask.squeeze()
    # Affichage original
    ax[0, 0].imshow(image)
    ax[0, 0].set_title("Original Image")
    ax[0, 0].axis("off")
    ax[0, 1].imshow(mask, cmap='gray')
    ax[0, 1].set_title("Original Mask")
    ax[0, 1].axis("off")

    # --- Apply and display augmented versions --- #
    for i in range(samples):
        # Applique la transformation de visu
        if vis_transform:
            augmented = vis_transform(image=image, mask=mask)
            aug_image = augmented['image']
            aug_mask = augmented['mask']
        else:
            aug_image, aug_mask = image, mask

        # PATCH : force format image/mask
        if aug_image.ndim == 2:
            aug_image = np.stack([aug_image]*3, axis=-1)
        if aug_image.ndim == 3 and aug_image.shape[2] == 1:
            aug_image = np.repeat(aug_image, 3, axis=2)
        if aug_image.dtype != np.uint8:
            aug_image = aug_image.astype(np.uint8)
        if aug_mask.ndim == 3:
            aug_mask = aug_mask.squeeze()

        ax[i + 1, 0].imshow(aug_image)
        ax[i + 1, 0].set_title(f"Augmented Image {i+1}")
        ax[i + 1, 0].axis("off")

        ax[i + 1, 1].imshow(aug_mask, cmap='gray')
        ax[i + 1, 1].set_title(f"Augmented Mask {i+1}")
        ax[i + 1, 1].axis("off")

    plt.tight_layout()
    plt.show()



In [None]:
visualize_segmentation(train_loader, samples=3)

In [None]:
def show_batch(dataset, idx=0, n=5):
    """
    Visualise n paires image/masque d’un batch donné du DataLoader.
    """
    # On récupère un batch d'index idx (un lot d'images et de masques)
    images, masks = dataset[idx]
    # images.shape = (batch_size, H, W, C)
    # on extrait donc la batch size ainsi
    batch_size = images.shape[0]
    # on fait attention, si le batch a moins d'élements que de paires demandées, on recalibre le nombre de paires demandées
    n = min(n, batch_size)

    # on va faire les subplots pour toutes les paires images et masks

    plt.figure(figsize=(6, 2 * n))
    for i in range(n):
        # colonne de gauche : image
        plt.subplot(n, 2, 2 * i + 1)
        img = images[i]
        # Si l’image est grayscale (juste 2D, shape (H, W)), on la duplique sur 3 canaux pour obtenir (H, W, 3)
        if img.ndim == 2:
            img = np.stack([img]*3, axis=-1)
        # Si l’image est au format (H, W, 1) (toujours grayscale mais avec une dimension "canal" explicite), on la répète sur 3 canaux pour obtenir (H, W, 3)
        if img.ndim == 3 and img.shape[2] == 1:
            img = np.repeat(img, 3, axis=2)
        if img.dtype != np.uint8:
            img = img.astype(np.uint8)
        plt.imshow(img)
        plt.axis("off")
        plt.title(f"Image {i}")
        # colonne de droite : mask
        plt.subplot(n, 2, 2 * i + 2)
        mask = masks[i]
        # Au cas où le nombre de channels (1) du mask est indiqué
        if mask.ndim == 3:
            mask = mask.squeeze()
        plt.imshow(mask, cmap='tab20')  # 'tab20' pour mieux voir les classes, sinon 'nipy_spectral'
        plt.axis("off")
        plt.title(f"Mask {i}")
    plt.tight_layout()
    plt.show()


In [None]:
show_batch(train_loader, idx=0, n=5)

In [None]:
def print_unique_mask_values(dataset, idx=0):
    """
    Affiche les valeurs uniques de chaque masque d’un batch (par défaut premier batch).
    """
    _, masks = dataset[idx]
    for i, mask in enumerate(masks):
        uniques = np.unique(mask)
        print(f"Masque {i} : valeurs uniques {uniques}")


In [None]:
print_unique_mask_values(train_loader, idx=0)


In [None]:
def print_all_unique_mask_values(dataset):
    """
    Affiche les valeurs uniques globales pour tous les masques du dataset.
    """
    all_uniques = set()
    for idx in range(len(dataset)):
        _, masks = dataset[idx]
        for mask in masks:
            all_uniques.update(np.unique(mask))
    print(f"Valeurs uniques globales sur tous les masques : {sorted(all_uniques)}")


In [None]:
# print_all_unique_mask_values(train_loader)


In [None]:
show_batch(train_loader, idx=4, n=7)

# Baseline

https://github.com/zhixuhao/unet/blob/master/model.py

Je vais devoir modifier à la fois l'input et la sortie

https://keras.io/api/metrics/segmentation_metrics/



In [None]:
import skimage.io as io
import skimage.transform as trans

from keras.models import *
from keras.layers import *
from keras.optimizers import *
from keras.callbacks import ModelCheckpoint, LearningRateScheduler

In [None]:
def unet(pretrained_weights = None,input_size = (256,512,3)):
    inputs = Input(input_size)
    conv1 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(inputs)
    conv1 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
    conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool1)
    conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool2)
    conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
    conv4 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool3)
    conv4 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv4)
    drop4 = Dropout(0.5)(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)

    conv5 = Conv2D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool4)
    conv5 = Conv2D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv5)
    drop5 = Dropout(0.5)(conv5)

    up6 = Conv2D(512, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(drop5))
    merge6 = concatenate([drop4,up6], axis = 3)
    conv6 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge6)
    conv6 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv6)

    up7 = Conv2D(256, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(conv6))
    merge7 = concatenate([conv3,up7], axis = 3)
    conv7 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge7)
    conv7 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv7)

    up8 = Conv2D(128, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(conv7))
    merge8 = concatenate([conv2,up8], axis = 3)
    conv8 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge8)
    conv8 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv8)

    up9 = Conv2D(64, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(conv8))
    merge9 = concatenate([conv1,up9], axis = 3)
    conv9 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge9)
    conv9 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv9)
    # la couche suivante n'est pas pertinente pour du multiclasse
    # conv9 = Conv2D(2, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv9)
    # ici je doix changer conv10 pour avoir huit catégories, et donc utiliser softmax en activation
    conv10 = Conv2D(8, 1, activation='softmax')(conv9)

    model = Model(inputs = inputs, outputs = conv10)

    # pour du multiclasse qui n'est pas en one-hot encoding, loss est SCCE
    # je dois aussi ajouter des métriques
    model.compile(optimizer = Adam(learning_rate = 1e-4), loss = 'sparse_categorical_crossentropy', metrics = ['accuracy'])

    # model.summary()

    if(pretrained_weights):
    	model.load_weights(pretrained_weights)

    return model

In [None]:
train_loader = Dataloader(
    data_folder=X_train_dir,
    batch_size=8,
    transform=val_transform,
    shuffle=True
)

val_loader = Dataloader(
    data_folder=X_val_dir,
    batch_size=8,
    transform=val_transform,
    shuffle=False
)

In [None]:
model = unet()

In [None]:
imgs, masks = next(iter(train_loader))
print("images shape", imgs.shape)
print("masks shape", masks.shape)
print("mask unique", np.unique(masks), masks.dtype)

## Callbacks

In [None]:
earlystop_cb = EarlyStopping(
    monitor='val_loss',
    patience=8,
    restore_best_weights=True
)

In [None]:
class PerClassMetricsCallback(tf.keras.callbacks.Callback):
    def __init__(self, val_loader, num_classes=8):
        # on initie d'abord le parent pour éviter les mauvaises surprises
        super().__init__()
        self.val_loader = val_loader
        self.num_classes = num_classes
        # on charge le dico des classes
        self.dicoclasses = dicoclasses or {i: f"class_{i}" for i in range(num_classes)}
        # on stocke les scores de chaque epoch dans un df
        self.df_scores = pd.DataFrame()

    def on_epoch_end(self, epoch, logs=None):
        # à chaque fin d'époque on calcule les métriques customs
        # d'abord on recueille les prédictions et la vérité
        all_preds = []
        all_trues = []
        for i in range(len(self.val_loader)):
            imgs, masks = self.val_loader[i]
            preds = self.model.predict(imgs, verbose=0)
            all_preds.append(preds)
            all_trues.append(masks)
        all_preds = np.concatenate(all_preds, axis=0)
        all_trues = np.concatenate(all_trues, axis=0)
        # on extrait la plus haute probabilité
        y_pred = np.argmax(all_preds, axis=-1)

        # Calcul Dice & IoU par classe
        # attention l'union dans le Dice et l'IoU n'est pas la même chose
        # dans le Dice il s'agit de la somme des cardinaux des ensembles
        dice_scores = []
        iou_scores = []
        for c in range(self.num_classes):
            # on créé des masques binaires pour pouvoir appliquer facilement nos formules
            # tous les pixels correspondant à la classe c sont 1, les autres sont 0
            y_true_c = (all_trues == c).astype(np.int32)
            y_pred_c = (y_pred == c).astype(np.int32)
            intersection = (y_true_c * y_pred_c).sum()
            union = y_true_c.sum() + y_pred_c.sum()
            dice = (2. * intersection) / (union + 1e-6)
            dice_scores.append(dice)

            union_iou = y_true_c.sum() + y_pred_c.sum() - intersection
            iou = (intersection) / (union_iou + 1e-6)
            iou_scores.append(iou)

        # ajouts au dataframe de résultats
        row = {'epoch': epoch+1}
        for c, (d, iou) in enumerate(zip(dice_scores, iou_scores)):
            label = self.dicoclasses.get(c, f"class_{c}")
            row[f"dice_{label}"] = d
            row[f"iou_{label}"] = iou
        row['dice_mean'] = np.mean(dice_scores)
        row['iou_mean'] = np.mean(iou_scores)

        self.df_scores = pd.concat([self.df_scores, pd.DataFrame([row])], ignore_index=True)


        print(f"\n=== Époque {epoch + 1} ===")
        for c, (d, iou) in enumerate(zip(dice_scores, iou_scores)):
            className = dicoclasses.get(c, f"class_{c}")
            print(f"{className} (classe {c}): Dice={d:.4f} | IoU={iou:.4f}")
        print(f"--- Dice moyen: {np.mean(dice_scores):.4f} | IoU moyen: {np.mean(iou_scores):.4f} ---\n")


In [None]:
per_class_metrics_cb = PerClassMetricsCallback(val_loader=val_loader, num_classes=8)

## Fit et graphiques

In [None]:
history = model.fit(
    train_loader,
    validation_data=val_loader,
    epochs=50,
    callbacks=[earlystop_cb, per_class_metrics_cb],
    verbose=1
)


In [None]:
plt.plot(history.history['loss'], label='train loss')
plt.plot(history.history['val_loss'], label='val loss')
plt.legend()
plt.show()

In [None]:
# Récupère les scores du callback
df = per_class_metrics_cb.df_scores

# Exemple : courbe Dice moyen sur la validation
plt.plot(df['epoch'], df['dice_mean'], label='Val Dice')
plt.plot(history.history['dice_coef'], label='Train Dice')  # si tu as mis dice_coef dans metrics
plt.legend()
plt.xlabel("Epoch")
plt.ylabel("Dice")
plt.title("Courbe Dice train/val")
plt.show()

# Pareil pour la loss, l'IoU, etc.
