# U-Net Segmentation - Camus Dataset

*Made by **Hang Jung Ling** and **Olivier Bernard** from the INSA Lyon, France.*

This notebook shows how to train, test and evaluate a U-Net to segment different cardiac structures on [CAMUS dataset](https://humanheart-project.creatis.insa-lyon.fr/database/#collection/6373703d73e9f0047faa1bc8).

CAMUS is one of the largest public echocardiogaphic datasets, with 500 patients and each patient has 4 echocardiographic images: end-diastolic (ED) and end-systolic (ES) frames acquired in both apical two chamber and apical four chamber views. Each image is annotated by an expert and contains 3 classes + background:</br>
&emsp;1) Left ventricle</br>
&emsp;2) Myocardium</br>
&emsp;3) Left atrium</br>

Summary :</br>
&emsp;I.   [Install dependencies](#install)</br>
&emsp;II.  [Dataset](#dataset)</br>
&emsp;II.  [Train](#train)</br>
&emsp;III. [Visualize learning curves and predictions](#visualize)</br>


# I. Install dependencies <a class="anchor" id="install"></a>

Kindly ignore this step if you have installed your own environment using `environment.yaml`. If not, please execute the following cells to install the dependencies.

In [None]:
%%capture project_path_setup

import sys

if "../" in sys.path:
    print(sys.path)
else:
    sys.path.append("../")
    print(sys.path)

In [None]:
%%capture packages_install

# Make sure the repo's package and its dependencies are installed
%pip install -e ../.

# II. Dataset <a class="anchor" id="dataset"></a>

Once the environment is successfully setup, download the CAMUS dataset by executing the following cell. The dataset will be downloaded to the `data/` folder. 

In [None]:
from pathlib import Path

from tqdm.auto import tqdm

# Make sure the data is downloaded and extracted where it should be
if not Path("../data/camus_64").is_dir():
    import zipfile
    from io import BytesIO
    from urllib.request import urlopen

    zipurl = "https://www.creatis.insa-lyon.fr/~bernard/camus/camus_64.zip"
    with urlopen(zipurl) as zipresp:
        with zipfile.ZipFile(BytesIO(zipresp.read())) as zfile:
            for member in tqdm(
                zfile.infolist(), desc="Downloading and extracting data", position=0, leave=True
            ):
                try:
                    zfile.extract(member, "../data/")
                except zipfile.error as e:
                    pass

Now, let's split these data into training, validation and testing sets. We will use 80% of the data for training, 10% for validation and 10% for testing. The split is done by patient ID, so that the same patient will not appear in different sets.

In [None]:
from pathlib import Path

from sklearn.model_selection import train_test_split

from src.utils.file_and_folder_operations import subdirs

# Specify the data directory
data_dir = Path("../data/camus_64").resolve()

# List all the patients id
keys = subdirs(data_dir, prefix="patient", join=False)

# Split the patients into 80/10/10 train/val/test sets
train_keys, val_and_test_keys = train_test_split(keys, train_size=0.8, random_state=12345)
val_keys, test_keys = train_test_split(val_and_test_keys, test_size=0.5, random_state=12345)

train_keys = sorted(train_keys)
val_keys = sorted(val_keys)
test_keys = sorted(test_keys)

# Create train, val and test datalist
viws_instants = ["2CH_ED", "2CH_ES", "4CH_ED", "4CH_ES"]
train_datalist = [
    {
        "image": str(data_dir / key / f"{key}_{view}.nii.gz"),
        "label": str(data_dir / key / f"{key}_{view}_gt.nii.gz"),
    }
    for key in train_keys
    for view in viws_instants
]

val_datalist = [
    {
        "image": str(data_dir / key / f"{key}_{view}.nii.gz"),
        "label": str(data_dir / key / f"{key}_{view}_gt.nii.gz"),
    }
    for key in val_keys
    for view in viws_instants
]

test_datalist = [
    {
        "image": str(data_dir / key / f"{key}_{view}.nii.gz"),
        "label": str(data_dir / key / f"{key}_{view}_gt.nii.gz"),
    }
    for key in test_keys
    for view in viws_instants
]

print("Example of train keys: ", train_datalist[:2])
print("Example of validation keys: ", val_datalist[:2])
print("Example of test keys: ", test_datalist[:2])

Once the data is split, we will create a `Dataset` object for each set. This object will be used to load the data during training and testing.

In [None]:
import numpy as np
from monai.data import CacheDataset
from monai.transforms import (
    Compose,
    EnsureChannelFirstd,
    LoadImaged,
    NormalizeIntensityd,
    RandAdjustContrastd,
    RandFlipd,
    RandGaussianNoised,
    RandGaussianSmoothd,
    RandRotated,
    RandScaleIntensityd,
    RandZoomd,
)

# Transforms to load data
load_transforms = [
    LoadImaged(keys=["image", "label"], image_only=True),  # Load image and label
    EnsureChannelFirstd(
        keys=["image", "label"]
    ),  # Make sure the first dimension is the channel dimension
    NormalizeIntensityd(keys=["image"]),  # Normalize the intensity of the image
]

# Transforms to augment data
range_x = [-15.0 / 180 * np.pi, 15.0 / 180 * np.pi]
data_augmentation_transforms = [
    RandRotated(
        keys=["image", "label"],
        range_x=range_x,
        range_y=0,
        range_z=0,
        mode=["bicubic", "nearest"],
        padding_mode="zeros",
        prob=0.2,
    ),
    RandZoomd(
        keys=["image", "label"],
        min_zoom=0.7,
        max_zoom=1.4,
        mode=["bicubic", "nearest"],
        padding_mode="constant",
        align_corners=(True, None),
        prob=0.2,
    ),
    RandGaussianNoised(keys=["image"], std=0.01, prob=0.15),
    RandGaussianSmoothd(
        keys=["image"],
        sigma_x=(0.5, 1.15),
        sigma_y=(0.5, 1.15),
        prob=0.15,
    ),
    RandScaleIntensityd(keys=["image"], factors=0.3, prob=0.15),
    RandAdjustContrastd(keys=["image"], gamma=(0.7, 1.5), prob=0.3),
    RandFlipd(keys=["image", "label"], spatial_axis=[0], prob=0.5),
]

# Define transforms for training, validation and testing
train_transforms = Compose(load_transforms + data_augmentation_transforms)
val_transforms = Compose(load_transforms)
test_transforms = Compose(load_transforms)

# Use CacheDataset to accelerate training and validation
train_ds = CacheDataset(data=train_datalist, transform=train_transforms, cache_rate=1.0)
val_ds = CacheDataset(data=val_datalist, transform=val_transforms, cache_rate=1.0)
test_ds = CacheDataset(data=test_datalist, transform=test_transforms, cache_rate=1.0)

Now, let's visualize some images from the training set. The images are displayed with their corresponding ground truth segmentation masks.

In [None]:
from matplotlib import pyplot as plt
from matplotlib.colors import ListedColormap

from src.utils.visualizations import imagesc

# Get a random index to display the image with label from each dataset
train_idx = np.random.randint(len(train_ds))
val_idx = np.random.randint(len(val_ds))
test_idx = np.random.randint(len(test_ds))

# Print the selected indices
print("train_idx: ", train_idx)
print("val_idx: ", val_idx)
print("test_idx: ", test_idx)

# Visualize a random image with label from each dataset
colors = ["black", "red", "green", "blue"]
cmap = ListedColormap(colors)

figure = plt.figure(figsize=(8, 8))
train_sample = train_ds[train_idx]
image = train_sample["image"].detach().cpu().numpy()[0].transpose(1, 0)
label = train_sample["label"].detach().cpu().numpy()[0].transpose(1, 0)
ax = figure.add_subplot(3, 2, 1)
imagesc(ax, image, title="Training image", show_colorbar=False)
ax = figure.add_subplot(3, 2, 2)
imagesc(
    ax, label, title="Training label", show_colorbar=False, colormap=cmap, interpolation="nearest"
)

val_sample = val_ds[val_idx]
image = val_sample["image"].detach().cpu().numpy()[0].transpose(1, 0)
label = val_sample["label"].detach().cpu().numpy()[0].transpose(1, 0)
ax = figure.add_subplot(3, 2, 3)
imagesc(ax, image, title="Validation image", show_colorbar=False)
ax = figure.add_subplot(3, 2, 4)
imagesc(
    ax,
    label,
    title="Validation label",
    show_colorbar=False,
    colormap=cmap,
    interpolation="nearest",
)

test_sample = test_ds[test_idx]
image = test_sample["image"].detach().cpu().numpy()[0].transpose(1, 0)
label = test_sample["label"].detach().cpu().numpy()[0].transpose(1, 0)
ax = figure.add_subplot(3, 2, 5)
imagesc(ax, image, title="Test image", show_colorbar=False)
ax = figure.add_subplot(3, 2, 6)
imagesc(ax, label, title="Test label", show_colorbar=False, colormap=cmap, interpolation="nearest")
figure.tight_layout()
plt.show()

# III. Train <a class="anchor" id="train"></a>
Let's move on to train a U-Net to segment the left ventricle, myocardium and left atrium. We will use the training and validation sets created in the previous section.

### Definition of U-Net architecture

In [None]:
from torchinfo import summary

from src.models.unet import UNet

input_channels = 1  # This is the number of input channels in the image
input_shape = (input_channels, 64, 64)  # This is the shape of the input image to the network
num_classes = 4  # This is the number of output classes
output_shape = (num_classes, 64, 64)  # This is the shape of the output mask
init_channels = 32  # This is the number of channels in the first layer of the network

unet = UNet(input_shape=input_shape, output_shape=output_shape, init_channels=init_channels)

# Print the summary of the network
summary_kwargs = dict(
    col_names=["input_size", "output_size", "kernel_size", "num_params"], depth=3, verbose=0
)
summary(unet, (1, *input_shape), device="cpu", **summary_kwargs)

### Define whole auto-encoder and encoder

In [None]:
import torch
import torch.nn as nn
class DoubleConv(nn.Module):

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)

class Down(nn.Module):

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels),
            #nn.Dropout2d(p=0.2)
        )

    def forward(self, x):
        return self.maxpool_conv(x)

class Up(nn.Module):

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        # if bilinear, use the normal convolutions to modify the number of channels
        if bilinear:
            self.up = nn.Sequential(
                nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True),
                DoubleConv(in_channels, out_channels),
                #nn.Dropout2d(p=0.2)
            )
        else:
            self.up = nn.Sequential(
                nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True),
                #nn.Dropout2d(p=0.2)
            )

    def forward(self, x):
        return self.up(x)

class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

class AE_Conv(nn.Module):
    def __init__(self, in_channels, out_channels, init_channel, bilinear=True):
        super(AE_Conv, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.init_channel = init_channel
        self.bilinear = bilinear

        self.inc = DoubleConv(in_channels, init_channel)
        self.down1 = Down(init_channel, init_channel*2)
        self.down2 = Down(init_channel*2, init_channel*4)
        self.down3 = Down(init_channel*4, init_channel*8)
        #self.down4 = Down(init_channel*8, init_channel*16)
        #self.up1 = Up(init_channel*16, init_channel*8, bilinear)
        self.up2 = Up(init_channel*8, init_channel*4, bilinear)
        self.up3 = Up(init_channel*4, init_channel*2, bilinear)
        self.up4 = Up(init_channel*2, init_channel, bilinear)
        self.outc = OutConv(init_channel, out_channels)
        self.relu = nn.ReLU()

        self._initialize_weights()

    def forward(self, x):
        x = self.inc(x)
        x = self.down1(x)
        x = self.down2(x)
        x = self.down3(x)
        #x = self.down4(x)
        #x = self.up1(x)
        x = self.up2(x)
        x = self.up3(x)
        x = self.up4(x)
        x = self.outc(x)
        #x = self.relu(x)
        return x

    def _initialize_weights(self):
       for m in self.modules():
           if isinstance(m, nn.Conv2d):
               nn.init.kaiming_normal_(m.weight)
               if m.bias is not None:
                   nn.init.zeros_(m.bias)
           elif isinstance(m, nn.BatchNorm2d):
               nn.init.constant_(m.weight, 1)
               nn.init.constant_(m.bias, 0)

class Encoder(nn.Module):
    def __init__(self, in_channels, init_channel):
        super(Encoder, self).__init__()
        self.in_channels = in_channels
        self.init_channel = init_channel

        self.inc = DoubleConv(in_channels, init_channel)
        self.down1 = Down(init_channel, init_channel*2)
        self.down2 = Down(init_channel*2, init_channel*4)
        self.down3 = Down(init_channel*4, init_channel*8)
        #self.down4 = Down(init_channel*8, init_channel*16)

        self._initialize_weights()

    def forward(self, x):
        x = self.inc(x)
        x = self.down1(x)
        x = self.down2(x)
        x = self.down3(x)
        #x = self.down4(x)
        return x
    def _initialize_weights(self):
       for m in self.modules():
           if isinstance(m, nn.Conv2d):
               nn.init.kaiming_normal_(m.weight)
               if m.bias is not None:
                   nn.init.zeros_(m.bias)
           elif isinstance(m, nn.BatchNorm2d):
               nn.init.constant_(m.weight, 1)
               nn.init.constant_(m.bias, 0)

### Get auto-encoder weights and load encoder weights

In [None]:
# Determine the device to run the model on
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

input_channel = 1  # This is the number of input channels in the image
output_channel = 1  # This is the number of output channel
init_channels = 32  # This is the output channel's number of input convolution
ae_input_channels = 1 # This is the number of auto-encoder input channels in the image

ae_conv = AE_Conv(input_channel, output_channel, init_channels)
encoder = Encoder(ae_input_channels, init_channels)

ae_conv = torch.load("../logs/ae_label/best_metric_model.pth", map_location=device)

#Load the 4 first layers parameters of auto-encoder into encoder
encoder.inc.load_state_dict(ae_conv.inc.state_dict())
encoder.down1.load_state_dict(ae_conv.down1.state_dict())
encoder.down2.load_state_dict(ae_conv.down2.state_dict())
encoder.down3.load_state_dict(ae_conv.down3.state_dict())

### Definition of optimizer and loss function
We will use the Adam optimizer. The loss function is the combination of the Dice and cross-entropy (CE) loss, which is a standard loss function for segmentation tasks.
> **&#9432;** The exact computation of the loss function is CE - Dice, which means its minimum value is -1 instead of 0 in the best case scenario.

In [None]:
from functools import partial

from torch.nn import MSELoss

from src.utils.loss_functions.dice_loss import DC_and_CE_loss

# Soft dice and CE loss function
loss_function = DC_and_CE_loss(
    {"batch_dice": True, "smooth": 1e-5, "do_bg": False}, weight_ce=1, weight_dice=1
)

# MSE loss function for comparison between ref (=label) and prediction in the auto-encoder latent space
loss_function_latent = MSELoss();

# Adam optimizer
optimizer = partial(torch.optim.Adam, params=unet.parameters())

### Definition of training function

In [None]:
import os
import time
from typing import Union

import torch.nn.functional as F
from monai.data import DataLoader
from torch import nn
from torch.utils.data import Dataset
from tqdm.auto import tqdm

from src.utils.tensor_utils import sum_tensor


def train_process(
    train_ds: Dataset,
    val_ds: Dataset,
    num_workers: int,
    model: nn.Module,
    encoder: nn.Module,
    loss_function: nn.Module,
    loss_function_latent: nn.Module,
    optimizer: torch.optim.Optimizer,
    num_classes: int,
    batch_size: int = 2,
    lr: float = 0.001,
    max_epochs: int = 30,
    log_dir: Union[Path, str] = Path("../logs/camus_segmentation"),
    val_interval=1,
) -> tuple[float, list[float], list[float], list[int], list[float]]:
    """Trains a neural network model for segmentation on the provided datasets using the specified parameters.

    Args:
        train_ds: Training dataset.
        val_ds: Validation dataset.
        num_workers: Number of workers to use for data loading.
        model: Neural network model.
        loss_function: Loss function.
        optimizer: Optimizer.
        num_classes: Number of classes to segment.
        batch_size: Number of batch size. Defaults to 2.
        lr: Learning rate. Defaults to 0.001.
        max_epochs: Maximum training epochs. Defaults to 30.
        log_dir: Path to the logging directory. Defaults to Path("../logs/camus_segmentation").
        val_interval: Epoch interval to perform evaluation steps. Defaults to 1.

    Returns:
        - Total time taken for training.
        - List containing training loss values for each epoch.
        - List containing validation loss values for each evaluation epoch.
        - List containing epochs where validation is performed.
        - List containing metric values for each evaluation epoch.
    """
    # Create train and validation dataloaders
    train_dataloader = DataLoader(
        train_ds,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=torch.cuda.is_available(),
    )
    val_dataloader = DataLoader(
        val_ds,
        batch_size=batch_size,
        num_workers=num_workers,
        pin_memory=torch.cuda.is_available(),
    )

    # Determine the device to run the model on
    if torch.cuda.is_available():
        device = torch.device("cuda")
        print(f"GPU detected, training on: {device}!\n")
    else:
        device = torch.device("cpu")
        print("GPU not detected, training on CPU!\n")

    # Move the models to the device
    model = model.to(device)
    encoder = encoder.to(device)

    # Finalize the creation of the optimizer object with the learning rate
    optimizer = optimizer(lr=lr)

    # Convert log directory to Path object if needed
    if not isinstance(log_dir, Path):
        log_dir = Path(log_dir)
    # Create the log directory if it does not exist
    log_dir.mkdir(parents=True, exist_ok=True)

    # Define some variables to keep track of the best metric values, epoch time and losses
    best_metric = -1
    best_metric_epoch = -1
    epoch_train_loss_values = []
    epoch_val_loss_values = []
    metric_values = []
    epoch_val = []
    metric_per_class = {f"metric/{i}": [] for i in range(1, num_classes)}
    total_start = time.time()
    
    # Losses weights
    k_segm = 0.5
    k_latent = 0.5

    fit_pbar = tqdm(range(max_epochs), desc="Training", unit="epoch", position=0, leave=True)
    pbar_metrics = {"train/loss": None, "val/loss": None, "val/dice": None}

    for epoch in fit_pbar:
        model.train()
        epoch_loss = 0
        epoch_val_loss = 0
        step = 0
        epoch_tp_hard = []
        epoch_fp_hard = []
        epoch_fn_hard = []
        for batch_data in train_dataloader:
            step += 1
            inputs, labels = (
                batch_data["image"].to(device),
                batch_data["label"].to(device),
            )
            optimizer.zero_grad()
            outputs = model(inputs)
            
            #We pass the prediction and the label into the encoder
            outputs_1_ch = F.softmax(outputs, dim=1).argmax(dim=1, keepdim=True).float() #we use argmax to have only 1 channel on output
            outputs_latent = encoder(outputs_1_ch) #encode segmented image into latent space
            labels_latent = encoder(labels) #encode labels (=reference) into latent space
            
            #We compute both losses
            loss_segm = loss_function(outputs, labels)
            loss_latent = loss_function_latent(outputs_latent, labels_latent) #compute MSE loss in the latent space
            loss = k_segm*loss_segm + k_latent*loss_latent #weighted sum of both losses
            
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
            pbar_metrics["loss"] = loss.item()
            pbar_metrics["train_batch"] = f"{step}/{len(train_dataloader)}"
            fit_pbar.set_postfix(pbar_metrics)

        epoch_loss /= step
        epoch_train_loss_values.append(epoch_loss)
        pbar_metrics["train/loss"] = epoch_loss
        pbar_metrics.pop("loss")
        pbar_metrics.pop("train_batch")
        fit_pbar.set_postfix(pbar_metrics)

        if (epoch + 1) % val_interval == 0:
            step = 0
            model.eval()
            with torch.no_grad():
                for val_data in val_dataloader:
                    step += 1
                    val_inputs, val_labels = (
                        val_data["image"].to(device),
                        val_data["label"].to(device),
                    )
                    val_outputs = model(val_inputs)
                    
                    #We pass the prediction and the label into the encoder
                    val_outputs_1_ch = F.softmax(val_outputs, dim=1).argmax(dim=1, keepdim=True).float() #we use argmax to have only 1 channel on output
                    val_outputs_latent = encoder(val_outputs_1_ch) #encode segmented image into latent space
                    val_labels_latent = encoder(val_labels) #encode labels (=reference) into latent space

                    #We compute both losses
                    val_loss_segm = loss_function(outputs, labels)
                    val_loss_latent = loss_function_latent(val_outputs_latent, val_labels_latent) #compute MSE loss in the latent space
                    val_loss = k_segm*val_loss_segm + k_latent*val_loss_latent #weighted sum of both losses
                    
                    epoch_val_loss += val_loss.item()
                    pbar_metrics["loss"] = val_loss.item()
                    pbar_metrics["val_batch"] = f"{step}/{len(val_dataloader)}"
                    fit_pbar.set_postfix(pbar_metrics)

                    # Compute tp, fp, and fn that will be used to compute the hard dice metric per epoch
                    val_labels = val_labels[:, 0]
                    axes = tuple(range(1, len(val_labels.shape)))
                    # Convert the output logits to a segmentation mask
                    val_outputs = F.softmax(val_outputs, dim=1).argmax(dim=1)

                    tp_hard = torch.zeros((val_labels.shape[0], num_classes - 1)).to(
                        val_outputs.device.index
                    )
                    fp_hard = torch.zeros((val_labels.shape[0], num_classes - 1)).to(
                        val_outputs.device.index
                    )
                    fn_hard = torch.zeros((val_labels.shape[0], num_classes - 1)).to(
                        val_outputs.device.index
                    )

                    for c in range(1, num_classes):
                        tp_hard[:, c - 1] = sum_tensor(
                            (val_outputs == c).float() * (val_labels == c).float(), axes=axes
                        )
                        fp_hard[:, c - 1] = sum_tensor(
                            (val_outputs == c).float() * (val_labels != c).float(), axes=axes
                        )
                        fn_hard[:, c - 1] = sum_tensor(
                            (val_outputs != c).float() * (val_labels == c).float(), axes=axes
                        )

                    tp_hard = tp_hard.sum(0, keepdim=False).detach().cpu().numpy()
                    fp_hard = fp_hard.sum(0, keepdim=False).detach().cpu().numpy()
                    fn_hard = fn_hard.sum(0, keepdim=False).detach().cpu().numpy()

                    # Store the tp_hard, fp_hard, and fn_hard per evaluation step
                    epoch_tp_hard.append(list(tp_hard))
                    epoch_fp_hard.append(list(fp_hard))
                    epoch_fn_hard.append(list(fn_hard))

                epoch_val_loss /= step
                epoch_val_loss_values.append(epoch_val_loss)
                epoch_val.append(epoch + 1)
                pbar_metrics.pop("loss")
                pbar_metrics.pop("val_batch")
                pbar_metrics["val/loss"] = float(epoch_val_loss)

                # Compute the hard dice metric per epoch
                epoch_tp_hard = np.sum(epoch_tp_hard, 0)
                epoch_fp_hard = np.sum(epoch_fp_hard, 0)
                epoch_fn_hard = np.sum(epoch_fn_hard, 0)

                # Compute the hard dice metric per class
                global_dc_per_class = [
                    i if not np.isnan(i) else 0.0
                    for i in [
                        2 * i / (2 * i + j + k)
                        for i, j, k in zip(epoch_tp_hard, epoch_fp_hard, epoch_fn_hard)
                    ]
                ]

                val_metric = np.mean(global_dc_per_class)
                metric_values.append(val_metric)
                pbar_metrics["val/dice"] = val_metric
                for i in range(1, num_classes, 1):
                    pbar_metrics[f"val/dice/{i}"] = global_dc_per_class[i - 1]
                    metric_per_class[f"metric/{i}"].append(global_dc_per_class[i - 1])
                fit_pbar.set_postfix(pbar_metrics)

                if val_metric > best_metric:
                    best_metric = val_metric
                    best_metric_epoch = epoch + 1
                    # Save best metric model checkpoint
                    torch.save(
                        {
                            "max_epochs": max_epochs,
                            "current_epoch": epoch + 1,
                            "best_metric_epoch": best_metric_epoch,
                            "train_loss": epoch_train_loss_values,
                            "val_loss": epoch_val_loss_values,
                            "epoch_val": epoch_val,
                            "metric_values": metric_values,
                            "metric_per_class": metric_per_class,
                            "model_state_dict": model.state_dict(),
                            "optimizer_state_dict": optimizer.state_dict(),
                        },
                        str(log_dir / "best_metric_model.pth"),
                    )

        # Save last model checkpoint
        torch.save(
            {
                "max_epochs": max_epochs,
                "current_epoch": epoch + 1,
                "best_metric_epoch": best_metric_epoch,
                "train_loss": epoch_train_loss_values,
                "val_loss": epoch_val_loss_values,
                "epoch_val": epoch_val,
                "metric_values": metric_values,
                "metric_per_class": metric_per_class,
                "model_state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
            },
            str(log_dir / "last_model.pth"),
        )

    print(
        f"train completed, best_metric: {best_metric:.4f}"
        f" at epoch: {best_metric_epoch}"
        f" total time: {(time.time() - total_start):.4f}"
    )
    return (
        time.time() - total_start,
        epoch_train_loss_values,
        epoch_val_loss_values,
        epoch_val,
        metric_values,
        metric_per_class,
    )

### Definition of training hyperparameters
In this section, we will define the hyperparameters for training, such as the number of epochs, the learning rate, the batch size, etc.

In [None]:
batch_size = 2  # Number of batch size
lr = 0.001  # Learning rate
max_epochs = 10  # Number of epochs to train the model
num_workers = os.cpu_count() - 1  # Number of workers to use for data loading

# Train the model
(
    total_time,
    epoch_train_loss_values,
    epoch_val_loss_values,
    epoch_val,
    metric_values,
    metric_per_class,
) = train_process(
    train_ds=train_ds,
    val_ds=val_ds,
    num_workers=num_workers,
    model=unet,
    encoder=encoder,
    loss_function=loss_function,
    loss_function_latent=loss_function_latent,
    optimizer=optimizer,
    num_classes=num_classes,
    batch_size=batch_size,
    lr=lr,
    max_epochs=max_epochs,
    log_dir="../logs/camus_segmentation",
)

# IV. Visualize learning curves and predictions <a class="anchor" id="visualize"></a>

### Plot training losses, validation losses and validation dice over epochs

In [None]:
from matplotlib.ticker import MaxNLocator

trains_epoch = list(range(1, max_epochs + 1, 1))
vals_epochs = epoch_val
plt.figure("train", (16, 8))
ax = plt.subplot(1, 3, 1)
plt.title("Epoch Average Loss")
plt.xlabel("Epoch")
ax.xaxis.set_major_locator(MaxNLocator(integer=True))  # Set integer ticks for x-axis
plt.plot(trains_epoch, epoch_train_loss_values, color="red", label="Train")
plt.plot(vals_epochs, epoch_val_loss_values, color="blue", label="Val")
plt.legend(loc="upper right")
ax = plt.subplot(1, 3, 2)
plt.title("Val Mean Dice")
plt.xlabel("Epoch")
ax.xaxis.set_major_locator(MaxNLocator(integer=True))  # Set integer ticks for x-axis
plt.plot(vals_epochs, metric_values, color="green")

ax = plt.subplot(1, 3, 3)
plt.title("Val Mean Dice per Class")
plt.xlabel("Epoch")
ax.xaxis.set_major_locator(MaxNLocator(integer=True))  # Set integer ticks for x-axis
legend_metric = ["left ventricle", "myocardium", "left atrium"]
for i in range(1, num_classes, 1):
    plt.plot(vals_epochs, metric_per_class[f"metric/{i}"], label=legend_metric[i - 1])
plt.legend(loc="upper left")
plt.show()

### Download the pre-trained model weight for 50 epochs and visualize learning curves and metrics over 50 epochs

In [None]:
from matplotlib.ticker import MaxNLocator

# Path to the logging directory
log_dir = "../logs/camus_segmentation"

# Determine the device to run the model on
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

# Load the best model weight
best_model = Path(log_dir) / "best_model_50epochs.pth"
if not best_model.is_file():
    import urllib.request

    urllib.request.urlretrieve(
        "https://www.creatis.insa-lyon.fr/~bernard/camus/best_model_50epochs.pth",
        str(best_model),
    )

# Load the weight
weight = torch.load(best_model, map_location=device)
epoch_train_loss_values = weight["train_loss"]
epoch_val_loss_values = weight["val_loss"]
epoch_val = weight["epoch_val"]
metric_values = weight["metric_values"]
metric_per_class = weight["metric_per_class"]
max_epochs = weight["max_epochs"]
best_metric_epoch = weight["best_metric_epoch"]

# Plot the training losses, validation losses and validation dice over epochs
trains_epoch = list(range(1, max_epochs + 1, 1))
vals_epochs = epoch_val

plt.figure("train", (16, 8))
ax = plt.subplot(1, 3, 1)
plt.title("Epoch Average Loss")
plt.xlabel("Epoch")
ax.xaxis.set_major_locator(MaxNLocator(integer=True))  # Set integer ticks for x-axis
plt.plot(trains_epoch, epoch_train_loss_values, color="red", label="Train")
plt.plot(vals_epochs, epoch_val_loss_values, color="blue", label="Val")
# Add a vertical line at the best model epoch
plt.axvline(best_metric_epoch, color="gray", linestyle="--")
plt.text(
    best_metric_epoch + 0.5,
    min(epoch_train_loss_values) * 4 / 5,
    f"Best model epoch = {best_metric_epoch}",
    rotation=90,
)
plt.legend(loc="upper right")
ax = plt.subplot(1, 3, 2)
plt.title("Val Mean Dice")
plt.xlabel("Epoch")
ax.xaxis.set_major_locator(MaxNLocator(integer=True))  # Set integer ticks for x-axis
plt.plot(vals_epochs, metric_values, color="green")
plt.axvline(best_metric_epoch, color="gray", linestyle="--")

ax = plt.subplot(1, 3, 3)
plt.title("Val Mean Dice per Class")
plt.xlabel("Epoch")
ax.xaxis.set_major_locator(MaxNLocator(integer=True))  # Set integer ticks for x-axis
legend_metric = ["left ventricle", "myocardium", "left atrium"]
for i in range(1, num_classes, 1):
    plt.plot(vals_epochs, metric_per_class[f"metric/{i}"], label=legend_metric[i - 1])
plt.axvline(best_metric_epoch, color="gray", linestyle="--")
plt.legend(loc="upper left")
plt.show()

### Load the pre-trained model weight into the U-Net

In [None]:
# Set to True to use the pretrained model
# Set to False to use the model you just trained
use_pretrained = False

if use_pretrained:
    weight = torch.load(Path(log_dir) / "best_model_50epochs.pth", map_location=device)
else:
    weight = torch.load(Path(log_dir) / "best_metric_model.pth", map_location=device)

# Load the weight into the U-Net
unet.load_state_dict(weight["model_state_dict"])
# Move the U-Net to the correct device
unet.to(device)
# Put the U-Net in evaluation mode
unet.eval()

### Perform the inference on the test set
First, we create the dataloader from the test dataset we defined at the beginning.

In [None]:
# Create the test dataloader
num_workers = os.cpu_count() - 1

test_dataloader = DataLoader(
    test_ds,
    batch_size=1,
    num_workers=num_workers,
    pin_memory=torch.cuda.is_available(),
)

# Create an iterator to iterate over the test dataloader
test_dataloader_iter = iter(test_dataloader)

Then, we perform the inference on a sample from the test dataset and plot the prediction.

In [None]:
# Run inference on a test image
batch_data = next(test_dataloader_iter)
with torch.no_grad():
    image = batch_data["image"].to(device)
    pred = unet(image)
    pred = F.softmax(pred, dim=1).argmax(dim=1)
    pred = pred.squeeze().detach().cpu().numpy().transpose(1, 0)
    label = batch_data["label"].squeeze().detach().cpu().numpy().transpose(1, 0)
    image = image.squeeze().detach().cpu().numpy().transpose(1, 0)

# Plot the image, label and prediction
figure = plt.figure(figsize=(8, 8))
ax = figure.add_subplot(1, 3, 1)
imagesc(ax, image, title="Image", show_colorbar=False)
ax = figure.add_subplot(1, 3, 2)
imagesc(ax, label, title="Label", show_colorbar=False, colormap=cmap, interpolation="nearest")
ax = figure.add_subplot(1, 3, 3)
imagesc(ax, pred, title="Prediction", show_colorbar=False, colormap=cmap, interpolation="nearest")
figure.tight_layout()
plt.show()