In [None]:
# # build uNet model
# from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input
# from tensorflow.keras.models import Model

# def conv_block(inputs, num_filters):
#     x = Conv2D(num_filters, 3, padding="same")(inputs)
#     x = BatchNormalization()(x)
#     x = Activation("relu")(x)

#     x = Conv2D(num_filters, 3, padding="same")(x)
#     x = BatchNormalization()(x)
#     x = Activation("relu")(x)

#     return x

# def encoder_block(inputs, num_filters):
#     x = conv_block(inputs, num_filters)
#     p = MaxPool2D((2, 2))(x)
#     return x, p

# def decoder_block(inputs, skip_features, num_filters):
#     x = Conv2DTranspose(num_filters, 2, strides=2, padding="same")(inputs)
#     x = Concatenate()([x, skip_features])
#     x = conv_block(x, num_filters)
#     return x

# def build_unet(input_shape):
#     inputs = Input(input_shape)

#     s1, p1 = encoder_block(inputs, 64)
#     s2, p2 = encoder_block(p1, 128)
#     s3, p3 = encoder_block(p2, 256)
#     # s4, p4 = encoder_block(p3, 512)

#     # print(s1.shape, s2.shape, s3.shape, s4.shape)
#     # print(p1.shape, p2.shape, p3.shape, p4.shape)
    
#     b1 = conv_block(p3, 512)

#     # d1 = decoder_block(b1, s4, 512)
#     d2 = decoder_block(b1, s3, 256)
#     d3 = decoder_block(d2, s2, 128)
#     d4 = decoder_block(d3, s1, 64)

#     outputs = Conv2D(1, 1, padding="same", activation="sigmoid")(d4)

#     model = Model(inputs, outputs, name="UNET")
#     return model

# if __name__ == "__main__":
#     input_shape = (256, 256, 3)
#     model = build_unet(input_shape)
#     model.summary()

In [5]:
import tensorflow as tf
from tensorflow.keras.applications import MobileNet
from tensorflow.keras.layers import (
    Input, Conv2D, Conv2DTranspose, MaxPooling2D, concatenate, BatchNormalization, Activation
)
from tensorflow.keras.models import Model


def unet_mobilenet(input_shape=(512, 512, 3), num_classes=1):
    """
    UNet model with MobileNet as the encoder backbone.

    Args:
        input_shape (tuple): Shape of the input image (height, width, channels).
        num_classes (int): Number of output classes for segmentation.

    Returns:
        keras.Model: UNet model with MobileNet backbone.
    """
    inputs = Input(input_shape)

    mobilenet = MobileNet(input_tensor=inputs, include_top=False, weights="imagenet")

    skip1 = mobilenet.get_layer("conv_pw_1_relu").output  
    skip2 = mobilenet.get_layer("conv_pw_3_relu").output 
    skip3 = mobilenet.get_layer("conv_pw_5_relu").output 
    bottleneck = mobilenet.get_layer("conv_pw_11_relu").output  

    # Decoder: Up-sampling and concatenation
    up1 = Conv2DTranspose(256, (3, 3), strides=(2, 2), padding="same")(bottleneck)
    up1 = concatenate([up1, skip3])
    up1 = Conv2D(256, (3, 3), padding="same")(up1)
    up1 = BatchNormalization()(up1)
    up1 = Activation("relu")(up1)

    up2 = Conv2DTranspose(128, (3, 3), strides=(2, 2), padding="same")(up1)
    up2 = concatenate([up2, skip2])
    up2 = Conv2D(128, (3, 3), padding="same")(up2)
    up2 = BatchNormalization()(up2)
    up2 = Activation("relu")(up2)

    up3 = Conv2DTranspose(64, (3, 3), strides=(2, 2), padding="same")(up2)
    up3 = concatenate([up3, skip1])
    up3 = Conv2D(64, (3, 3), padding="same")(up3)
    up3 = BatchNormalization()(up3)
    up3 = Activation("relu")(up3)

    up4 = Conv2DTranspose(32, (3, 3), strides=(2, 2), padding="same")(up3)
    up4 = Conv2D(32, (3, 3), padding="same")(up4)
    up4 = BatchNormalization()(up4)
    up4 = Activation("relu")(up4)

    outputs = Conv2D(num_classes, (1, 1), activation="sigmoid")(up4)
    model = Model(inputs, outputs, name="UNet-MobileNet")
    return model


input_shape = (256, 256, 3)
num_classes = 1
model = unet_mobilenet(input_shape=input_shape, num_classes=num_classes)
model.summary()

Model: "UNet-MobileNet"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 256, 256, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv1 (Conv2D)                 (None, 128, 128, 32  864         ['input_2[0][0]']                
                                )                                                                 
                                                                                                  
 conv1_bn (BatchNormalization)  (None, 128, 128, 32  128         ['conv1[0][0]']                  
                                )                                                    

In [6]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import backend as K

# Smoothing factor to avoid division by zero
smooth = 1e-15

# Dice Coefficient
def dice_coef(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

# Dice Loss
def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)

# Intersection over Union (IoU)
def iou(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) - intersection
    return (intersection + smooth) / (union + smooth)

# Precision
def precision(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    true_positives = tf.reduce_sum(y_true * y_pred)
    predicted_positives = tf.reduce_sum(y_pred)
    return (true_positives + smooth) / (predicted_positives + smooth)

# Recall
def recall(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    true_positives = tf.reduce_sum(y_true * y_pred)
    possible_positives = tf.reduce_sum(y_true)
    return (true_positives + smooth) / (possible_positives + smooth)

# F1 Score
def f1_score(y_true, y_pred):
    prec = precision(y_true, y_pred)
    rec = recall(y_true, y_pred)
    return (2. * prec * rec + smooth) / (prec + rec + smooth)

In [None]:
import os
import numpy as np
import cv2
from glob import glob
from sklearn.utils import shuffle
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping, TensorBoard
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split

""" Global parameters """
H = 256
W = 256

def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def load_dataset(path, split=0.2):
    images = sorted(glob(os.path.join(path, "images", "*.png")))
    masks = sorted(glob(os.path.join(path, "masks", "*.png")))
    split_size = int(len(images) * split)
    train_x, valid_x = train_test_split(images, test_size=split_size, random_state=42)
    train_y, valid_y = train_test_split(masks, test_size=split_size, random_state=42)

    train_x, test_x = train_test_split(train_x, test_size=split_size, random_state=42)
    train_y, test_y = train_test_split(train_y, test_size=split_size, random_state=42)
    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (W, H))
    x = x / 255.0
    x = x.astype(np.float32)
    return x

def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)  ## (h, w)
    x = cv2.resize(x, (W, H))   ## (h, w)
    x = x / 255.0              
    x = x.astype(np.float32)  
    x = np.expand_dims(x, axis=-1)
    return x

def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([H, W, 3])
    y.set_shape([H, W, 1])
    return x, y

def tf_dataset(X, Y, batch=2):
    dataset = tf.data.Dataset.from_tensor_slices((X, Y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(10)
    return dataset

if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)

    """ Directory for storing files """
    create_dir("files")

    """ Hyperparameters """
    batch_size = 4
    lr = 1e-4
    num_epochs = 200
    model_path = os.path.join("files", "model.h5")
    csv_path = os.path.join("files", "log.csv")

    """ Dataset """
    dataset_path = "./DATASET/"
    (train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_dataset(dataset_path)

    print(f"Train: {len(train_x)} - {len(train_y)}")
    print(f"Valid: {len(valid_x)} - {len(valid_y)}")
    print(f"Test : {len(test_x)} - {len(test_y)}")

    train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
    valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)

    """Model"""
    model = unet_mobilenet((H, W, 3))
    model.compile(loss=dice_loss, optimizer=Adam(lr), metrics=[dice_coef])

    callbacks = [
        ModelCheckpoint(model_path, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-7, verbose=1),
        CSVLogger(csv_path),
        EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False),
    ]

    model.fit(
        train_dataset,
        epochs=num_epochs,
        validation_data=valid_dataset,
        callbacks=callbacks,
        verbose=1
    )

Train: 1840 - 1840
Valid: 612 - 612
Test : 612 - 612
Epoch 1/200
Epoch 1: val_loss improved from inf to 0.67992, saving model to files\model.h5
Epoch 2/200
Epoch 2: val_loss improved from 0.67992 to 0.51271, saving model to files\model.h5
Epoch 3/200
Epoch 3: val_loss improved from 0.51271 to 0.36164, saving model to files\model.h5
Epoch 4/200
Epoch 4: val_loss improved from 0.36164 to 0.29763, saving model to files\model.h5
Epoch 5/200
Epoch 5: val_loss improved from 0.29763 to 0.24356, saving model to files\model.h5
Epoch 6/200
Epoch 6: val_loss improved from 0.24356 to 0.23177, saving model to files\model.h5
Epoch 7/200
Epoch 7: val_loss did not improve from 0.23177
Epoch 8/200
Epoch 8: val_loss improved from 0.23177 to 0.21109, saving model to files\model.h5
Epoch 9/200
Epoch 9: val_loss improved from 0.21109 to 0.20593, saving model to files\model.h5
Epoch 10/200
Epoch 10: val_loss improved from 0.20593 to 0.20142, saving model to files\model.h5
Epoch 11/200
Epoch 11: val_loss imp

In [None]:

import os

import numpy as np
import cv2
import pandas as pd
from glob import glob
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.utils import CustomObjectScope
from sklearn.metrics import f1_score, jaccard_score, precision_score, recall_score
from sklearn.model_selection import train_test_split

""" Global parameters """
H = 256
W = 256

""" Creating a directory """
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def save_results(image, mask, y_pred, save_image_path):
    mask = np.expand_dims(mask, axis=-1)
    mask = np.concatenate([mask, mask, mask], axis=-1)

    y_pred = np.expand_dims(y_pred, axis=-1)
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis=-1)
    y_pred = y_pred * 255

    line = np.ones((H, 10, 3)) * 255

    cat_images = np.concatenate([image, line, mask, line, y_pred], axis=1)
    cv2.imwrite(save_image_path, cat_images)


if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)

    """ Directory for storing files """
    create_dir("results")

    """ Load the model """
    with CustomObjectScope({"dice_coef": dice_coef, "dice_loss": dice_loss}):
        model = tf.keras.models.load_model(os.path.join("files", "model.h5"))

    """ Dataset """
    dataset_path = "./DATASET/"
    (train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_dataset(dataset_path)

    """ Prediction and Evaluation """
    SCORE = []
    for x, y in tqdm(zip(test_x, test_y), total=len(test_y)):
        """ Extracting the name """
        name = x.split("/")[-1]

        """ Reading the image """
        image = cv2.imread(x, cv2.IMREAD_COLOR) ## [H, w, 3]
        image = cv2.resize(image, (W, H))       ## [H, w, 3]
        x = image/255.0                         ## [H, w, 3]
        x = np.expand_dims(x, axis=0)           ## [1, H, w, 3]

        """ Reading the mask """
        mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, (W, H))

        """ Prediction """
        y_pred = model.predict(x, verbose=0)[0]
        y_pred = np.squeeze(y_pred, axis=-1)
        y_pred = y_pred >= 0.5
        y_pred = y_pred.astype(np.int32)

        """ Saving the prediction """
        save_image_path = os.path.join("results", name)
        save_results(image, mask, y_pred, save_image_path)

        """ Flatten the array """
        mask = mask/255.0
        mask = (mask > 0.5).astype(np.int32).flatten()
        y_pred = y_pred.flatten()

        """ Calculating the metrics values """
        f1_value = f1_score(mask, y_pred, labels=[0, 1], average="binary")
        jac_value = jaccard_score(mask, y_pred, labels=[0, 1], average="binary")
        recall_value = recall_score(mask, y_pred, labels=[0, 1], average="binary", zero_division=0)
        precision_value = precision_score(mask, y_pred, labels=[0, 1], average="binary", zero_division=0)
        SCORE.append([name, f1_value, jac_value, recall_value, precision_value])

    """ Metrics values """
    score = [s[1:]for s in SCORE]
    score = np.mean(score, axis=0)
    print(f"F1: {score[0]:0.5f}")
    print(f"Jaccard: {score[1]:0.5f}")
    print(f"Recall: {score[2]:0.5f}")
    print(f"Precision: {score[3]:0.5f}")

    df = pd.DataFrame(SCORE, columns=["Image", "F1", "Jaccard", "Recall", "Precision"])
    df.to_csv("files/score.csv")

ValueError: Unknown metric function: iou. Please ensure this object is passed to the `custom_objects` argument. See https://www.tensorflow.org/guide/keras/save_and_serialize#registering_the_custom_object for details.