In [None]:
import os
import random
import shutil
import glob
import cv2
from glob import glob
import numpy as np
import pandas as pd
import gc
import pickle

from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Input, Reshape, concatenate, Concatenate, Conv2D, Conv2DTranspose, MaxPooling2D, UpSampling2D
from tensorflow.keras.layers import Input, Activation, BatchNormalization, Dropout, Lambda, Conv2D, Conv2DTranspose, MaxPooling2D, concatenate
from tensorflow.keras.models import Model, load_model, save_model
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping,TensorBoard,LearningRateScheduler

from glob import glob
import matplotlib.pyplot as plt
from tqdm import tqdm
from skimage.color import rgb2gray as rtg
from skimage.io import imread, imshow
from skimage.transform import resize
from PIL import Image
from datetime import datetime
import time
from keras.callbacks import CSVLogger
from deeplab import deeplabModel
from deeplabv1 import deeplab
from unet import unet

# PreProcessing


In [None]:
def load_data(img_height, img_width, images_to_be_loaded):
    IMAGES_PATH = "CVC-ClinicDB/TIF/Original/"
    MASKS_PATH = "CVC-ClinicDB/TIF/Ground Truth/"

    train_ids = glob(IMAGES_PATH + "*.tif")

    if images_to_be_loaded == -1:
        images_to_be_loaded = len(train_ids)

    X_train = np.zeros((images_to_be_loaded, img_height, img_width, 3), dtype=np.float32)
    Y_train = np.zeros((images_to_be_loaded, img_height, img_width), dtype=np.uint8)

    print('Resizing training images and masks: ' + str(images_to_be_loaded))
    for n, id_ in tqdm(enumerate(train_ids)):
        if n == images_to_be_loaded:
            break

        image_path = id_
        mask_path = image_path.replace("Original", "Ground Truth")

        image = imread(image_path)
        mask_ = imread(mask_path)


        mask = np.zeros((img_height, img_width), dtype=np.bool_)

        pillow_image = Image.fromarray(image)

        pillow_image = pillow_image.resize((img_height, img_width))
        image = np.array(pillow_image)

        X_train[n] = image / 255

        pillow_mask = Image.fromarray(mask_)
        pillow_mask = pillow_mask.resize((img_height, img_width), resample=Image.LANCZOS)
        mask_ = np.array(pillow_mask)

        for i in range(img_height):
            for j in range(img_width):
                if (mask_[i, j] >= 127).any():
                    mask[i, j] = 1

        Y_train[n] = mask

    Y_train = np.expand_dims(Y_train, axis=-1)

    return X_train, Y_train

In [None]:
img_size = 256
X, Y = load_data(img_size, img_size, -1) #Resize (352,352) for Kvasir-SEG & CVC-ColonDB while (256,256) for CVC-ClinicDB

In [None]:
test_size = 0.2
validation_size = 0.5

#Split the the data into train:validation:test with 8:1:1 Ratio
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=test_size, random_state=0)
X_val, X_test, y_val, y_test = train_test_split(X_test, y_test, test_size=validation_size, random_state=0)

In [None]:
import random

for i in range(0,5):
    image_x = random.randint(0, 60)

    plt.subplot(1,2,1)
    imshow(X_val[image_x])

    plt.subplot(1,2,2)
    imshow(y_val[image_x], cmap='gray')
    plt.show()

##Augmentation

In [None]:
import albumentations as A
from albumentations.augmentations import functional as F

aug_train = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.ColorJitter(brightness=(0.6,1.6), contrast=0.2, saturation=0.1, hue=0.01, always_apply=True),
    A.Affine(scale=(0.5,1.5), translate_percent=(-0.125,0.125), rotate=(-180,180), shear=(-22.5,22), always_apply=True),
    A.GaussianBlur(blur_limit=(25, 25),sigma_limit=(0.001, 2.0),always_apply=False, p=0.5),
])

def augment_images():
    x_train_out = []
    y_train_out = []

    for i in range (len(X_train)):
        ug = aug_train(image=X_train[i], mask=y_train[i])
        x_train_out.append(ug['image'])
        y_train_out.append(ug['mask'])

    return np.array(x_train_out), np.array(y_train_out)

Evaluation Metrics

In [None]:
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras.losses import binary_crossentropy
import numpy as np
def dice_coeff(y_true, y_pred):

    _epsilon = 10 ** -7
    intersections = tf.reduce_sum(y_true * y_pred)
    unions = tf.reduce_sum(y_true + y_pred)
    dice_scores = (2.0 * intersections + _epsilon) / (unions + _epsilon)

    return dice_scores

def dice_loss(y_true, y_pred):

    loss = 1 - dice_coeff(y_true, y_pred)

    return loss

def total_loss(y_true, y_pred):
    return 0.5*binary_crossentropy(y_true, y_pred) + dice_loss(y_true, y_pred)

def IoU(y_true, y_pred, eps=1e-6):

    intersection = K.sum(y_true * y_pred, axis=[1,2,3])
    union = K.sum(y_true, axis=[1,2,3]) + K.sum(y_pred, axis=[1,2,3]) - intersection

    return K.mean( (intersection + eps) / (union + eps), axis=0)

def zero_IoU(y_true, y_pred):

    return IoU(1-y_true, 1-y_pred)

def bce_dice_loss(y_true, y_pred):

    return binary_crossentropy(y_true, y_pred) + dice_loss(y_true, y_pred)

def tversky(y_true, y_pred, smooth=1, alpha=0.7):

    y_true_pos = tf.reshape(y_true,[-1])
    y_pred_pos = tf.reshape(y_pred,[-1])
    true_pos = tf.reduce_sum(y_true_pos * y_pred_pos)
    false_neg = tf.reduce_sum(y_true_pos * (1 - y_pred_pos))
    false_pos = tf.reduce_sum((1 - y_true_pos) * y_pred_pos)

    return (true_pos + smooth) / (true_pos + alpha * false_neg + (1 - alpha) * false_pos + smooth)


def tversky_loss(y_true, y_pred):

    return 1 - tversky(y_true, y_pred)

def focal_tversky_loss(y_true, y_pred, gamma=0.75):

    tv = tversky(y_true, y_pred)

    return K.pow((1 - tv), gamma)

def f1_score(y_true, y_pred):
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(tf.math.round(y_pred), tf.float32)

    tp = tf.reduce_sum(y_true * y_pred)
    fp = tf.reduce_sum((1 - y_true) * y_pred)
    fn = tf.reduce_sum(y_true * (1 - y_pred))

    precision = tp / (tp + fp + tf.keras.backend.epsilon())
    recall = tp / (tp + fn + tf.keras.backend.epsilon())

    f1 = 2 * precision * recall / (precision + recall + tf.keras.backend.epsilon())
    return f1

def dice_metric_loss(ground_truth, predictions, smooth=1e-6):
    ground_truth = K.cast(ground_truth, tf.float32)
    predictions = K.cast(predictions, tf.float32)
    ground_truth = K.flatten(ground_truth)
    predictions = K.flatten(predictions)
    intersection = K.sum(predictions * ground_truth)
    union = K.sum(predictions) + K.sum(ground_truth)

    dice = (2. * intersection + smooth) / (union + smooth)

    return 1 - dice

# Layer 1 (Deeplabv3+ and DoubleUNet)

## Train Deeplabv3+ Layer 1

In [None]:
import os

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
os.environ["TF_FORCE_GPU_ALLOW_GROWTH"] = "true"

from keras.layers import (
    Conv2D,
    BatchNormalization,
    Activation,
    Concatenate,
    Input,
    Dropout,
)
from keras.layers import (
    AveragePooling2D,
    GlobalAveragePooling2D,
    UpSampling2D,
    Reshape,
    Dense,
)
from keras.models import Model
from keras.applications import ResNet50, ResNet101
from keras.regularizers import l2
from tensorflow.python.keras.engine.keras_tensor import KerasTensor


def squeeze_and_excite(inputs: KerasTensor, ratio: int = 8) -> KerasTensor:
    """Function to apply Squeeze & Excitation to a feature map.

    Args:
        inputs (KerasTensor): Feature Map
        ratio (int, optional): Ratio for excitation in first dense layer. Defaults to 8.

    Returns:
        KerasTensor: Re-calibrated feature map.
    """
    init = inputs
    filters = init.shape[-1]
    se_shape = (1, 1, filters)

    se = GlobalAveragePooling2D()(init)
    se = Reshape(se_shape)(se)
    se = Dense(
        filters // ratio,
        activation="relu",
        kernel_initializer="he_normal",
        use_bias=False,
    )(se)
    se = Dense(
        filters,
        activation="sigmoid",
        kernel_initializer="he_normal",
        use_bias=False,
    )(se)

    x = init * se
    return x


def ASPP(inputs: KerasTensor) -> KerasTensor:
    """Function to apply Atrous Spatial Pyramid Pooling on features from backbone.

    Args:
        inputs (KerasTensor): Features from backbone.

    Returns:
        KerasTensor: Features with better spatial context.
    """
    shape = inputs.shape
    y1 = AveragePooling2D(pool_size=(shape[1], shape[2]))(inputs)
    y1 = Conv2D(256, 1, padding="same", use_bias=False, kernel_initializer="he_normal")(y1)
    y1 = BatchNormalization()(y1)
    y1 = Activation("relu")(y1)
    y1 = UpSampling2D((shape[1], shape[2]), interpolation="bilinear")(y1)

    # 1x1 Convolution
    y2 = Conv2D(256, 1, padding="same", use_bias=False, kernel_initializer="he_normal")(inputs)
    y2 = BatchNormalization()(y2)
    # y2 = Dropout(0.5)(y2)
    y2 = Activation("relu")(y2)

    # 3x3 Convolution, Dilation Rate - 12 or 6
    y3 = Conv2D(
        256, 3, padding="same", dilation_rate=6, use_bias=False, kernel_initializer="he_normal"
    )(inputs)
    y3 = BatchNormalization()(y3)
    y3 = Activation("relu")(y3)

    # 3x3 Convolution, Dilation Rate - 24 or 12
    y4 = Conv2D(
        256, 3, padding="same", dilation_rate=12, use_bias=False, kernel_initializer="he_normal"
    )(inputs)
    y4 = BatchNormalization()(y4)
    y4 = Activation("relu")(y4)

    # 3x3 Convolution, Dilation Rate - 36 or 18
    y5 = Conv2D(
        256, 3, padding="same", dilation_rate=18, use_bias=False, kernel_initializer="he_normal"
    )(inputs)
    y5 = BatchNormalization()(y5)
    y5 = Activation("relu")(y5)

    # 1x1 Convolution on the concatenated Feature Map
    y = Concatenate()([y1, y2, y3, y4, y5])
    y = Conv2D(256, 1, padding="same", use_bias=False, kernel_initializer="he_normal")(y)
    y = BatchNormalization()(y)
    y = Activation("relu")(y)

    return y


def createModel(modelType: str, shape: tuple[int] = (256, 256, 3)) -> Model:
    """Creates a Model with the specified backbone.

    Args:
        modelType (str): Choice of backbone. ResNet50 or ResNet101.
        shape (tuple[int]): Shape of input to the model. Defaults to (256, 256, 3).

    Returns:
        Model: Your DeepLabV3+ Model.
    """
    inputs = Input(shape)  # instantiating a tensor

    encoder = (
        ResNet101(weights="imagenet", include_top=False, input_tensor=inputs)
        if modelType == "ResNet101"
        else ResNet50(weights="imagenet", include_top=False, input_tensor=inputs)
    )

    image_features = encoder.get_layer(
        "conv4_block23_out" if modelType == "ResNet101" else "conv4_block6_out"
    ).output

    # High-Level Features
    x_a = ASPP(image_features)
    # Up-Sampling High-Level Features by 4
    x_a = UpSampling2D((4, 4), interpolation="bilinear")(x_a)
    x_a = Dropout(0.5)(x_a)

    # Low-Level Features
    x_b = encoder.get_layer("conv2_block2_out").output

    # 1x1 Convolution on Low-Level Features
    x_b = Conv2D(
        filters=48, kernel_size=1, padding="same", use_bias=False, kernel_initializer="he_normal"
    )(x_b)
    x_b = BatchNormalization()(x_b)
    x_b = Activation("relu")(x_b)

    # Concatenating High-Level and Low-Level Features
    x = Concatenate()([x_a, x_b])
    x = Dropout(0.5)(x)
    # x = squeeze_and_excite(x)

    # 3x3 Convolution on Concatenated Map
    x = Conv2D(
        filters=256, kernel_size=3, padding="same", use_bias=False, kernel_initializer="he_normal"
    )(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = squeeze_and_excite(x)

    # 3x3 Convolution on Concatenated Map
    x = Conv2D(
        filters=256, kernel_size=3, padding="same", use_bias=False, kernel_initializer="he_normal"
    )(x)
    x = BatchNormalization()(x)
    # x = Dropout(0.5)(x)
    x = Activation("relu")(x)
    x = squeeze_and_excite(x)

    # Final Up-Sampling by 4
    x = UpSampling2D((4, 4), interpolation="bilinear")(x)
    x = Conv2D(1, 1)(x)
    x = Activation("sigmoid")(x)

    model = Model(inputs, x)

    return model

In [None]:
seed_value = 58800
loss = 'binary_crossentropy'
ct = datetime.now()
EPOCHS = 600
dataset_type="cvcclinicdb"
model_type = "deeplabv3p"

model_path = 'ModelSaveTensorFlow/' + dataset_type + '_' + model_type + 'newaug'

In [None]:
from keras.optimizers import AdamW

deeplabv3p_model = createModel(modelType="ResNet101",shape=(256,256,3))
optimizer = AdamW(
    learning_rate = 0.00001, weight_decay = 0.004)
loss = 'binary_crossentropy'
deeplabv3p_model.compile(optimizer=optimizer, loss=loss,
                     metrics=['accuracy',f1_score,dice_coeff,dice_loss, total_loss, IoU, zero_IoU, tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])

In [None]:
step = 0
learning_rate = 1e-5
min_loss_for_saving = 0.2

start_time = time.time()

for epoch in range(0, EPOCHS):

    print(f'Training, epoch {epoch}')
    print('Learning Rate: ' + str(learning_rate))

    step += 1
    tf.keras.backend.clear_session()

    image_augmented, mask_augmented = augment_images()

    deeplabv3p_model.fit(x=image_augmented, y=mask_augmented, epochs=1, batch_size=16, validation_data=(X_val, y_val), verbose=1)

    prediction_valid = deeplabv3p_model.predict(X_val, verbose=1, batch_size=16)
    loss_valid = dice_metric_loss(y_val, prediction_valid)
    loss_valid = loss_valid.numpy()
    print("Loss Validation: " + str(loss_valid))

    prediction_test = deeplabv3p_model.predict(X_test, verbose=0, batch_size=16)
    loss_test = dice_metric_loss(y_test, prediction_test)
    loss_test = loss_test.numpy()
    print("Loss Test: " + str(loss_test))

    if min_loss_for_saving > loss_valid:
        min_loss_for_saving = loss_valid
        print("Saved model with val_loss: ", loss_valid)
        deeplabv3p_model.save(model_path)

    del image_augmented
    del mask_augmented
    tf.keras.backend.clear_session()
    tf.compat.v1.reset_default_graph()
    gc.collect()


elapsed_time = time.time() - start_time
hours, rem = divmod(elapsed_time, 3600)
minutes, seconds = divmod(rem, 60)
print(f"Elapsed Time: {int(hours)}h {int(minutes)}m {int(seconds)}s")

In [None]:
del deeplabv3p_model

##Train DoubleUNet Layer 1

In [None]:
def f1_score(y_true, y_pred):
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(tf.math.round(y_pred), tf.float32)

    tp = tf.reduce_sum(y_true * y_pred)
    fp = tf.reduce_sum((1 - y_true) * y_pred)
    fn = tf.reduce_sum(y_true * (1 - y_pred))

    precision = tp / (tp + fp + tf.keras.backend.epsilon())
    recall = tp / (tp + fn + tf.keras.backend.epsilon())

    f1 = 2 * precision * recall / (precision + recall + tf.keras.backend.epsilon())
    return f1

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model
from tensorflow.keras.applications import *

def squeeze_excite_block(inputs, ratio=8):
    init = inputs
    channel_axis = -1
    filters = init.shape[channel_axis]
    se_shape = (1, 1, filters)

    se = GlobalAveragePooling2D()(init)
    se = Reshape(se_shape)(se)
    se = Dense(filters // ratio, activation='relu', kernel_initializer='he_normal', use_bias=False)(se)
    se = Dense(filters, activation='sigmoid', kernel_initializer='he_normal', use_bias=False)(se)

    x = Multiply()([init, se])
    return x

def conv_block(inputs, filters):
    x = inputs

    x = Conv2D(filters, (3, 3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(filters, (3, 3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = squeeze_excite_block(x)

    return x

def encoder1(inputs):
    skip_connections = []

    model = VGG19(include_top=False, weights='imagenet', input_tensor=inputs)
    names = ["block1_conv2", "block2_conv2", "block3_conv4", "block4_conv4"]
    for name in names:
        skip_connections.append(model.get_layer(name).output)

    output = model.get_layer("block5_conv4").output
    return output, skip_connections

def decoder1(inputs, skip_connections):
    num_filters = [256,128,64,32]
    skip_connections.reverse()
    x = inputs

    for i, f in enumerate(num_filters):
        x = UpSampling2D((2, 2), interpolation='bilinear')(x)
        x = Concatenate()([x, skip_connections[i]])
        x = conv_block(x, f)

    return x

def encoder2(inputs):
    num_filters = [256,128,64,32]
    skip_connections = []
    x = inputs

    for i, f in enumerate(num_filters):
        x = conv_block(x, f)
        skip_connections.append(x)
        x = MaxPool2D((2, 2))(x)

    return x, skip_connections

def decoder2(inputs, skip_1, skip_2):
    num_filters = [256,128,64,32]
    skip_2.reverse()
    x = inputs

    for i, f in enumerate(num_filters):
        x = UpSampling2D((2, 2), interpolation='bilinear')(x)
        x = Concatenate()([x, skip_1[i], skip_2[i]])
        x = conv_block(x, f)

    return x

def output_block(inputs):
    x = Conv2D(1, (1, 1), padding="same", activation='sigmoid')(inputs)
    return x

def Upsample(tensor, size):
    """Bilinear upsampling"""
    def _upsample(x, size):
        return tf.image.resize(images=x, size=size)
    return Lambda(lambda x: _upsample(x, size), output_shape=size)(tensor)

def ASPP(x, filter):
    shape = x.shape

    y1 = AveragePooling2D(pool_size=(shape[1], shape[2]))(x)
    y1 = Conv2D(filter, 1, padding="same")(y1)
    y1 = BatchNormalization()(y1)
    y1 = Activation("relu")(y1)
    y1 = UpSampling2D((shape[1], shape[2]), interpolation='bilinear')(y1)

    y2 = Conv2D(filter, 1, dilation_rate=1, padding="same", use_bias=False)(x)
    y2 = BatchNormalization()(y2)
    y2 = Activation("relu")(y2)

    y3 = Conv2D(filter, 3, dilation_rate=6, padding="same", use_bias=False)(x)
    y3 = BatchNormalization()(y3)
    y3 = Activation("relu")(y3)

    y4 = Conv2D(filter, 3, dilation_rate=12, padding="same", use_bias=False)(x)
    y4 = BatchNormalization()(y4)
    y4 = Activation("relu")(y4)

    y5 = Conv2D(filter, 3, dilation_rate=18, padding="same", use_bias=False)(x)
    y5 = BatchNormalization()(y5)
    y5 = Activation("relu")(y5)

    y = Concatenate()([y1, y2, y3, y4, y5])

    y = Conv2D(filter, 1, dilation_rate=1, padding="same", use_bias=False)(y)
    y = BatchNormalization()(y)
    y = Activation("relu")(y)

    return y

def doubleunet(shape=(256,256,3)):
    inputs = Input(shape)
    x, skip_1 = encoder1(inputs)
    x = ASPP(x, 64)
    x = decoder1(x, skip_1)
    outputs1 = output_block(x)

    x = inputs * outputs1

    x, skip_2 = encoder2(x)
    x = ASPP(x, 64)
    x = decoder2(x, skip_1, skip_2)
    outputs2 = output_block(x)
    outputs = Concatenate()([outputs1, outputs2])

    model = Model(inputs, outputs2)
    return model

In [None]:
seed_value = 58800
loss = 'binary_crossentropy'
ct = datetime.now()
EPOCHS = 600
dataset_type="cvcclinicdb"
model_type = "doubleunet"

model_path = 'ModelSaveTensorFlow/' + dataset_type + '_' + model_type + 'newaug'

In [None]:
from keras.optimizers import AdamW

doubleunet_model = doubleunet(shape=(256,256,3))
optimizer = AdamW(learning_rate = 0.00001, weight_decay = 0.004)
loss = 'binary_crossentropy'
doubleunet_model.compile(optimizer=optimizer, loss=loss,run_eagerly=True,
                     metrics=['accuracy',f1_score,dice_coeff,dice_loss, total_loss, IoU, zero_IoU, tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])

In [None]:
step = 0
learning_rate = 1e-5
min_loss_for_saving = 0.2
batch_size=4
epoch_step = len(X_train)//batch_size
val_step = len(X_val)//batch_size
start_time = time.time()

for epoch in range(0, EPOCHS):

    print(f'Training, epoch {epoch}')
    print('Learning Rate: ' + str(learning_rate))

    step += 1
    tf.keras.backend.clear_session()

    image_augmented, mask_augmented = augment_images()

    doubleunet_model.fit(x=image_augmented, y=mask_augmented, epochs=1,steps_per_epoch=epoch_step,validation_steps=val_step, validation_data=(X_val, y_val), verbose=1)

    prediction_valid = doubleunet_model.predict(X_val, verbose=0, batch_size=4)
    loss_valid = dice_metric_loss(y_val, prediction_valid)
    loss_valid = loss_valid.numpy()
    print("Loss Validation: " + str(loss_valid))

    prediction_test = doubleunet_model.predict(X_test, verbose=0, batch_size=4)
    loss_test = dice_metric_loss(y_test, prediction_test)
    loss_test = loss_test.numpy()
    print("Loss Test: " + str(loss_test))

    if min_loss_for_saving > loss_valid:
        min_loss_for_saving = loss_valid
        print("Saved model with val_loss: ", loss_valid)
        doubleunet_model.save(model_path)

    del image_augmented
    del mask_augmented
    tf.keras.backend.clear_session()
    tf.compat.v1.reset_default_graph()
    gc.collect()


elapsed_time = time.time() - start_time
hours, rem = divmod(elapsed_time, 3600)
minutes, seconds = divmod(rem, 60)
print(f"Elapsed Time: {int(hours)}h {int(minutes)}m {int(seconds)}s")

In [None]:
del doubleunet_model

## Load Layer 1 Deeplabv3+ Model and Print the evaluation metrics

In [None]:
print("Loading the model")

deeplabv3p_model = tf.keras.models.load_model("ModelSaveTensorFlow/cvcclinicdb_deeplabv3pnewaug", custom_objects={'f1_score': f1_score,
    'dice_coeff': dice_coeff,
    'dice_loss': dice_loss,
    'total_loss': total_loss,
    'IoU': IoU,
    'zero_IoU': zero_IoU,
    'dice_metric_loss':dice_metric_loss})
deeplabv3p_preds_tr = deeplabv3p_model.predict(X_train, batch_size=4)
deeplabv3p_preds_val = deeplabv3p_model.predict(X_val, batch_size=4)
deeplabv3p_preds_t = deeplabv3p_model.predict(X_test, batch_size=4)
del deeplabv3p_model

In [None]:
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score, jaccard_score

ground_truth_flat = y_test.flatten()
predictions_flat = np.round(deeplabv3p_preds_t).flatten()

# Calculate metrics
accuracy = accuracy_score(ground_truth_flat, predictions_flat)
precision = precision_score(ground_truth_flat, predictions_flat)
recall = recall_score(ground_truth_flat, predictions_flat)
f1 = f1_score(ground_truth_flat, predictions_flat)
jaccard = jaccard_score(ground_truth_flat, predictions_flat)

# Print or log the evaluation metrics
print("Deeplab V3 Plus 1: ")
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1}")
print(f"IoU: {jaccard}")

## Load Layer 1 DoubleUNet Model and Print the evaluation metrics

In [None]:
print("Loading the model")

doubleunet_model = tf.keras.models.load_model("ModelSaveTensorFlow/cvcclinicdb_doubleunetnewaug", custom_objects={'f1_score': f1_score,
    'dice_coeff': dice_coeff,
    'dice_loss': dice_loss,
    'total_loss': total_loss,
    'IoU': IoU,
    'zero_IoU': zero_IoU,
    'dice_metric_loss':dice_metric_loss})
doubleunet_preds_tr = doubleunet_model.predict(X_train, batch_size=4)
doubleunet_preds_val = doubleunet_model.predict(X_val, batch_size=4)
doubleunet_preds_t = doubleunet_model.predict(X_test, batch_size=4)
del doubleunet_model

In [None]:
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score, jaccard_score

ground_truth_flat = y_test.flatten()
predictions_flat = np.round(doubleunet_preds_t).flatten()

# Calculate metrics
accuracy = accuracy_score(ground_truth_flat, predictions_flat)
precision = precision_score(ground_truth_flat, predictions_flat)
recall = recall_score(ground_truth_flat, predictions_flat)
f1 = f1_score(ground_truth_flat, predictions_flat)
jaccard = jaccard_score(ground_truth_flat, predictions_flat)

# Print or log the evaluation metrics
print("Double U-Net 1: ")
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1}")
print(f"IoU: {jaccard}")

## Average Layer 1

In [None]:
average_preds_tr = (doubleunet_preds_tr + deeplabv3p_preds_tr) / 2.0
train_with_predictions = np.concatenate([X_train, average_preds_tr], axis=-1)

average_preds_val = (doubleunet_preds_val + deeplabv3p_preds_val) / 2.0
val_with_predictions = np.concatenate([X_val, average_preds_val], axis=-1)

average_preds_t = (doubleunet_preds_t + deeplabv3p_preds_t) / 2.0
test_with_predictions = np.concatenate([X_test, average_preds_t], axis=-1)

# Layer 2 (U-Net and Deeplabv1)

## Train U-Net Layer 2

In [None]:
from datetime import datetime

log_dir = './logs/fit' + datetime.now().strftime('%d.%m.%Y--%H-%M-%S')
model_path = 'ModelSaveTensorFlow/cvcclinicdb_unet2_newaug.h5'
checkpoint = ModelCheckpoint(model_path, monitor='val_dice_metric_loss', verbose = 1, save_best_only=True,
                            mode='min', save_freq='epoch')
early = EarlyStopping(monitor='val_dice_metric_loss', min_delta=0, patience = 25, verbose = 1, mode='min')
board = TensorBoard(log_dir=log_dir,histogram_freq = 1)
tensorboard_callback = [checkpoint,early,board]

In [None]:
optimizer = Adam(learning_rate = 0.00001)
loss = 'binary_crossentropy'
unet_model2 = unet(input_shape=(256,256,4))
unet_model2.compile(optimizer=optimizer, loss=loss, metrics=['accuracy',f1_score,dice_metric_loss,dice_coeff,dice_loss, total_loss, IoU, zero_IoU, tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])
unet_model2.fit(train_with_predictions, y_train,
                  batch_size=8,epochs=400,
                  validation_data=(val_with_predictions, y_val),
                  callbacks=[tensorboard_callback])

## Train Deeplabv1 Layer 2

In [None]:
def deeplab(input_shape=(256,256,3)):

    atrous_rates = [6, 12, 18, 24]
    inputs = Input(input_shape)

    # Encoder
    conv1 = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    conv1 = Conv2D(32, (3, 3), activation='relu', padding='same')(conv1)
    pool1 = MaxPooling2D((2, 2))(conv1)

    conv2 = Conv2D(64, (3, 3), activation='relu', padding='same')(pool1)
    conv2 = Conv2D(64, (3, 3), activation='relu', padding='same')(conv2)
    pool2 = MaxPooling2D((2, 2))(conv2)

    conv3 = Conv2D(128, (3, 3), activation='relu', padding='same')(pool2)
    conv3 = Conv2D(128, (3, 3), activation='relu', padding='same')(conv3)
    pool3 = MaxPooling2D((2, 2))(conv3)

    conv4 = Conv2D(256, (3, 3), activation='relu', padding='same')(pool3)
    conv4 = Conv2D(256, (3, 3), activation='relu', padding='same')(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(conv4)

    conv5 = Conv2D(512, (3, 3), activation='relu', padding='same')(pool4)
    conv5 = Conv2D(512, (3, 3), activation='relu', padding='same')(conv5)

    fusion_layer_deeplab = pool3

    # Atrous (dilated) Convolutions with Different Rates
    atrous_layers = []
    for rate in atrous_rates:
        atrous_layer = Conv2D(512, 3, activation='relu', padding='same', dilation_rate=rate)(conv5)
        atrous_layers.append(atrous_layer)

    # Concatenated Atrous Layers
    concat = Concatenate(axis=-1)(atrous_layers)

    # Decoder
    conv6 = Conv2D(512, 1, activation='relu', padding='same')(concat)

    upsample = Conv2DTranspose(1, kernel_size=16, strides=16, padding='same')(conv6)
    outputs = Conv2D(1, 1, activation='sigmoid')(upsample)

    model = Model(inputs=[inputs], outputs=[outputs])

    return model

In [None]:
from datetime import datetime

log_dir = './logs/fit' + datetime.now().strftime('%d.%m.%Y--%H-%M-%S')
model_path = 'ModelSaveTensorFlow/cvcclinicdb_deeplab2_newaug.h5'
checkpoint = ModelCheckpoint(model_path, monitor='val_dice_metric_loss', verbose = 1, save_best_only=True,
                            mode='min', save_freq='epoch')
early = EarlyStopping(monitor='val_dice_metric_loss', min_delta=0, patience = 25, verbose = 1, mode='min')
board = TensorBoard(log_dir=log_dir,histogram_freq = 1)
tensorboard_callback = [checkpoint,early,board]

In [None]:
optimizer = AdamW(learning_rate = 0.00001,weight_decay=0.004)
loss = 'binary_crossentropy'
deeplab_model2 = deeplab(input_shape=(256,256,4))
deeplab_model2.compile(optimizer=optimizer, loss=loss, metrics=['accuracy',f1_score,dice_metric_loss,dice_coeff,dice_loss, total_loss, IoU, zero_IoU, tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])
deeplab_model2.fit(train_with_predictions, y_train,
                  batch_size=8,epochs=400,
                  validation_data=(val_with_predictions, y_val),
                  callbacks=[tensorboard_callback])

##Load U-Net Layer 2 and print the evaluation metrics

In [None]:
unet2_model = tf.keras.models.load_model('ModelSaveTensorFlow/cvcclinicdb_unet2_newaug.h5', custom_objects={'f1_score': f1_score,
        'dice_coeff': dice_coeff,
        'dice_loss': dice_loss,
        'total_loss': total_loss,
        'IoU': IoU,
        'zero_IoU': zero_IoU,
        'dice_metric_loss':dice_metric_loss})
unet2_preds_t = unet2_model.predict(test_with_predictions,batch_size=4)

In [None]:
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score, jaccard_score

ground_truth_flat = y_test.flatten()
predictions_flat = np.round(unet2_preds_t).flatten()

# Calculate metrics
accuracy = accuracy_score(ground_truth_flat, predictions_flat)
precision = precision_score(ground_truth_flat, predictions_flat)
recall = recall_score(ground_truth_flat, predictions_flat)
f1 = f1_score(ground_truth_flat, predictions_flat)
jaccard = jaccard_score(ground_truth_flat, predictions_flat)

# Print or log the evaluation metrics
print("U-Net 2: ")
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1}")
print(f"IoU: {jaccard}")

##Load Deeplabv1 Layer 2 and print the evaluation metrics

In [None]:
deeplab2_model = tf.keras.models.load_model("ModelSaveTensorFlow/cvcclinicdb_deeplab2_newaug.h5", custom_objects={'f1_score': f1_score,
    'dice_coeff': dice_coeff,
    'dice_loss': dice_loss,
    'total_loss': total_loss,
    'IoU': IoU,
    'zero_IoU': zero_IoU,
    'dice_metric_loss':dice_metric_loss})
deeplab2_preds_t = deeplab2_model.predict(test_with_predictions,batch_size=4)

In [None]:
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score, jaccard_score

ground_truth_flat = y_test.flatten()
predictions_flat = np.round(deeplab2_preds_t).flatten()

# Calculate metrics
accuracy = accuracy_score(ground_truth_flat, predictions_flat)
precision = precision_score(ground_truth_flat, predictions_flat)
recall = recall_score(ground_truth_flat, predictions_flat)
f1 = f1_score(ground_truth_flat, predictions_flat)
jaccard = jaccard_score(ground_truth_flat, predictions_flat)

# Print or log the evaluation metrics
print("Deeplab 2: ")
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-Score: {f1}")
print(f"IoU: {jaccard}")

In [None]:
average2_preds_test = (unet2_preds_t + deeplab2_preds_t) / 2.0
average2_preds_t = np.round(average2_preds_test).flatten()
y_test_flat = y_test.flatten()

In [None]:
avg_ensemble_accuracy = accuracy_score(y_test_flat, average2_preds_t)
avg_ensemble_f1 = f1_score(y_test_flat, average2_preds_t)
avg_ensemble_precision = precision_score(y_test_flat, average2_preds_t)
avg_ensemble_recall = recall_score(y_test_flat, average2_preds_t)
avg_ensemble_iou = jaccard_score(y_test_flat, average2_preds_t)

# Display ensemble metrics
print("Averaged Ensemble Accuracy:", avg_ensemble_accuracy)
print("Averaged Ensemble F1 Score:", avg_ensemble_f1)
print("Averaged Ensemble Precision:", avg_ensemble_precision)
print("Averaged Ensemble Recall:", avg_ensemble_recall)
print("Averaged Ensemble IoU:", avg_ensemble_iou)

# Visualization of Models from layer 1, layer 2, and final output

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score

num_samples_to_visualize = 20
random_indices = np.random.choice(range(len(X_test)), num_samples_to_visualize, replace=False)

# Visualize each selected sample
for sample_index in random_indices:
    # Load the original image and corresponding mask
    original_image = X_test[sample_index]
    actual_mask = y_test[sample_index]

    # Get predictions for the chosen sample
    doubleunet_prediction = np.round(doubleunet_preds_t[sample_index, ..., 0])
    deeplabv3p_prediction = np.round(deeplabv3p_preds_t[sample_index, ..., 0])
    avg_prediction = np.round(average_preds_test[sample_index])
    unet2_prediction = np.round(unet2_preds_t[sample_index, ..., 0])
    deeplab2_prediction = np.round(deeplab2_preds_t[sample_index, ..., 0])
    avg2_prediction = np.round(average2_preds_test[sample_index])
    # Plot the images and predictions
    plt.figure(figsize=(20, 20))

    plt.subplot(1, 11, 1)
    plt.imshow(original_image)
    plt.title("Original Image")

    plt.subplot(1, 11, 2)
    plt.imshow(original_image[:, :, 0], cmap='Reds')
    plt.title("Red Channel")

    plt.subplot(1, 11, 3)
    plt.imshow(original_image[:, :, 1], cmap='Greens')
    plt.title("Green Channel")

    plt.subplot(1, 11, 4)
    plt.imshow(original_image[:, :, 2], cmap='Blues')
    plt.title("Blue Channel")

    plt.subplot(1, 11, 5)
    plt.imshow(actual_mask, cmap='gray')
    plt.title("Actual Mask")

    plt.subplot(1, 11, 6)
    plt.imshow(doubleunet_prediction, cmap='gray')
    plt.title("DoubleU-Net")

    plt.subplot(1, 11, 7)
    plt.imshow(deeplabv3p_prediction, cmap='gray')
    plt.title("Deeplab V3 Plus")

    plt.subplot(1, 11, 8)
    plt.imshow(avg_prediction, cmap='gray')
    plt.title("Avg Layer 1")

    plt.subplot(1, 11, 9)
    plt.imshow(unet2_prediction, cmap='gray')
    plt.title("U-Net 2")

    plt.subplot(1, 11, 10)
    plt.imshow(deeplab2_prediction, cmap='gray')
    plt.title("Deeplab 2")

    plt.subplot(1, 11, 11)
    plt.imshow(avg2_prediction, cmap='gray')
    plt.title("Avg Layer 2")


    plt.tight_layout()
    plt.show()