In [1]:
import os
from PIL import Image
from tqdm import tqdm
import nibabel as nib
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import transforms, datasets
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from google.colab import drive
import torch.nn.functional as F

drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
class DiceLoss(nn.Module):
    def __init__(self):
        super(DiceLoss, self).__init__()

    def forward(self, outputs, targets, smooth=1e-6):
        # Convert to float tensors
        outputs = outputs.float()
        targets = targets.float()

        intersection = (outputs * targets).sum()
        union = outputs.sum() + targets.sum()

        dice_coeff = (2. * intersection + smooth) / (union + smooth)
        return 1 - dice_coeff

In [3]:
class UNetVGG16(nn.Module):
    def __init__(self, pretrained=True):
        super(UNetVGG16, self).__init__()

        # Carica VGG16 senza BatchNorm
        vgg16 = models.vgg16(weights=models.VGG16_Weights.DEFAULT if pretrained else None).features

        # Encoder (senza BatchNorm)
        self.enc1 = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            *vgg16[1:6]   #Usa il resto dei layer di vgg16[0:6]
        )
        self.enc2 = nn.Sequential(*vgg16[6:13])  # 160x160 -> 80x80
        self.enc3 = nn.Sequential(*vgg16[13:23]) # 80x80 -> 40x40
        self.enc4 = nn.Sequential(*vgg16[23:33]) # 40x40 -> 20x20
        self.enc5 = nn.Sequential(*vgg16[33:43]) # 20x20 -> 10x10

        # Bottleneck senza BatchNorm
        self.bottleneck = nn.Sequential(
            nn.Conv2d(512, 1024, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(1024, 1024, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )

        # Decoder con upsampling tramite ConvTranspose2d
        self.dec5 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2, padding=0)  # 10x10 -> 20x20
        self.dec4 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2, padding=0)   # 20x20 -> 40x40
        self.dec3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2, padding=0)   # 40x40 -> 80x80
        self.dec2 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2, padding=0)    # 80x80 -> 160x160
        self.dec1 = nn.ConvTranspose2d(64, 64, kernel_size=2, stride=2, padding=0)     # 80x80 -> 160x160

        # Ultima convoluzione con Sigmoid
        self.final_conv = nn.Conv2d(64, 1, kernel_size=1)

    def forward(self, x):
        # Encoder
        enc1_out = self.enc1(x)  # 160x160
        enc2_out = self.enc2(enc1_out)  # 80x80
        enc3_out = self.enc3(enc2_out)  # 40x40
        enc4_out = self.enc4(enc3_out)  # 20x20
        enc5_out = self.enc5(enc4_out)  # 10x10

        # Bottleneck
        bottleneck_out = self.bottleneck(enc5_out)

        # Decoder con upsampling
        dec5_out = self.dec5(bottleneck_out)  # 10x10 -> 20x20
        dec4_out = self.dec4(dec5_out)       # 20x20 -> 40x40
        dec3_out = self.dec3(dec4_out)       # 40x40 -> 80x80
        dec2_out = self.dec2(dec3_out)       # 80x80 -> 160x160
        dec1_out = self.dec1(dec2_out)       # 80x80 -> 160x160

        # Output finale con Sigmoid
        out = self.final_conv(dec1_out)
        return torch.sigmoid(out)

In [10]:
def load_nii(file_path):
    return nib.load(file_path).get_fdata()

class MRI_Dataset(Dataset):
    def __init__(self, image_paths, mask_paths, num_slices=5, transform=None):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.num_slices = num_slices  # Numero di slice da prendere (centrale ±2)
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Carica il volume MRI e la maschera
        img_nii = nib.load(self.image_paths[idx])
        mask_nii = nib.load(self.mask_paths[idx])

        img = img_nii.get_fdata()  # Shape (H, W, D)
        mask = mask_nii.get_fdata()  # Shape (H, W, D)

        # Prendi la slice centrale ±2
        mid_slice = img.shape[2] // 2
        slice_idxs = list(range(mid_slice - 2, mid_slice + 3))  # Es. [78, 79, 80, 81, 82]

        img_slices = img[:, :, slice_idxs]  # (H, W, 5)
        mask_slices = mask[:, :, slice_idxs]  # (H, W, 5)


        # Converte in tensori PyTorch (5 canali per UNet)
        img_tensor = torch.tensor(img_slices, dtype=torch.float32).permute(2, 0, 1)  # (C=5, H, W)
        mask_tensor = torch.tensor(mask_slices, dtype=torch.float32).permute(2, 0, 1)  # (C=5, H, W)

        img_tensor = img_tensor.unsqueeze(1)  # (num_slices, 1, Width, Height)
        mask_tensor = mask_tensor.unsqueeze(1)

        # Se necessario, applica le trasformazioni (es. resize a 160x160)
        if self.transform:
            img_tensor = self.transform(img_tensor)
            mask_tensor = self.transform(mask_tensor)

        return img_tensor, mask_tensor


def train_unet(model, train_loader, val_loader, epochs=10, lr=0.001, device="cuda"):
    model.to(device)
    criterion = DiceLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(epochs):
        print(f'Entro nel ciclo di addestramento della epoca: {epoch}')
        model.train()
        train_loss = 0.0

        for imgs, masks in train_loader:
            imgs, masks = imgs.to(device), masks.to(device)
            # imgs ha forma [batch_size, N_slice, 1, 160, 160]
            # masks ha forma [batch_size, N_slice, 1, 160, 160]

            # Iterate over the batch size
            for j in range(imgs.shape[0]):
                # Iterate over the number of slices
                for i in range(imgs.shape[1]):
                    img_slice = imgs[j, i, :, :, :].unsqueeze(0)  # Select the slice and add channel dimension [1, 1, 160, 160]
                    mask_slice = masks[j, i, :, :, :].unsqueeze(0)   # Select the slice and convert to long [1, 1, 160, 160]

                    optimizer.zero_grad()
                    output_slice = model(img_slice)  # Process the single slice
                    loss = criterion(output_slice, mask_slice)
                    loss.backward()
                    optimizer.step()
                    train_loss += loss.item()


         # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for imgs, masks in val_loader:
                imgs, masks = imgs.to(device), masks.to(device)

                # Iterate over the batch size
                for j in range(imgs.shape[0]):
                    # Iterate over the number of slices
                    for i in range(imgs.shape[1]):
                        img_slice = imgs[j, i, :, :, :].unsqueeze(0)  # Select the slice and add channel dimension [1, 1, 160, 160]
                        mask_slice = masks[j, i, :, :, :].unsqueeze(0)   # Select the slice and convert to long [1, 1, 160, 160]

                        #optimizer.zero_grad()
                        output_slice = model(img_slice)  # Process the single slice
                        loss = criterion(output_slice, mask_slice)

                        val_loss += loss.item()

        val_loss /= (len(val_loader) * imgs.shape[0] * imgs.shape[1]) # update the divisor

        print(f"Epoch {epoch+1}/{epochs} - Train Loss: {train_loss:.4f} - Val Loss: {val_loss:.4f}")

    return model

# Funzione per salvare le segmentazioni
def save_segmentations(model, test_loader):
    model.eval()
    os.makedirs("results", exist_ok=True)

    with torch.no_grad():
        for idx, (image, mask) in enumerate(test_loader):
            image = image.to(device)
            output = model(image)
            pred = torch.argmax(output, dim=1).cpu().numpy()

            # Salva come file NIfTI
            nifti_img = nib.Nifti1Image(pred, np.eye(4))
            nib.save(nifti_img, f"results/segmentation_{idx}.nii")



In [5]:
sample_mask = load_nii('/content/drive/MyDrive/Colab Notebooks/Analisi_Cardiac_MRI/primo_blocco_analisi/database/training/patient001/patient001_frame01_gt_preprocessed.nii.gz')  # Sostituisci con un percorso valido

# Trova i valori unici presenti nella maschera
unique_classes = np.unique(sample_mask)

print("Classi presenti nella maschera:", unique_classes)

Classi presenti nella maschera: [0. 1. 2. 3.]


In [6]:
# Preparazione dataset

base_path = '/content/drive/MyDrive/Colab Notebooks/Analisi_Cardiac_MRI/primo_blocco_analisi/database/training'

image_paths = []
mask_paths = []

for root, dirs, files in os.walk(base_path):
    for file in files:
        if "preprocessed" in file and "_gt" not in file:
            image_paths.append(os.path.join(root, file))
        elif "preprocessed" in file and "_gt" in file:
            mask_paths.append(os.path.join(root, file))

image_paths = sorted(image_paths)
mask_paths = sorted(mask_paths)

print(image_paths)
print(mask_paths)

['/content/drive/MyDrive/Colab Notebooks/Analisi_Cardiac_MRI/primo_blocco_analisi/database/training/patient001/patient001_frame01_preprocessed.nii.gz', '/content/drive/MyDrive/Colab Notebooks/Analisi_Cardiac_MRI/primo_blocco_analisi/database/training/patient001/patient001_frame12_preprocessed.nii.gz', '/content/drive/MyDrive/Colab Notebooks/Analisi_Cardiac_MRI/primo_blocco_analisi/database/training/patient002/patient002_frame01_preprocessed.nii.gz', '/content/drive/MyDrive/Colab Notebooks/Analisi_Cardiac_MRI/primo_blocco_analisi/database/training/patient002/patient002_frame12_preprocessed.nii.gz', '/content/drive/MyDrive/Colab Notebooks/Analisi_Cardiac_MRI/primo_blocco_analisi/database/training/patient003/patient003_frame01_preprocessed.nii.gz', '/content/drive/MyDrive/Colab Notebooks/Analisi_Cardiac_MRI/primo_blocco_analisi/database/training/patient003/patient003_frame15_preprocessed.nii.gz', '/content/drive/MyDrive/Colab Notebooks/Analisi_Cardiac_MRI/primo_blocco_analisi/database/tra

In [7]:
image_mask_pairs = list(zip(image_paths, mask_paths))
train_pairs, val_pairs = train_test_split(image_mask_pairs, test_size=0.2, random_state=42)
train_imgs, train_masks = zip(*train_pairs)
val_imgs, val_masks = zip(*val_pairs)

transform = transforms.Compose([
    transforms.Resize((160, 160)),
])

# Creazione dataset
train_dataset = MRI_Dataset(list(train_imgs), list(train_masks), num_slices=5, transform=transform)
val_dataset = MRI_Dataset(list(val_imgs), list(val_masks), num_slices=5, transform=transform)

# Creazione DataLoader
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)

In [8]:
from torchinfo import summary
num_slices = 6
num_classes = 4
batch_size=4
model = UNetVGG16()
summary(model)


Layer (type:depth-idx)                   Param #
UNetVGG16                                --
├─Sequential: 1-1                        --
│    └─Conv2d: 2-1                       640
│    └─ReLU: 2-2                         --
│    └─Conv2d: 2-3                       36,928
│    └─ReLU: 2-4                         --
│    └─MaxPool2d: 2-5                    --
│    └─Conv2d: 2-6                       73,856
├─Sequential: 1-2                        --
│    └─ReLU: 2-7                         --
│    └─Conv2d: 2-8                       147,584
│    └─ReLU: 2-9                         --
│    └─MaxPool2d: 2-10                   --
│    └─Conv2d: 2-11                      295,168
│    └─ReLU: 2-12                        --
│    └─Conv2d: 2-13                      590,080
├─Sequential: 1-3                        --
│    └─ReLU: 2-14                        --
│    └─Conv2d: 2-15                      590,080
│    └─ReLU: 2-16                        --
│    └─MaxPool2d: 2-17                   -

In [11]:
trained_model = train_unet(model, train_loader, val_loader, epochs=100)

RuntimeError: Found no NVIDIA driver on your system. Please check that you have an NVIDIA GPU and installed a driver from http://www.nvidia.com/Download/index.aspx

In [None]:
save_segmentations(trained_model, val_loader)