-------------------------------------------------------------------------------------------------------------------------------
Copyright:

Ce notebook a été conçu  et rédigé par Linda Marrakchi-Kacem pour l'atelier "Segmentation d'images médicales" qui s'inscrit dans le cadre du mastère TICV 2023-2024. Toute utilisation ou distribution de ce notebook en dehors du cadre de l'atelier doit porter la mention du nom de l'auteur.

-------------------------------------------------------------------------------------------------------------------------------

# Chapitre 5 : Segmentation par classification supervisée (architecture UNet)



Ce notebook accompagne votre cinquième séance d'atelier. Il a pour objectif de vous montrer comment effectuer une segmentation en utilisant l'architecture UNet.


Deux options sont disponibles pour le téléchargement de la base de données:

- Données 4D du site  http://medicaldecathlon.com/ (version difficile)
- Données 2D simplifiée du contraste T1C téléchargeables à partir du lien suivant: https://www.dropbox.com/s/zmytk2yu284af6t/Task01_BrainTumour_2D.tar.gz (version facile)

## Exercice 1: Implementation d'une classe pour le chargement et la division des données:

In [None]:
!pip install gdown



In [None]:
# Mount data from drive
from google.colab import drive
drive.mount('/content/drive')

# Create Dataset4D directory
!mkdir -p '/content/Dataset2D'

# Download the file
!gdown --id 1-10tKKGmtN0eVAulDKZdc_FvUBKDLUff -O '/content/Dataset2D/Task01_BrainTumour_2D.tar'

# Extract the tar file
!tar -xvf '/content/Dataset2D/Task01_BrainTumour_2D.tar' -C '/content/Dataset2D'


[1;30;43mLe flux de sortie a été tronqué et ne contient que les 5000 dernières lignes.[0m
Task01_BrainTumour_2D/training_labels/BRATS_076_z77.png
Task01_BrainTumour_2D/training_labels/BRATS_095_z46.png
Task01_BrainTumour_2D/training_labels/BRATS_055_z124.png
Task01_BrainTumour_2D/training_labels/BRATS_282_z124.png
Task01_BrainTumour_2D/training_labels/BRATS_225_z108.png
Task01_BrainTumour_2D/training_labels/BRATS_429_z77.png
Task01_BrainTumour_2D/training_labels/BRATS_086_z77.png
Task01_BrainTumour_2D/training_labels/BRATS_128_z108.png
Task01_BrainTumour_2D/training_labels/BRATS_278_z62.png
Task01_BrainTumour_2D/training_labels/BRATS_065_z46.png
Task01_BrainTumour_2D/training_labels/BRATS_032_z62.png
Task01_BrainTumour_2D/training_labels/BRATS_460_z124.png
Task01_BrainTumour_2D/training_labels/BRATS_357_z124.png
Task01_BrainTumour_2D/training_labels/BRATS_166_z124.png
Task01_BrainTumour_2D/training_labels/BRATS_155_z77.png
Task01_BrainTumour_2D/training_labels/BRATS_061_z46.png
Task0

In [None]:
import os
import nibabel as nib
import numpy as np

In [None]:
# Définir le chemin des données
data_path = "/content/Dataset4D/Task01_BrainTumour/labelsTr"

# Lister les fichiers dans le répertoire
seg_samples = sorted(os.listdir(data_path))

# Remplacer le premier élément par 'BRATS_166.nii.gz'
seg_samples[0] = 'BRATS_166.nii.gz'

# Trier la liste
seg_samples = sorted(seg_samples)

# Initialiser les variables
saved_values = []
max_nb_values = 0

# Parcourir les échantillons
for sample in seg_samples:
    # Charger l'image NIfTI
    seg_img = nib.load(os.path.join(data_path, sample)).get_fdata()

    # Obtenir les valeurs uniques
    unique_values = np.unique(seg_img)
    nb_unique_values = len(unique_values)

    # Mettre à jour les valeurs maximales
    if nb_unique_values > max_nb_values:
        max_nb_values = nb_unique_values
        saved_values = unique_values

# Afficher le nombre maximal de valeurs dans toutes les images de segmentation
print(f"Maximum number of values in all segmentation images: {max_nb_values}")

In [None]:
# Specify path
data_path = "/content/Dataset4D/Task01_BrainTumour/imagesTr"

samples = os.listdir(data_path)
print("Number of samples:", len(samples))
VOLUME_START_AT = 60
VOLUME_SLICES = 75
samples_train, samples_val = train_test_split(samples, test_size=0.2, random_state=42)

samples_train, samples_test = train_test_split(samples_train, test_size=0.15, random_state=42)

# Print data distribution (Train , Test , Val)
print(f"Train length: {len(samples_train)}")
print(f"Validation length: {len(samples_val)}")
print(f"Test length: {len(samples_test)}")

Number of samples: 495
Train length: 336
Validation length: 99
Test length: 60


In [None]:
from skimage.transform import rotate
from skimage.util import montage
import os
from sklearn.model_selection import train_test_split
import keras
import cv2
import tensorflow
import random
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from keras.callbacks import CSVLogger
import keras.backend as K
import zipfile
import pandas as pd

In [None]:
IMG_SIZE = 128
BATCH_SIZE = 1

# DataLoader
class DataLoader(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_IDs, dim= ( IMG_SIZE, IMG_SIZE), batch_size = BATCH_SIZE, n_channels = 2, shuffle=True):
        'Initialization'
        self.dim = tuple(dim) # Resized image dimensions (128 x 128)
        self.batch_size = batch_size
        self.list_IDs = list_IDs
        self.n_channels = n_channels # (T1CE + FLAIR)
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        Batch_ids = [self.list_IDs[k] for k in indexes]

        # Load & Generate data
        X, y = self.__data_generation(Batch_ids)

        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, Batch_ids):
        'Generates data containing batch_size samples'
        # Initialization
        X = np.zeros((self.batch_size*VOLUME_SLICES, *self.dim, self.n_channels))
        y = np.zeros((self.batch_size*VOLUME_SLICES, 240, 240))

        # Generate data
        for c, i in enumerate(Batch_ids):

            # Get path of each RMI modality and the segmentation
            sample_path = os.path.join(data_path, i)
            seg_path = os.path.join('/content/Dataset4D/Task01_BrainTumour/labelsTr', i)
            ## print(sample_path)
            ## print(seg_path)

            #t1_path = sample_path + '_t1.nii'
            #t2_path = sample_path + '_t2.nii'

            # Extract the data from these paths
            modality = nib.load(sample_path).get_fdata()
            t1ce = modality[:, :, :, 2]
            flair = modality[:, :, :, 0]
            seg = nib.load(seg_path).get_fdata()
            #t1 = nib.load(t1_paths).get_fdata()
            #t2 = nib.load(t2_path).get_fdata()


            for j in range(VOLUME_SLICES):
                X[j +VOLUME_SLICES*c,:,:,0] = cv2.resize(flair[:,:,j+VOLUME_START_AT], (IMG_SIZE, IMG_SIZE))
                X[j +VOLUME_SLICES*c,:,:,1] = cv2.resize(t1ce[:,:,j+VOLUME_START_AT], (IMG_SIZE, IMG_SIZE))

                y[j + VOLUME_SLICES * c] = seg[:, :, j + VOLUME_START_AT]


        # Masks / Segmentations
        y[y==4] = 3
        mask = tensorflow.one_hot(y, 4)
        Y = tensorflow.image.resize(mask, (IMG_SIZE, IMG_SIZE))

        # Scale data between 0 and 1 (since the minimum value in the data is 0)
        return X/np.max(X), Y

training_generator = DataLoader(samples_train)
valid_generator = DataLoader(samples_val)
test_generator = DataLoader(samples_test)

## Exercice 2: Implementation de l'architecture UNet

In [None]:
# U-Net
def build_unet(inputs, ker_init, dropout):
    conv1 = Conv2D(32, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(inputs)
    conv1 = Conv2D(32, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv1)

    pool = MaxPooling2D(pool_size=(2, 2))(conv1)
    conv = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(pool)
    conv = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv)

    pool1 = MaxPooling2D(pool_size=(2, 2))(conv)
    conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(pool1)
    conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv2)

    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(pool2)
    conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv3)

    pool4 = MaxPooling2D(pool_size=(2, 2))(conv3)
    conv5 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(pool4)
    conv5 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv5)
    drop5 = Dropout(dropout)(conv5)

    up7 = Conv2D(256, 2, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(UpSampling2D(size = 2)(drop5))
    merge7 = concatenate([conv3,up7], axis = 3)
    conv7 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(merge7)
    conv7 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv7)

    up8 = Conv2D(128, 2, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(UpSampling2D(size = 2)(conv7))
    merge8 = concatenate([conv2,up8], axis = 3)
    conv8 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(merge8)
    conv8 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv8)

    up9 = Conv2D(64, 2, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(UpSampling2D(size = 2)(conv8))
    merge9 = concatenate([conv,up9], axis = 3)
    conv9 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(merge9)
    conv9 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv9)

    up = Conv2D(32, 2, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(UpSampling2D(size = 2)(conv9))
    merge = concatenate([conv1,up], axis = 3)
    conv = Conv2D(32, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(merge)
    conv = Conv2D(32, 3, activation = 'relu', padding = 'same', kernel_initializer = ker_init)(conv)

    conv10 = Conv2D(4, 1, activation = 'softmax')(conv)

    return Model(inputs = inputs, outputs = conv10)

In [None]:
# Compute metric :
def dice_coef(y_true, y_pred, smooth=1.0):
    class_num = 4
    for i in range(class_num):
        y_true_f = K.flatten(y_true[:,:,:,i])
        y_pred_f = K.flatten(y_pred[:,:,:,i])
        intersection = K.sum(y_true_f * y_pred_f)
        loss = ((2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth))
        if i == 0:
            total_loss = loss
        else:
            total_loss = total_loss + loss
    total_loss = total_loss / class_num
    return total_loss

def precision(y_true, y_pred):
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
        precision = true_positives / (predicted_positives + K.epsilon())
        return precision

def sensitivity(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    return true_positives / (possible_positives + K.epsilon())


def specificity(y_true, y_pred):
    true_negatives = K.sum(K.round(K.clip((1-y_true) * (1-y_pred), 0, 1)))
    possible_negatives = K.sum(K.round(K.clip(1-y_true, 0, 1)))
    return true_negatives / (possible_negatives + K.epsilon())

In [None]:
# Define input data shape
input_layer = Input((IMG_SIZE, IMG_SIZE, 2))

# Build and compile the model
model = build_unet(input_layer, 'he_normal', 0.2)

model.compile(loss="categorical_crossentropy", optimizer=tensorflow.keras.optimizers.Adam(learning_rate=0.001), metrics = ['accuracy',tensorflow.keras.metrics.MeanIoU(num_classes=4), dice_coef, precision, sensitivity, specificity] )

In [None]:
callbacks = [
    keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2,
                              patience=2, min_lr=0.000001, verbose=1),

    keras.callbacks.ModelCheckpoint(filepath = 'model_.{epoch:02d}-{val_loss:.6f}.m5',
                             verbose=1, save_best_only=True, save_weights_only = True),

    CSVLogger('training.log', separator=',', append=False)
]

## Exercice 3: Entrainement du UNet et test

In [None]:
model.fit(training_generator,
          epochs=10,
          steps_per_epoch=len(samples_train),
          callbacks=callbacks,
          validation_data=valid_generator)

Epoch 1/10
  8/336 [..............................] - ETA: 2:35:28 - loss: 1.3938 - accuracy: 0.9724 - mean_io_u: 0.3757 - dice_coef: 0.2084 - precision: 0.8538 - sensitivity: 0.6900 - specificity: 0.9940

In [None]:
# Evaluate the model on the test data

results = model.evaluate(test_generator, batch_size=100, callbacks= callbacks)

descriptions = ["Loss", "Accuracy", "MeanIOU", "Dice coefficient", "Precision", "Sensitivity", "Specificity"]

results_list = zip(results, descriptions)

print("\nModel evaluation on the test set:")
print("==================================")
for i, (metric, description) in enumerate(results_list):
    print(f"{description} : {round(metric, 4)}")