In [27]:
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.optimizers import SGD,Adam
import os 
import keras
from keras.preprocessing.image import load_img, smart_resize
import tensorflow as tf
from tensorflow.keras import layers
import skimage.io as io
import skimage.transform as trans
from keras.models import *
from keras.layers import *
from keras.optimizers import *
from keras.callbacks import ModelCheckpoint, LearningRateScheduler
from keras import backend as keras
from google.colab import drive
drive.mount('drive')
!mkdir -p drive -v
cwd = os.getcwd()

Drive already mounted at drive; to attempt to forcibly remount, call drive.mount("drive", force_remount=True).


In [36]:
# PARAMS
VALIDATION_DATASET_SIZE = 10
DATASET_SIZE = 100
DATASET_PATH = 'drive/My Drive/projet_thematique/training'
TRAINING_IMAGE_SIZE = VALIDATION_IMAGE_SIZE = (216,256,10)
TRAINING_BATCH_SIZE = 3
VALIDATION_BATCH_SIZE = 3
NUMBER_OF_CHANNELS = 1
SHUFFLE_DATA = True
TRANSFORM = True
BATCH_SIZE = 10
CLASSES = 5
LEARNING_RATE = 0.001
PRETRAINED_WEIGHTS = None
NBR_CLASSES = 5

DESCRIPTION :
The script provide helpers functions to handle nifti image format:
    - load_nii()
    - save_nii()

to generate metrics for two images:
    - metrics()

And it is callable from the command line (see below).
Each function provided in this script has comments to understand
how they works.

HOW-TO:

This script was tested for python 3.4.

First, you need to install the required packages with
    pip install -r requirements.txt

After the installation, you have two ways of running this script:
    1) python metrics.py ground_truth/patient001_ED.nii.gz prediction/patient001_ED.nii.gz
    2) python metrics.py ground_truth/ prediction/

The first option will print in the console the dice and volume of each class for the given image.
The second option wiil ouput a csv file where each images will have the dice and volume of each class.


Link: http://acdc.creatis.insa-lyon.fr


In [37]:
"""
author: Clément Zotti (clement.zotti@usherbrooke.ca)
date: April 2017

DESCRIPTION :
The script provide helpers functions to handle nifti image format:
    - load_nii()
    - save_nii()

to generate metrics for two images:
    - metrics()

And it is callable from the command line (see below).
Each function provided in this script has comments to understand
how they works.

HOW-TO:

This script was tested for python 3.4.

First, you need to install the required packages with
    pip install -r requirements.txt

After the installation, you have two ways of running this script:
    1) python metrics.py ground_truth/patient001_ED.nii.gz prediction/patient001_ED.nii.gz
    2) python metrics.py ground_truth/ prediction/

The first option will print in the console the dice and volume of each class for the given image.
The second option wiil ouput a csv file where each images will have the dice and volume of each class.


Link: http://acdc.creatis.insa-lyon.fr

"""

import os
from glob import glob
import time
import re
import argparse
import nibabel as nib
import pandas as pd
#from medpy.metric.binary import hd, dc
import numpy as np
import matplotlib.pyplot as plt


HEADER = ["Name", "Dice LV", "Volume LV", "Err LV(ml)",
          "Dice RV", "Volume RV", "Err RV(ml)",
          "Dice MYO", "Volume MYO", "Err MYO(ml)"]

#
# Utils functions used to sort strings into a natural order
#
def conv_int(i):
    return int(i) if i.isdigit() else i


def natural_order(sord):
    """
    Sort a (list,tuple) of strings into natural order.

    Ex:

    ['1','10','2'] -> ['1','2','10']

    ['abc1def','ab10d','b2c','ab1d'] -> ['ab1d','ab10d', 'abc1def', 'b2c']

    """
    if isinstance(sord, tuple):
        sord = sord[0]
    return [conv_int(c) for c in re.split(r'(\d+)', sord)]


#
# Utils function to load and save nifti files with the nibabel package
#
def load_nii(img_path):
    """
    Function to load a 'nii' or 'nii.gz' file, The function returns
    everyting needed to save another 'nii' or 'nii.gz'
    in the same dimensional space, i.e. the affine matrix and the header

    Parameters
    ----------

    img_path: string
    String with the path of the 'nii' or 'nii.gz' image file name.

    Returns
    -------
    Three element, the first is a numpy array of the image values,
    the second is the affine transformation of the image, and the
    last one is the header of the image.
    """
    nimg = nib.load(img_path)
    return nimg.get_data(), nimg.affine, nimg.header


def save_nii(img_path, data, affine, header):
    """
    Function to save a 'nii' or 'nii.gz' file.

    Parameters
    ----------

    img_path: string
    Path to save the image should be ending with '.nii' or '.nii.gz'.

    data: np.array
    Numpy array of the image data.

    affine: list of list or np.array
    The affine transformation to save with the image.

    header: nib.Nifti1Header
    The header that define everything about the data
    (pleasecheck nibabel documentation).
    """
    nimg = nib.Nifti1Image(data, affine=affine, header=header)
    nimg.to_filename(img_path)


#
# Functions to process files, directories and metrics
#
def metrics(img_gt, img_pred, voxel_size):
    """
    Function to compute the metrics between two segmentation maps given as input.

    Parameters
    ----------
    img_gt: np.array
    Array of the ground truth segmentation map.

    img_pred: np.array
    Array of the predicted segmentation map.

    voxel_size: list, tuple or np.array
    The size of a voxel of the images used to compute the volumes.

    Return
    ------
    A list of metrics in this order, [Dice LV, Volume LV, Err LV(ml),
    Dice RV, Volume RV, Err RV(ml), Dice MYO, Volume MYO, Err MYO(ml)]
    """

    if img_gt.ndim != img_pred.ndim:
        raise ValueError("The arrays 'img_gt' and 'img_pred' should have the "
                         "same dimension, {} against {}".format(img_gt.ndim,
                                                                img_pred.ndim))

    res = []
    # Loop on each classes of the input images
    for c in [3, 1, 2]:
        # Copy the gt image to not alterate the input
        gt_c_i = np.copy(img_gt)
        gt_c_i[gt_c_i != c] = 0

        # Copy the pred image to not alterate the input
        pred_c_i = np.copy(img_pred)
        pred_c_i[pred_c_i != c] = 0

        # Clip the value to compute the volumes
        gt_c_i = np.clip(gt_c_i, 0, 1)
        pred_c_i = np.clip(pred_c_i, 0, 1)

        # Compute the Dice
        dice = dc(gt_c_i, pred_c_i)

        # Compute volume
        volpred = pred_c_i.sum() * np.prod(voxel_size) / 1000.
        volgt = gt_c_i.sum() * np.prod(voxel_size) / 1000.

        res += [dice, volpred, volpred-volgt]

    return res


def compute_metrics_on_files(path_gt, path_pred):
    """
    Function to give the metrics for two files

    Parameters
    ----------

    path_gt: string
    Path of the ground truth image.

    path_pred: string
    Path of the predicted image.
    """
    gt, _, header = load_nii(path_gt)
    pred, _, _ = load_nii(path_pred)
    zooms = header.get_zooms()

    name = os.path.basename(path_gt)
    name = name.split('.')[0]
    res = metrics(gt, pred, zooms)
    res = ["{:.3f}".format(r) for r in res]

    formatting = "{:>14}, {:>7}, {:>9}, {:>10}, {:>7}, {:>9}, {:>10}, {:>8}, {:>10}, {:>11}"
    print(formatting.format(*HEADER))
    print(formatting.format(name, *res))


def compute_metrics_on_directories(dir_gt, dir_pred):
    """
    Function to generate a csv file for each images of two directories.

    Parameters
    ----------

    path_gt: string
    Directory of the ground truth segmentation maps.

    path_pred: string
    Directory of the predicted segmentation maps.
    """
    lst_gt = sorted(glob(os.path.join(dir_gt, '*')), key=natural_order)
    lst_pred = sorted(glob(os.path.join(dir_pred, '*')), key=natural_order)

    res = []
    for p_gt, p_pred in zip(lst_gt, lst_pred):
        if os.path.basename(p_gt) != os.path.basename(p_pred):
            raise ValueError("The two files don't have the same name"
                             " {}, {}.".format(os.path.basename(p_gt),
                                               os.path.basename(p_pred)))

        gt, _, header = load_nii(p_gt)
        pred, _, _ = load_nii(p_pred)
        zooms = header.get_zooms()
        res.append(metrics(gt, pred, zooms))

    lst_name_gt = [os.path.basename(gt).split(".")[0] for gt in lst_gt]
    res = [[n,] + r for r, n in zip(res, lst_name_gt)]
    df = pd.DataFrame(res, columns=HEADER)
    df.to_csv("results_{}.csv".format(time.strftime("%Y%m%d_%H%M%S")), index=False)

def main(path_gt, path_pred):
    """
    Main function to select which method to apply on the input parameters.
    """
    if os.path.isfile(path_gt) and os.path.isfile(path_pred):
        compute_metrics_on_files(path_gt, path_pred)
    elif os.path.isdir(path_gt) and os.path.isdir(path_pred):
        compute_metrics_on_directories(path_gt, path_pred)
    else:
        raise ValueError(
            "The paths given needs to be two directories or two files.")

# main('./training/patient001/patient001_4d.nii.gz', './training/patient001/patient001_4d.nii.gz')

def discover_dataset(path_gt, path_pred):
    gt, _, header = load_nii(path_gt)
    pred, _, _ = load_nii(path_pred)
    zooms = header.get_zooms()
#   print(len(pred))
#   print(len(pred[0]))
#   print(len(pred[0][0]))
#   print(len(pred[0][0][0]))
    return(pred)

def read_file(path):

    a=load_nii(path)
    plt.imshow(a[0][:,:,5])
    print(a[0])
    print(len(a[0]))
    print(len(a[0][0]))
    print(len(a[0][0][0]))
    

# compute_metrics_on_files('./training/patient001/patient001_4d.nii.gz', './training/patient001/patient001_4d.nii.gz')

# for i in range(10):
#     plt.figure() 
#     plt.imshow(discover_dataset('./training/patient001/patient001_4d.nii.gz', './training/patient001/patient001_4d.nii.gz')[:,:,i,20])

# import

parametres

# générateurs

In [77]:
def create_generators(data_path=DATASET_PATH):
    'Returns three generators'
    image_paths = []
    label_paths = []
    for i in range(1,DATASET_SIZE + 1):
      if i == 100:
        patient = 'patient' + str(i)
      elif i > 9:
        patient = 'patient0' + str(i)
      else:
        patient = 'patient00' + str(i)

      folder_path = os.path.join(data_path, patient)
      image_paths.append(os.path.join(folder_path, patient+'_frame01.nii.gz'))
      image_paths.append(os.path.join(folder_path, patient+'_frame02.nii.gz'))
      label_paths.append(os.path.join(folder_path, patient+'_frame01_gt.nii.gz'))
      label_paths.append(os.path.join(folder_path, patient+'_frame02_gt.nii.gz'))
          

    x_train_list, y_train_list, x_val_list, y_val_list = data_split(image_paths, label_paths)

    train_data_generator = DataGeneratorClassifier(x_train_list, y_train_list,TRAINING_BATCH_SIZE, TRAINING_IMAGE_SIZE)
    validation_data_generator = DataGeneratorClassifier(x_val_list, y_val_list, VALIDATION_BATCH_SIZE, VALIDATION_IMAGE_SIZE, transform=False)
    return train_data_generator, validation_data_generator


def data_split(x_paths_list, y_paths_list):
    'Splits the paths list into three splits'
    # np.random.seed(0)
    # np.random.shuffle(paths_list)
    return x_paths_list[VALIDATION_DATASET_SIZE:], y_paths_list[VALIDATION_DATASET_SIZE:], x_paths_list[:VALIDATION_DATASET_SIZE], y_paths_list[:VALIDATION_DATASET_SIZE]




class DataGeneratorClassifier(tf.keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_IDs, list_labels, batch_size, image_size, data_path=DATASET_PATH, n_channels=NUMBER_OF_CHANNELS, shuffle=SHUFFLE_DATA, transform=TRANSFORM):
        'Initialisation'
        self.classes = os.listdir(data_path)
        self.image_size = image_size
        self.batch_size = BATCH_SIZE
        self.list_IDs = list_IDs
        self.list_labels = list_labels
        self.n_channels = n_channels
        self.shuffle = shuffle
        self.on_epoch_end()
        self.data_path=data_path
        self.transform=transform

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        list_IDs_temp = []
        list_labels_temp = []
        for i in range(index*self.batch_size, (index+1)*self.batch_size):
          list_IDs_temp.append(self.list_IDs[i])
          list_labels_temp.append(self.list_labels[i])


        # indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        # list_IDs_temp = self.list_IDs[indexes]
        # list_labels_temp = self.list_labels[indexes]
        X, y = self.__data_generation(list_IDs_temp, list_labels_temp)
        
        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
    # TODO
    def __data_generation(self, list_IDs_temp, list_labels_temp):
        'Generates data containing batch_size samples' # X : (n_samples, *image_size, n_channels)
        #X = np.empty((self.batch_size, *self.image_size, self.n_channels))
        #y = np.empty((self.batch_size, *self.image_size, self.n_channels), dtype=int)

        X = []
        y = []
        for i, ID in enumerate(list_IDs_temp):
            
            Xi = load_nii(ID)
            # Xi = smart_resize(np.asarray(Xi), self.image_size)
            X.append(Xi)

        for i, ID in enumerate(list_labels_temp):
            
            yi = load_nii(ID)
            y.append(yi)

        # if self.transform:
        #     data_augmentation = tf.keras.Sequential([layers.experimental.preprocessing.RandomFlip("horizontal_and_vertical"),
        #     layers.experimental.preprocessing.RandomRotation(0.8)])
        #     X=data_augmentation(X)

        return X,y

def show_batch(generator, batch_number=0):
    images, labels = generator.__getitem__(batch_number)
    fig,ax=plt.subplots(nrows=1, ncols=10)
    slices_indices=[1,2,3,4,5,6,7,8,9,10]
    for i in range(len(images)):
        for j in range(len(slices_indices)):
            images_slice=images[i,:,:,j]
            ax[j].imshow(images_slice,cmap='gray')
            ax[j].axis('off')
    plt.show

In [79]:
gt,gv=create_generators(DATASET_PATH)
images,labels=gt.__getitem__(0)
print(images)

#show_batch(gt,0)


* deprecated from version: 3.0
* Will raise <class 'nibabel.deprecator.ExpiredDeprecationError'> as of version: 5.0


[(array([[[ 0,  0,  0, ...,  0,  0,  0],
        [ 0,  0,  0, ...,  0,  0,  0],
        [ 0,  0,  0, ...,  0,  0,  0],
        ...,
        [ 0,  0,  0, ...,  0,  0,  0],
        [ 0,  0,  0, ...,  0,  0,  0],
        [ 0,  0,  0, ...,  0,  0,  0]],

       [[ 0,  1,  0, ...,  0,  0,  0],
        [ 0,  1,  2, ...,  0,  0,  1],
        [ 0,  1,  0, ...,  0,  2,  1],
        ...,
        [ 5,  0, 12, ...,  1, 17, 14],
        [ 8,  0, 12, ..., 12,  6, 10],
        [ 2,  0,  8, ...,  8,  6,  6]],

       [[ 0,  3,  0, ...,  0,  0,  0],
        [ 0,  1,  0, ...,  2,  0,  1],
        [ 0,  1,  0, ...,  2,  1,  2],
        ...,
        [39,  0, 19, ..., 11, 15,  4],
        [29,  0,  7, ...,  1,  8,  6],
        [ 0,  0, 15, ..., 12, 13,  2]],

       ...,

       [[ 0, 38,  0, ...,  0,  0,  0],
        [13, 80, 47, ..., 13,  9, 14],
        [ 2, 94, 17, ..., 13, 22, 33],
        ...,
        [ 2,  0,  0, ...,  1,  1,  2],
        [ 2,  0,  2, ...,  1,  1,  2],
        [ 1,  0,  1, ...,  5, 

# model

In [None]:
def unet(pretrained_weights = PRETRAINED_WEIGHTS,input_size = (TRAINING_IMAGE_SIZE[0],TRAINING_IMAGE_SIZE[1],TRAINING_IMAGE_SIZE[2],NUMBER_OF_CHANNELS)):
#     inputs = tf.random.normal(input_size)
    inputs = Input(input_size, batch_size = BATCH_SIZE)
    conv1 = Conv3D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(inputs)
    conv1 = Conv3D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv1)
    pool1 = MaxPooling3D(pool_size=(2, 2, 1))(conv1)
    conv2 = Conv3D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool1)
    conv2 = Conv3D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv2)
    pool2 = MaxPooling3D(pool_size=(2, 2, 1))(conv2)
    conv3 = Conv3D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool2)
    conv3 = Conv3D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv3)
    drop3 = Dropout(0.5)(conv3)
    pool3 = MaxPooling3D(pool_size=(2, 2, 1))(drop3)

    conv4 = Conv3D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool3)
    conv4 = Conv3D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv4)
    drop4 = Dropout(0.5)(conv4)
    # pool4 = MaxPooling3D(pool_size=(2, 2, 1))(drop4)
    # conv5 = Conv3D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool4)
    # conv5 = Conv3D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv5)
    # drop5 = Dropout(0.5)(conv5)

    up6 = Conv3D(512, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling3D(size = (2,2,1))(drop4))
    merge6 = concatenate([drop3,up6], axis = 4)
    conv6 = Conv3D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge6)
    conv6 = Conv3D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv6)

    up7 = Conv3D(256, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling3D(size = (2,2,1))(conv6))
    merge7 = concatenate([conv2,up7], axis = 4)
    conv7 = Conv3D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge7)
    conv7 = Conv3D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv7)

    up8 = Conv3D(128, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling3D(size = (2,2,1))(conv7))
    merge8 = concatenate([conv1,up8], axis = 4)
    conv8 = Conv3D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge8)
    conv8 = Conv3D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv8)

    # up9 = Conv3D(64, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling3D(size = (2,2,1))(conv8))
    # merge9 = concatenate([conv1,up9], axis = 4)
    # conv9 = Conv3D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge9)
    # conv9 = Conv3D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv9)
    conv9 = Conv3D(2, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv8)
    conv10 = Conv3D(1, 1, activation = 'sigmoid')(conv9)

    model = Model(inputs = inputs, outputs = conv10)

    model.compile(optimizer = Adam(lr = LEARNING_RATE), loss = 'binary_crossentropy', metrics = tf.keras.metrics.MeanIoU(num_classes=NBR_CLASSES))
    
    model.summary()

    if(pretrained_weights):
        model.load_weights(pretrained_weights)

    return model

unet()