# Info

  The base dataset link from kaggle: 

      https://www.kaggle.com/datasets/olekslu/makeup-lips-segmentation-28k-samples

The method for generating the lips mask dataset:
    https://github.com/RemainAplomb/Generate-Dataset/blob/main/Generate_Lips_Mask.ipynb

The base code that I employed and used can be found here: 
    https://www.tensorflow.org/tutorials/images/segmentation

This notebook is for the easy implementation of the semantic segmentation method provided by Tensorflow.

I have modified and explored it so that you will have an easier time training the model.

I have provided below multiple specifications for the model that you might want to train. The tflite size and accuracy depends on what specification you will choose.

# Imports and Dependecies

In [None]:
# Install kaggle to google colab
!pip install -q kaggle
!pip install git+https://github.com/tensorflow/examples.git
!pip install tensorflow-model-optimization

# Import tensorflow
import tensorflow as tf
from IPython.display import clear_output
import matplotlib.pyplot as plt
from tensorflow_examples.models.pix2pix import pix2pix

import cv2

from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dropout, Dense, Conv1D, Flatten,InputLayer,BatchNormalization
from tensorflow.keras.callbacks import ProgbarLogger
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.losses import SparseCategoricalCrossentropy

# Import necessary models
import os
import time
from pathlib import Path

import numpy as np
import pandas as pd
from glob import glob

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt

# Mount the google drive
from google.colab import files
from google.colab import drive
drive.mount('/content/drive')


import tensorflow_model_optimization as tfmot

In [None]:
# Upload the kaggle.json
# Make kaggle directory
!mkdir /kaggle
files.upload()
!ls -lha kaggle.json


!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 /root/.kaggle/kaggle.json

!pwd


# Download the lips segmentation dataset and
# Unzip the it
!kaggle datasets download -d remainaplomb/lips-segmentation-dataset-2
!unzip /content/lips-segmentation-dataset-2.zip

# Our lips segmentation dataset




In [None]:
# Path of the lips segmentation dataset
DATASET_FILEPATH = Path('/content/Lips_Dataset')

# Folder for the original image
IMAGES = 'original/'

# Folder for the mask image
MASKS = 'mask/'

# Configuration for the resizing of image
# and batch size
batch_size = 32
img_height = 128
img_width = 128

class_names = [ "original", "mask" ]

# File path for the folders
IMAGES_FILEPATH = DATASET_FILEPATH.joinpath(IMAGES)
MASKS_FILEPATH = DATASET_FILEPATH.joinpath(MASKS)

In [None]:
# Check the differences in both image sets.
imgs_set = set(os.listdir(IMAGES_FILEPATH))
masks_set = set(os.listdir(MASKS_FILEPATH))

imgs_set = set(''.join(filter(lambda x: x.isdigit(), i)) for i in imgs_set)
masks_set = set(''.join(filter(lambda x: x.isdigit(), i)) for i in masks_set)

len(imgs_set), len(masks_set)
len(imgs_set.difference(masks_set)), len(masks_set.difference(imgs_set))

In [None]:
# Remove image that doesn't have its corresponding mask
not_mask = imgs_set.difference(masks_set)

not_mask = [f'image{i}.jpg' for i in not_mask]
not_mask

# Data pipeline

In [None]:
# 1. To load the dataset: image and mask paths
# 2. Building the TensorFlow Input Data Pipeline using tf.data API

def load_data(path):
    images = sorted(glob(os.path.join(path, "original/*")))
    masks = sorted(glob(os.path.join(path, "mask/*")))

    return images, masks

def read_image(path):
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    x = cv2.resize(x, (256, 256))
    x = x / 255.0
    x = x.astype(np.float32)
    return x

def read_mask(path):
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (256, 256))
    x[x > 0] = 1
    #x = x / 255.0
    x = np.expand_dims(x, axis=-1)
    x = x.astype(np.float32)
    return x

def preprocess(x, y):
    def f(x, y):
        x = x.decode()
        y = y.decode()

        x = read_image(x)
        y = read_mask(y)

        return x, y

    images, masks = tf.numpy_function(f, [x, y], [tf.float32, tf.float32])
    images.set_shape([256, 256, 3])
    masks.set_shape([256, 256, 1])

    return images, masks

def tf_dataset(x, y):
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    #dataset = dataset.shuffle(buffer_size=1000)
    dataset = dataset.map(preprocess)
    #dataset = dataset.batch(batch)
    #dataset = dataset.prefetch(2)
    return dataset


path = '/content/Lips_Dataset/'

images, masks = load_data(path)
#images = np.delete( images, np.arange( 15000, 28594, 1))
#masks = np.delete( masks, np.arange( 15000, 28540, 1))

print(f"Images: {len(images)} - Masks: {len(masks)}")

Images: 5066 - Masks: 5066


In [None]:
for i in range( len(not_mask)):
  not_mask[i] = str(IMAGES_FILEPATH.joinpath(not_mask[i]))

In [None]:
images = [imagePath for imagePath in images if imagePath not in not_mask]

In [None]:
dataset = tf_dataset(images, masks)
for x, y in dataset:
    x = x[0] * 255
    y = y[0] * 255

    x = x.numpy()
    y = y.numpy()

    cv2.imwrite("image.png", x)

    y = np.squeeze(y, axis=-1)
    cv2.imwrite("mask.png", y)

    break

# Preprocessing

In [None]:
def normalize(input_image, input_mask):
  #input_image = tf.cast(input_image, tf.float32) / 255.0
  input_mask -= 1
  return input_image, input_mask

def load_image(original_datapoint, mask_datapoint):
  input_image = tf.image.resize(original_datapoint, (256, 256))
  input_mask = tf.image.resize(mask_datapoint, (256, 256))

  print(input_mask)

  #input_image, input_mask = normalize(input_image, input_mask)

  return input_image, input_mask

In [None]:
# Specify the parameters for training
TRAIN_LENGTH = 5066
BATCH_SIZE = 64
BUFFER_SIZE = 1000

In [None]:
# Partition the data into training and testing sets

# Take the training dataset and validation dataset
val_size = int(TRAIN_LENGTH * 0.2)
train_ds = dataset.skip(val_size)
val_ds = dataset.take(val_size)

train_images = train_ds.map(load_image, num_parallel_calls=tf.data.AUTOTUNE)
test_images = val_ds.map(load_image, num_parallel_calls=tf.data.AUTOTUNE)

Tensor("resize_1/Squeeze:0", shape=(256, 256, 1), dtype=float32)
Tensor("resize_1/Squeeze:0", shape=(256, 256, 1), dtype=float32)


In [None]:
# Check the partitioning of the data
print(tf.data.experimental.cardinality(train_images).numpy())
print(tf.data.experimental.cardinality(test_images).numpy())

4053
1013


# Augment 

In [None]:
# This is for the augmentation of the data
class Augment(tf.keras.layers.Layer):
  def __init__(self, seed=42):
    super().__init__()
    # both use the same seed, so they'll make the same random changes.
    self.augment_inputs = tf.keras.layers.RandomFlip(mode="horizontal", seed=seed)
    self.augment_labels = tf.keras.layers.RandomFlip(mode="horizontal", seed=seed)
  
  def call(self, inputs, labels):
    inputs = self.augment_inputs(inputs)
    labels = self.augment_labels(labels)
    return inputs, labels

# Function for plotting and displaying the original image as well as the mask
def display(display_list):
  plt.figure(figsize=(15, 15))

  title = ['Input Image', 'True Mask', 'Predicted Mask']

  for i in range(len(display_list)):
    plt.subplot(1, len(display_list), i+1)
    plt.title(title[i])
    plt.imshow(tf.keras.utils.array_to_img(display_list[i]))
    plt.axis('off')
  plt.show()

In [None]:
# Initialize the training batch
train_batches = (
    train_images
    .cache()
    .shuffle(BUFFER_SIZE)
    .batch(BATCH_SIZE)
    .repeat()
    .map(Augment())
    .prefetch(buffer_size=tf.data.AUTOTUNE))

test_batches = test_images.batch(BATCH_SIZE)

In [None]:
# Test the training batch
for images, masks in train_batches.take(2):
  sample_image, sample_mask = images[0], masks[0]
  display([sample_image, sample_mask])

# Define the model
The model being used here is a modified [U-Net](https://arxiv.org/abs/1505.04597). A U-Net consists of an encoder (downsampler) and decoder (upsampler). To learn robust features and reduce the number of trainable parameters, use a pretrained model—[MobileNetV2](https://arxiv.org/abs/1801.04381)—as the encoder. For the decoder, you will use the upsample block, which is already implemented in the [pix2pix](https://github.com/tensorflow/examples/blob/master/tensorflow_examples/models/pix2pix/pix2pix.py) example in the TensorFlow Examples repo. (Check out the [pix2pix: Image-to-image translation with a conditional GAN](../generative/pix2pix.ipynb) tutorial in a notebook.)


As mentioned, the encoder is a pretrained MobileNetV2 model. You will use the model from `tf.keras.applications`. The encoder consists of specific outputs from intermediate layers in the model. Note that the encoder will not be trained during the training process.

In [None]:
base_model = tf.keras.applications.MobileNetV2(input_shape= [256, 256, 3], include_top=False)

# Use the activations of these layers
layer_names = [
    'block_1_expand_relu',   # 64x64
    'block_3_expand_relu',   # 32x32
    'block_6_expand_relu',   # 16x16
    'block_13_expand_relu',  # 8x8
    #'block_16_project',      # 4x4
]
base_model_outputs = [base_model.get_layer(name).output for name in layer_names]

# Create the feature extraction model
down_stack = tf.keras.Model(inputs=base_model.input, outputs=base_model_outputs)

In [None]:
#down_stack = quantize_model(down_stack)
down_stack.trainable = False
down_stack.summary()

In [None]:
layers = [l for l in down_stack.layers]
print(len(layers))

150


The decoder/upsampler is simply a series of upsample blocks implemented in TensorFlow examples:

In [None]:
up_stack = [
    pix2pix.upsample(512, 3),  # 4x4 -> 8x8
    pix2pix.upsample(256, 3),  # 8x8 -> 16x16
    pix2pix.upsample(128, 3),  # 16x16 -> 32x32
    pix2pix.upsample(64, 3),   # 32x32 -> 64x64
]

In [None]:
up_stack = [
    pix2pix.upsample(128, 3),  # 8x8 -> 16x16
    pix2pix.upsample(64, 3),  # 16x16 -> 32x32
    pix2pix.upsample(32, 3),   # 32x32 -> 64x64
]

In [None]:
def unet_model(output_channels:int):
  inputs = tf.keras.layers.Input(shape=[256, 256, 3])

  # Downsampling through the model
  skips = down_stack(inputs)
  x = skips[-1]
  skips = reversed(skips[:-1])

  # Upsampling and establishing the skip connections
  for up, skip in zip(up_stack, skips):
    x = up(x)
    concat = tf.keras.layers.Concatenate()
    
    x = concat([x, skip])

  # This is the last layer of the model
  last = tf.keras.layers.Conv2DTranspose(
      filters=output_channels, kernel_size=3, strides=2,
      padding='same')  #64x64 -> 128x128

  x = last(x)
  return tf.keras.Model(inputs=inputs, outputs=x)

Note that the number of filters on the last layer is set to the number of `output_channels`. This will be one output channel per class.

# Train the model

Now, all that is left to do is to compile and train the model. 

Since this is a multiclass classification problem, use the `tf.keras.losses.CategoricalCrossentropy` loss function with the `from_logits` argument set to `True`, since the labels are scalar integers instead of vectors of scores for each pixel of every class.

When running inference, the label assigned to the pixel is the channel with the highest value. This is what the `create_mask` function is doing.

In [None]:
OUTPUT_CLASSES = 3

model = unet_model(output_channels=OUTPUT_CLASSES)

model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

In [None]:
model.summary()

Plot the resulting model architecture:

In [None]:
tf.keras.utils.plot_model(model, show_shapes=True)

Try out the model to check what it predicts before training:

In [None]:
def create_mask(pred_mask):
  pred_mask = tf.math.argmax(pred_mask, axis=-1)
  pred_mask = pred_mask[..., tf.newaxis]
  return pred_mask[0]

In [None]:
def show_predictions(dataset=None, num=1):
  if dataset:
    for image, mask in dataset.take(num):
      pred_mask = model.predict(image)
      display([image[0], mask[0], create_mask(pred_mask)])
  else:
    display([sample_image, sample_mask,
             create_mask(model.predict(sample_image[tf.newaxis, ...]))])

In [None]:
show_predictions()

The callback defined below is used to observe how the model improves while it is training:

In [None]:
class DisplayCallback(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    clear_output(wait=True)
    show_predictions()
    print ('\nSample Prediction after epoch {}\n'.format(epoch+1))

In [None]:
# Epoch 5 of removed512
#checkpoint_path = "/content/drive/MyDrive/Trained-Models/ConfigTest/Lips.ckpt"
checkpoint_path = "/content/drive/MyDrive/Trained-Models/1-66mb/Epoch30/Lips.ckpt"
checkpoint_dir = os.path.dirname(checkpoint_path)

# Checkpoint callback
cp_callback = tf.keras.callbacks.ModelCheckpoint( checkpoint_path,
                                                save_weights_only=True,
                                                verbose= 1)

In [None]:
model.load_weights( "/content/drive/MyDrive/Trained-Models/1-66mb/Epoch15/Lips.ckpt")
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])


In [None]:
import tempfile
logdir = tempfile.mkdtemp()

In [None]:
def cal_steps(num_images, batch_size):
   # calculates steps for generator
   steps = num_images // batch_size

   # adds 1 to the generator steps if the steps multiplied by
   # the batch size is less than the total training samples
   return steps + 1 if (steps * batch_size) < num_images else steps

EPOCHS = 5
#VAL_SUBSPLITS = 5
#STEPS_PER_EPOCH = cal_steps( 22832, 64)
#VALIDATION_STEPS = cal_steps( 5708, 64)

STEPS_PER_EPOCH = cal_steps( 4053, 64)
VALIDATION_STEPS = cal_steps( 1013, 64)

model_history = model.fit(train_batches, epochs=EPOCHS,
                          steps_per_epoch=STEPS_PER_EPOCH,
                          validation_steps=VALIDATION_STEPS,
                          validation_data=test_batches,
                          callbacks=[DisplayCallback(), cp_callback])

Test Prediction After Training

In [None]:
show_predictions(test_batches, 3)

In [None]:
def get_file_size(file_path):
    size = os.path.getsize(file_path)
    return size
    
def convert_bytes(size, unit=None):
    if unit == "KB":
        return print('File size: ' + str(round(size / 1024, 3)) + ' Kilobytes')
    elif unit == "MB":
        return print('File size: ' + str(round(size / (1024 * 1024), 3)) + ' Megabytes')
    else:
        return print('File size: ' + str(size) + ' bytes')

TF2 (pb) and Keras (.h5) Export

In [None]:
TF_MODEL_PATH = "/content/drive/MyDrive/Trained-Models/Epoch30/TF2/Lips_Segmentation_Model_Epoch30_ND_5k_1-66mb"
model.save(TF_MODEL_PATH,save_format='.tf')

In [None]:
KERAS_MODEL_NAME = "/content/drive/MyDrive/Trained-Models/Epoch30/Keras/Lips_Segmentation_Model_Epoch30_ND_5k_1-66mb.h5"
model.save(KERAS_MODEL_NAME)
convert_bytes(get_file_size(KERAS_MODEL_NAME), "MB")

File size: 13.062 Megabytes


Regular Tflite Export

In [None]:
TF_LITE_MODEL_FILE_NAME = "/content/drive/MyDrive/Trained-Models/Epoch30/Tflite/Lips_Segmentation_Model_Epoch30_ND_5k.tflite"
tf_lite_converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = tf_lite_converter.convert()

In [None]:
tflite_model_name = TF_LITE_MODEL_FILE_NAME
open(tflite_model_name, "wb").write(tflite_model)
convert_bytes(get_file_size(TF_LITE_MODEL_FILE_NAME), "KB")

File size: 25256.395 Kilobytes


Check the Stats of the Training

In [None]:
test_loss, test_acc = model_for_pruning.evaluate(test_batches, verbose=2)
print('\nTest accuracy:', test_acc)

16/16 - 179s - loss: 0.0180 - accuracy: 0.9940 - 179s/epoch - 11s/step

Test accuracy: 0.9939634799957275


In [None]:
test_loss, test_acc = model.evaluate(test_batches, verbose=2)
print('\nTest accuracy:', test_acc)

16/16 - 169s - loss: 0.0045 - accuracy: 0.9982 - 169s/epoch - 11s/step

Test accuracy: 0.998233437538147


In [None]:
loss = model_history.history['loss']
val_loss = model_history.history['val_loss']

plt.figure()
plt.plot(model_history.epoch, loss, 'r', label='Training loss')
plt.plot(model_history.epoch, val_loss, 'bo', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss Value')
plt.ylim([0, 1])
plt.legend()
plt.show()

In [None]:
model.summary()

# TF Lite Model (Optimize)

In [None]:
TF_LITE_MODEL_FILE_NAME = "/content/drive/MyDrive/Trained-Models/Epoch30/Tflite/Lips_Segmentation_Model_Epoch30_ND_5k_1-66mb.tflite"

In [None]:
tf_lite_converter = tf.lite.TFLiteConverter.from_keras_model(model)
tf_lite_converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = tf_lite_converter.convert()

In [None]:
tflite_model_name = TF_LITE_MODEL_FILE_NAME
open(tflite_model_name, "wb").write(tflite_model)

In [None]:
convert_bytes(get_file_size(TF_LITE_MODEL_FILE_NAME), "KB")

File size: 1699.727 Kilobytes
