# **Binary Hands Segmentation**

>***Notebook sections:***
1. Dataset
2. Unet model
3. Training phase
4. Testing phase


In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


***Program parameters***

In [5]:
IMAGE_SIZE = (384, 224) # Images dimension in input of the DNN
BATCH_SIZE = 20
INPUT_CHANNELS = 3
PROJECT_DIR = '/content/drive/MyDrive/Uni/CV Project/'
CHECKPOINT_PATH = PROJECT_DIR + f'Alberto/unet/checkpoints/checkpoint_{IMAGE_SIZE[0]}x{IMAGE_SIZE[1]}/checkpoint.ckpt'
SAVE_MODEL_PATH = PROJECT_DIR + f'Alberto/unet/saved_models/unet_{IMAGE_SIZE[0]}x{IMAGE_SIZE[1]}'
NUM_IMAGES = None # Can't be greater than the size of the dataset; "None" for using all the images

# **1. Dataset**

***Datasets import***

In [6]:
from tqdm.autonotebook import tqdm
import glob

dataset_names = ["egohands", 
                 #"handsoverface", 
                 #"eyth",
                 "handsoverface_prof",
                 #"egtea"
                 ]

# Getting images and masks filenames
data_images = []
data_masks = []
for dataset in tqdm(dataset_names):
  data_images = data_images + sorted(glob.glob(PROJECT_DIR + "datasets/" + dataset + "/DATA_IMAGES/*"))
  data_masks = data_masks + sorted(glob.glob(PROJECT_DIR + "datasets/" + dataset + "/DATA_MASKS/*"))

print("Total number of usable images: ", len(data_images))

  0%|          | 0/2 [00:00<?, ?it/s]

Total number of usable images:  5068


***Filenames processing***

In [7]:
import random
import os

# Shuffling data
c = list(zip(data_images, data_masks))
random.shuffle(c)
data_images, data_masks = zip(*c)
data_images = list(data_images)
data_masks = list(data_masks)

# Simple check
assert len(data_images) == len(data_masks)

N_TOTAL_IMAGES = len(data_images)

if NUM_IMAGES is None:
  NUM_TRAIN_IMAGES = N_TOTAL_IMAGES - int(N_TOTAL_IMAGES * 10/100)
  NUM_VAL_IMAGES = N_TOTAL_IMAGES - NUM_TRAIN_IMAGES
else:
  NUM_TRAIN_IMAGES = NUM_IMAGES - int(NUM_IMAGES * 10/100)
  NUM_VAL_IMAGES = NUM_IMAGES - NUM_TRAIN_IMAGES

# Splitting data into train/val set
train_images = data_images[:NUM_TRAIN_IMAGES]
train_masks = data_masks[:NUM_TRAIN_IMAGES]
val_images = data_images[NUM_TRAIN_IMAGES : NUM_VAL_IMAGES + NUM_TRAIN_IMAGES]
val_masks = data_masks[NUM_TRAIN_IMAGES : NUM_VAL_IMAGES + NUM_TRAIN_IMAGES]

print("Traininig set size: ", len(train_images))
print("Validation set size: ", len(val_images))

Traininig set size:  4562
Validation set size:  506


***DataGenerator class definition***

* [Code reference](https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly) (by Afshine Amidi and Shervine Amidi)

In [8]:
import numpy as np
import tensorflow as tf
print(f"TensorFlow version: {tf.__version__}")
import cv2

class DataGenerator(tf.keras.utils.Sequence):
    """
    Generates data for Keras
    Sequence based data generator. Suitable for building data generator for training and prediction.
    """
    def __init__(self, image_paths, mask_paths,
                 to_fit=True, batch_size=16, dim=(512, 288),
                 n_channels=3, n_classes=1, shuffle=True):
        """
        Initialization
        :param image_paths: path of images
        :param mask_paths: paths of masks
        :param to_fit: True to return X and y, False to return X only
        :param batch_size: batch size at each iteration
        :param dim: tuple indicating image dimension
        :param n_channels: number of image channels
        :param n_classes: number of output masks
        :param shuffle: True to shuffle label indexes after every epoch
        """
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.to_fit = to_fit
        self.batch_size = batch_size
        self.dim = dim
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        """
        Denotes the number of batches per epoch
        :return: number of batches per epoch
        """
        return int(np.floor(len(self.image_paths) / self.batch_size))

    def __getitem__(self, index):
        """
        Generate one batch of data
        :param index: index of the batch
        :return: X and y when fitting. X only when predicting
        """
        # Generate indexes of the batch
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]

        images_ids = []
        masks_ids = []
        for i in range(0, len(indexes)):
          images_ids.append(train_images[indexes[i]])
          masks_ids.append(train_masks[indexes[i]])

        # Generate data
        X = self._generate_X(images_ids)

        if self.to_fit:
            y = self._generate_y(masks_ids)
            return X, y
        else:
            return X

    def on_epoch_end(self):
        """
        Updates indexes after each epoch
        """
        self.indexes = np.arange(len(self.image_paths))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def _generate_X(self, list_IDs_temp):
        """
        Generates data containing batch_size images
        :param list_IDs_temp: list of images ids to load
        :return: batch of images
        """
        images = []
        if self.n_channels == 3:
          images = [cv2.imread(img, cv2.IMREAD_COLOR) for img in list_IDs_temp]
        elif self.n_channels == 1:
          images = [cv2.imread(img, cv2.IMREAD_GRAYSCALE) for img in list_IDs_temp]
        else:
          raise Exception("Number of possibile input channels: 1, 3")
        for i in range(0, len(images)):
          images[i] = cv2.resize(images[i], IMAGE_SIZE)
        image_dataset = np.array(images)
        return image_dataset / 255

    def _generate_y(self, list_IDs_temp):
        """
        Generates data containing batch_size masks
        :param list_IDs_temp: list of masks ids to load
        :return: batch of masks
        """
        masks = [cv2.imread(mask, cv2.IMREAD_GRAYSCALE) for mask in list_IDs_temp]
        for i in range(0, len(masks)):
          masks[i] = cv2.resize(masks[i], IMAGE_SIZE, interpolation = cv2.INTER_NEAREST)
        mask_dataset = np.array(masks)
        mask_dataset = np.expand_dims(mask_dataset, axis = 3)
        return mask_dataset / 255

TensorFlow version: 2.8.2


In [9]:
# Creating data generators for the training phase
train_generator = DataGenerator(train_images, train_masks, batch_size=BATCH_SIZE, dim=IMAGE_SIZE,
                 n_channels=INPUT_CHANNELS, shuffle=True)
val_generator = DataGenerator(val_images, val_masks, batch_size=BATCH_SIZE, dim=IMAGE_SIZE,
                 n_channels=INPUT_CHANNELS, shuffle=True)

# **2. Unet model**

***Unet definition*** 

* [Code reference]( https://github.com/bnsreenu/python_for_image_processing_APEER/blob/master/tutorial117_building_unet_using_encoder_decoder_blocks.ipynb) (by Dr. Sreenivas Bhattiprolu)
* [Paper reference](https://arxiv.org/abs/1505.04597) (U-Net: Convolutional Networks for Biomedical Image Segmentation)

In [10]:
from tensorflow import keras
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Conv2DTranspose, BatchNormalization, Dropout, Lambda
from keras.layers import Activation, MaxPool2D, Concatenate

def conv_block(input, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(input)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    return x

# Encoder block: Conv block followed by maxpooling
def encoder_block(input, num_filters):
    x = conv_block(input, num_filters)
    p = MaxPool2D((2, 2))(x)
    return x, p   

# Decoder block
# skip_features gets input from encoder for concatenation
def decoder_block(input, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(input)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

# Build Unet using the blocks
def build_unet(input_shape, n_classes):
    inputs = Input(input_shape)

    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)

    b1 = conv_block(p4, 1024) #Bridge

    d1 = decoder_block(b1, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)

    if n_classes == 1:  #Binary
      activation = 'sigmoid'
    else:
      activation = 'softmax'

    # Change the activation based on n_classes
    outputs = Conv2D(n_classes, 1, padding="same", activation=activation)(d4)

    model = Model(inputs, outputs, name="U-Net")
    return model

***Unet import***

In [11]:
import os

input_shape = (IMAGE_SIZE[1], IMAGE_SIZE[0], INPUT_CHANNELS)
model = build_unet(input_shape, 1) # 1 for binary segmentation
model.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-3), 
              loss='binary_crossentropy', 
              metrics=['accuracy'])
if os.path.exists(CHECKPOINT_PATH):
  model.load_weights(CHECKPOINT_PATH)
  print("Checkpoint found: weights loaded")

model.summary()

Model: "U-Net"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 384, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d (Conv2D)                (None, 224, 384, 64  1792        ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 batch_normalization (BatchNorm  (None, 224, 384, 64  256        ['conv2d[0][0]']                 
 alization)                     )                                                             

# **3. Training phase**

***Training start***

In [None]:
# Defining some callbacks
model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=CHECKPOINT_PATH,
    save_weights_only=True,
    monitor='val_loss',
    mode='min',
    save_best_only=True,
    verbose=1,
    )

early_stopping = tf.keras.callbacks.EarlyStopping(
    patience=5
    )

# Training the model
history = model.fit(train_generator, 
                    validation_data=val_generator,
                    verbose=1,
                    epochs=50,
                    callbacks=[model_checkpoint, 
                               early_stopping])

model.save(SAVE_MODEL_PATH)
print("Model saved")

Epoch 1/50
Epoch 1: val_loss improved from inf to 0.78969, saving model to /content/drive/MyDrive/Uni/CV Project/Alberto/unet/checkpoints/checkpoint_384x224/checkpoint.ckpt
Epoch 2/50
Epoch 2: val_loss improved from 0.78969 to 0.13887, saving model to /content/drive/MyDrive/Uni/CV Project/Alberto/unet/checkpoints/checkpoint_384x224/checkpoint.ckpt
Epoch 3/50
Epoch 3: val_loss improved from 0.13887 to 0.08123, saving model to /content/drive/MyDrive/Uni/CV Project/Alberto/unet/checkpoints/checkpoint_384x224/checkpoint.ckpt
Epoch 4/50
Epoch 4: val_loss did not improve from 0.08123
Epoch 5/50
Epoch 5: val_loss did not improve from 0.08123
Epoch 6/50
Epoch 6: val_loss did not improve from 0.08123
Epoch 7/50
Epoch 7: val_loss did not improve from 0.08123
Epoch 8/50
Epoch 8: val_loss improved from 0.08123 to 0.06166, saving model to /content/drive/MyDrive/Uni/CV Project/Alberto/unet/checkpoints/checkpoint_384x224/checkpoint.ckpt
Epoch 9/50
Epoch 9: val_loss did not improve from 0.06166
Epoch 

***Training stats***



In [None]:
from matplotlib import pyplot as plt

# Plotting loss/val_loss for each epoch
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, 'y', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plotting accuracies for each epoch
acc = history.history['acc']
val_acc = history.history['val_acc']
plt.plot(epochs, acc, 'y', label='Training acc')
plt.plot(epochs, val_acc, 'r', label='Validation acc')
plt.title('Training and validation acc')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

# **4. Testing phase**

In [None]:
from keras.models import load_model

# Loading model from the last checkpoint saved
model = load_model(SAVE_MODEL_PATH)

In [None]:
import cv2

def preprocess_image(image_path, resize_dim):
  """
  Transforms an image in order to be usable by the DNN.
  It makes the same transformations you can find in the DataGenerator class.
  """
  if INPUT_CHANNELS == 3:
    image = cv2.imread(image_path, cv2.IMREAD_COLOR)
  elif INPUT_CHANNELS == 1:
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
  else:
    raise Exception("Number of possibile input channels: 1, 3")
  image = cv2.resize(image, resize_dim)
  return image, image / 255

***Results on test set***

In [None]:
import glob
import numpy as np
from matplotlib import pyplot as plt

images_name = glob.glob(PROJECT_DIR + "/test_set/*")
threshold = 0.5

for i in range(0, len(images_name)):
  # Reading an image from the test set
  image255, image = preprocess_image(images_name[i], IMAGE_SIZE)  

  image = np.expand_dims(image, 0)
  print(image.shape)
  print(image.type())

  # Predicting the mask
  prediction = (model.predict(image)[0,:,:,0] > threshold).astype(np.uint8)

  # Showing the image and its predicted mask
  plt.figure(figsize=(16, 8))
  plt.subplot(231)
  plt.title('Testing Image')
  plt.imshow(cv2.cvtColor(image255, cv2.COLOR_BGR2RGB))
  plt.subplot(232)
  plt.title('Prediction on test image')
  plt.imshow(prediction, cmap='gray')
  plt.show()
  break