In [None]:
#Define Loss function
import tensorflow as tf 

def dice_metric_loss(y_true, y_pred, smooth=1e-6, alpha=0.25, gamma=2):
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.clip_by_value(y_pred, tf.keras.backend.epsilon(), 1 - tf.keras.backend.epsilon())

    intersection = tf.reduce_sum(y_true * y_pred)
    dice_coefficient = (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)
    dice_loss = 1 - dice_coefficient

    logits = tf.math.log(y_pred / (1 - y_pred))
    focal_loss = -alpha * (1 - y_pred) ** gamma * y_true * tf.math.log(y_pred) - (1 - alpha) * y_pred ** gamma * (1 - y_true) * tf.math.log(1 - y_pred)

    return dice_loss + tf.reduce_mean(focal_loss)


In [None]:
#Define the Model Architecture

from tensorflow.keras.layers import Conv2D, UpSampling2D, add, Input, Concatenate, GlobalAveragePooling2D, Dense, Reshape, Multiply, BatchNormalization, ReLU
from tensorflow.keras.models import Model
from tensorflow.keras.applications import EfficientNetB4, MobileNetV2

kernel_initializer = 'he_normal'


def squeeze_excite_module(input_tensor, filters):
    se = GlobalAveragePooling2D()(input_tensor)
    se = Dense(filters // 16, activation='relu')(se)
    se = Dense(filters, activation='sigmoid')(se)
    se = Reshape((1, 1, filters))(se)
    return Multiply()([input_tensor, se])


def resnet_module(x, filters, dilation_rate=1):
    x1 = Conv2D(filters, 3, activation='relu', kernel_initializer=kernel_initializer, padding='same', dilation_rate=dilation_rate)(x)
    x = Conv2D(filters, 3, activation='relu', kernel_initializer=kernel_initializer, padding='same', dilation_rate=dilation_rate)(x)
    x = BatchNormalization()(x)
    x = Conv2D(filters, 3, activation='relu', kernel_initializer=kernel_initializer, padding='same', dilation_rate=dilation_rate)(x)
    x = BatchNormalization()(x)
    return BatchNormalization()(add([x, x1]))

def atrous_module(x, filters):
    x = Conv2D(filters, 3, activation='relu', kernel_initializer=kernel_initializer, padding='same', dilation_rate=1)(x)
    x = BatchNormalization()(x)
    x = Conv2D(filters, 3, activation='relu', kernel_initializer=kernel_initializer, padding='same', dilation_rate=2)(x)
    x = BatchNormalization()(x)
    x = Conv2D(filters, 3, activation='relu', kernel_initializer=kernel_initializer, padding='same', dilation_rate=3)(x)
    return BatchNormalization()(x)

def RAS_module(x, filters):
    x = BatchNormalization()(x)
    x1 = atrous_module(x, filters)
    x2 = resnet_module(x, filters)
    x = add([x1, x2])
    x = squeeze_excite_module(x, filters)
    x = BatchNormalization()(x)
    return x
    
def create_model(img_height, img_width, input_channels, out_classes, starting_filters):
    input_layer = Input(shape=(img_height, img_width, input_channels))
    backbone1 = EfficientNetB4(input_tensor=input_layer, include_top=False, weights="imagenet")
    backbone2 = MobileNetV2(input_tensor=input_layer, include_top=False, weights='imagenet')

    layers1 = [backbone1.get_layer(x).output for x in ['block7a_project_bn', 'block6a_project_bn', 'block4a_project_bn', 'block2a_project_bn']]
    layers2 = [backbone2.get_layer(x).output for x in ['block_16_expand_relu', 'block_13_expand_relu', 'block_6_expand_relu', 'block_3_expand_relu']]

    p1 = Conv2D(starting_filters * 4, 1, padding='same', activation='relu', kernel_initializer=kernel_initializer)(layers1[3])
    p2 = Conv2D(starting_filters * 8, 1, padding='same', activation='relu', kernel_initializer=kernel_initializer)(UpSampling2D(2)(layers1[2]))
    p3 = Conv2D(starting_filters * 16, 1, padding='same', activation='relu', kernel_initializer=kernel_initializer)(UpSampling2D(2)(layers1[1]))

    q1 = Conv2D(starting_filters * 4, 1, padding='same', activation='relu', kernel_initializer=kernel_initializer)(layers2[3])
    q2 = Conv2D(starting_filters * 8, 1, padding='same', activation='relu', kernel_initializer=kernel_initializer)(layers2[2])
    q3 = Conv2D(starting_filters * 16, 1, padding='same', activation='relu', kernel_initializer=kernel_initializer)(layers2[1])

    
    c = Conv2D(starting_filters, 3, strides=2, padding='same', activation='relu', kernel_initializer=kernel_initializer)(input_layer)

    
    l1 = Conv2D(starting_filters * 4, 3, strides=2, padding='same', activation='relu', kernel_initializer=kernel_initializer)(c)
    s1 = add([l1, p1, q1])
    t1 = RAS_module(s1, starting_filters * 4)

    l2 = Conv2D(starting_filters * 8, 2, strides=2, padding='same', activation='relu', kernel_initializer=kernel_initializer)(t1)
    s2 = add([l2, p2, q2])
    t2 = RAS_module(s2, starting_filters * 8)

    l3 = Conv2D(starting_filters * 16, 2, strides=2, padding='same', activation='relu', kernel_initializer=kernel_initializer)(t2)
    s3 = add([l3, p3, q3])
    t3 = RAS_module(s3, starting_filters * 16)


    outd = Concatenate()([UpSampling2D(4)(t3),UpSampling2D(2)(t2),t1])
    outd = Conv2D(32, 3, activation='relu', kernel_initializer=kernel_initializer, padding='same')(outd)
    outd = BatchNormalization()(outd)
    outd = Conv2D(1, 3, activation='relu', kernel_initializer=kernel_initializer, padding='same')(outd)
    
    
    outd = UpSampling2D(4, interpolation='bilinear')(outd)
    output = Conv2D(out_classes, 1, activation='sigmoid')(outd)

    return Model(inputs=input_layer, outputs=output)


In [None]:
import numpy as np
img_size = 352
learning_rate = 1e-4
filters = 17 
b_size = 8

In [None]:
#Initialize the model for training

img_size = 352
learning_rate = 1e-4
filters = 17
b_size = 8


opts = tf.keras.optimizers.AdamW(learning_rate = 1e-4)

model = create_model(img_height=img_size, img_width=img_size, input_channels=3, out_classes=1, starting_filters=filters)  
model.compile(optimizer=opts, loss=dice_metric_loss) 


In [None]:
# #Load weight for further training 

# weights = np.load("path_to_saved_weight_file.npz")

# model = create_model(img_height=img_size, img_width=img_size, input_channels=3, out_classes=1, starting_filters=filters)  
# model.compile(optimizer=tf.keras.optimizers.AdamW(learning_rate = 1e-4), loss=dice_metric_loss) 

# model.set_weights([weights[f"arr_{i}"] for i in range(len(weights))])


In [None]:
#For computation of FLOPs

from tensorflow.python.framework.convert_to_constants import  convert_variables_to_constants_v2_as_graph

def get_flops(model):
    concrete = tf.function(lambda inputs: model(inputs))
    concrete_func = concrete.get_concrete_function(
        [tf.TensorSpec([1, *inputs.shape[1:]]) for inputs in model.inputs])
    frozen_func, graph_def = convert_variables_to_constants_v2_as_graph(concrete_func)
    with tf.Graph().as_default() as graph:
        tf.graph_util.import_graph_def(graph_def, name='')
        run_meta = tf.compat.v1.RunMetadata()
        opts = tf.compat.v1.profiler.ProfileOptionBuilder.float_operation()
        flops = tf.compat.v1.profiler.profile(graph=graph, run_meta=run_meta, cmd="op", options=opts)
        return flops.total_float_ops/1e9

flops = get_flops(model)
print(flops)

In [None]:
trainable_params = sum(tf.keras.backend.count_params(w) for w in model.trainable_weights)
non_trainable_params = sum(tf.keras.backend.count_params(w) for w in model.non_trainable_weights)
total_params = model.count_params()

print(f"Trainable Parameters: {trainable_params}")
print(f"Non-Trainable Parameters: {non_trainable_params}")
print(f"Total Parameters: {total_params}")

In [None]:
#general load_data function
import glob

# import numpy as np
from PIL import Image
from skimage.io import imread
from tqdm import tqdm
import cv2


def load_data(img_height, img_width, data_frame):
   
    images_to_be_loaded = len(data_frame)

    X_train = np.zeros((images_to_be_loaded, img_height, img_width, 3), dtype=np.float32)
    Y_train = np.zeros((images_to_be_loaded, img_height, img_width), dtype=np.uint8)

    print('Resizing training images and masks: ' + str(images_to_be_loaded))
    for n, row in tqdm(enumerate(data_frame.itertuples(index=False)), total=len(data_frame)):
        
        image_path = row.image
        mask_path = row.mask

        image = cv2.imread(image_path, cv2.IMREAD_COLOR)
        mask_ = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        mask = np.zeros((img_height, img_width), dtype=np.bool_)

        pillow_image = Image.fromarray(image)

        pillow_image = pillow_image.resize((img_height, img_width))
        image = np.array(pillow_image)

        X_train[n] = image / 255

        pillow_mask = Image.fromarray(mask_)
        pillow_mask = pillow_mask.resize((img_height, img_width), resample=Image.LANCZOS)
        mask_ = np.array(pillow_mask)
        
        for i in range(img_height):
            for j in range(img_width):                
                if mask_[i, j] >= 127:                  
                    mask[i, j] = 1

        Y_train[n] = mask

    Y_train = np.expand_dims(Y_train, axis=-1)

    return X_train, Y_train


In [None]:
#For creation of dataframe for images and masks paths required by training

import os
import pandas as pd
import transformers

def create_image_mask_df(folder_path,image_dir,mask_dir):
    images_path = folder_path + image_dir
    masks_path = folder_path + mask_dir
    images = sorted([images_path + f for f in os.listdir(images_path)])
    masks =sorted([masks_path + f for f in os.listdir(masks_path)])
    data = {'image': images, 'mask': masks}
    return pd.DataFrame(data)

                                                                          

train_df = create_image_mask_df('/kaggle/input/duck-net-splits/Datasets/CVC-ClinicDB/train/',"images/","masks/")
valid_df = create_image_mask_df('/kaggle/input/duck-net-splits/Datasets/CVC-ClinicDB/validation/',"images/","masks/")
test_df = create_image_mask_df('/kaggle/input/duck-net-splits/Datasets/CVC-ClinicDB/test/',"images/","masks/")


print(len(train_df),len(valid_df),len(test_df))


In [None]:

x_train, y_train = load_data(img_size, img_size, train_df)
x_valid, y_valid = load_data(img_size, img_size, valid_df)
x_test, y_test = load_data(img_size, img_size, test_df)


print(x_train.shape,y_train.shape)
print(x_valid.shape,y_valid.shape)
print(x_test.shape,y_test.shape)

In [None]:
import albumentations as albu
from sklearn.metrics import jaccard_score, precision_score, recall_score, accuracy_score, f1_score

aug_train = albu.Compose([
    albu.HorizontalFlip(),
    albu.VerticalFlip(),
    albu.ColorJitter(brightness=(0.6,1.6), contrast=0.2, saturation=0.1, hue=0.01, always_apply=True),
    albu.Affine(scale=(0.5,1.5), translate_percent=(-0.125,0.125), rotate=(-180,180), shear=(-22.5,22), always_apply=True),
])


def augment_images():
    x_train_out = []
    y_train_out = []

    for i in range (len(x_train)):
        ug = aug_train(image=x_train[i], mask=y_train[i])
        x_train_out.append(ug['image'])  
        y_train_out.append(ug['mask'])

    return np.array(x_train_out), np.array(y_train_out)
    



In [None]:
#Train the model

import gc
import time
from tensorflow.keras.models import save_model
from sklearn.metrics import jaccard_score, precision_score, recall_score, accuracy_score, f1_score

EPOCHS = 5
min_loss_for_saving = 100000

for epoch in range(EPOCHS):

   
    print(f'Training, epoch {epoch}')

    image_augmented, mask_augmented = augment_images()    
    
    history = model.fit(x=image_augmented, y=mask_augmented, epochs=1, batch_size=b_size, verbose = 1)
    
    prediction_valid = model.predict(x_valid, verbose=1)
    loss_valid = dice_metric_loss(y_valid, prediction_valid).numpy()
    print("Loss Validation:", loss_valid)

    #loss_array[epoch+offset] = [history.history['loss'][0], loss_valid]
        
    dice_valid = f1_score(
        np.ndarray.flatten(np.array(y_valid, dtype=bool)),
        np.ndarray.flatten(prediction_valid > 0.5)
    )

    print("Dice Valid:", dice_valid)
    
    
    if min_loss_for_saving > loss_valid:
        min_loss_for_saving = loss_valid
        print("Saved model with val_loss:", loss_valid)
       
        np.savez("model_saved_on_best_val_loss.npz", *model.get_weights())
             


    
    del image_augmented, mask_augmented, prediction_valid
    gc.collect()
    tf.keras.backend.clear_session()


In [None]:

#Load the best model weight for inference on test set

weights = np.load("model_saved_on_best_val_loss.npz")

model = create_model(img_height=img_size, img_width=img_size, input_channels=3, out_classes=1, starting_filters=filters)  
model.compile(optimizer=tf.keras.optimizers.AdamW(learning_rate = 1e-4), loss=dice_metric_loss) 

model.set_weights([weights[f"arr_{i}"] for i in range(len(weights))])


In [None]:
from sklearn.metrics import jaccard_score, precision_score, recall_score, accuracy_score, f1_score

prediction_test = model.predict(x_test, batch_size=1)


loss_test = dice_metric_loss(y_test, prediction_test).numpy()
dice_test = f1_score(np.ndarray.flatten(np.array(y_test, dtype=bool)),np.ndarray.flatten(prediction_test > 0.5))
miou_test = jaccard_score(np.ndarray.flatten(np.array(y_test, dtype=bool)),np.ndarray.flatten(prediction_test > 0.5))


print(dice_test)
print(miou_test)
print(loss_test)

In [None]:
#To calculate the Hausdorff Distance
from scipy.spatial.distance import directed_hausdorff
from scipy.ndimage import binary_erosion
import numpy as np

def hausdorff_distance(y_true, y_pred):
    y_true = np.squeeze(y_true)
    y_pred = np.squeeze(y_pred) > 0.5

    def get_boundary(mask):
        return mask & ~binary_erosion(mask)

    distances = []
    for i in range(y_true.shape[0]):
        gt_coords = np.argwhere(get_boundary(y_true[i]))
        pred_coords = np.argwhere(get_boundary(y_pred[i]))
        if len(gt_coords) == 0 or len(pred_coords) == 0:
            distances.append(np.nan)
            continue
        hd1 = directed_hausdorff(gt_coords, pred_coords)[0]
        hd2 = directed_hausdorff(pred_coords, gt_coords)[0]
        distances.append(max(hd1, hd2))

    
    return np.nanmedian(distances)

print(hausdorff_distance(y_test, prediction_test))