<a href="https://colab.research.google.com/github/Nerikoutchala/Nerikoutchala/blob/main/DeepLabV3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Base Model configurations of the Ocelot Paper: DeepLabV3+**

### **Load and Prepare Data and transform**

In [19]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [20]:
!pip install monai
!pip install segmentation_models_pytorch



In [22]:
# Import Libraries

import os
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import segmentation_models_pytorch as smp
from sklearn.model_selection import train_test_split
from sklearn.model_selection import ParameterGrid

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models.segmentation import DeepLabV3
from torchvision.models.segmentation.deeplabv3 import DeepLabHead
from torchvision.models import resnet18

import monai.transforms as T
import random
from monai.transforms.compose import Randomizable


import monai
from monai.data import Dataset, DataLoader
from monai.losses import DiceLoss
from monai.metrics import DiceMetric

from monai.transforms import Compose, LoadImaged, EnsureChannelFirstd, Resized, RandRotated, \
    ScaleIntensityd, EnsureTyped, RandBiasFieldd, RandAdjustContrastd, \
    AsDiscreted, ToTensord, Rand2DElasticd, RandSpatialCropd, RandRotate90d, RandFlipd

In [24]:
train_files_dir = "/content/drive/MyDrive/ocelot/{}/train/tissue"
test_files_dir = "/content/drive/MyDrive/ocelot/{}/test/tissue"

# tissues is a list of filenames
tissues = list(sorted(glob.iglob(os.path.join(train_files_dir.format('images'), '*.jpg'))))
masks = list(sorted(glob.iglob(os.path.join(train_files_dir.format('annotations'), '*.png'))))

# Load train, test, and validation filenames and masks
tissues = list(sorted(glob.iglob(os.path.join(train_files_dir.format('images'), '*.jpg'))))
masks = list(sorted(glob.iglob(os.path.join(train_files_dir.format('annotations'), '*.png'))))

test_tissues = list(sorted(glob.iglob(os.path.join(test_files_dir.format('images'), '*.jpg'))))
test_masks = list(sorted(glob.iglob(os.path.join(test_files_dir.format('annotations'), '*.png'))))

# Create dictionaries for training, testing, and validation data
train_files_dict = [{"img": img, "seg": seg} for img, seg in zip(tissues, masks)]
test_files_dict = [{"img": img, "seg": seg} for img, seg in zip(test_tissues, test_masks)]

# Split the data into training and validation sets
train_files_dict, val_files_dict = train_test_split(train_files_dict, test_size=0.2, random_state=42)


# Let's define the transformations to be applied to the images
k = ["img", "seg"]
num_classes = 3 # 3 classes: background, cancer, unknown
spatial_size = (256, 256)

# Custom transform to convert segmentation classes 1,2,255 into 0,1,2
class TransformAnnot(monai.transforms.Transform):
    'Output selected slice from input volume.'
    def __call__(self, mk):
        mk[mk==255] = 0
        return mk

class TransformAnnotd(monai.transforms.Transform):
    'Output selected slice from input volume.'
    def __init__(self, keys):
        self.backend = TransformAnnot()
        self.keys = keys

    def __call__(self, data):
        d = dict(data)
        for k in d.keys():
            if k not in self.keys:
                continue
            d[k] = self.backend.__call__(d[k])

        return d


seg_transforms = [
    LoadImaged(keys=k),
    TransformAnnotd(keys=['seg']),
    EnsureChannelFirstd(keys=k),
    ScaleIntensityd(keys=["img"]),
    ToTensord(keys=k),
    AsDiscreted(keys=["seg"], rounding='torchrounding'),
    AsDiscreted(keys=["seg"], to_onehot=num_classes),
    RandSpatialCropd(keys=k, roi_size=spatial_size, random_size=False),
    EnsureTyped(keys=k),
]

transform = Compose(seg_transforms)

# Create data loaders for train, test, and validation sets
train_dset = Dataset(train_files_dict, transform=transform)
train_dloader = DataLoader(train_dset, batch_size=4, shuffle=True)

test_dset = Dataset(test_files_dict, transform=transform)
test_dloader = DataLoader(test_dset, batch_size=4, shuffle=False)

val_dset = Dataset(val_files_dict, transform=transform)
val_dloader = DataLoader(val_dset, batch_size=4, shuffle=True)

In [None]:
from monai.transforms import GaussianBlurd, GaussianNoised, ColorJitterd, RandFlipd, RandRotate90d

# Define the data augmentation transforms using torchvision
photometric_transforms = [
    T.GaussianBlur(kernel_size=(5, 5), sigma=(0.5, 1.0)),
    T.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5),
]

geometric_transforms = [
    T.RandomHorizontalFlip(),
    T.RandomRotation(degrees=[0, 90, 180, 270]),
]

# Combine photometric and geometric transformations
augmentation_transforms = photometric_transforms + geometric_transforms


#########

# Define custom MONAI transforms
class CustomGaussianBlur(monai.transforms.Transform):
    def __call__(self, data):
        return {"img": custom_gaussian_blur(data["img"])}

class CustomGaussianNoise(monai.transforms.Transform):
    def __call__(self, data):
        return {"img": custom_gaussian_noise(data["img"])}

class CustomColorJitter(monai.transforms.Transform):
    def __call__(self, data):
        return {"img": custom_color_jitter(data["img"])}

# Define the data augmentation transforms using custom MONAI transforms
photometric_transforms = [
    CustomGaussianBlur(),
    CustomGaussianNoise(),
    CustomColorJitter(),
]

geometric_transforms = [
    T.RandFlipd(keys="img", prob=0.5, spatial_axis=0),
    T.RandRotate90d(keys="img", prob=1.0, max_k=4),
]

# Combine photometric and geometric transformations
augmentation_transforms = photometric_transforms + geometric_transforms

*augmentation_transforms,  # we Include augmentation transforms here

### **Trainning and Evaluation Function**

In [25]:
def train_eval_model(model, train_loader, val_loader, criterion, optimizer, device):
    model = model.to(device)
    criterion = criterion.to(device)
    dice_loss = DiceLoss()
    learning_rate = 0.001  # Set your desired learning rate value
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # optimizer = optimizer(model.parameters())

    # nicializamos listas para almacenar las pérdidas y las puntuaciones de los dados
    train_losses = []
    val_losses = []
    dice_scores = []

    # Iteramos sobre cada época
    for epoch in range(num_epochs):
        # Ponemos el modelo en modo entrenamiento
        model.train()
        # Inicializamos a 0 la pérdida de entrenamiento para la época
        train_loss = 0.0

        # Iteramos sobre los lotes de datos de entrenamiento
        for i, batch in enumerate(train_loader):
            images = batch["img"].to(device)
            masks = batch["seg"].to(device)

            # Restablecemos los gradientes del optimizador
            optimizer.zero_grad()
            # Generamos salidas pasando las imágenes por el modelo
            outputs = model(images)
            # Calculamos la pérdida entre las salidas y las máscaras
            loss = criterion(outputs, masks)
            # Cálculamos los gradientes por retropropagación
            loss.backward()
            # Actualizamos los parámetros del modelo con el optimizador
            optimizer.step()

            # Añadimos la pérdida del lote actual a la pérdida de entrenamiento
            train_loss += loss.item()

        # Calculamos la pérdida media de entrenamiento para la época
        train_loss /= len(train_loader)
        # Añadimos la pérdida a la lista vacia train_losses
        train_losses.append(train_loss)


        # Ahora ponemos el modelo en modo evaluación
        model.eval()
        # Inicializamos la pérdida de validación para la época a 0
        val_loss = 0.0
        # Creamos un objeto DiceMetric con los parámetros especificados
        dice_metric = DiceMetric(include_background=True, reduction="mean")

        # Desactivamos el cálculo del gradiente durante la validación
        with torch.no_grad():
            # Iteramos sobre los lotes de datos de validación
            for i, batch in enumerate(val_loader):
                images = batch["img"].to(device)
                masks = batch["seg"].to(device)

                # Generamos las salidas pasando las imágenes por el modelo
                outputs = model(images)
                # Calculamos la pérdida entre las salidas y las máscaras
                val_loss += criterion(outputs, masks).item()

                # Actualizamos la métrica de los dados comparando las salidas y las máscaras
                dice_metric(outputs, masks)

        # Calculamos la pérdida media de validación para la época
        val_loss /= len(val_loader)
        # Añadimos la pérdida de validación a la lista
        val_losses.append(val_loss)

        # Calculamos la dice score
        dice_score = dice_metric.aggregate().item()
        # Añadimos la dice score a la lista
        dice_scores.append(dice_score)

        # Imprimimos la pérdida de entrenamiento, la pérdida de validación y la puntuación dada para la época
        print(f"Epoch {epoch+1}/{num_epochs}: Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Dice Score: {dice_score:.4f}")

    # Devolvemos las listas de pérdidas del training, pérdidas de la validación y los dice scores.
    return train_losses, val_losses, dice_scores


### **Training the DeepLabV3+ Model**

In [None]:
# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Model configurations
model_configs = [
    {"name": "DeepLabV3", "model": smp.DeepLabV3, "params": {"encoder_name": "resnet34", "in_channels": 3, "classes": 3}},
]

# Define the parameter grid for hyperparameter tuning
param_grid = {
    "num_epochs": [300],  # Train for 300 epochs.
    "optimizer": [optim.Adam],  # Use Adam optimizer
    "lr": [5e-5, 2e-3],  # Learning rate range: [5e-5, 2e-3]
    "loss": [monai.losses.DiceLoss],  # Uso DiceLoss para DeepLabV3+
}

# Create an empty DataFrame to store the results
results_df = pd.DataFrame()

# Iteramos sobre las configuraciones del modelo y el parameter grid, entrenamos y evaluamos cada modelo y almacenamos los resultados en el DataFrame.
for model_config in model_configs:
    for params in ParameterGrid(param_grid):
        # Obtenemos el nombre del modelo y creamos una instancia del modelo con los parámetros que hemos especificado.
        model_name = model_config["name"]
        model = model_config["model"](**model_config["params"])
        # Obtenga el número de épocas, el optimizador y la función de pérdida del parameter grid
        num_epochs = params["num_epochs"]
        optimizer = params["optimizer"]
        learning_rate = params["lr"]
        loss = params["loss"]()

        # Move the model to the appropriate device
        model.to(device)

        # Define the Adam optimizer with the specified learning rate
        optimizer = optimizer(model.parameters(), lr=learning_rate)

        # Train and evaluate the model using the train_eval_model function
        train_losses, val_losses, dice_scores = train_eval_model(
            model, train_dloader, val_dloader, loss, optimizer, device
        )

        # Iterate over each epoch and store the results in the DataFrame
        for epoch in range(num_epochs):
            results_df = results_df.append(
                {
                    "Model": model_name,
                    "Num Epochs": num_epochs,
                    "Optimizer": "Adam",
                    "Loss": loss.__class__.__name__,
                    "Train Loss": train_losses[epoch],
                    "Val Loss": val_losses[epoch],
                    "Dice Score": dice_scores[epoch],
                },
                ignore_index=True,
            )

Epoch 1/300: Train Loss: 0.4964, Val Loss: 0.4567, Dice Score: 0.6242
Epoch 2/300: Train Loss: 0.4457, Val Loss: 0.4443, Dice Score: 0.6576
Epoch 3/300: Train Loss: 0.4313, Val Loss: 0.4162, Dice Score: 0.6019
Epoch 4/300: Train Loss: 0.4089, Val Loss: 0.4004, Dice Score: 0.6345
Epoch 5/300: Train Loss: 0.3786, Val Loss: 0.3700, Dice Score: 0.6234
Epoch 6/300: Train Loss: 0.3664, Val Loss: 0.3202, Dice Score: 0.6243
Epoch 7/300: Train Loss: 0.4123, Val Loss: 0.4011, Dice Score: 0.6365
Epoch 8/300: Train Loss: 0.3970, Val Loss: 0.4218, Dice Score: 0.6300
Epoch 9/300: Train Loss: 0.3964, Val Loss: 0.3843, Dice Score: 0.6123
Epoch 10/300: Train Loss: 0.3852, Val Loss: 0.3794, Dice Score: 0.6014
Epoch 11/300: Train Loss: 0.3832, Val Loss: 0.3818, Dice Score: 0.6454
Epoch 12/300: Train Loss: 0.3805, Val Loss: 0.3751, Dice Score: 0.6295
Epoch 13/300: Train Loss: 0.3739, Val Loss: 0.3604, Dice Score: 0.6447


In [17]:
# Print the results with all rows displayed
pd.set_option("display.max_rows", None)  # Set the maximum number of rows to display as None (show all rows)

# Print the results DataFrame
print(results_df)

Empty DataFrame
Columns: []
Index: []


In [None]:
# Guardamos el modelo
import joblib
filename = 'Ocelot_Base_model.sav'
joblib.dump(model, filename)

In [None]:
# Plot the training loss over epochs
plt.figure()
plt.plot(range(1, num_epochs + 1), train_losses, label="Train Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title(f"{model_name} - Training Loss")
plt.legend()
plt.show()

### **Test of the DeepLabV3+ Model - ocelot**

In [None]:
# Define the loss function
criterion = DiceLoss(include_background=True, reduction="mean")
test_loss = 0.0

# Initialize variables to store the predicted and ground truth masks
pred_masks = []
gt_masks = []

model.eval()
with torch.no_grad():
    for batch in test_dloader:
        images = batch["img"].to(device)
        masks = batch["seg"].to(device)

        outputs = model(images)
        test_loss += criterion(outputs, masks).item()

        pred_masks.extend(torch.argmax(outputs, dim=1).detach().cpu().numpy())
        gt_masks.extend(torch.argmax(masks, dim=1).detach().cpu().numpy())

test_loss /= len(test_dloader)

# Convert the predicted and ground truth masks to binary matrices
pred_masks = np.array(pred_masks)  # y_pred
gt_masks = np.array(gt_masks)  # y_true
pred_masks = np.eye(num_classes)[pred_masks]  # one-hot encoding
gt_masks = np.eye(num_classes)[gt_masks]  # one-hot encoding

# Calculate the Dice coefficient
intersection = np.sum(pred_masks * gt_masks, axis=(1, 2, 3))
sum_u = np.sum(pred_masks, axis=(1, 2, 3)) + np.sum(gt_masks, axis=(1, 2, 3))
dice_coefficient = (2.0 * intersection) / (sum_u + 1e-7)
average_dice_coefficient = np.mean(dice_coefficient)

# Calculate Intersection over Union (IoU)
union = sum_u - intersection
iou = intersection / (union + 1e-7)
average_iou = np.mean(iou)

# Calculate Precision, Recall, and F1-Score
true_positive = np.sum(pred_masks * gt_masks, axis=(1, 2, 3))
false_positive = np.sum(pred_masks * (1 - gt_masks), axis=(1, 2, 3))
false_negative = np.sum((1 - pred_masks) * gt_masks, axis=(1, 2, 3))
precision = true_positive / (true_positive + false_positive + 1e-7)
recall = true_positive / (true_positive + false_negative + 1e-7)
f1_score = 2 * (precision * recall) / (precision + recall + 1e-7)
average_precision = np.mean(precision)
average_recall = np.mean(recall)
average_f1 = np.mean(f1_score)

# Calculate Pixel Accuracy
correct_pixels = np.sum(np.all(pred_masks == gt_masks, axis=1), axis=(1, 2))
total_pixels = np.prod(pred_masks.shape[1:])
pixel_accuracy = correct_pixels / total_pixels
average_pixel_accuracy = np.mean(pixel_accuracy)

# Calculate Mean Absolute Error (MAE) and Mean Squared Error (MSE)
mae = np.mean(np.abs(pred_masks - gt_masks))
mse = np.mean(np.square(pred_masks - gt_masks))

# Print all evaluation metrics
print(f"Test Loss: {test_loss:.4f}")
print(f"Average Dice Coefficient: {average_dice_coefficient:.4f}")
print(f"Average IoU: {average_iou:.4f}")
print(f"Average Precision: {average_precision:.4f}")
print(f"Average Recall: {average_recall:.4f}")
print(f"Average F1-Score: {average_f1:.4f}")
print(f"Average Pixel Accuracy: {average_pixel_accuracy:.4f}")
print(f"Mean Absolute Error (MAE): {mae:.4f}")
print(f"Mean Squared Error (MSE): {mse:.4f}")