# Semantic Image Segmentation with UNet
---


In this Colab, we will build a famous model (UNet) that helps to predict segmentation masks (pixel-wise label maps) of various pets. We will train the model on the [Oxford-IIIT pet](https://www.robots.ox.ac.uk/~vgg/data/pets/) dataset that contains over 37 unique categories. [UNet](https://arxiv.org/abs/1505.04597) is a fully convolutional network which uses skip connections to join parallel encoder stages to the decoder. We will evaluate the models performance using IOU and dice score metrics.

![segmentation](images/segmentation.png)

# Imports and Data Preprocessing

First, we must import the necessary modules to perform the given task.

In [None]:
import random
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np
import matplotlib.pyplot as plt

Next we will need to down the Oxford IIIT dataset that we will be training/testing upon. Luckily for us, the dataset is already included within the TensorFlow Datasets and we can retrieve it by running the cell below. This will include the pet images, bounding boxes and segmentation masks, but we will be ignoring the boxes for now. Note that the masks are only included in data versions 3+ which is why the name is "3.\*.\*".

In [None]:
# download the dataset and get info
dataset, info = tfds.load('oxford_iiit_pet:3.*.*', with_info=True)

# class list for the pixel maps
class_names = ['pet', 'background', 'outline']

We define some utilities functions to help us with proprocessing the data including normalizing the pixel values to range between [0,1] for quicker convergence, data augmentation to improve our models generality and resizing images to the approriate size to feed into the model.

In [None]:
def normalize(image, seg_mask):
  image = tf.cast(image, dtype='float32') / 255.0
  seg_mask -= 1
  return image, seg_mask

def simple_augmentation(image, seg_mask):
  if random.randint(0, 9) > 5:
    image = tf.image.flip_left_right(image)
    seg_mask = tf.image.flip_left_right(seg_mask)

  return image, seg_mask

def process_training(data_entry):
  image = tf.image.resize(data_entry['image'], (128, 128), 'nearest')
  seg_mask = tf.image.resize(data_entry['segmentation_mask'], (128, 128), 'nearest')
  image, seg_mask = simple_augmentation(image, seg_mask)
  image, seg_mask = normalize(image, seg_mask)

  return image, seg_mask

def process_test(data_entry):
  image = tf.image.resize(data_entry['image'], (128, 128), 'nearest')
  seg_mask = tf.image.resize(data_entry['segmentation_mask'], (128, 128), 'nearest')
  image, seg_mask = normalize(image, seg_mask)

  return image, seg_mask

  We will now map the utility functions defined above to each entry within the dataset. Note that `tf.data.experimental.AUTOTUNE` will perform calls in parallel as opposed to sequentially depending on the available CPU resources which will overall speed up the execution time.

In [None]:
train = dataset['train'].map(process_training, num_parallel_calls=tf.data.AUTOTUNE)
test = dataset['test'].map(process_test, num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
buffer_size = 1000
batch_size = 64

train_ds = train.cache().shuffle(buffer_size).batch(batch_size).repeat().prefetch(tf.data.AUTOTUNE)
test_ds = test.batch(batch_size)

## Define the Model

Below is an image displaying the overall architecture of the UNet model we will be constructing. It consists of an desampling encoder and upsampling decoder seperated by a pipeline stage at the bottom. The gray arrows represent the link connection between corresponding encoder and decoder blocks. It may look complex at first glance but we can implement this with the help of the TensorFlow Functional API.
<img src='https://drive.google.com/uc?export=view&id=1BeQSKL2Eq6Fw9iRXsN1hgunY-CS2nH7V' alt='unet'>



From the left side of the UNet you can see that a single block within the encoder consists of 2 Conv2D layers activated by a ReLU followed by a MaxPool and Dropout layer. Note that each subsequent block contains a higher numbers of filters for the convolutional layers. Since we have to save the outputs of the Conv2D layers before they pass through the MaxPool + Dropout for the decoder later on, we define two seperate functions. `conv_block` will simply define the ReLU activated convolutional layers and `encoder_block` will build upon its function call, chaining together the pooling and dropout layers. This way we can save the output of the `conv_block` for the decoder and build upon it for the encoder.

In [None]:
def conv_block(input, num_filters):
  x = input

  for i in range(2):
    x = tf.keras.layers.Conv2D(num_filters, kernel_size=(3,3), padding='same', kernel_initializer = 'he_normal')(x)
    x = tf.keras.layers.Activation('relu')(x)

  return x

def encoder_block(input, num_filters):
  a = conv_block(input, num_filters)
  b = tf.keras.layers.MaxPooling2D(pool_size=(2,2))(a)
  b = tf.keras.layers.Dropout(0.3)(b)

  return a, b

def encoder(input):
  a1, b1 = encoder_block(input, num_filters=64)
  a2, b2 = encoder_block(b1, num_filters=128)
  a3, b3 = encoder_block(b2, num_filters=256)
  a4, b4 = encoder_block(b3, num_filters=512)

  return b4, (a1, a2, a3, a4)

The pipeline stage follows the encoder and is simply another conv block with a large number of filters designed to extract higher level features.

In [None]:
def pipeline(input):
  conn_pipeline = conv_block(input, num_filters=1024);
  return conn_pipeline

The last piece is the decoder that upsamples the features to original size and does the pixel-wise predictions. At each step, you take the output of the previous block, upsample it (Conv2DTranspose) and concatenate with the corresponding encoder block before sending it off to the next.

In [None]:
def decoder_block(input, conv_output, num_filters):
  c = tf.keras.layers.Conv2DTranspose(num_filters, kernel_size=(3,3), strides=(2,2), padding='same')(input)
  d = tf.keras.layers.concatenate([c, conv_output])
  d = tf.keras.layers.Dropout(0.3)(d)
  d = conv_block(d, num_filters)

  return d

def decoder(input, convs, output_maps):
  b1, b2, b3, b4 = convs

  d1 = decoder_block(input, b4, 512)
  d2 = decoder_block(d1, b3, 256)
  d3 = decoder_block(d2, b2, 128)
  d4 = decoder_block(d3, b1, 64)

  final_outputs = tf.keras.layers.Conv2D(output_maps, kernel_size=(1,1), activation='softmax')(d4)

  return final_outputs


Now that we have defined all of our components, we can string the encoder, pipeline and decoder. Note that we use `len(class_names)` as the
parameter value for number of output_maps since there are 3 possible labels for our use case `['pet', 'outline', 'background']`.

In [None]:
def unet_model():
  inputs = tf.keras.layers.Input(shape=(128, 128, 3))

  encoder_output, convs = encoder(inputs)

  conn_pipeline = pipeline(encoder_output)

  outputs = decoder(conn_pipeline, convs, len(class_names))

  model = tf.keras.Model(inputs=inputs, outputs=outputs)
  return model

model = unet_model()
print(model.summary())

## Compile, Fit and Analyze the Model

Now that everything is ready, we can begin training our model! Note that this will take approximately 15-20 minutes to complete and you can leave it running in the background
. We will be using the adoptive Adam's optimizer and `sparse_categorical_crossentropy` as our loss since our network is assigning each pixel a multi-class prediction, [0, 1, 2] for our 3 classes/channels. Expect a val_accuracy of atleast **85%** or higher.

In [None]:
epochs = 15
training_examples = info.splits['train'].num_examples
steps_per_epoch = training_examples // batch_size

model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
history = model.fit(train_ds, epochs=epochs, steps_per_epoch=steps_per_epoch, validation_data=test_ds)

Running the cell below will create a graph displaying the training (blue) and validation (red) metrics of the model at each epoch during training.

In [None]:
display = ['loss', 'accuracy']

plt.figure(figsize=(12, 3))
for i in range(len(display)):
  plt.subplot(1, 2, i+1)
  plt.title(f"Training and Validation {display[i].title()} per Epoch")
  plt.ylabel(display[i]); plt.xlabel('epoch')
  plt.xticks(np.arange(0,15))
  plt.plot(history.history[display[i]], color='b', label='train_' + display[i])
  plt.plot(history.history['val_' + display[i]], color='r', label='val_' + display[i])
  plt.legend()

plt.show()

# Make a Prediction

Now that the model is trained, we are ready to make some predictions. The utility functions below will help process the test dataset that we defined earlier and will feed as input to the model, examples it has never seen before to determine how well it generalizes to new data.

In [None]:
def get_image_and_masks():
  num_examples = info.splits['test'].num_examples
  dataset = test_ds.unbatch().batch(num_examples)

  true_images = []
  true_masks = []

  for images, masks in dataset.take(1):
    true_images = images.numpy()
    true_masks = masks.numpy()

  return true_images[0:(num_examples - num_examples % batch_size)], true_masks[0:(num_examples - num_examples % batch_size)]

def create_label_map(mask):
  mask = tf.argmax(mask, axis=-1)
  mask = mask[..., tf.newaxis]

  return mask[0].numpy()

def make_prediction(image):
  image = image[tf.newaxis, ...]
  pred_mask = model.predict(image)
  pred_mask = create_label_map(pred_mask)

  return pred_mask

In [None]:
y_true_images, y_true_masks = get_image_and_masks()

pred_masks = model.predict(test_ds, steps=info.splits['test'].num_examples // batch_size)
pred_masks = np.argmax(pred_masks, axis=-1)
pred_masks = pred_masks[..., tf.newaxis]

# Calculate Class Wise Metrics

The cell below will define a function to compute the IOU (intersection over union) and dice score values that help determine the accuracy.

In [None]:
def mask_metrics(y_true, y_pred):
  iou_list = []
  dice_score_list = []

  smoothening_factor = 0.00001
  for i in range(3):
    area_of_overlap = np.sum((y_true == i) * (y_pred == i))
    y_true_area = np.sum((y_true == i))
    y_pred_area = np.sum((y_pred == i))
    combined_area = y_true_area + y_pred_area

    iou = (area_of_overlap + smoothening_factor) / (combined_area - area_of_overlap + smoothening_factor)
    iou_list.append(iou)

    dice_score = 2 * ((area_of_overlap + smoothening_factor) / (combined_area + smoothening_factor))
    dice_score_list.append(dice_score)

  return iou_list, dice_score_list

# Show Results

Feel free to change the value of the `random_integer` variable to anything between 0 and 3467 to pick and visualize an image from the test dataset  next to its predicted and true segmentation mask. We will also display the metric scores below to see how accurate our results are.

In [None]:
random_integer = 45

y_pred_mask = make_prediction(y_true_images[random_integer])
iou, dice = mask_metrics(y_true_masks[random_integer], y_pred_mask)



In [None]:
titles = ["Image", "Predicted Mask", "True Mask"]
img_array = [y_true_images[random_integer], y_pred_mask, y_true_masks[random_integer]]
plt.figure(figsize=(15, 15))

metrics = [(i, iou, dice_score) for i, (iou, dice_score) in enumerate(zip(iou, dice))]
display_string = ["{} IOU: {} Dice Score: {}".format(class_names[i], iou, dice_score) for i, iou, dice_score in metrics]
display_string = "\n".join(display_string)

for i in range(len(class_names)):
  plt.subplot(1, 3, i+1)
  plt.title(titles[i])
  plt.xticks([])
  plt.yticks([])
  if i == 1: plt.xlabel(display_string, fontsize=12)
  plt.imshow(img_array[i])

plt.show()