In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
import zipfile
local_zip = '/content/drive/MyDrive/human-img-seg.zip'
zip_ref = zipfile.ZipFile(local_zip, 'r')
zip_ref.extractall('dataset')
zip_ref.close()

# Import dependencies

In [None]:
import os 
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tqdm import tqdm
import tensorflow.data as tfd
import matplotlib.pyplot as plt
from tensorflow.keras import layers
from tensorflow.keras import callbacks
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.utils import plot_model
from typing import List, Tuple, Union

# Load and prepare Dataset 

In [None]:
image_train_dir = "/content/dataset/training/sample"
mask_train_dir = "/content/dataset/training/mask"

train_img_paths = sorted(
    [os.path.join(image_train_dir, fname)
     for fname in os.listdir(image_train_dir)
     if fname.endswith(".png")])
train_mask_paths = sorted(
    [os.path.join(mask_train_dir, fname)
     for fname in os.listdir(mask_train_dir)
     if fname.endswith(".png") and not fname.startswith(".")])

In [None]:
image_test_dir = "/content/dataset/testing/sample"
mask_test_dir = "/content/dataset/testing/mask"

test_img_paths = sorted(
    [os.path.join(image_test_dir, fname)
     for fname in os.listdir(image_test_dir)
     if fname.endswith(".png")])
test_mask_paths = sorted(
    [os.path.join(mask_test_dir, fname)
     for fname in os.listdir(mask_test_dir)
     if fname.endswith(".png") and not fname.startswith(".")])

In [None]:
N_IMAGE_CHANNELS = 3
N_MASK_CHANNELS = 1

IMAGE_WIDTH = 256
IMAGE_HEIGHT = 256

IMAGE_SIZE = (IMAGE_WIDTH, IMAGE_HEIGHT, N_IMAGE_CHANNELS)
MASK_SIZE = (IMAGE_WIDTH, IMAGE_HEIGHT, N_MASK_CHANNELS)

BATCH_SIZE = 32

OUTPUT_CHANNELS = 1

EPOCHS = 10
FILTERS = 32

In [None]:
def load_image_and_mask(image_path, mask_path):
    
    '''
    This function takes the file paths of an image and mask as input function converts the image and mask tensors to the float32 data type and returns.
    
    '''
    
    # Read the images
    image = tf.io.read_file(filename = image_path)
    mask  = tf.io.read_file(filename = mask_path)
    
    # Decode the images
    image = tf.image.decode_jpeg(contents = image, channels = N_IMAGE_CHANNELS)
    mask  = tf.image.decode_jpeg(contents = mask,  channels = N_MASK_CHANNELS)
    
    # Convert the image to a Tensor
    image = tf.image.convert_image_dtype(image = image, dtype = tf.float32)
    mask  = tf.image.convert_image_dtype(image = mask, dtype = tf.float32)
    
    # Resize the image to the desired dimensions
    image = tf.image.resize(images = image, size = (IMAGE_WIDTH, IMAGE_HEIGHT))
    mask  = tf.image.resize(images = mask, size = (IMAGE_WIDTH, IMAGE_HEIGHT))
    
    # Normalize the image
    image = tf.clip_by_value(image, clip_value_min = 0.0, clip_value_max = 1.0)
    mask  = tf.clip_by_value(mask, clip_value_min = 0.0, clip_value_max = 1.0)
    
    # Final conversion
    image = tf.cast(image, dtype = tf.float32)
    mask  = tf.cast(mask,  dtype = tf.float32)
    
    return image, mask

In [None]:
def load_dataset(
    image_paths, mask_paths, batch_size=BATCH_SIZE, shuffle=True, 
    buffer_size=1000, n_repeat=1) :
    '''
    This function loads the image and mask data from the file paths and creates a tensorflow dataset. 
    '''
    
    # Create array to storing the data.
    images = np.empty(shape=(len(image_paths), *IMAGE_SIZE), dtype=np.float32)
    masks  = np.empty(shape=(len(mask_paths), *MASK_SIZE),  dtype=np.float32)
    
    # Iterate over the data.
    index = 0
    for image_path, mask_path in tqdm(zip(image_paths, mask_paths), desc='Loading'):
        
        # Load the image and the mask.
        image, mask = load_image_and_mask(image_path = image_path, mask_path = mask_path)
        
        # Store the image and the mask.
        images[index] = image
        masks[index]  = mask
        
        index += 1
    
    # Create a Tensorflow data.
    data_set = tfd.Dataset.from_tensor_slices((images, masks)).repeat(n_repeat)
    
    # Shuffle the data set.
    if shuffle:
        data_set = data_set.shuffle(buffer_size)
    
    # Convert data into batches
    data_set = data_set.batch(batch_size, drop_remainder=True).prefetch(tfd.AUTOTUNE)
    
    # Return the data
    return data_set

In [None]:
# Training and Testing Data
train_ds = load_dataset(
    image_paths = train_img_paths,
    mask_paths = train_mask_paths,
    shuffle = True,
    n_repeat=1,
)
test_ds = load_dataset(
    image_paths = test_img_paths,
    mask_paths = test_mask_paths,
    shuffle = True,
    n_repeat=1,
)

In [None]:
print(f"{' '*30}Training Data Size : {train_ds.cardinality().numpy() * BATCH_SIZE}")
print(f"{' '*30}Testing Data Size  : {test_ds.cardinality().numpy() * BATCH_SIZE}")

In [None]:
# Split Data 
full_train_size = train_ds.cardinality().numpy()

train_val_split = 0.1
valid_size = int(full_train_size * train_val_split)
train_size = full_train_size - valid_size

training_ds = train_ds.take(train_size)
valid_ds = train_ds.skip(train_size).take(valid_size)

In [None]:
print(f"{' '*30}Training Data Size   : {training_ds.cardinality().numpy() * BATCH_SIZE}")
print(f"{' '*30}Validation Data Size : {valid_ds.cardinality().numpy() * BATCH_SIZE}")
print(f"{' '*30}Testing Data Size    : {test_ds.cardinality().numpy() * BATCH_SIZE}")

In [None]:
def show_images_and_masks(data, n_images=10, FIGSIZE=(25, 5), model=None):
    '''
    this function is for show the image and the mask and the mask overlay on the image,
    when adding model it can show the predicted mask and overlay the predicted mask on the image
    
    '''
    if model is None:
        n_cols = 3
    else:
        n_cols = 5
    
    # take some data
    images, masks = next(iter(data))
    
    # Iterate over the data
    for n in range(n_images):
        
        plt.figure(figsize=FIGSIZE)
        
        # Plot the image
        plt.subplot(1, n_cols, 1)
        plt.title("Original Image")
        plt.imshow(images[n])
        plt.axis('off')
        
        # Plot the Mask
        plt.subplot(1, n_cols, 2)
        plt.title("Original Mask")
        plt.imshow(masks[n], cmap='gray')
        plt.axis('off')
        
        # Plot image and mask overlay
        plt.subplot(1, n_cols, 3)
        plt.title('Image and Mask overlay')
        plt.imshow(masks[n], alpha=0.8, cmap='binary_r')
        plt.imshow(images[n], alpha=0.5)
        plt.axis('off')
        
        # Model predictions
        if model is not None:
            pred_mask = model.predict(tf.expand_dims(images[n], axis=0))[0]
            plt.subplot(1, n_cols, 4)
            plt.title('Predicted Mask')
            plt.imshow(pred_mask, cmap='gray')
            plt.axis('off')
            
            plt.subplot(1, n_cols, 5)
            plt.title('Predicted Mask Overlay')
            plt.imshow(pred_mask*images[n], alpha=1, cmap='binary_r')
            plt.axis('off')
    
        # Show final plot
        plt.show()

show_images_and_masks(data=train_ds)

# **UNET** architecture

![UNET architecture](https://lmb.informatik.uni-freiburg.de/people/ronneber/u-net/u-net-architecture.png "UNET architecture")

### UNET-Encoder block

In [None]:
# Encoder

def encoder_block(input_tensor, n_filters, kernal_size= 3, max_pool= True,rate= 0.2):
    x = input_tensor
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(filters=n_filters,
            kernel_size=kernal_size,
            strides=1,
            padding='same',
            activation='relu',
            kernel_initializer='he_normal')(x)
    x = layers.Conv2D(filters=n_filters,
        kernel_size=kernal_size,
        strides=1,
        padding='same',
        activation='relu',
        kernel_initializer='he_normal')(x)
    if max_pool:
        p = layers.Dropout(rate)(x)
        p = layers.MaxPool2D(pool_size=(2,2), strides=(2,2))(p)
        return x, p
    else: 
        x = layers.Dropout(rate)(x)
        return x


def encoder(inputs):
    f1, p1 = encoder_block(inputs, n_filters=FILTERS, rate=0.1)
    f2, p2 = encoder_block(p1, n_filters=2*FILTERS, rate=0.1)
    f3, p3 = encoder_block(p2, n_filters=4*FILTERS, rate=0.2)
    f4, p4 = encoder_block(p3, n_filters=8*FILTERS, rate=0.3)

    return p4, (f1, f2, f3, f4)

### UNET-Decoder block

In [None]:
def decoder_block(inputs, conv_output, filters, rate=0.2):
    u = layers.Conv2DTranspose(
        filters = filters,
        kernel_size = 3,
        strides = 2,
        padding = 'same',
        activation = 'relu',
        kernel_initializer = 'he_normal')(inputs)
    c = layers.concatenate([u, conv_output])
    c = layers.BatchNormalization()(c)
    c = encoder_block(c, n_filters= filters, rate= rate, max_pool = False)
    
    return c

def decoder(inputs ,convs, output_channels):
    f1, f2, f3, f4 = convs

    c6 = decoder_block(inputs, f4, rate=0.2, filters=8*FILTERS)
    c7 = decoder_block(c6, f3, rate =0.2, filters=4*FILTERS)
    c8 = decoder_block(c7, f2, rate= 0.1, filters=2*FILTERS)
    c9 = decoder_block(c8, f1, rate= 0.1,  filters=FILTERS)

    outputs = layers.Conv2D(output_channels,
                            kernel_size= 1,
                            strides=1,
                            padding='same',
                            activation='sigmoid')(c9)
    return outputs

### Combine encoder block and decoder block

In [None]:
inputs = layers.Input(shape=IMAGE_SIZE)

encoder_output, convs = encoder(inputs)

bottle_neck = encoder_block(encoder_output, n_filters=FILTERS*16, max_pool=False, rate=0.3)

decoder_output = decoder(bottle_neck, convs, OUTPUT_CHANNELS)

model = keras.Model(
    inputs= inputs,
    outputs= decoder_output
)

model.summary()

In [None]:
tf.keras.utils.plot_model(model = model, to_file = "Unet2Model.png", dpi = 96, show_shapes=True,)

# Custom callback 

In [None]:
class ShowProgress(callbacks.Callback):
    """
    A callback that displays the original image, the original mask, 
    the predicted mask  and the image and predicted mask overlay on 
    sample image after each epoch of training.
    """
    def __init__(self, data, cmap = 'gray', num_images = 1, file_format = 'png', **kwargs):
        super().__init__(**kwargs)
      
        self.data = data
        self.cmap = cmap
        self.num_images = num_images
        self.file_format = file_format
    
    def on_epoch_end(self, epoch, logs=None):

        # Plotting configuration
        plt.figure(figsize=(25, 8 * self.num_images))
        
        for i in range(self.num_images):
            # Get Data 
            images, masks = next(iter(self.data))
            images = images.numpy()
            masks = masks.numpy()
            
            # Select image
            index = np.random.randint(len(images))
            image, mask = images[index], masks[index]

            # Make Prediction
            pred_mask = self.model.predict(np.expand_dims(image, axis=0))[0]
            mask1 = pred_mask.copy()
            masked_pred_img = image * mask1
            # Show Image
            plt.subplot(1, 4, 1)
            plt.title("Original Image")
            plt.imshow(image)
            plt.axis('off')

            # Show Mask
            plt.subplot(1, 4, 2)
            plt.title("Original Mask")
            plt.imshow(mask, cmap=self.cmap)
            plt.axis('off')

            # Show Model Pred
            plt.subplot(1, 4, 3)
            plt.title("Predicted Mask")
            plt.imshow(pred_mask, cmap=self.cmap)
            plt.axis('off')

            # Show pred and image overlay
            plt.subplot(1, 4, 4)
            plt.title("masked photo")
            plt.imshow(masked_pred_img)
            plt.axis('off')

            # Show Final plot
            plt.show()

In [None]:
CALLBACKS = [
    # callbacks.EarlyStopping(
    #     patience = 10, 
    #     monitor="val_loss",
    #     restore_best_weights = True),
    ShowProgress(
        data = valid_ds
    )
]

# Custom evaluation metric

The Dice coefficient is a measure of the similarity between two samples. In the context of image segmentation, it is used to evaluate the performance of a segmentation algorithm by measuring the overlap between the predicted segmentation and the ground truth segmentation.

The Dice coefficient is calculated as follows:

`dice_coeff = 2 * |X ∩ Y| / (|X| + |Y|)`


In [None]:
def dice_coeff(y_true, y_pred, smooth=1.0):
    
    """
    calc the Dice coefficient between predicted and true masks.

    """
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    intersection = tf.reduce_sum(y_true * y_pred, axis=[1, 2, 3])
    union = tf.reduce_sum(y_true, axis=[1, 2, 3]) + tf.reduce_sum(y_pred, axis=[1, 2, 3])
    dice = tf.reduce_mean((2.0 * intersection + smooth) / (union + smooth), axis=0)

    return tf.cast(dice, tf.float32)


In [None]:
# Register the custom metric
custom_objects = {'dice_coeff': dice_coeff}
tf.keras.utils.get_custom_objects().update(custom_objects)

# Compile and train model

In [None]:
# Mean Intersection Over Union
mean_iou = metrics.MeanIoU(num_classes=2, name="MeanIoU")
# optimizer
optimizer = optimizers.Adam()

# Compile Model
model.compile(
    loss = 'binary_crossentropy',
    optimizer = optimizer,
    metrics = [
        'accuracy',
        mean_iou,
        dice_coeff
    ]
)

In [None]:
# Model Training
unet_model_history = model.fit(
    training_ds,
    validation_data = valid_ds,
    epochs = EPOCHS,
    callbacks = CALLBACKS,
    batch_size = BATCH_SIZE,
)

In [None]:
model.save('FinalModel.h5')

In [None]:
model = keras.models.load_model('/content/drive/MyDrive/FinalModel.h5')

# Evaluate the model

In [None]:
history = unet_model_history.history

In [None]:
# show model losses and accuracy
plt.figure(figsize=(25,10))

plt.subplot(2,2,1)
plt.plot(history['loss'], label='Training Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Binary Croscentropy Loss')
plt.legend()
plt.grid()

plt.subplot(2,2,2)
plt.plot(history['accuracy'], label='accuracy')
plt.plot(history['val_accuracy'], label='Validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('accuracy')
plt.legend()
plt.grid()

plt.subplot(2,2,3)
plt.plot(history['MeanIoU'], label='MeanIoU')
plt.plot(history['val_MeanIoU'], label='Validation MeanIoU')
plt.xlabel('Epochs')
plt.ylabel('MeanIoU Score')
plt.legend()
plt.grid()

plt.subplot(2,2,4)
plt.plot(history['dice_coeff'], label='Dice Coeff')
plt.plot(history['val_dice_coeff'], label='Validation Dice Coeff')
plt.xlabel('Epochs')
plt.ylabel('Dice Coeff Score')
plt.legend()
plt.grid()
plt.show()

In [None]:
# Evaluate the model on the test data
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=[dice_coeff])

loss, dice_coeff = model.evaluate(test_ds)

# Print the results
print('Test loss:', loss)
print('Test dice_coeff:', dice_coeff)

In [None]:
show_images_and_masks(data=training_ds, model=model)

In [None]:
show_images_and_masks(data=test_ds, model=model)

# Make some predictions

In [None]:
# try model predictions
from PIL import Image
from urllib.request import urlopen

def load_img(path):
    img = Image.open(urlopen(path))
    img = img.resize((256, 256))
    img = np.asarray(img) / 255.0
    return img
def pre_image(path):
    img = Image.open(urlopen(path))
    img = img.resize((256, 256))
    img = np.asarray(img) / 255.0
    img = np.expand_dims(img, axis=0)
    return img

In [None]:
test_image = 'https://assets.vogue.in/photos/604cd4ac6a9323b082cc4e0f/1:1/w_686,h_686,c_limit/selfies.jpg'
real_img = load_img(test_image)
procces_img = pre_image(test_image)
predicted_mask = model.predict(procces_img)[0]

In [None]:
# show result
fig, axs = plt.subplots(ncols=3, figsize=(25, 8))
axs[0].imshow(real_img)
axs[0].set_title('Test Image')
axs[1].imshow(predicted_mask, cmap='gray')
axs[1].set_title('Predicted Mask')
axs[2].imshow(predicted_mask * real_img)
axs[2].set_title('Masked Image')
plt.show()

# END
**Git repo:** https://github.com/Yousef-Nasr/human-segmentation <br>
**Live demo:**