## **Elaborazione di Immagini Mediche**
### 2021/22 - VESSELWALL SEGMENTATION CHALLENGE


# Import



*   Import delle librerie necessarie



In [1]:
import os
import random
import numpy as np
import shutil
import math
import re
from copy import deepcopy
import statistics

import plotly.express as px
from matplotlib import pyplot as plt

from skimage import util
from skimage.io import imread, imshow, imsave

import glob
import pydicom
import json


## Functions

In [2]:
def sorted_alphanum(data):
    convert = lambda text: int(text) if text.isdigit() else text.lower()
    alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key) ] 
    return sorted(data, key=alphanum_key)

# Metrics

* Preparazione e creazione cartelle necessarie

In [3]:
data_type = ('DATASET_Intermedio','RESULTS')
set_type = ('TRAIN','VALIDATION','TEST')
side_type = ('_Left','_Right')
masks_name = ('ICAL','ECAL','ICAR','ECAR')

path = os.getcwd()

train_dir = os.path.join(path,data_type[0],set_type[0])
val_dir = os.path.join(path,data_type[0],set_type[1])

newTrain_dir = os.path.join(path,data_type[1],set_type[0])
newVal_dir = os.path.join(path,data_type[1],set_type[1])

listOfPz_train = sorted_alphanum(os.listdir(train_dir))     
listOfPz_val = sorted_alphanum(os.listdir(val_dir)) 

listOfPz_newTrain = sorted_alphanum(os.listdir(newTrain_dir))     
listOfPz_newVal = sorted_alphanum(os.listdir(newVal_dir)) 

* Metriche:

In [4]:
#------------------------- Training Set:
dict_train = {pz:{} for pz in listOfPz_train}
metrics_train = {pz:{} for pz in listOfPz_train}
info_train = {pz:[{'DSC':None}, {'RVD':None}, {'cont_Auto_not_Exists':None}, {'cont_Man_not_Exists':None}, {'Tot_Masks':None}, {'Found[%]':None}] for pz in listOfPz_train}

# alloco in memoria le maschere manuali:
for pz in listOfPz_train:
    pi = pz.split('_')[1]
    list_of_img_man = sorted_alphanum(os.listdir(train_dir+'/'+pz))   #Lista delle immagini e maschere per il paziente

    list_of_masks_man = [img for img in list_of_img_man if 'lume' in img or 'wall' in img]
    
    dict_train[pz] = {mask_: imread(train_dir+'/'+pz+'/'+mask_) for mask_ in list_of_masks_man}
    metrics_train[pz] = {mask_: [{'DSC':'auto_not_exists'}, {'RVD':'auto_not_exists'}] for mask_ in list_of_masks_man}
# leggo man mano le maschere automatiche e confronto con le manuali allocate in memoria:
for pz in listOfPz_newTrain:
    pi = pz.split('_')[1]
    list_of_img_auto = sorted_alphanum(os.listdir(newTrain_dir+'/'+pz))   #Lista delle immagini e maschere per il paziente

    list_of_masks_auto = [img for img in list_of_img_auto if 'lume' in img or 'wall' in img]

    for mask_ in list_of_masks_auto:
        if mask_ in dict_train[pz].keys():    # se della maschera auto (in RESULTS) è presente anche la manuale (in DATASET_intermedio)
            mask_auto = imread(newTrain_dir+'/'+pz+'/'+mask_).astype('bool')
            mask_manual = dict_train[pz][mask_].astype('bool')
            if 'wall' in mask_:
                dice = 2*(np.sum(mask_auto*mask_manual)) / (np.sum(mask_manual) + np.sum(mask_auto))
                # ricompongo maschere wall e lume per calcolo RVD:
                name_pt1 = mask_.split('_')[0]
                name_pt2 = mask_.split('_')[1]
                name_lume = name_pt1+'_'+name_pt2+'_lume.png'

                mask_auto_lume = imread(newTrain_dir+'/'+pz+'/'+name_lume).astype('bool')
                mask_auto_lumewall = np.logical_xor(mask_auto,mask_auto_lume)
                mask_manual_lume = dict_train[pz][name_lume].astype('bool')
                mask_manual_lumewall = np.logical_xor(mask_manual, mask_manual_lume)

                rvd_lumewall = (np.sum(mask_auto_lumewall) - np.sum(mask_manual_lumewall)) / np.sum(mask_manual_lumewall)
                metrics_train[pz][mask_][1]['RVD'] = rvd_lumewall
                
            if 'lume' in mask_:
                dice = None   # Richiesto da consegna
                rvd_lume = (np.sum(mask_auto) - np.sum(mask_manual)) / np.sum(mask_manual)
                metrics_train[pz][mask_][1]['RVD'] = rvd_lume

            metrics_train[pz][mask_][0]['DSC'] = dice
            
        else:
            metrics_train[pz][mask_] = [{'DSC':'manual_not_exists'}, {'RVD':'manual_not_exists'}]
             
#------------------------- Validation Set:
dict_val = {pz:{} for pz in listOfPz_val}
metrics_val = {pz:{} for pz in listOfPz_val}
info_val = {pz:[{'DSC':None}, {'RVD':None}, {'cont_Auto_not_Exists':None}, {'cont_Man_not_Exists':None}, {'Tot_Masks':None}, {'Found[%]':None}] for pz in listOfPz_val}

# Alloco in memoria le manuali:
for pz in listOfPz_val:
    pi = pz.split('_')[1]
    list_of_img_man = sorted_alphanum(os.listdir(val_dir+'/'+pz))   #Lista delle immagini e maschere per il paziente

    list_of_masks_man = [img for img in list_of_img_man if 'lume' in img or 'wall' in img]
    
    dict_val[pz] = {mask_: imread(val_dir+'/'+pz+'/'+mask_) for mask_ in list_of_masks_man}
    metrics_val[pz] = {mask_: [{'DSC':'auto_not_exists'}, {'RVD':'auto_not_exists'}] for mask_ in list_of_masks_man}

# Scorro man mano tra le automatiche e le paragono con le manuali allocate in memoria:
for pz in listOfPz_newVal:
    pi = pz.split('_')[1]
    list_of_img_auto = sorted_alphanum(os.listdir(newVal_dir+'/'+pz))   #Lista delle immagini e maschere per il paziente

    list_of_masks_auto = [img for img in list_of_img_auto if 'lume' in img or 'wall' in img]

    for mask_ in list_of_masks_auto:
        if mask_ in dict_val[pz].keys():    # se la maschera in RESULTS è presente nel DATASET intermedio
            mask_auto = imread(newVal_dir+'/'+pz+'/'+mask_).astype('bool')
            mask_manual = dict_val[pz][mask_].astype('bool')
            if 'wall' in mask_:
                dice = 2*(np.sum(mask_auto*mask_manual)) / (np.sum(mask_manual) + np.sum(mask_auto))
                name_pt1 = mask_.split('_')[0]
                name_pt2 = mask_.split('_')[1]
                name_lume = name_pt1+'_'+name_pt2+'_lume.png'

                mask_auto_lume = imread(newVal_dir+'/'+pz+'/'+name_lume).astype('bool')
                mask_auto_lumewall = np.logical_xor(mask_auto,mask_auto_lume)
                mask_manual_lume = dict_val[pz][name_lume].astype('bool')
                mask_manual_lumewall = np.logical_xor(mask_manual, mask_manual_lume)

                rvd_lumewall = (np.sum(mask_auto_lumewall) - np.sum(mask_manual_lumewall)) / np.sum(mask_manual_lumewall)
                metrics_val[pz][mask_][1]['RVD'] = rvd_lumewall
                
            if 'lume' in mask_:
                dice = None   #Richiesto da consegna
                rvd_lume = (np.sum(mask_auto) - np.sum(mask_manual)) / np.sum(mask_manual)
                metrics_val[pz][mask_][1]['RVD'] = rvd_lume

            metrics_val[pz][mask_][0]['DSC'] = dice
            
        else:
            metrics_val[pz][mask_] = [{'DSC':'manual_not_exists'}, {'RVD':'manual_not_exists'}]    # se la maschere in RESULTS non è nel DATASET intermedio

print(f"""Casi:
1) Maschere sono presenti in entrambe: ottengo valore di DSC e RVD.
2) Dice lume: None (Richiesto dalla consegna, lo calcolo solo del wall)
3) Se non presenti le automatiche: auto_not_exists.
4) Se non presenti le manuali: manual_not_exists.""")

Casi:
1) Maschere sono presenti in entrambe: ottengo valore di DSC e RVD.
2) Dice lume: None (Richiesto dalla consegna, lo calcolo solo del wall)
3) Se non presenti le automatiche: auto_not_exists.
4) Se non presenti le manuali: manual_not_exists.


* Calcolo informazioni riassuntive per ogni paziente:

In [5]:
#------------------------- Training Set:
for pz in metrics_train.keys():
    val_dsc = 0
    val_rvd = 0
    cont_dsc = 0
    cont_rvd = 0

    n_AnotE = 0
    n_MnotE = 0

    tot_masks = len(metrics_train[pz].keys())

    for mask_ in metrics_train[pz].keys():
        if isinstance(metrics_train[pz][mask_][0]['DSC'], (int, float)):
            val_dsc += metrics_train[pz][mask_][0]['DSC']
            cont_dsc += 1
        if isinstance(metrics_train[pz][mask_][1]['RVD'], (int, float)):
            val_rvd += metrics_train[pz][mask_][1]['RVD']
            cont_rvd += 1
        if metrics_train[pz][mask_][1]['RVD'] == 'auto_not_exists':
            n_AnotE += 1
        if metrics_train[pz][mask_][1]['RVD'] == 'manual_not_exists':
            n_MnotE += 1
    if cont_dsc != 0:
        info_train[pz][0]['DSC'] = val_dsc/cont_dsc
    if cont_rvd != 0:
        info_train[pz][1]['RVD'] = val_rvd/cont_rvd
    info_train[pz][2]['cont_Auto_not_Exists'] = n_AnotE
    info_train[pz][3]['cont_Man_not_Exists'] = n_MnotE
    info_train[pz][4]['Tot_Masks'] = tot_masks
    if tot_masks != 0:
        info_train[pz][5]['Found[%]'] = ((tot_masks-n_AnotE) - n_MnotE) / (tot_masks-n_AnotE) * 100

#------------------------- Validation Set:
for pz in metrics_val.keys():
    val_dsc = 0
    val_rvd = 0
    cont_dsc = 0
    cont_rvd = 0

    n_AnotE = 0
    n_MnotE = 0

    tot_masks = len(metrics_val[pz].keys())

    for mask_ in metrics_val[pz].keys():
        if isinstance(metrics_val[pz][mask_][0]['DSC'], (int, float)):
            val_dsc += metrics_val[pz][mask_][0]['DSC']
            cont_dsc += 1
        if isinstance(metrics_val[pz][mask_][1]['RVD'], (int, float)):
            val_rvd += metrics_val[pz][mask_][1]['RVD']
            cont_rvd += 1
        if metrics_val[pz][mask_][1]['RVD'] == 'auto_not_exists':
            n_AnotE += 1
        if metrics_val[pz][mask_][1]['RVD'] == 'manual_not_exists':
            n_MnotE += 1
        
    if cont_dsc != 0:
        info_val[pz][0]['DSC'] = val_dsc/cont_dsc
    if cont_rvd != 0:
        info_val[pz][1]['RVD'] = val_rvd/cont_rvd
    info_val[pz][2]['cont_Auto_not_Exists'] = n_AnotE
    info_val[pz][3]['cont_Man_not_Exists'] = n_MnotE
    info_val[pz][4]['Tot_Masks'] = tot_masks
    if tot_masks != 0:
        info_val[pz][5]['Found[%]'] = ((tot_masks-n_AnotE) - n_MnotE) / (tot_masks-n_AnotE) * 100

Calcolo DSC e RVD medi per Training e Validation Set

In [6]:
#------------------------- Training Set:
dsc_tot = []
rvd_tot = []
found_tot = []

for pz in info_train.keys():
    dsc, rvd, found = info_train[pz][0]['DSC'], info_train[pz][1]['RVD'], info_train[pz][5]['Found[%]']
    if dsc != None:
        dsc_tot.append(dsc)
        rvd_tot.append(rvd)
        found_tot.append(found)

mean_dsc = statistics.mean(dsc_tot)
devStd_dsc = statistics.stdev(dsc_tot)
mean_rvd = statistics.mean(rvd_tot)
devStd_rvd = statistics.stdev(rvd_tot)

mean_found = statistics.mean(found_tot)

resume_train = {'Mean-DevStd DSC': [mean_dsc,devStd_dsc], 'Mean-DevStd RVD': [mean_rvd,devStd_rvd], 'Found[%]': mean_found}

print(f"Metriche finali per l'intero Training and Validation Set:")
print_toSave_train = f"""Training Set:
DSC [mean +- devStd]: {mean_dsc:.3f} +- {devStd_dsc:.3f}
RVD [mean +- devStd]: {mean_rvd:.3f} +- {devStd_rvd:.3f}"""

print(f'{print_toSave_train}')

#------------------------- Validation Set:
dsc_tot = []
rvd_tot = []
found_tot = []

for pz in info_val.keys():
    dsc, rvd, found = info_val[pz][0]['DSC'], info_val[pz][1]['RVD'], info_val[pz][5]['Found[%]']
    if dsc != None:
        dsc_tot.append(dsc)
        rvd_tot.append(rvd)
        found_tot.append(found)

mean_dsc = statistics.mean(dsc_tot)
devStd_dsc = statistics.stdev(dsc_tot)
mean_rvd = statistics.mean(rvd_tot)
devStd_rvd = statistics.stdev(rvd_tot)

mean_found = statistics.mean(found_tot)

resume_val = {'Mean-DevStd DSC': [mean_dsc,devStd_dsc], 'Mean-DevStd RVD': [mean_rvd,devStd_rvd], 'Found[%]': mean_found}

print_toSave_val = f"""\nValidation Set:
DSC [mean +- devStd]: {mean_dsc:.3f} +- {devStd_dsc:.3f}
RVD [mean +- devStd]: {mean_rvd:.3f} +- {devStd_rvd:.3f}"""

print(f'{print_toSave_val}')

Metriche finali per l'intero Training and Validation Set:
Training Set:
DSC [mean +- devStd]: 0.722 +- 0.044
RVD [mean +- devStd]: 0.037 +- 0.027

Validation Set:
DSC [mean +- devStd]: 0.696 +- 0.034
RVD [mean +- devStd]: 0.016 +- 0.043


* Saving all metrics' information in JSON files

In [7]:
#------------Metrics
filename_train = 'Metrics_train.json'
filename_val = 'Metrics_val.json'

json.dump(metrics_train, open(filename_train, 'w'), indent=3)
json.dump(metrics_val, open(filename_val, 'w'), indent=3)

#------------Info 
filename_train = 'Info_train.json'
filename_val = 'Info_val.json'

json.dump([info_train,resume_train], open(filename_train, 'w'), indent=3)
json.dump([info_val,resume_val], open(filename_val, 'w'), indent=3)