# **# Import packets and libraries**

In [None]:
# Import packets and libraries
import os
import numpy as np
import cv2
from glob import glob
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Activation, BatchNormalization, LeakyReLU
from tensorflow.keras.layers import UpSampling2D, Input, Concatenate
from tensorflow.keras.models import Model
from tensorflow.keras.applications import MobileNetV2,ResNet50

from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.metrics import Recall, Precision
from tensorflow.keras import backend as K

# **# Import datasets**

# **# Pre-processing**

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

In [None]:
IMAGE_SIZE = 256 
EPOCHS = 30       
BATCH = 8         
LR = 1e-4         

PATH = "Dataset_Path"

In [None]:
images_arr = []
annotations_arr = []
for i in range(0,1469) :
    path = "Path to folder contain dataset"
    x = path + "/images/" + str(i+1) + ".JPG"
    y = path + "/annotations/" + str(i+1) + ".PNG"
    images_arr.append(x)
    annotations_arr.append(y)

# **# Import function**

In [None]:
def load_data(path, split=0.1):

    images = sorted(images_arr)
    masks = sorted(annotations_arr)

    total_size = len(images)                # 1469 total images
    valid_size = int(split * total_size)    # 146 images for validation
    test_size = int(split * total_size)     # 146 images for test / 1177 images for training

    train_x, valid_x = train_test_split(images, test_size=valid_size, random_state=42) 
    train_y, valid_y = train_test_split(masks, test_size=valid_size, random_state=42)  

    train_x, test_x = train_test_split(train_x, test_size=test_size, random_state=42)  
    train_y, test_y = train_test_split(train_y, test_size=test_size, random_state=42)  t

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

In [None]:
def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (IMAGE_SIZE, IMAGE_SIZE))
    x = x/255.0
    return x

def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (IMAGE_SIZE, IMAGE_SIZE))
    x = x/255.0
    (thresh, x) = cv2.threshold(x, 0, 1, cv2.THRESH_BINARY)
    x = np.expand_dims(x, axis=-1)
    return x

def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y
    x, y = tf.numpy_function(_parse, [x, y], [tf.float64, tf.float64])
    x.set_shape([IMAGE_SIZE, IMAGE_SIZE, 3]) 
    y.set_shape([IMAGE_SIZE, IMAGE_SIZE, 1]) 
    return x, y

def tf_dataset(x, y, batch=8):
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    dataset = dataset.map(tf_parse) # Tiền xử lý map & batch
    dataset = dataset.batch(batch)
    dataset = dataset.repeat()
    return dataset

In [None]:
(train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_data(PATH)

print("Training data: ", len(train_y))
print("Validation data: ", len(valid_x))
print("Testing data: ", len(test_x))

Training data:  1177
Validation data:  146
Testing data:  146


In [None]:
train_dataset = tf_dataset(train_x, train_y, batch=BATCH)
valid_dataset = tf_dataset(valid_x, valid_y, batch=BATCH)
test_dataset = tf_dataset(test_x, test_y, batch=BATCH)

# **# Model**

In [None]:
import tensorflow as tf
import tensorflow.keras.layers as L
from tensorflow.keras.models import Model
from tensorflow.keras.applications import VGG16
from keras.models import Model
from keras.layers import Input, Concatenate, Conv2D, MaxPooling2D

In [None]:
def conv_block(x, num_filters):
    x = L.Conv2D(num_filters, 3, padding="same")(x)
    x = L.BatchNormalization()(x)
    x = L.Activation("relu")(x)
    x = L.Conv2D(num_filters, 3, padding="same")(x)
    x = L.BatchNormalization()(x)
    x = L.Activation("relu")(x)
    return x

def encoder_block(x, num_filters):
    x = conv_block(x, num_filters)
    p = L.MaxPool2D((2, 2))(x)
    return x, p

def attention_gate(g, s, num_filters):
    Wg = L.Conv2D(num_filters, 1, padding="same")(g)
    Wg = L.BatchNormalization()(Wg)

    Ws = L.Conv2D(num_filters, 1, padding="same")(s)
    Ws = L.BatchNormalization()(Ws)

    out = L.Activation("relu")(Wg + Ws)
    out = L.Conv2D(num_filters, 1, padding="same")(out)
    out = L.Activation("sigmoid")(out)
    return out * s

def decoder_block(x, s, num_filters):
    x = L.UpSampling2D(interpolation="bilinear")(x)
    s = attention_gate(x, s, num_filters)
    x = L.Concatenate()([x, s])
    x = conv_block(x, num_filters)
    return x

def SPPF_module(x, k = 5):
    B, H, W, C = x.shape # Batch - Height - Width - Channel

    cv1 = Conv2D(C//2, (1,1), strides=(1,1), padding = 'valid')(x)
    cv1 = BatchNormalization()(cv1) 
    cv1 = tf.nn.silu(cv1) 
    mp1 = MaxPooling2D(pool_size = (k, k), strides = (1, 1), padding = 'same')(cv1) 
    mp2 = MaxPooling2D(pool_size = (k, k), strides = (1, 1), padding = 'same')(mp1)
    mp3 = MaxPooling2D(pool_size = (k, k), strides = (1, 1), padding = 'same')(mp2)

    out = Concatenate(axis=-1)([cv1, mp1, mp2, mp3])
    out = Conv2D(C, (1,1), strides=(1,1), padding = 'valid')(out)
    out = BatchNormalization()(out)
    out = tf.nn.silu(out)
    return out

def build_vgg16_attention_unet(input_shape):
    """ Input """
    inputs = Input(input_shape)

    """ Pre-trained VGG19 Model """
    vgg16 = VGG16(include_top = False, weights = "imagenet", input_tensor = inputs)

    """ Encoder """
    s1 = vgg16.get_layer("block1_conv2").output;
    s2 = vgg16.get_layer("block2_conv2").output;
    s3 = vgg16.get_layer("block3_conv3").output;
    s4 = vgg16.get_layer("block4_conv3").output;
    b1 = vgg16.get_layer("block5_conv3").output;
    b2 = SPPF_module(b1, k = 5);

    """ Bridge """
    # b1 = vgg19.get_layer("block5_conv4").output

    """ Decoder """
    d1 = decoder_block(b2, s4, 512);
    d2 = decoder_block(d1, s3, 256);
    d3 = decoder_block(d2, s2, 128);
    d4 = decoder_block(d3, s1, 64) ;

    """ Output """
    outputs = Conv2D(1, 1, padding="same", activation="sigmoid")(d4);
    model = Model(inputs, outputs)
    return model

In [None]:
input = (256, 256, 3)
model = build_vgg16_attention_unet(input) # OUTPUT (256, 256, 1)
model.summary()

In [None]:
import tensorflow as tf
import keras.backend as K
from keras.losses import binary_crossentropy, BinaryCrossentropy

beta = 0.25
alpha = 0.25
gamma = 2
epsilon = 1e-5
smooth = 1e-15

def dice_coef( y_true, y_pred):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + K.epsilon()) / (
            K.sum(y_true_f) + K.sum(y_pred_f) + K.epsilon())

def sensitivity(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    return true_positives / (possible_positives + K.epsilon())

def specificity( y_true, y_pred):
    true_negatives = K.sum(
        K.round(K.clip((1 - y_true) * (1 - y_pred), 0, 1)))
    possible_negatives = K.sum(K.round(K.clip(1 - y_true, 0, 1)))
    return true_negatives / (possible_negatives + K.epsilon())

def convert_to_logits(y_pred):
    y_pred = tf.clip_by_value(y_pred, tf.keras.backend.epsilon(),
                              1 - tf.keras.backend.epsilon())
    return tf.math.log(y_pred / (1 - y_pred))

def weighted_cross_entropyloss(y_true, y_pred):
    y_pred = convert_to_logits(y_pred)
    pos_weight = beta / (1 - beta)
    loss = tf.nn.weighted_cross_entropy_with_logits(logits=y_pred,
                                                    labels=y_true,
                                                    pos_weight=pos_weight)
    return tf.reduce_mean(loss)

def generalized_dice_coefficient( y_true, y_pred):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    score = (2. * intersection + smooth) / (
            K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
    return score

def dice_loss( y_true, y_pred):
    loss = 1 - generalized_dice_coefficient(y_true, y_pred)
    return loss

def confusion( y_true, y_pred):
    y_pred_pos = K.clip(y_pred, 0, 1)
    y_pred_neg = 1 - y_pred_pos
    y_pos = K.clip(y_true, 0, 1)
    y_neg = 1 - y_pos
    tp = K.sum(y_pos * y_pred_pos)
    fp = K.sum(y_neg * y_pred_pos)
    fn = K.sum(y_pos * y_pred_neg)
    prec = (tp + smooth) / (tp + fp + smooth)
    recall = (tp + smooth) / (tp + fn + smooth)
    return prec, recall

def true_positive(y_true, y_pred):
    y_pred_pos = K.round(K.clip(y_pred, 0, 1))
    y_pos = K.round(K.clip(y_true, 0, 1))
    tp = (K.sum(y_pos * y_pred_pos) + smooth) / (K.sum(y_pos) + smooth)
    return tp

def true_negative( y_true, y_pred):
    smooth = 1
    y_pred_pos = K.round(K.clip(y_pred, 0, 1))
    y_pred_neg = 1 - y_pred_pos
    y_pos = K.round(K.clip(y_true, 0, 1))
    y_neg = 1 - y_pos
    tn = (K.sum(y_neg * y_pred_neg) + smooth) / (K.sum(y_neg) + smooth)
    return tn

def jacard_similarity( y_true, y_pred):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    union = K.sum((y_true_f + y_pred_f) - (y_true_f * y_pred_f))
    return intersection / union

def jacard_loss(y_true, y_pred):
    return 1 - jacard_similarity(y_true, y_pred)

def ssim_loss(y_true, y_pred):
    return 1 - tf.image.ssim(y_true, y_pred, max_val=1)

def binary_dice(y_true, y_pred):
    return 0.5 *binary_crossentropy(y_true, y_pred) + dice_loss( y_true, y_pred)

def FocalLoss(y_true, y_pred):
    alpha = 0.26
    gamma = 2.3
    y_true = K.cast(y_true, 'float32')
    y_pred = K.cast(y_pred, 'float32')

    inputs = K.flatten(y_pred)
    targets = K.flatten(y_true)

    BCE = K.binary_crossentropy(targets, inputs)
    BCE_EXP = K.exp(-BCE)
    focal_loss = K.mean(alpha * K.pow((1-BCE_EXP), gamma) * BCE)

    return focal_loss
def joint_loss1(y_true, y_pred):
    focal_loss1 = FocalLoss(y_true, y_pred)
    ms_ssim_loss1 = ssim_loss(y_true, y_pred)
    jacard_loss1 = jacard_loss(y_true, y_pred)
    loss =  (focal_loss1 + ms_ssim_loss1 + jacard_loss1)/3
    return loss

# **# Compile Model**

In [None]:
from tensorflow.keras.callbacks import ModelCheckpoint

checkpoint_path = "Save-epoch-path.{epoch:02d}.h5"

checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_path,
    monitor='val_loss',
    verbose=1,
    save_best_only=False,
    save_weights_only=True,
    mode='auto',
    save_freq='epoch'
)

callbacks = [
    ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=4),
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=False),
    checkpoint_callback
]

In [None]:
train_steps = len(train_x) // BATCH
valid_steps = len(valid_x) // BATCH

if len(train_x) % BATCH != 0:
    train_steps += 1
if len(valid_x) % BATCH != 0:
    valid_steps += 1

print(train_steps)
print(valid_steps)

148
19


In [None]:
opt = tf.keras.optimizers.Nadam(LR)
metrics = [dice_coef, jacard_similarity, tf.keras.metrics.Recall(), tf.keras.metrics.Precision()]

model.compile(loss=joint_loss1, optimizer=opt, metrics=metrics)

In [None]:
opt = tf.keras.optimizers.Nadam(LR)
metrics = [dice_coef, jacard_similarity, tf.keras.metrics.Recall(), tf.keras.metrics.Precision()]

model.compile(loss=joint_loss1, optimizer=opt, metrics=metrics)

history = model.fit(
    train_dataset,
    validation_data=valid_dataset,
    epochs=30,
    steps_per_epoch=train_steps,
    validation_steps=valid_steps,
    callbacks=callbacks
)

In [None]:
model_path = 'Best-weight'
model.load_weights(model_path)

In [None]:
test_dataset = tf_dataset(test_x, test_y, batch=BATCH)

test_steps = (len(test_x)//BATCH)
if len(test_x) % BATCH != 0:
    test_steps += 1
model.evaluate(test_dataset, steps=test_steps)

In [None]:
img_path = cv2.imread('ultrasound-image-path')
img_pred = model.predict(np.expand_dims(img_path, axis = 0))[0] >0.5