# Installation des modules nécessaires

In [None]:
!pip install pytorch_lightning -U torchinfo segmentation_models_pytorch
!pip install torchsummary

# Importation des librairies nécéssaire au projet

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import os
from os.path import join
import glob
import sys
import random
import warnings
from tqdm import tqdm
import itertools
from itertools import chain
from skimage.io import imread, imshow, imread_collection, concatenate_images
from skimage.transform import resize
from skimage.morphology import label
from sklearn.model_selection import train_test_split
from IPython.display import Image
from skimage import io
import torchvision
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import pytorch_lightning as pl
from pytorch_lightning import seed_everything
from pytorch_lightning.callbacks import  ModelCheckpoint
from sklearn.preprocessing import MinMaxScaler
import segmentation_models_pytorch as smp
from torchinfo import summary

import torch
import torch.nn as nn
import torchvision.models as models
import segmentation_models_pytorch as smp  # Pour les métriques
from collections import OrderedDict

import os
import numpy as np
from torch.utils.data import Dataset, DataLoader
import cv2
import torch
import time
import torch
import torch.nn as nn
from torchsummary import summary

seed_everything(42, workers=True)
torch.use_deterministic_algorithms(True)
import os
os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8"


%matplotlib inline

# Importation de la base de données de Kaggle

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("mateuszbuda/lgg-mri-segmentation")

print("Path to dataset files:", path)

In [4]:
!mv /root/.cache/kagglehub/datasets/mateuszbuda/lgg-mri-segmentation/versions/2 .

# Fonction du tp pour créé un dataset

In [5]:
from glob import glob
def create_dataset(subset_size):
    def check_mask(mask_path):
      img = cv2.imread(mask_path)
      max_val = img.max()
      if max_val > 0:
          return 1
      else:
          return 0
    patients = [ d for d in os.listdir('./2/kaggle_3m') if os.path.isdir(os.path.join('./2/kaggle_3m', d))]
    # Collect mask files and train files
    mask_files = [f for p in patients for f in glob(f'./2/kaggle_3m/{p}/*_mask*')]
    train_files = [m.replace('_mask', '') for m in mask_files]

    # Create DataFrame
    df = pd.DataFrame({"image_path": train_files, "mask_path": mask_files})

    df['mask'] = df['mask_path'].apply(lambda x: check_mask(x))

    if subset_size:
      df, _ = train_test_split(df,train_size=subset_size,stratify=df["mask"],random_state=123)
      df.reset_index(inplace=True)
    return df

# Définition des hyperparamètres

In [30]:
SUBSET_SIZE = 1000
EPOCHS = 30
BATCH_SIZE = 16
LEARNING_RATE = 0.0001
IMAGE_SIZE = (224, 224)
NUM_CLASSES = 1
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Pour sauvegarder les métriques et pouvoir analyser les résultats après les entraînement

In [31]:
MODEL_NAME = []
TRAINING_TIME = []
MODEL_SIZE = []
TEST_LOSS = []
TEST_ACCURACY = []
TEST_IOU = []
TOTAL_PARAMS = []

In [32]:
def append_results(model_name, training_time, model_size, test_loss, test_accuracy, test_iou, total_params):
    MODEL_NAME.append(model_name)
    TRAINING_TIME.append(training_time)
    MODEL_SIZE.append(model_size)
    TEST_LOSS.append(test_loss)
    TEST_ACCURACY.append(test_accuracy)
    TEST_IOU.append(test_iou)
    TOTAL_PARAMS.append(total_params)

# Création des datasets

In [None]:
df= create_dataset(subset_size=SUBSET_SIZE)

# Vérifier l'équilibre entre les deux classes
class_balance = df['mask'].value_counts()

print("Répartition des classes :")
print(class_balance)

# Afficher la proportion de chaque classe
print("\nProportion de chaque classe :")
print(class_balance / class_balance.sum())

In [None]:
df["mask"].value_counts()

In [None]:
from sklearn.model_selection import train_test_split
# test split
df_train, df_test = train_test_split(df, test_size=0.1, random_state=2)

# validation split
df_train, df_val = train_test_split(df_train, test_size=0.2, random_state=2)
print("Train shape: {}\nTest shape: {}\nValidation shape: {}".format(df_train.shape, df_test.shape, df_val.shape))

In [36]:
def adjust_data(img, mask):
    """
    - Normalize image (0-1 range).
    - Binarize mask.
    """
    img = img / 255.0
    mask = mask / 255.0
    mask[mask > 0.5] = 1.0
    mask[mask <= 0.5] = 0.0
    return img, mask

# Custom Dataset Class
class CustomDataset(Dataset):
    def __init__(self, data_frame, transform=None, target_size=(256, 256)):
        self.data_frame = data_frame
        self.transform = transform
        self.target_size = target_size

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        # Load Image and Mask Paths
        image_path = self.data_frame.iloc[idx]['image_path']
        mask_path = self.data_frame.iloc[idx]['mask_path']

        # Read Image and Mask
        image = cv2.imread(image_path, cv2.IMREAD_COLOR)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        # Resize Image and Mask
        image = cv2.resize(image, self.target_size)
        mask = cv2.resize(mask, self.target_size)

        # Adjust Data
        image, mask = adjust_data(image, mask)

        # Apply Augmentations if any
        if self.transform:
            image = self.transform(image).float()
            mask = self.transform(mask).float()

        return image, mask

In [None]:
image_transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
])

# Create Datasets
train_dataset = CustomDataset(df_train, transform=image_transform, target_size=IMAGE_SIZE)
val_dataset = CustomDataset(df_val, transform=image_transform, target_size=IMAGE_SIZE)
test_dataset = CustomDataset(df_test, transform=image_transform, target_size=IMAGE_SIZE)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=4)

# Example usage
for images, masks in train_loader:
    print("Image batch shape:", images.shape)
    print("Mask batch shape:", masks.shape)
    break

In [38]:
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=4)

In [39]:
loss_fn=smp.losses.DiceLoss(smp.losses.BINARY_MODE, from_logits=False)

# Affichage de quelques data se trouvant dans la db

In [40]:
def plotData():
  count = 0
  # generating random number for getting random image
  i = np.random.randint(len(df)-3)

  fig,axes = plt.subplots(3,3, figsize=(20,15))
  for mask in df['mask']:
      # plot only if the generated random index mask is 1
      if df.iloc[i]['mask'] == 1:

          print("Image id: {}".format(i))
          img = io.imread(df.image_path[i])
          axes[count][0].title.set_text("Brain MRI")
          axes[count][0].imshow(img)

          mask = io.imread(df.mask_path[i])
          axes[count][1].title.set_text("Mask =" + str(df['mask'][i]))
          axes[count][1].imshow(mask, cmap='gray')

          img[mask==255] = (255,0,0)  # change pixel color at the position of mask
          axes[count][2].title.set_text("MRI with Mask =" + str(df['mask'][i]))
          axes[count][2].imshow(img)
          count +=1
      i += 1
      if (count==3):
          break

  fig.show()

In [None]:
plotData()

# Fonction du tp permetant d'afficher l'emplacement réél de la tumeur et l'emplacement prédit

In [42]:
def evaluate_model(model, df_test):

    # Check and reset index if not sequential
    if not (df_test.index == pd.RangeIndex(start=0, stop=len(df_test), step=1)).all():
        df_test = df_test.reset_index(drop=True)

    # Select 15 examples with tumor ('mask' == 1)
    indexes = df_test[df_test['mask'] == 1].iloc[:15].index

    for idx in indexes:
        # Load image and mask
        img = cv2.imread(df_test.loc[idx, 'image_path'], cv2.IMREAD_COLOR)
        mask = cv2.imread(df_test.loc[idx, 'mask_path'], cv2.IMREAD_GRAYSCALE)


        img = cv2.resize(img, (256, 256))
        mask = cv2.resize(mask, (256, 256))  # Ensure mask is same size
        original_img = img.copy()  # Keep a copy for visualization

        # Normalize and prepare input image
        img_input = img / 255.0
        img_input = np.transpose(img_input, (2, 0, 1))  # (H, W, C) -> (C, H, W)
        img_input = img_input[np.newaxis, :, :, :]  # Add batch dimension

        # Convert to torch tensor and predict
        with torch.no_grad():
            input_img = torch.tensor(img_input).float()
            pred = model.predict_step(input_img, 0)


        pred_binary = pred.detach().cpu().numpy().squeeze().astype(np.uint8)  # convert to numpy array and cast it to int type

        pred_overlay = original_img.copy()
        pred_overlay[pred_binary == 1] = [255, 0, 0]  # Red for predicted mask

        true_overlay = original_img.copy()
        true_overlay[mask == 255] = [0, 255, 0]  # Green for true mask

        plt.figure(figsize=(12, 12))

        plt.subplot(1, 3, 1)
        plt.imshow(cv2.cvtColor(original_img, cv2.COLOR_BGR2RGB))
        plt.title("Original Image (id = " + str(idx) + ")")

        plt.subplot(1, 3, 2)
        plt.imshow(true_overlay)
        plt.title("True Mask (Green Overlay)")

        plt.subplot(1, 3, 3)
        plt.imshow(pred_overlay)
        plt.title("Prediction Mask (Red Overlay)")

        plt.tight_layout()
        plt.show()

# Classe parente pour les différentes architectures

In [43]:
import collections

class SegModel(pl.LightningModule):
    def __init__(self,model,loss_fn):
        super().__init__()
        self.save_hyperparameters(ignore=['model',"loss_fn"])
        self.model = model
        self.loss_fn = loss_fn

        # Lists to accumulate outputs for each epoch
        self.train_outputs = []
        self.valid_outputs = []
        self.test_outputs = []

        self.metrics = {}

    def forward(self, image):
        mask = self.model(image)
        return mask

    def shared_step(self, batch, stage):
        image = batch[0]
        mask = batch[1]

        assert mask.max() <= 1.0 and mask.min() >= 0


        preds = self(image)

        if isinstance(preds, collections.OrderedDict):
            preds = preds["out"]
        loss = self.loss_fn(preds, mask)

        #prob_mask = logits_mask.sigmoid()

        pred_mask = torch.where(preds > 0.5, 1, 0).float()


        tp, fp, fn, tn = smp.metrics.get_stats(pred_mask.long(), mask.long(), mode="binary")

        return {"loss": loss, "tp": tp, "fp": fp, "fn": fn, "tn": tn}

    def shared_epoch_end(self, outputs, stage):
        tp = torch.cat([x["tp"] for x in outputs])
        fp = torch.cat([x["fp"] for x in outputs])
        fn = torch.cat([x["fn"] for x in outputs])
        tn = torch.cat([x["tn"] for x in outputs])

        total_loss = sum(x['loss'].item() for x in outputs) / len(outputs)

        iou = smp.metrics.iou_score(tp, fp, fn, tn, reduction="micro-imagewise")
        accuracy = smp.metrics.accuracy(tp, fp, fn, tn, reduction="micro-imagewise")

        metrics = {
            f"{stage}_loss":total_loss,
            f"{stage}_accuracy": accuracy,
            f"{stage}_iou": iou,
        }

        self.log_dict(metrics,on_step=False, on_epoch=True, prog_bar=True)

    # Training Step
    def training_step(self, batch, batch_idx):
        output = self.shared_step(batch, "train")
        self.train_outputs.append(output)  # Store outputs for the epoch
        return output["loss"]

    def on_train_epoch_end(self):
        self.shared_epoch_end(self.train_outputs, "train")
        self.train_outputs.clear()  # Clear memory for the next epoch

    # Validation Step
    def validation_step(self, batch, batch_idx):
        output = self.shared_step(batch, "val")
        self.valid_outputs.append(output)
        return output

    def on_validation_epoch_end(self):
        self.shared_epoch_end(self.valid_outputs, "val")
        self.valid_outputs.clear()

    # Test Step
    def test_step(self, batch, batch_idx):
        output = self.shared_step(batch, "test")
        self.test_outputs.append(output)
        return output

    def on_test_epoch_end(self):
        self.shared_epoch_end(self.test_outputs, "test")
        self.test_outputs.clear()
        #self.metrics['test_accuracy'] = sum(self.metrics['test_accuracy']) / len(self.metrics['test_accuracy'])
        #self.metrics['test_iou'] = sum(self.metrics['test_iou']) / len(self.metrics['test_iou'])

    # Needed for making predictions at inference
    def predict_step(self, batch, batch_idx, dataloader_idx=0):
        output = self(batch)
        if isinstance(output, collections.OrderedDict):
            output = output["out"]  # used only for deeplabv3 format
        pred_mask = torch.where(output > 0.5, 1, 0).float()
        return pred_mask


    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.0001)

# Fonction permettant d'entrainer avec une architecture spécifique en conservant certaines métriques

In [44]:
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.loggers import CSVLogger

def train_model(model_to_train, model_name):
  logger = CSVLogger("logs", name=model_name)

  model = SegModel(model=model_to_train, loss_fn=loss_fn)

  # Calculer le nombre de paramètres
  total_params = sum(p.numel() for p in model.parameters())

  early_stop_callback = EarlyStopping(
    monitor="val_iou",
    patience=5,
    verbose=True,
    mode="max",
  )

  trainer = pl.Trainer(max_epochs=EPOCHS, devices=1 if torch.cuda.is_available() else 0, accelerator="gpu" if torch.cuda.is_available() else "cpu", callbacks=[early_stop_callback],
                       logger=logger)


  # Démarrer le chronomètre
  start_time = time.time()
  trainer.fit(model, train_loader, val_loader)
  print(trainer.callback_metrics)
  # Fin du chronomètre
  end_time = time.time()
  training_time = end_time - start_time

  test_results = trainer.test(model, test_loader)

  torch.save(model.state_dict(), model_name + "_state_dict.pth")  # Sauvegarde les paramètres uniquement
  model_size = os.path.getsize(model_name + "_state_dict.pth") / (1024 * 1024)

  append_results(
      model_name=model_name,
      training_time=training_time,
      model_size=model_size,
      test_loss = test_results[0]['test_loss'],
      test_accuracy = test_results[0]['test_accuracy'],
      test_iou = test_results[0]['test_iou'],
      total_params=total_params
      )

  return model

# 1. Architecture Unet du tp

In [45]:
class UNet(nn.Module):
    def __init__(self, input_channels=3, output_channels=1):
        super(UNet, self).__init__()

        # Encoder
        self.enc1 = self._down_block(input_channels, 64)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.enc2 = self._down_block(64, 128)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.enc3 = self._down_block(128, 256)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.enc4 = self._down_block(256, 512)
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)
        # Bottleneck
        self.bottleneck = self._conv_block(512, 1024)

        # Decoder
        self.up1 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.dec1 = self._conv_block(1024, 512)
        self.up2 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec2 = self._conv_block(512, 256)
        self.up3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec3 = self._conv_block(256, 128)
        self.up4 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec4 = self._conv_block(128, 64)

        # output layer
        self.outconv = nn.Conv2d(64, output_channels, kernel_size=1)

    # Encoder block
    def _down_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ELU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ELU()
        )

    # Decoder  block
    def _conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ELU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ELU()
        )

    def forward(self, x):
        c1 = self.enc1(x)
        p1 = self.pool1(c1)
        c2 = self.enc2(p1)
        p2 = self.pool2(c2)
        c3 = self.enc3(p2)
        p3 = self.pool3(c3)
        c4 = self.enc4(p3)
        p4 = self.pool4(c4)

        # Bottleneck
        bottleneck = self.bottleneck(p4)
        u1 = self.up1(bottleneck)
        d1 = self.dec1(torch.cat([u1, c4], dim=1))
        u2 = self.up2(d1)
        d2 = self.dec2(torch.cat([u2, c3], dim=1))
        u3 = self.up3(d2)
        d3 = self.dec3(torch.cat([u3, c2], dim=1))
        u4 = self.up4(d3)
        d4 = self.dec4(torch.cat([u4, c1], dim=1))


        out = self.outconv(d4)
        out = F.sigmoid(out)
        return out


In [None]:
model_to_train = UNet().to(DEVICE)
summary(model_to_train, input_size=(3, 256, 256), device=str(DEVICE))
final_model = train_model(model_to_train, "UNet du tp")

In [None]:
evaluate_model(final_model, df_test)

# Définition de l'encodeur et des poids et de la fonction d'activation pour les autres architectures

In [48]:
ENCODER = 'resnet18'
ENCODER_WEIGHTS = 'imagenet'
CLASSES = 1
ACTIVATION = 'sigmoid'

# 2. Unet architecture

In [None]:
model_to_train = smp.Unet(
    encoder_name=ENCODER,
    encoder_weights=ENCODER_WEIGHTS,
    classes=CLASSES,
    activation=ACTIVATION,
)

model_to_train = model_to_train.to(DEVICE)
#summary(model_to_train, input_size=(3, 256, 256), device=str(DEVICE))
final_model = train_model(model_to_train, "Unet")


In [None]:
final_model.eval()
evaluate_model(final_model, df_test)

# 3. Linknet architecture

In [None]:
model_to_train = smp.Linknet(
    encoder_name=ENCODER,
    encoder_weights=ENCODER_WEIGHTS,
    classes=CLASSES,
    activation=ACTIVATION,
)

model_to_train = model_to_train.to(DEVICE)
#summary(model_to_train, input_size=(3, 256, 256), device=str(DEVICE))
final_model = train_model(model_to_train, "Linknet")

In [None]:
final_model.eval()
evaluate_model(final_model, df_test)

# 4. FPN architecture

In [None]:
model_to_train = smp.FPN(
    encoder_name=ENCODER,
    encoder_weights=ENCODER_WEIGHTS,
    classes=CLASSES,
    activation=ACTIVATION,
)

model_to_train = model_to_train.to(DEVICE)
#summary(model_to_train, input_size=(3, 256, 256), device=str(DEVICE))
final_model = train_model(model_to_train, "FPN")

In [None]:
final_model.eval()
evaluate_model(final_model, df_test)

# 5. PSPnet architecture

In [None]:
model_to_train = smp.PSPNet(
    encoder_name=ENCODER,
    encoder_weights=ENCODER_WEIGHTS,
    classes=CLASSES,
    activation=ACTIVATION,
)

torch.use_deterministic_algorithms(True, warn_only=True)

model_to_train = model_to_train.to(DEVICE)
#summary(model_to_train, input_size=(3, 256, 256), device=str(DEVICE))
final_model = train_model(model_to_train, "PSPNet")

In [None]:
final_model.eval()
evaluate_model(final_model, df_test)

# 6. Deeplabv3 architecture

In [None]:
model_to_train = smp.DeepLabV3(
    encoder_name=ENCODER,
    encoder_weights=ENCODER_WEIGHTS,
    classes=CLASSES,
    activation=ACTIVATION,
)

model_to_train = model_to_train.to(DEVICE)
#summary(model_to_train, input_size=(3, 256, 256), device=str(DEVICE))
final_model = train_model(model_to_train, "DeepLabV3")


In [None]:
final_model.eval()
evaluate_model(final_model, df_test)

# Resnet101 comme encodeur

In [63]:
ENCODER = 'resnet101'
ENCODER_WEIGHTS = 'imagenet'
CLASSES = 1
ACTIVATION = 'sigmoid'

# 7. Unet architecture avec resnet101

In [None]:
model_to_train = smp.Unet(
    encoder_name=ENCODER,
    encoder_weights=ENCODER_WEIGHTS,
    classes=CLASSES,
    activation=ACTIVATION,
)

model_to_train = model_to_train.to(DEVICE)
#summary(model_to_train, input_size=(3, 256, 256), device=str(DEVICE))
final_model = train_model(model_to_train, "Unet with resnet101")

In [None]:
final_model.eval()
evaluate_model(final_model, df_test)

# Analyse des résultats

In [None]:
# Graphique de la taille des modèles
plt.figure(figsize=(10, 6))
plt.bar(MODEL_NAME, MODEL_SIZE, color='lightblue')
plt.xlabel('Modèles')
plt.ylabel('Taille (MB)')
plt.title('Taille des Modèles')
plt.show()

In [None]:
# Graphique du nombre de paramètres
plt.figure(figsize=(10, 6))
plt.bar(MODEL_NAME, TOTAL_PARAMS, color='lightblue')
plt.xlabel('Modèles')
plt.ylabel('Nombre de paramètres')
plt.title('Nombre de paramètres par modèle')
plt.show()

In [None]:
# graphique du temps d'entraînement
plt.figure(figsize=(10, 6))
plt.bar(MODEL_NAME, TRAINING_TIME, color='skyblue')
plt.xlabel('Model')
plt.ylabel('Training Time (seconds)')
plt.title('Training Time per Model')
plt.xticks(rotation=45)
plt.show()


In [None]:
# graphique de l'accuracy, iou et loss
plt.figure(figsize=(10, 6))

plt.plot(MODEL_NAME, TEST_ACCURACY, marker='o', label='Test Accuracy', color='green')
plt.plot(MODEL_NAME, TEST_IOU, marker='o', label='Test IoU', color='orange')
plt.plot(MODEL_NAME, TEST_LOSS, marker='o', label='Test Loss', color='blue')


plt.xlabel('Model')
plt.ylabel('Performance')
plt.title('Test Accuracy and IoU per Model')
plt.xticks(rotation=45)
plt.legend()
plt.show()
