In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import cv2
from glob import glob
import tensorflow as tf
import tf_keras as keras
import keras.backend as K
import tf_keras.layers as L
from tf_keras import backend as K
from tf_keras.models import Model
from tf_keras.layers import Input, Conv2D, MaxPooling2D, MaxPool2D, Add, Dropout, Concatenate,concatenate, Conv2DTranspose, Dense, Reshape, Flatten, Softmax, Lambda, UpSampling2D, AveragePooling2D, Activation, BatchNormalization, GlobalAveragePooling2D, SeparableConv2D,Multiply
from tf_keras.optimizers import Adam
from tf_keras.metrics import MeanIoU
from tf_keras.utils import to_categorical
from tf_keras.metrics import BinaryAccuracy, Precision, Recall
from tf_keras.applications import ResNet50
from sklearn.model_selection import train_test_split
from tf_keras.preprocessing.image import load_img, img_to_array
from sklearn.model_selection import train_test_split

Building the proposed model ( U-Net (MobileNetV2 as encoder) + Effective Edge Detection (EED) technique + Bit-Plane Attention (BPA) technique + Combined loss of dice loss and BCE loss )

In [None]:
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import Conv2D, Input, UpSampling2D, concatenate, GlobalAveragePooling2D, Reshape, Dense, Multiply, Activation, BatchNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.layers import ReLU, Add
import tensorflow as tf
from tensorflow.keras import layers

# BitPlaneAttention Block
def BitPlaneAttention(input_tensor):
    channels = input_tensor.shape[-1]
    x = GlobalAveragePooling2D()(input_tensor)
    x = Dense(channels // 4, activation='relu')(x)
    x = Dense(channels, activation='sigmoid')(x)
    x = Reshape((1, 1, channels))(x)
    return Multiply()([input_tensor, x])

# Edge Prediction Network
class EdgePredictionNet(tf.keras.Model):
    def __init__(self, in_channels):
        super(EdgePredictionNet, self).__init__()
        self.conv1 = layers.Conv2D(64, kernel_size=3, padding='same', input_shape=(None, None, in_channels))
        self.conv2 = layers.Conv2D(32, kernel_size=3, padding='same')
        self.conv3 = layers.Conv2D(1, kernel_size=1, padding='same')  # Single channel for edge map
        self.relu = layers.ReLU()

    def call(self, x):
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        edge_map = tf.nn.sigmoid(self.conv3(x))  # Normalize edge map to [0, 1]
        return edge_map

class EdgeRefinementModule(tf.keras.Model):
    def __init__(self, in_channels):
        super(EdgeRefinementModule, self).__init__()
        self.conv1 = layers.Conv2D(64, kernel_size=3, padding='same')
        self.conv2 = layers.Conv2D(32, kernel_size=3, padding='same')
        self.conv3 = layers.Conv2D(1, kernel_size=1, padding='same')  # Refined edge map
        self.relu = layers.ReLU()

    def call(self, x, edge_map):
        # Concatenate feature map and edge map
        refined_input = tf.concat([x, edge_map], axis=-1)
        x = self.relu(self.conv1(refined_input))
        x = self.relu(self.conv2(x))
        refined_edge_map = tf.nn.sigmoid(self.conv3(x))
        return refined_edge_map

class NeuralEdgeRefinement(tf.keras.Model):
    def __init__(self, in_channels):
        super(NeuralEdgeRefinement, self).__init__()
        self.epn = EdgePredictionNet(in_channels)
        self.refinement = EdgeRefinementModule(in_channels)

    def call(self, x):
        # Step 1: Predict initial edge map
        coarse_edge_map = self.epn(x)
        # Step 2: Refine the edge map
        refined_edge_map = self.refinement(x, coarse_edge_map)
        return refined_edge_map

class EdgeGuidedAttention(tf.keras.Model):
    def __init__(self, in_channels, out_channels):
        super(EdgeGuidedAttention, self).__init__()
        self.refinement = NeuralEdgeRefinement(in_channels)
        self.spatial_attention = layers.Conv2D(1, kernel_size=3, padding='same')

    def call(self, x):
        # Get refined edge map
        refined_edge_map = self.refinement(x)
        # Compute spatial attention weights
        attention_weights = tf.nn.sigmoid(self.spatial_attention(refined_edge_map))
        # Apply attention
        output = x * attention_weights
        return output, refined_edge_map


# U-Net with MobileNetV2 and BitPlaneAttention
def unet_with_mobilenet_and_bitplane(input_shape=(512, 512, 3), num_classes=1, activation='relu', output_activation='sigmoid', batch_norm=True):
    inputs = Input(shape=input_shape)

    # Use MobileNetV2 as the encoder (backbone)
    mobilenet = MobileNetV2(input_tensor=inputs, include_top=False, weights='imagenet')

    # Extract feature maps at different stages
    conv1 = mobilenet.get_layer('block_1_expand_relu').output   # Size: 256x256
    conv2 = mobilenet.get_layer('block_3_expand_relu').output   # Size: 128x128
    conv3 = mobilenet.get_layer('block_6_expand_relu').output   # Size: 64x64
    conv4 = mobilenet.get_layer('block_13_expand_relu').output  # Size: 32x32
    center = mobilenet.get_layer('block_16_project').output     # Size: 16x16

    # Apply BitPlaneAttention to the bottleneck
    center = BitPlaneAttention(center)
    center, _ = EdgeGuidedAttention(in_channels=center.shape[-1], out_channels=center.shape[-1])(center)

    # Decoder part (upsampling) with BitPlaneAttention and Edge Refinement in skip connections
    up4 = UpSampling2D(size=(2, 2))(center)
    up4 = concatenate([up4, conv4], axis=-1)
    up4 = BitPlaneAttention(up4)
    up4, _ = EdgeGuidedAttention(in_channels=up4.shape[-1], out_channels=up4.shape[-1])(up4)
    up4 = Conv2D(512, (3, 3), padding='same', activation=activation)(up4)
    if batch_norm:
        up4 = BatchNormalization()(up4)

    up3 = UpSampling2D(size=(2, 2))(up4)
    up3 = concatenate([up3, conv3], axis=-1)
    up3 = BitPlaneAttention(up3)
    up3, _ = EdgeGuidedAttention(in_channels=up3.shape[-1], out_channels=up3.shape[-1])(up3)
    up3 = Conv2D(256, (3, 3), padding='same', activation=activation)(up3)
    if batch_norm:
        up3 = BatchNormalization()(up3)

    up2 = UpSampling2D(size=(2, 2))(up3)
    up2 = concatenate([up2, conv2], axis=-1)
    up2 = BitPlaneAttention(up2)
    up2, _ = EdgeGuidedAttention(in_channels=up2.shape[-1], out_channels=up2.shape[-1])(up2)
    up2 = Conv2D(128, (3, 3), padding='same', activation=activation)(up2)
    if batch_norm:
        up2 = BatchNormalization()(up2)

    up1 = UpSampling2D(size=(2, 2))(up2)
    up1 = concatenate([up1, conv1], axis=-1)
    up1 = BitPlaneAttention(up1)
    up1, _ = EdgeGuidedAttention(in_channels=up1.shape[-1], out_channels=up1.shape[-1])(up1)
    up1 = Conv2D(64, (3, 3), padding='same', activation=activation)(up1)
    if batch_norm:
        up1 = BatchNormalization()(up1)

    # Final Upsampling to match the input size
    final_upsample = UpSampling2D(size=(2, 2))(up1)
    outputs = Conv2D(num_classes, (1, 1), activation=output_activation)(final_upsample)

    model = Model(inputs=inputs, outputs=outputs)
    return model

# Example usage
#model = unet_with_mobilenet_and_bitplane()


Definition of Evaluation metrics

In [None]:
def dice_score(y_true, y_pred):
    smooth = K.epsilon()
    y_true_flat = K.flatten(K.cast(y_true, 'float32'))
    y_pred_flat = K.flatten(y_pred)
    intersection = K.sum(y_true_flat * y_pred_flat)
    score = (2. * intersection + smooth) / (K.sum(y_true_flat) + K.sum(y_pred_flat) + smooth)
    return score

def iou(y_true, y_pred):
    smooth = K.epsilon()
    y_true_flat = K.flatten(K.cast(y_true, 'float32'))
    y_pred_flat = K.flatten(y_pred)
    intersection = K.sum(y_true_flat * y_pred_flat)
    union = K.sum(y_true_flat) + K.sum(y_pred_flat) - intersection + smooth
    iou = (intersection + smooth) / union
    return iou

def recall(y_true, y_pred):
    smooth = K.epsilon()
    y_pred_pos = K.round(K.clip(y_pred, 0, 1))
    y_true_flat = K.flatten(K.cast(y_true, 'float32'))
    y_pred_flat = K.flatten(y_pred_pos)
    tp = K.sum(y_true_flat * y_pred_flat)
    fn = K.sum(y_true_flat * (1 - y_pred_flat))
    recall = (tp + smooth) / (tp + fn + smooth)
    return recall

def precision(y_true, y_pred):
    smooth = K.epsilon()
    y_pred_pos = K.round(K.clip(y_pred, 0, 1))
    y_true_flat = K.flatten(K.cast(y_true, 'float32'))
    y_pred_flat = K.flatten(y_pred_pos)
    tp = K.sum(y_true_flat * y_pred_flat)
    fp = K.sum((1 - y_true_flat) * y_pred_flat)
    precision = (tp + smooth) / (tp + fp + smooth)
    return precision

Definition of loss function

In [None]:
def dice_loss(y_true, y_pred):
    loss = 1 - dice_score(y_true, y_pred)
    return loss

def bce_loss(y_true, y_pred):
    y_true = tf.cast(y_true, tf.float32)
    loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=y_true, logits=y_pred))
    return loss

# Hybrid Loss
def combined_loss(y_true, y_pred):
    dice = dice_loss(y_true, y_pred)
    bce = bce_loss(y_true, y_pred)
    total_loss = dice+bce
    return total_loss
    

In [None]:

# Define the input shape and number of classes for the model
input_shape = (512, 512, 3)  # Example input shape for grayscale images
num_classes = 1  # Binary segmentation

Building the model and compiling it

In [None]:
#model = unet_model()
from tensorflow.keras.optimizers import Adam

model = unet_with_mobilenet_and_bitplane(input_shape, num_classes)
optimizer =tf.keras.optimizers.Nadam(learning_rate=0.00001)
model.compile(loss=combined_loss, metrics=["accuracy", dice_score, recall, precision, iou], optimizer=optimizer)
model.summary()

Dataloader for ISIC 2016

In [None]:

class ISIC_2016:
    def __init__(self, data_dir, image_size=(256, 256), batch_size=8, mode='train'):
        self.data_dir = data_dir
        self.image_size = image_size
        self.batch_size = batch_size
        self.mode = mode

        # Load CSV files based on the mode
        if self.mode == 'train':
            csv_filename = "/kaggle/input/isic-2016-dataset/train_ISIC_2016.csv"
        elif self.mode == 'val':
            csv_filename = "/kaggle/input/isic-2016-dataset/val_ISIC_2016.csv"
        elif self.mode == 'test':
            csv_filename = "/kaggle/input/isic-2016-dataset/test_ISIC_2016.csv"

        self.csv_path = os.path.join(self.data_dir, csv_filename)
        self.df = pd.read_csv(self.csv_path)


        if self.mode == 'test':
        # Define image and mask paths
            self.image_path = os.path.join(self.data_dir, "ISBI2016_ISIC_Part1_Test_Data")
            self.mask_path = os.path.join(self.data_dir, "ISBI2016_ISIC_Part1_Test_GroundTruth")
        else:
            self.image_path = os.path.join(self.data_dir, "ISBI2016_ISIC_Part1_Training_Data")
            self.mask_path = os.path.join(self.data_dir, "ISBI2016_ISIC_Part1_Training_GroundTruth")

        # Initialize the current batch index to 0
        self.current_batch_index = 0

    def __len__(self):
        return int(np.ceil(len(self.df) / float(self.batch_size)))

    def __iter__(self):
        while self.current_batch_index < len(self.df):
            batch_images = []
            batch_masks = []
            
            for i in range(self.current_batch_index, min(self.current_batch_index + self.batch_size, len(self.df))):
                image_name = self.df.iloc[i]['Image_Id']
                mask_name = self.df.iloc[i]['Image_Id'][:-4]+'_Segmentation'+'.png'

                image = load_img(os.path.join(self.image_path, image_name), target_size=self.image_size)
                mask = load_img(os.path.join(self.mask_path, mask_name), target_size=self.image_size, color_mode='grayscale')

                image_arr = img_to_array(image) / 255.0
                mask_arr = img_to_array(mask) / 255.0

                batch_images.append(image_arr)
                batch_masks.append(mask_arr)

            batch_images = np.array(batch_images)
            batch_masks = np.array(batch_masks)

            # Update the current batch index for the next iteration
            self.current_batch_index += self.batch_size

            yield batch_images, batch_masks

        # Reset the current batch index at the end of one epoch
        self.current_batch_index = 0

Dataloader for cross dataset ( ISIC 2016 train set for training, ISIC 2016 validation set for validation and PH2 test set for testing

In [None]:

class ISIC_2016_PH2:
    def __init__(self, data_dir, image_size=(256, 256), batch_size=8, mode='train'):
        self.data_dir = data_dir
        self.image_size = image_size
        self.batch_size = batch_size
        self.mode = mode

        # Load CSV files based on the mode
        if self.mode == 'train':
            csv_filename = "/kaggle/input/isic-2016-dataset/train_ISIC_2016.csv"
        elif self.mode == 'val':
            csv_filename = "/kaggle/input/isic-2016-dataset/val_ISIC_2016.csv"
        elif self.mode == 'test':
            csv_filename = "/kaggle/input/isic-2017-segmentation/test_ph2.csv"

        self.csv_path = os.path.join(self.data_dir, csv_filename)
        self.df = pd.read_csv(self.csv_path)


        if self.mode == 'test':
        # Define image and mask paths
            # Define image and mask paths
            self.image_path = os.path.join(self.data_dir, "Images")
            self.mask_path = os.path.join(self.data_dir, "Masks")
        else:
            self.image_path = os.path.join(self.data_dir, "ISBI2016_ISIC_Part1_Training_Data")
            self.mask_path = os.path.join(self.data_dir, "ISBI2016_ISIC_Part1_Training_GroundTruth")

        # Initialize the current batch index to 0
        self.current_batch_index = 0

    def __len__(self):
        return int(np.ceil(len(self.df) / float(self.batch_size)))

    def __iter__(self):
        while self.current_batch_index < len(self.df):
            batch_images = []
            batch_masks = []
            if self.mode =='test':
                
                for i in range(self.current_batch_index, min(self.current_batch_index + self.batch_size, len(self.df))):
                    image_name = self.df.iloc[i]['Image_Name']
                    mask_name = self.df.iloc[i]['Image_Name'][:-4]+'_lesion'+'.bmp'
    
                    image = load_img(os.path.join(self.image_path, image_name), target_size=self.image_size)
                    mask = load_img(os.path.join(self.mask_path, mask_name), target_size=self.image_size, color_mode='grayscale')
    
                    image_arr = img_to_array(image) / 255.0
                    mask_arr = img_to_array(mask) / 255.0
    
                    batch_images.append(image_arr)
                    batch_masks.append(mask_arr)
            else:
                for i in range(self.current_batch_index, min(self.current_batch_index + self.batch_size, len(self.df))):
                    image_name = self.df.iloc[i]['Image_Id']
                    mask_name = self.df.iloc[i]['Image_Id'][:-4]+'_Segmentation'+'.png'
    
                    image = load_img(os.path.join(self.image_path, image_name), target_size=self.image_size)
                    mask = load_img(os.path.join(self.mask_path, mask_name), target_size=self.image_size, color_mode='grayscale')
    
                    image_arr = img_to_array(image) / 255.0
                    mask_arr = img_to_array(mask) / 255.0
    
                    batch_images.append(image_arr)
                    batch_masks.append(mask_arr)


            batch_images = np.array(batch_images)
            batch_masks = np.array(batch_masks)

            # Update the current batch index for the next iteration
            self.current_batch_index += self.batch_size

            yield batch_images, batch_masks

        # Reset the current batch index at the end of one epoch
        self.current_batch_index = 0

Calling the dataset loader function

In [None]:
import os
import pandas as pd
import tensorflow as tf
#from some_module import ISIC_2016  # Replace with the actual module if different

# Initialize the data loaders
train_data_loader = ISIC_2016_PH2(data_dir='/kaggle/input/isic-2016-dataset/ISIC_2016_dataset/ISIC_2016_dataset', image_size=(512, 512), batch_size=4, mode='train')
val_data_loader = ISIC_2016_PH2(data_dir='/kaggle/input/isic-2016-dataset/ISIC_2016_dataset/ISIC_2016_dataset', image_size=(512, 512), batch_size=4, mode='val')
test_data_loader = ISIC_2016_PH2(data_dir='/kaggle/input/isic-2017-segmentation/Arranged_PH2_dataset/Arranged_PH2_dataset', image_size=(512, 512), batch_size=4, mode='test')

# Create the datasets
train_dataset = tf.data.Dataset.from_generator(
    lambda: train_data_loader,
    output_signature=(
        tf.TensorSpec(shape=(None, 512, 512, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(None, 512, 512, 1), dtype=tf.float32)
    )
).repeat()  # Add repeat() to the dataset

val_dataset = tf.data.Dataset.from_generator(
    lambda: val_data_loader,
    output_signature=(
        tf.TensorSpec(shape=(None, 512, 512, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(None, 512, 512, 1), dtype=tf.float32)
    )
).repeat()  # Add repeat() to the dataset

test_dataset = tf.data.Dataset.from_generator(
    lambda: test_data_loader,
    output_signature=(
        tf.TensorSpec(shape=(None, 512, 512, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(None, 512, 512, 1), dtype=tf.float32)
    )
).repeat()  # Add repeat() to the dataset

# Calculate steps per epoch
train_steps_per_epoch = len(train_data_loader) // train_data_loader.batch_size
val_steps_per_epoch = len(val_data_loader) // val_data_loader.batch_size

# Define the model (example)
#model = tf.keras.models.Sequential([
#    tf.keras.layers.InputLayer(input_shape=(512, 512, 3)),
#    tf.keras.layers.Conv2D(32, (3, 3), activation='relu'),
#    tf.keras.layers.MaxPooling2D((2, 2)),
#    tf.keras.layers.Flatten(),
#    tf.keras.layers.Dense(10, activation='softmax')3])

# Compile the model
#model.compile(optimizer='Nadam', loss=combined_loss, metrics=["accuracy", dice_score, recall, precision, iou])

#model.summary()

Model Training

In [None]:
# Create a callback that saves the model's weights
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath='/kaggle/working/model_weight.weights.h5',  # Where to save the model file
    save_weights_only=True,  # Only save the model's weights
    monitor='val_dice_score',  # Metric to monitor
    mode='max',  # Mode can be 'min', 'max', or 'auto'
    save_best_only=True  # Save only the best model
)

# Train the model with the checkpoint callback
history=model.fit(
    train_dataset,
    validation_data=val_dataset,
    steps_per_epoch=train_steps_per_epoch,
    validation_steps=val_steps_per_epoch,
    epochs=500,
    callbacks=[checkpoint_callback]  # Include the checkpoint callback
)


Loading the best weights of parameters of the proposed model after training

In [None]:
# Load the best weights later
model.load_weights('/kaggle/working/model_weight.weights.h5')

In [None]:
test_steps_per_epoch = len(test_data_loader) // test_data_loader.batch_size

Evaluation on the test set

In [None]:
# Evaluate the model on test data
model.evaluate(test_dataset, steps=test_steps_per_epoch)

# Make predictions on new data
predictions = model.predict(test_dataset, steps=test_steps_per_epoch)

In [None]:
import matplotlib.pyplot as plt

Displaying the accuracy curve

In [None]:
# Plot training & validation accuracy values
plt.figure(figsize=(12, 6))
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.grid(True)
plt.show()

Displaying the loss curve

In [None]:
# Plot training & validation loss values
plt.figure(figsize=(12, 6))
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.grid(True)
plt.show()