<a href="https://colab.research.google.com/github/EmanAlajrami/LV-Segmentation-using-U-Net-/blob/main/Unet_kfold_cross_val.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
""" 
The U-Net model is from the reposotory 
https://github.com/intsav 

@ auther Eman Alajrami

My expiremnt with U-Net segmentation model using KFold cross validation

"""


In [None]:
import tensorflow as tf
import numpy as np
from tensorflow.python.keras import losses
from IPython.display import clear_output
import matplotlib.pyplot as plt
import os
import sys
from sklearn.model_selection import KFold

In [None]:

# ***********/Trying to use K_Fold cross validation /*******

EPOCHS = 50
OUTPUT_CHANNELS = 1 # no of classes
input_shape_image = [512,512,1] 
BATCH_SIZE = 8
BUFFER_SIZE = 1000
do_training = True

In [None]:
from google.colab import drive
drive.mount('/content/drive',force_remount=True)
sys.path.append('/content/drive/My Drive')
%cd /content/gdrive/My Drive/Colab Notebooks
print('os.path = ', os.path)
!#ls

Mounted at /content/drive
[Errno 2] No such file or directory: '/content/gdrive/My Drive/Colab Notebooks'
/content
os.path =  <module 'posixpath' from '/usr/lib/python3.7/posixpath.py'>


In [None]:
def normalize(input_image, input_mask):
  input_image = tf.cast(input_image, tf.float32) / 255.0
  #input_mask -= 1
  return input_image, input_mask

In [None]:
def load_image_train(datapoint):
  SIZE = 512  #1024
  input_image = tf.image.resize(datapoint['image'], (SIZE, SIZE))
  input_mask = tf.image.resize(datapoint['segmentation_mask'], (SIZE, SIZE))

  # if tf.random.uniform(()) > 0.5:
  #   input_image = tf.image.flip_left_right(input_image)
  #   input_mask = tf.image.flip_left_right(input_mask)

  input_image, input_mask = normalize(input_image, input_mask)

  return input_image, input_mask

In [None]:
def load_image_test(datapoint):
  SIZE = 512 #1024
  input_image = tf.image.resize(datapoint['image'], (SIZE, SIZE))
  input_mask = tf.image.resize(datapoint['segmentation_mask'], (SIZE, SIZE))

  input_image, input_mask = normalize(input_image, input_mask)

  return input_image, input_mask

In [None]:
def load_image_val(datapoint):
  SIZE = 512 #1024
  input_image = tf.image.resize(datapoint['image'], (SIZE, SIZE))
  input_mask = tf.image.resize(datapoint['segmentation_mask'], (SIZE, SIZE))

  input_image, input_mask = normalize(input_image, input_mask)

  return input_image, input_mask

In [None]:
def display(display_list):
  plt.figure(figsize=(15, 15))

  title = ['Input Image', 'True Mask', 'Predicted Mask']

  for i in range(len(display_list)):
    plt.subplot(1, len(display_list), i+1)
    plt.title(title[i])
    plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
    plt.axis('off')
  plt.show()

## Define the model


In [None]:
def conv_block(input_tensor, num_filters):
  encoder = tf.keras.layers.Conv2D(num_filters, (3, 3), padding='same')(input_tensor)
  encoder = tf.keras.layers.BatchNormalization()(encoder)
  encoder = tf.keras.layers.Activation('relu')(encoder)
  encoder = tf.keras.layers.Conv2D(num_filters, (3, 3), padding='same')(encoder)
  encoder = tf.keras.layers.BatchNormalization()(encoder)
  encoder = tf.keras.layers.Activation('relu')(encoder)
  return encoder

def encoder_block(input_tensor, num_filters):
  encoder = conv_block(input_tensor, num_filters)
  encoder_pool = tf.keras.layers.MaxPooling2D((2, 2), strides=(2, 2))(encoder)
  
  return encoder_pool, encoder

def decoder_block(input_tensor, concat_tensor, num_filters):
  decoder = tf.keras.layers.Conv2DTranspose(num_filters, (3, 3), strides=(2, 2), padding='same')(input_tensor)
  decoder = tf.keras.layers.concatenate([concat_tensor, decoder], axis=-1)
  decoder = tf.keras.layers.BatchNormalization()(decoder)
  decoder = tf.keras.layers.Activation('relu')(decoder)
  decoder = tf.keras.layers.Conv2D(num_filters, (3, 3), padding='same')(decoder)
  decoder = tf.keras.layers.BatchNormalization()(decoder)
  decoder = tf.keras.layers.Activation('relu')(decoder)
  decoder = tf.keras.layers.Conv2D(num_filters, (3, 3), padding='same')(decoder)
  decoder = tf.keras.layers.BatchNormalization()(decoder)
  decoder = tf.keras.layers.Activation('relu')(decoder)
  return decoder

##Defining custom metrics and loss functions


In [None]:
def dice_coeff(y_true, y_pred, loss_type='sorensen', smooth=1.):
    """Soft dice (Sørensen or Jaccard) coefficient for comparing the similarity
    of two batch of data, usually be used for binary image segmentation
    i.e. labels are binary. The coefficient between 0 to 1, 1 means totally match.
    Parameters
    -----------
    y_true : Tensor
        A distribution with shape: [batch_size, ....], (any dimensions).
    y_pred : Tensor
        The target distribution, format the same with `output`.
    loss_type : str
        ``jaccard`` or ``sorensen``, default is ``jaccard``.
    smooth : float
        This small value will be added to the numerator and denominator.
            - If both output and target are empty, it makes sure dice is 1.
            - If either output or target are empty (all pixels are background),
            dice = ```smooth/(small_value + smooth)``,
            then if smooth is very small, dice close to 0 (even the image values lower than the threshold),
            so in this case, higher smooth can have a higher dice.
    References
    -----------
    - `Wiki-Dice <https://en.wikipedia.org/wiki/Sørensen–Dice_coefficient>`__
    """
    
    # https://lars76.github.io/neural-networks/object-detection/losses-for-segmentation/
    #numerator = 2 * tf.reduce_sum(y_true * y_pred, axis=-1)
    #denominator = tf.reduce_sum(y_true + y_pred, axis=-1)
    #score = (numerator + 1) / (denominator + 1)
   
    # Flatten
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    #y_true_f = tf.layers.flatten(y_true)
    #y_pred_f = tf.layers.flatten(y_pred)

    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    
    if loss_type == 'jaccard':
        union = tf.reduce_sum(tf.square(y_pred_f)) + tf.reduce_sum(tf.square(y_true_f))

    elif loss_type == 'sorensen':
        union = tf.reduce_sum(y_pred_f) + tf.reduce_sum(y_true_f)

    else:
        raise ValueError("Unknown `loss_type`: %s" % loss_type)

    score = (2. * intersection + smooth) / (union + smooth)
    return score

In [None]:
def dice_loss(y_true, y_pred):
    loss = 1 - dice_coeff(y_true, y_pred)
    return loss

In [None]:
def bce_dice_loss(y_true, y_pred):
    #y_pred = tf.argmax(y_pred, axis=-1)
    #y_pred = tf.dtypes.cast(y_pred, tf.int64)
    loss = losses.binary_crossentropy(y_true, y_pred) + dice_loss(y_true, y_pred)
    return loss

In [None]:
from scipy.ndimage.morphology import distance_transform_edt, binary_erosion, generate_binary_structure
from scipy.ndimage import _ni_support

def __surface_distances(result, reference, voxelspacing=None, connectivity=1):
    """
    The distances between the surface voxel of binary objects in result and their
    nearest partner surface voxel of a binary object in reference.
    """
    result = np.array(result, dtype=np.bool)
    result = np.atleast_1d(result)
    reference = np.array(reference, dtype=np.bool)
    reference = np.atleast_1d(reference)
    
    #result = np.atleast_1d(result.astype(np.bool))
    #reference = np.atleast_1d(reference.astype(np.bool))
    
    if voxelspacing is not None:
        voxelspacing = _ni_support._normalize_sequence(voxelspacing, result.ndim)
        voxelspacing = np.asarray(voxelspacing, dtype=np.float64)
        if not voxelspacing.flags.contiguous:
            voxelspacing = voxelspacing.copy()
            
    # binary structure
    footprint = generate_binary_structure(result.ndim, connectivity)
    
    # test for emptiness
    if 0 == np.count_nonzero(result): 
        raise RuntimeError('The first supplied array does not contain any binary object.')
    if 0 == np.count_nonzero(reference): 
        raise RuntimeError('The second supplied array does not contain any binary object.')    
            
    # extract only 1-pixel border line of objects
    result_border = result ^ binary_erosion(result, structure=footprint, iterations=1)
    reference_border = reference ^ binary_erosion(reference, structure=footprint, iterations=1)
    
    # compute average surface distance        
    # Note: scipys distance transform is calculated only inside the borders of the
    #       foreground objects, therefore the input has to be reversed
    dt = distance_transform_edt(~reference_border, sampling=voxelspacing)
    sds = dt[result_border]
    
    return sds

In [None]:
def Hausdorff_Distance(reference, result, voxelspacing=None, connectivity=1):
    """
    Hausdorff Distance.
    
    Computes the (symmetric) Hausdorff Distance (HD) between the binary objects in two
    images. It is defined as the maximum surface distance between the objects.
    
    Parameters
    ----------
    result : array_like
        Input data containing objects. Can be any type but will be converted
        into binary: background where 0, object everywhere else.
    reference : array_like
        Input data containing objects. Can be any type but will be converted
        into binary: background where 0, object everywhere else.
    voxelspacing : float or sequence of floats, optional
        The voxelspacing in a distance unit i.e. spacing of elements
        along each dimension. If a sequence, must be of length equal to
        the input rank; if a single number, this is used for all axes. If
        not specified, a grid spacing of unity is implied.
    connectivity : int
        The neighbourhood/connectivity considered when determining the surface
        of the binary objects. This value is passed to
        `scipy.ndimage.morphology.generate_binary_structure` and should usually be :math:`> 1`.
        Note that the connectivity influences the result in the case of the Hausdorff distance.
        
    Returns
    -------
    hd : float
        The symmetric Hausdorff Distance between the object(s) in ```result``` and the
        object(s) in ```reference```. The distance unit is the same as for the spacing of 
        elements along each dimension, which is usually given in mm.
        
    See also
    --------
    :func:`assd`
    :func:`asd`
    
    Notes
    -----
    This is a real metric. The binary images can therefore be supplied in any order.
    """
    hd1 = __surface_distances(result, reference, voxelspacing, connectivity).max()
    hd2 = __surface_distances(reference, result, voxelspacing, connectivity).max()
    hd = max(hd1, hd2)
    return hd

In [None]:
def Dice_Coefficient(reference, result):
    """
    Computes the Dice coefficient (also known as Sorensen index) between the binary objects in two images.

    result : Input data containing objects. Can be any type but will be converted
        into binary: background where 0, object everywhere else.
    reference : Input data containing objects. Can be any type but will be converted
        into binary: background where 0, object everywhere else.
    """
    result = np.array(result, dtype=np.bool)
    result = np.atleast_1d(result)
    result = tf.reshape(result, [-1])

    reference = np.array(reference, dtype=np.bool)
    reference = np.atleast_1d(reference)
    reference = tf.reshape(reference, [-1])
    
    intersection = np.count_nonzero(result & reference)
    
    size_i1 = np.count_nonzero(result)
    size_i2 = np.count_nonzero(reference)
    
    try:
        dc = 2. * intersection / float(size_i1 + size_i2)
    except ZeroDivisionError:
        dc = 0.0
    
    return dc

In [None]:
def create_mask(pred_mask):
 
  pred_mask = tf.greater(pred_mask, 0.5)
  pred_mask = tf.dtypes.cast(pred_mask, tf.float32)
  pred_mask = pred_mask[0]
  return pred_mask

In [None]:
def show_predictions(dataset=None, num=1):
  if dataset:
    for image, mask in dataset.take(num):
      pred_mask = model.predict(image)
      display([image[0], mask[0], create_mask(pred_mask)])
  else:
    display([sample_image, sample_mask, create_mask(model.predict(sample_image[tf.newaxis, ...]))])

## Callbacks 


In [None]:
save_model_path = '/content/drive/MyDrive/Unet/weights-binary_lvnew.hdf5'
#cp = tf.keras.callbacks.ModelCheckpoint(filepath=save_model_path, monitor='val_dice_loss', mode='auto', save_best_only=True)
cp = tf.keras.callbacks.ModelCheckpoint(filepath=save_model_path, monitor='val_loss', mode='auto', save_best_only=True)
early_stop = tf.keras.callbacks.EarlyStopping(monitor = 'val_loss', min_delta=0,patience=10,verbose =1,restore_best_weights=True)

In [None]:
class DisplayCallback(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    clear_output(wait=True)
    show_predictions()
    print ('\nSample Prediction after epoch {}\n'.format(epoch+1))

In [None]:
x_train_path="/content/drive/MyDrive/MyDataset/dataset/images/training"
y_train_path="/content/drive/MyDrive/MyDataset/dataset/annotations_binary/training"
x_test_path="/content/drive/MyDrive/MyDataset/dataset/images/testing/"
y_test_path="/content/drive/MyDrive/MyDataset/dataset/annotations_binary/testing/"
X = []
Y = []


In [None]:
import os
import cv2

for filename in os.listdir(x_train_path):
  path=os.path.join(x_train_path, filename)
  im = cv2.imread(path)
  X.append(im)
for filename in os.listdir(x_test_path):
  path=os.path.join(x_test_path, filename)
  im = cv2.imread(path)
  X.append(im)
for filename in os.listdir(y_train_path):
  path=os.path.join(y_train_path, filename)
  im = cv2.imread(path)
  Y.append(im)
for filename in os.listdir(y_test_path):
  path=os.path.join(y_test_path, filename)
  im = cv2.imread(path)
  Y.append(im)


In [None]:
# Convert to Numpy Array
X = np.array(X)
Y = np.array(Y)

# Normalize data
X  = X/255
Y  = Y/255

# Define total folds
num_folds = 5

# Define per-fold score containers

loss_per_fold = []
acc_per_fold  = []


  This is separate from the ipykernel package so we can avoid doing imports until


In [None]:
# Using K-Fold Cross validation 

# Define per-fold score containers
# Merge inputs and targets
#train_dataset = np.concatenate((train_dataset, val_dataset), axis=0)
#inputs = np.concatenate((input_train, input_test), axis=0)
#targets = np.concatenate((target_train, target_test), axis=0)


# Define the K-fold Cross Validator
#kfold = KFold(n_splits=5, shuffle=True)

# K-fold Cross Validation model evaluation
n_split =5
fold_no = 1
kfold = KFold(n_splits=num_folds, shuffle=True)

for train, test in kfold.split(X, Y):
  x_train = X[train]
  y_train = Y[train]
  x_test  = X[test]
  y_test  = Y[test]
  

  # build the model 
  inputs = tf.keras.layers.Input(shape=input_shape_image)

  encoder0_pool, encoder0 = encoder_block(inputs, 32)
  encoder1_pool, encoder1 = encoder_block(encoder0_pool, 64)
  encoder2_pool, encoder2 = encoder_block(encoder1_pool, 128)
  encoder3_pool, encoder3 = encoder_block(encoder2_pool, 256)
  encoder4_pool, encoder4 = encoder_block(encoder3_pool, 512)

  center = conv_block(encoder4_pool, 1024)

  decoder4 = decoder_block(center, encoder4, 512)
  decoder3 = decoder_block(decoder4, encoder3, 256)
  decoder2 = decoder_block(decoder3, encoder2, 128)
  decoder1 = decoder_block(decoder2, encoder1, 64)
  decoder0 = decoder_block(decoder1, encoder0, 32)

  outputs = tf.keras.layers.Conv2D(OUTPUT_CHANNELS, (1, 1), activation='sigmoid')(decoder0)

  model = tf.keras.Model(inputs=[inputs], outputs=[outputs])

#Compile 
  model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=TRAIN_SEG_LEARNING_RATE), loss=tf.keras.losses.BinaryCrossentropy(from_logits=True), metrics=['accuracy'])
## Generate a print
  print('------------------------------------------------------------------------')
  print(f'Training for fold {fold_no} ...')

  # Fit data to model-  Train  
  if do_training:
     model_history =   model.fit(x_train, y_train, validation_data=(x_test, y_test), epochs=EPOCHS, batch_size=BATCH_SIZE, callbacks=[DisplayCallback(),cp,early_stop ])
   # Generate generalization metrics
  scores = model.evaluate(x_test, verbose=0)
  acc_per_fold.append(scores[1] * 100)
  loss_per_fold.append(scores[0])
  # Increase fold number
  fold_no = fold_no + 1



In [None]:
# == Provide average scores ==
print('------------------------------------------------------------------------')
print('Score per fold')
for i in range(0, len(acc_per_fold)):
  print('------------------------------------------------------------------------')
  print(f'> Fold {i+1} - Loss: {loss_per_fold[i]} - Accuracy: {acc_per_fold[i]}%')
print('------------------------------------------------------------------------')
print('Average scores for all folds:')
print(f'> Accuracy: {np.mean(acc_per_fold)} (+- {np.std(acc_per_fold)})')
print(f'> Loss: {np.mean(loss_per_fold)}')
print('------------------------------------------------------------------------')

# Visualize training process

In [None]:
if do_training:
  loss     = model_history.history['loss']
  val_loss = model_history.history['val_loss']

  epochs = range(EPOCHS)

  plt.figure()
  plt.subplot(1, 2, 1)
  plt.plot(epochs, loss, 'r', label='Training loss')
  plt.plot(epochs, val_loss, 'bo', label='Validation loss')
  plt.title('Training and Validation Loss')
  plt.xlabel('Epoch')
  plt.ylabel('Loss Value')
  plt.ylim([0, 1])
  plt.legend(loc='upper right')


  plt.show()

# Visualize actual performance 


In [None]:

if not do_training:
  model.load_weights(save_model_path)
 

## Make predictions

In [None]:
show_predictions(test_dataset, 2)

In [None]:
for image, mask in test_dataset.take(num_test_examples):
  pred_mask = model.predict(image)

  print('Hausdorff_Distance = ', Hausdorff_Distance(mask[0], create_mask(pred_mask)))
  DC = Dice_Coefficient(mask[0], create_mask(pred_mask))
  print(DC/(2-DC))

 

In [None]:

model_history.history
print(np.mean(model_history.history['accuracy']))
print(np.mean(model_history.history['val_accuracy']))

In [None]:
DC_list= []
HD_list =[]
for image, mask in test_dataset.take(num_test_examples):
  pred_mask = model.predict(image)
  DC = Dice_Coefficient(mask[0], create_mask(pred_mask))
  DC_list.append ((DC/(2-DC)))
  HD = Hausdorff_Distance(mask[0], create_mask(pred_mask))
  HD_list.append(HD)

In [None]:
# print the average Dice and average HD
print('DC= ',np.mean(DC_list))
print('HD =', np.mean(HD_list))