## VGG16-UNet with DL-Kfold (build_vgg16_unet)

In [1]:
import os
import random
import numpy as np
import cv2
import glob
import matplotlib.pyplot as plt
plt.style.use("ggplot")
%matplotlib inline

import tensorflow as tf
from tensorflow.keras import backend as K
from sklearn.model_selection import KFold
from tensorflow.keras.models import Model
from keras_unet_collection.losses import focal_tversky
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.applications import VGG16 
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, CSVLogger
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import img_to_array
from tensorflow.keras.metrics import MeanIoU


tf.keras.backend.clear_session()



def IoU(y_true, y_pred, dtype=tf.float32):
    y_pred = tf.cast(y_pred, dtype)
    y_true = tf.cast(y_true, y_pred.dtype)

    y_pred = tf.squeeze(y_pred)
    y_true = tf.squeeze(y_true)

    y_true_pos = tf.reshape(y_true, [-1])
    y_pred_pos = tf.reshape(y_pred, [-1])

    area_intersect = tf.reduce_sum(tf.multiply(y_true_pos, y_pred_pos))
    
    area_true = tf.reduce_sum(y_true_pos)
    area_pred = tf.reduce_sum(y_pred_pos)
    area_union = area_true + area_pred - area_intersect
    
    # Return the IoU score
    return tf.math.divide_no_nan(area_intersect, area_union)

def dice_score(y_true, y_pred, smooth=1):
    
    # Flatten
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    score = (2. * intersection + smooth) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth)
    return score

def tversky(y_true, y_pred, smooth=1, alpha=0.7):
    y_true_pos = K.flatten(y_true)
    y_pred_pos = K.flatten(y_pred)
    true_pos = K.sum(y_true_pos * y_pred_pos)
    false_neg = K.sum(y_true_pos * (1 - y_pred_pos))
    false_pos = K.sum((1 - y_true_pos) * y_pred_pos)
    return (true_pos + smooth) / (true_pos + alpha * false_neg + (1 - alpha) * false_pos + smooth)


def tversky_loss(y_true, y_pred):
    return 1 - tversky(y_true, y_pred)


def focal_tversky_loss(y_true, y_pred, gamma=0.75):
    tv = tversky(y_true, y_pred)
    return K.pow((1 - tv), gamma)


# Plot sample of model prediction
def plot_sample(X, y, preds, binary_preds, ix=None):
    if ix is None:
        ix = random.randint(0, len(X))

    fig, ax = plt.subplots(1, 4, figsize=(30, 20))
    ax[0].imshow(X[ix, ..., 0], cmap='Greys_r')
    
    ax[0].set_title('US-image', c="white" )
    ax[0].grid(False)

    ax[1].imshow(y[ix].squeeze(), cmap='Greys_r')
    ax[1].set_title('Aponeurosis', c="white")
    ax[1].grid(False)

    ax[2].imshow(preds[ix].squeeze(), vmin=0, vmax=1, cmap="Greys_r")
    
    ax[2].set_title('Apo-Predicted', c="white")
    ax[2].grid(False)
    
    ax[3].imshow(binary_preds[ix].squeeze(), vmin=0, vmax=0.5, cmap="Greys_r")
    
    ax[3].set_title('Apo-Picture binary', c="white")
    ax[3].grid(False)
    
    plt.savefig(str(ix)+"Pred_area.tif")


def conv_block(inputs, num_filters): # found in model_training
    x = Conv2D(num_filters, 3, padding = "same")(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    
    x = Conv2D(num_filters, 3, padding = "same")(x) #NOTE Carla was right, this should be x. This is a bug! 
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    
    return x

def decoder_block(inputs, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(inputs) #32
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    
    return x

def build_vgg16_unet(input_shape):
    inputs = Input(input_shape)
    
    vgg16 = VGG16(include_top=False, weights="imagenet", input_tensor = inputs)
    #vgg16.summary()
    
    """ Encoder """
    
    # skip connections
    s1 = vgg16.get_layer("block1_conv2").output # 256
    s2 = vgg16.get_layer("block2_conv2").output # 128
    s3 = vgg16.get_layer("block3_conv3").output # 64
    s4 = vgg16.get_layer("block4_conv3").output # 32

    """ Bottleneck/Bridge """
    
    b1 = vgg16.get_layer("block5_conv3").output # 16
    
    """ Decoder """

    d1 = decoder_block(b1, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)
    
    """ Outputs """
    outputs = Conv2D(1, (1, 1), padding = "same", activation="sigmoid")(d4) #binary segmentation
    model = Model(inputs, outputs, name = "VGG16_U-Net")
    return model

# Images will be re-scaled
im_width = 256
im_height = 256
border = 5

image_directory = "C:/Users/diego/Bac Sport and Computer Science/BA-arbeit/RepoBA/BA-IFSS/data/analyzed_data/008/MZP1/volume"
mask_directory = "C:/Users/diego/Bac Sport and Computer Science/BA-arbeit/RepoBA/BA-IFSS/data/analyzed_data/008/MZP1/mask"  

# list of all images in the path
ids = os.listdir(image_directory)
print("Total no. of aponeurosis images = ", len(ids))

image_dataset = []
for path in glob.glob(image_directory):
    for img_path in glob.glob(os.path.join(path, "*.tif")):
        img = cv2.imread(img_path, 1)
        img = cv2.resize(img, (256,256))
        img = img_to_array(img)
        img = img/255.0
        image_dataset.append(img)  
image_dataset = np.array(image_dataset)

mask_dataset = []
for path in glob.glob(mask_directory):
    for mask_path in glob.glob(os.path.join(path, "*.tif")):
        mask = cv2.imread(mask_path, 0)
        mask = cv2.resize(mask, (256,256))
        mask = img_to_array(mask)
        mask = mask/255.0
        mask_dataset.append(mask)        
mask_dataset = np.array(mask_dataset)

# Define some hyperparameters
batch_size = 2
epochs = 100
num_folds = 5

# Define the K-fold Cross Validator
kfold = KFold(n_splits=num_folds, shuffle=False)

#compile the model
VGG16_UNet = build_vgg16_unet((256,256,3)) #input_shape is (256, 256, 3)
model= VGG16_UNet
model.compile(optimizer=Adam(learning_rate=1e-5), loss="binary_crossentropy", metrics= ["accuracy", IoU])

# K-fold Cross Validation model evaluation
fold_no = 1
for train, test in kfold.split(image_dataset, mask_dataset):

    callbacks = [
        EarlyStopping(patience=20, verbose=1),
        ReduceLROnPlateau(factor=0.1, patience=10, min_lr=0.0000001, verbose=1),
        ModelCheckpoint(f'K-foldno{fold_no}-VGG16-V1-VL-BFRHL-256.keras', verbose=1, save_best_only=True, save_weights_only=False), # Give the model a name (the .h5 part)
        CSVLogger(f'K-foldno{fold_no}-VGG16-V1-VL-BFRHL-256.csv', separator=',', append=False)
    ]

    # # Create a generator
    train_datagen = ImageDataGenerator()
    val_datagen = ImageDataGenerator()

    # Create flow from directory or from arrays
    train_generator = train_datagen.flow(image_dataset[train], mask_dataset[train], batch_size=batch_size)
    val_generator = val_datagen.flow(image_dataset[test], mask_dataset[test], batch_size=batch_size)
    

    # Generate a print
    print('------------------------------------------------------------------------')
    print(f'Training for fold {fold_no} ...')

    # Fit data to model
    results = model.fit(train_generator, batch_size=batch_size, epochs=epochs,
                        callbacks=callbacks, validation_data=val_generator)

    
    # Increase fold number
    fold_no = fold_no + 1

    clear_session()

fold_no = 1

Total no. of aponeurosis images =  314


KeyboardInterrupt: 