In [None]:
from google.colab import drive
drive.mount('/content/drive')

# INSERIRE IL PATH DEL DATASET DA ESTRARRE
!unzip "/content/drive/MyDrive/CONSEGNA CHALLENGE/TEST(1).zip"

# INSTALLAZIONE DI MONAI

In [None]:
!pip install 'monai[all]'
!pip install pynrrd

# LIBRERIE UTILIZZATE

In [None]:
import os
from monai.inferers import SimpleInferer
import nrrd

from monai.utils import set_determinism
from monai.networks.nets import UNet
from monai.networks.layers import Norm
from monai.config import print_config
from monai.losses.dice import DiceCELoss
from monai.metrics import DiceMetric,ConfusionMatrixMetric
from skimage import measure
from scipy.stats import ks_2samp
import matplotlib.patches as mpatches

import monai
import matplotlib.pyplot as plt
import numpy as np
import torch 

from tqdm import tqdm
from skimage.io import imread, imshow, imsave
from skimage.transform import resize

from skimage.transform import rotate
from scipy.ndimage import binary_fill_holes
from scipy.ndimage import binary_dilation, binary_erosion


import skimage.io as io
from skimage.measure import label, regionprops, regionprops_table
from skimage import morphology,exposure


import torch
from torchvision import transforms
from monai.utils import first
from monai.transforms import (
    AsDiscrete,
    EnsureChannelFirst,
    DataStatsd,
    AddChanneld,
    Compose,
    Activations,
    LoadImage,
    LoadImaged,
    Resize,
    Resized,
    DataStats,
    AsChannelFirstd,
    AsDiscreted,
    ToTensord,
    ToTensor,
    EnsureType,
    EnsureChannelFirstd,
    DataStatsd,
    AddChannel,
    AdjustContrastd,
)

from monai.data import (
    DataLoader,
    CacheDataset,
    PILReader,
    ITKReader,
    NrrdReader,
    IterableDataset,
    decollate_batch,
    ArrayDataset
)

from scipy.ndimage import median_filter
from scipy.ndimage import gaussian_filter


# DEFINIZIONE DEI PATH

In [None]:
current_dir = os.getcwd()
Nome_cartella = 'CODICI FINALI' #nome della cartella caricata contenente lo script submission e il modello
Nome_best_metric_model = 'best_metric_model.pth' #nome del modello

Nome_cartella_immagine = 'volumes' #inserire il nome della cartella contenente le immagini di test
Nome_cartella_maschere = 'mask'   #inserire il nome della cartella contenente le maschere di test

path = os.path.join(current_dir,'drive/MyDrive', Nome_cartella)  #creazione del path utile all'interno dello script

TEST_volumes_path = os.path.join(current_dir,Nome_cartella_immagine) #path associato alle immagini di test
TEST_mask_path    = os.path.join(current_dir,Nome_cartella_maschere) #path associato alle maschere correlate alle immagini del test

# creazione delle liste ordinate 
test_volumes = sorted(os.listdir(TEST_volumes_path))
test_masks =   sorted(os.listdir(TEST_mask_path))

Creazione del test set dict

In [None]:
test_data = [] # Inizializzazione della lista di tris di immagini di test
for i in range(len(test_volumes)): #ciclo sul primo set di immagini di test 
    tempDict = {
        'image': os.path.join(TEST_volumes_path, test_volumes[i]),
        'segmentation': os.path.join(TEST_mask_path, test_masks[i])}
    
    test_data.append(tempDict)

# MODELLO DELLA RETE

In [None]:
# Definiamo il device da utilizzare: GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = UNet(
    spatial_dims=2,
    in_channels=1,
    out_channels=1,
    channels=(16, 32, 64, 128, 256),
    strides=(2, 2, 2, 2),
    num_res_units=2,
    norm=Norm.BATCH, 
).to(device)

# Definizione della loss function
loss_function = DiceCELoss(sigmoid = True)
torch.backends.cudnn.benchmark = True

# Definizione dell'ottimizatore
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)

# Definizione delle trasformazioni di post-processing
post_pred = Compose([EnsureType(), Activations(sigmoid=True), AsDiscrete(threshold=0.5)]) 
post_label = Compose([EnsureType(), AsDiscrete(threshold=0.5)])
post_image = Compose([EnsureType()])

# Definizione del tipo di Inferenza da applicare con il modello allenato
inferer = SimpleInferer()

# Definizione un seed per renderere ripetibili tutte le opearazioni che andiamo ad effettuare
# Questa operazione non è sempre possibile in quanto alcune operazioni hanno condizioni aleatorie intrinseche,
# in quel caso possono essere segnalati errori e warning che risolviamo eliminando questa condizione
set_determinism(seed=46)

# CARICAMENTO DEL MODELLO

In [None]:
model.load_state_dict(torch.load(os.path.join(path, Nome_best_metric_model), map_location=torch.device('cpu')))

<All keys matched successfully>

# DIMENSIONI ORIGINALI DELLE SLICES

In [None]:
dim_originali_test=[] #Inizializzazione di una lista che conterra le dimensioni originali delle immagini del test set

for ind in range(len(test_data)): #iterazione sulla lunghezza del test set e, per ogni iterazione, si effettua:
  input_image = nrrd.read(test_data[ind]["image"]) #lettura dell'immagine volumetrica
  input_image=input_image[0]
  dim_originali_test.append((input_image.shape[1],input_image.shape[2]))  #lista che contiene le dimensioni originali delle slice di ogni volume

# PRE PROCESSING e POST PROCESSING

Funzione che permette di generare una maschera binaria, tramite la tecnica del global thresholding, a partire dall'immagine pre elaborata.


*   x: immagine 2D pre-processata
*   Threshold: soglia per il global thresholding





In [None]:
def img_to_mask(x,threshold):
  mask= x.copy()
  mask[mask<threshold] = 0
  mask[mask>threshold] = 1
  return mask

Funzione che permette di gestire gli artefatti di tipo lineare

*   x: immagine 2D pre-processata
*   mask: maschera ottenuta dalla funzione img_to_mask



In [None]:
def delartifact(x,mask):
  kernel = np.ones((3,3)) #Definizione dell'elemento strutturale (kernel 3x3)
  dilated = binary_dilation(mask, structure=kernel) # Dilatazione degli elementi con intensità di pixel=1
  mask = binary_fill_holes(mask) #riempimento degli spazi vuoti all'interno delle segmentazioni
  mask = binary_erosion(dilated,structure=kernel) # Erosione degli elementi con intensità di pixel=1

  image_mask = x*mask #Maschera sovrapposta all'immagine
  regions = regionprops(label(mask),image_mask) # Calcola le proprietà delle regioni

  for region in regions:
    tumor_coords = region.coords # Estrazione delle coordinate dei pixel all'interno della regione del tumore      
    minr, minc, maxr, maxc = region.bbox #Estrazione delle coordinate della bounding box

    if ((maxr-minr)>50 and (maxc-minc)<35) or ((maxr-minr)>150 and (maxc-minc)<37) or ((maxr-minr)<37 and (maxc-minc)>150) or ((maxr-minr)>20 and (maxc-minc)<8): #Condizione per eliminare l'artefatto
      x[tumor_coords[:, 0], tumor_coords[:, 1]]=np.min(x) #eliminazione dell'artefatto dall'immagine

  return x

Definizione di una funzione di pre processing


*   x: immagine 2D originale
*   y: maschera manuale associata all'immagine x

In [None]:
def preprocessing(x,y):
  
  #Eliminazione del canale --> (256,256)
  x = x.squeeze()
  y = y.squeeze()

  #Conversione delle immagini in formato numpy
  x = x.cpu().numpy()
  y = y.cpu().numpy()
  
  #Applicazione del filtro mediano con dimensione del kernel pari a 3
  x = median_filter(x,3)

  #Gestione degli artefatti
  mask = img_to_mask(x,1700)
  condizione = np.count_nonzero(mask)
  if condizione !=0: #se la maschera ha almeno un elemento uguale a 1
    x = delartifact(x,mask)
  
  return x,y

Definizione di una funzione di post processing

*   x: maschera automatica in output al modello 
*   y: maschera manuale associata all'immagine z
*   z: immagine 2D pre-processata
*   dim: lista composta da tuple contenenti le dimensioni originali delle immagini
*   step: indice utilizzato per iterare nel set di immagini



In [None]:
def postprocessing(x,y,z,dim,step):

  x = x.cpu().numpy() # x: test_outputs_post (maschera automatica)
  y = y.cpu().numpy() # y: test_labels_post (maschera manuale)
  z = z.cpu().numpy() # z: test_inputs_post (immagine)

  x = binary_fill_holes(x) # Rifinimento della segmentazione (riempimento dei buchi)   

  # Ridimensionamento delle immagini e maschere con dimensioni originali 
  x = np.resize(x,dim[step-1])
  y = np.resize(y,dim[step-1])
  z = np.resize(z,dim[step-1])
  
  return x,y,z

# FUNZIONE PER IL SALVATAGGIO



Funzione per il salvataggio delle immagini in formato .nrrd

*   list_val_output: Tensore con shape [256,1,256,256]
*   stepFd: path della cartella all'interno della quale salvare i volumi
*   fdName: path dell'immagine

In [None]:
expFd = os.path.join(path,'Test-image') #creazione della cartella in cui verranno salvate le maschere automatiche volumetriche

if not os.path.isdir(expFd):
  os.mkdir(expFd) #crea la directory

def Savenrrd(list_val_output,stepFd,fdName):
  
  #Estrazione del nome dell'immagine a partire da fdName
  parts = fdName.split('/') # suddivide la stringa in base al separatore '/'
  parts = parts[3:] # elimina la prima parte della lista
  string = '/'.join(parts) # ricostruisce la stringa eliminando la parte specificata

  #per creare il file path dell'intero volume 3D 
  file_path = os.path.join(stepFd, '{}.nrrd'.format(string + '_output')) # crea il percorso completo del file in cui salvare l'immagine

  image_3d_numpy = list_val_output.squeeze().cpu().numpy().transpose(1,2,0) # trasforma image_3d in un array numpy - image_3d_numpy.shape --> (256,256,256)
  nrrd.write(file_path, image_3d_numpy) # scrive l'array NumPy in un file .nrrd

# IMPUTAZIONE

Trasformazioni da applicare al set di Test

In [None]:
test_transforms = Compose(
    [
        LoadImaged(keys=["image","segmentation"],image_only=False, reader=ITKReader()), # caricamento dell'immagine e della segmentazione
        EnsureChannelFirstd(keys=["image","segmentation"]), # aggiunge il canale all' immagine e alla segmentazione ottenendo il formato channel-first da (NxN) a (1xNxN)
        AdjustContrastd(keys=["image"],gamma=1.5), # modifica il contrasto dell'immagine 
        Resized(keys=["image", "segmentation"], spatial_size=[256,256,256]), # resize dell'immagine a 256x256
        ToTensord(keys=["image", "segmentation"]), # per portare immagini e segmentazioni da formato PIL a un torch tensor da dare in input alla rete
    ]
)

# Creazione di dataset iterabili utili durante l'imputazione
test_ds = IterableDataset(data = test_data, transform = test_transforms)
test_loader = DataLoader(test_ds, batch_size=1, num_workers=0, pin_memory=True, shuffle=False)

Funzione che permette di trasformare le matrici (256,256) numpy in tensori con dimensioni [batch_size, channels, height, width]

*   x2: immagine 2D pre-processata (formato numpy) 
*   y2: maschera manuale 2D (formato numpy) 
*   i: indica l'i-esima immagine del batch
*   d: per indicare se si tratta di un operazione che viene svolta nel processo di pre o post-processing
*   x: tensore di immagini creato nell'iterazione precedente nel formato nel formato [batch_size, channels, height, width]
*   y: tensore di maschere creato nell'iterazione precedente nel formato [batch_size, channels, height, width]

In [None]:
def NumTensor(x2, y2, i, d, x=None, y=None):
  Totensor = transforms.ToTensor() # Questa operazione ruota l'immagine

  if d == 'pre': #operazioni che vengono applicate sulle immagini di pre-processing
    x3 = Totensor(x2).unsqueeze(0).float().to('cuda') #si utilizza la gpu
    y3 = Totensor(y2).unsqueeze(0).to('cuda')

    if i == 0: # Prima iterazione
      x, y = x3, y3
    else: # Iterazioni successive
      x = torch.cat((x, x3), dim=0) #concatena i tensori x3 a x
      y = torch.cat((y, y3), dim=0)
  else: #operazioni che vengono applicate sulle immagini di post-processing
    x3 = Totensor(x2).unsqueeze(0).float().to('cpu') #si utilizza la cpu
    y3 = Totensor(y2).unsqueeze(0).to('cpu')

    if i == 0: # Prima iterazione
      x, y = x3, y3
    else: # Iterazioni successive
      x = torch.cat((x, x3), dim=0) #concatena i tensori x3 a x
      y = torch.cat((y, y3), dim=0)
  
  return x, y

Definizione delle metriche

In [None]:
dice_metric_3D = DiceMetric(include_background=True, reduction="mean", get_not_nans=False)
recall_metric_3D = ConfusionMatrixMetric(include_background=True, metric_name='sensitivity', compute_sample=False, reduction="mean", get_not_nans=False) 
precision_metric_3D = ConfusionMatrixMetric(include_background=True, metric_name='precision', compute_sample=False, reduction="mean", get_not_nans=False)

In [None]:
epoch_iterator = tqdm(test_loader, desc="Testing (X / X Steps)", dynamic_ncols=True,total=len(test_data), position=0, leave=True) # crea un oggetto tqdm per tracciare l'avenzamento delle epoche

dice_test_3D = []
precision_test_3D = []
recall_test_3D = []

test_inputs = torch.empty(256, 1, 256, 256, device='cuda') #inizializzazione di un tensore vuoto nel formato [256,1,256,256] per le immagini in output alla funzione di pre-processing
test_labels = torch.empty(256, 1, 256, 256, device='cuda') #inizializzazione di un tensore vuoto nel formato [256,1,256,256] per le maschere manuali in output alla funzione di pre-processing
test_outputs_def = torch.empty(256, 1, 256, 256, device='cpu') #inizializzazione di un tensore vuoto nel formato [256,1,256,256] per le immagini in output alla funzione di post-processing
test_labels_def = torch.empty(256, 1, 256, 256, device='cpu') #inizializzazione di un tensore vuoto nel formato [256,1,256,256] per le maschere manuali in output alla funzione di post-processing

with torch.no_grad():
  for step, batch in enumerate(epoch_iterator): # ad ogni ciclo for estrae una coppia step (numero dell'iterazione), batch (immagine e segmentazione) dal train loader
      step += 1

      x0, y0 = (batch["image"].to('cuda'), batch["segmentation"].to('cuda')) # manda i tensori di immagine e maschera alla GPU --> (1,1,256,256,256)
      fdName = batch['image_meta_dict']['filename_or_obj'][-1][:-5] #nome dell'immagine

      #PRE-PROCESSING
      for i in range(x0.shape[2]):
        test_inputs1,test_labels1 = x0[:,:,:,:,i], y0[:,:,:,:,i] # --> test_inputs1/test_labels1.shape (1,1,256,256)
        
        #Applicazione della funzione pre-processing
        test_inputs_pre,test_labels_pre = preprocessing(test_inputs1,test_labels1) #test_inputs_pre/test_labels_pre.shape (256,256)
        
        #Applicazione della funzione NumTensor
        test_inputs,test_labels = NumTensor(test_inputs_pre, test_labels_pre, i,d='pre', x=test_inputs, y=test_labels) # --> test_inputs.shape (256,1,256,256)  
      
      #INFERENCE: applicazione del modello all'input
      test_outputs0 = inferer(test_inputs, model) #--> (256,1,256,256)

      test_outputs_dec = [post_pred(i) for i in decollate_batch(test_outputs0.squeeze())] #per riportare i tensori nella forma--> (256,256,256) 
      test_labels_dec  = [post_label(i) for i in decollate_batch(test_labels.squeeze())] # scompone il tenosore nelle varie slice 
      test_inputs_dec  = [post_image(i) for i in decollate_batch(test_inputs.squeeze())] # lista di 256 item contenente un MetaTensor con shape [256,256]


      #POST-PROCESSING
      for i in range(test_outputs0.shape[0]):
        test_outputs1,test_labels1,test_inputs1 = test_outputs_dec[i], test_labels_dec[i], test_inputs_dec[i] # tensori con shape [256,256]
        
        #Applicazione della funzione post-processing
        test_outputs_post,test_labels_post,test_inputs_post = postprocessing(test_outputs1,test_labels1, test_inputs1,dim_originali_test,step) #test_outputs_post.shape --> (256,256)

        #Applicazione della funzione NumTensor
        test_outputs_def,test_labels_def = NumTensor(test_outputs_post, test_labels_post, i, d='post', x=test_outputs_def, y=test_labels_def) # --> test_inputs.shape (256,1,256,256)
        

      #Calcolo del Dice di un intera immagine 3D
      dice_metric_3D(y_pred=test_outputs_def, y=test_labels_def)
      dice_3D = dice_metric_3D.aggregate().item() #Valore di Dice su 256 slices
      dice_test_3D.append(dice_3D) 

      #Calcolo della precision di un intera immagine 3D
      precision_metric_3D(y_pred=test_outputs_def, y=test_labels_def)
      precision_3D = precision_metric_3D.aggregate() #Valore di Precision su 256 slices
      precision_3D = precision_3D[0].item()
      precision_test_3D.append(precision_3D)

      #Calcolo della recall di un intera immagine 3D
      recall_metric_3D(y_pred=test_outputs_def, y=test_labels_def)
      recall_3D = recall_metric_3D.aggregate() #Valore di Recall su 256 slices
      recall_3D = recall_3D[0].item()
      recall_test_3D.append(recall_3D) 


      #SALVATAGGIO intera immagine 3D
      Savenrrd(test_outputs_def,expFd,fdName)
      epoch_iterator.set_description("Testing (%d / %d Steps)" % (step, len(test_data))) # aggiorna il counter tqdm