In [1]:
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np
import matplotlib.pyplot as plt

In [2]:
# downloading the dataset and get info
dataset, info = tfds.load('oxford_iiit_pet:3.*.*', with_info=True)

In [3]:
print(dataset.keys())

dict_keys(['train', 'test'])


In [4]:
# preprocessing functions
def random_flip(image, mask):
  if tf.random.uniform(()) > 0.5:
    image = tf.image.flip_left_right(image)
    mask = tf.image.flip_left_right(mask)

  return image, mask

def normalize(image, mask):
  image = tf.cast(image, tf.float32) / 255.
  mask -= 1
  return image, mask

def preprocess_train(data):
  """
  resizing, normalizing, and flipping the train data
  """
  image = tf.image.resize(data["image"], (128, 128), method="nearest")
  mask = tf.image.resize(data["segmentation_mask"], (128, 128), method="nearest")

  image, mask = random_flip(image, mask)
  image, mask = normalize(image, mask)

  return image, mask

def preprocess_test(data):
  """
  resizing, and normalizing test data
  """
  image = tf.image.resize(data["image"], (128, 128), method="nearest")
  mask = tf.image.resize(data["segmentation_mask"], (128, 128), method="nearest")

  image, mask = normalize(image, mask)

  return image, mask

In [5]:
# preprocessing train and test
train_dataset = dataset["train"].map(preprocess_train,
                                     num_parallel_calls=tf.data.experimental.AUTOTUNE)
test_dataset = dataset["test"].map(preprocess_test)

In [6]:
BATCH_SIZE = 64
BUFFER_SIZE = 1024

# shuffling and grouping the train set
train_dataset = train_dataset.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()
# prefetching to optimize preprocessing
train_dataset = train_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

# grouping the test set
test_dataset = test_dataset.batch(BATCH_SIZE)

In [7]:
# DEFINING THE UNET MODEL

# Encoder utilities
class Conv2D_Block(tf.keras.Model):
  def __init__(self, filters, kernel_size = 3):
    super(Conv2D_Block, self).__init__()
    self.conv2d_1 = tf.keras.layers.Conv2D(filters, kernel_size,
                                           kernel_initializer="he_normal",
                                           padding = "same",
                                           activation = "relu")
    self.conv2d_2 = tf.keras.layers.Conv2D(filters, kernel_size,
                                           kernel_initializer="he_normal",
                                           padding = "same",
                                           activation="relu")

  def call(self, input):
    x = self.conv2d_1(input)
    x = self.conv2d_2(x)

    return x

class Encoder_Block(tf.keras.Model):
  def __init__(self, filters, kernel_size=3, pool_size=(2,2), dropout_rate=0.3):
    super(Encoder_Block, self).__init__()
    self.conv2d_block = Conv2D_Block(filters, kernel_size)
    self.pool = tf.keras.layers.MaxPooling2D(pool_size = 2)
    self.dropout = tf.keras.layers.Dropout(dropout_rate)

  def call(self, input):
    """
    f - the output features of the convolution block
    p - the maxpooled features with dropout
    """
    f = self.conv2d_block(input)
    p = self.pool(f)
    p = self.dropout(p)

    return f, p

class Encoder(tf.keras.Model):
  def __init__(self):
    super(Encoder, self).__init__()
    self.encoder_block_1 = Encoder_Block(filters = 64)
    self.encoder_block_2 = Encoder_Block(filters = 128)
    self.encoder_block_3 = Encoder_Block(filters = 256)
    self.encoder_block_4 = Encoder_Block(filters = 512)

  def call(self, input):
    """
    p4 - the output maxpooled features of the last encoder block
    (f1, f2, f3, f4) - the output features of all the encoder blocks
    """
    f1, p1 = self.encoder_block_1(input)
    f2, p2 = self.encoder_block_2(p1)
    f3, p3 = self.encoder_block_3(p2)
    f4, p4 = self.encoder_block_4(p3)

    return p4, (f1,f2,f3,f4)

In [8]:
# Bottleneck utilities
class Bottleneck(tf.keras.Model):
  def __init__(self):
    super(Bottleneck, self).__init__()

    self.conv2d_block = Conv2D_Block(filters=1024)

  def call(self, input):
    bottle_neck = self.conv2d_block(input)

    return bottle_neck

In [9]:
# Decoder utilities
class Decoder_Block(tf.keras.Model):
  def __init__(self, filters=64, kernel_size=3, strides=3, dropout_rate=0.3):
    super(Decoder_Block, self).__init__()
    self.conv2d_tp = tf.keras.layers.Conv2DTranspose(filters, kernel_size,
                                                     strides=strides,
                                                     padding="same")
    self.dropout = tf.keras.layers.Dropout(dropout_rate)
    self.conv2d_block = Conv2D_Block(filters)

  def call(self, input, conv_output):
    u = self.conv2d_tp(input)
    c = tf.keras.layers.concatenate([u, conv_output])
    c = self.dropout(c)
    c = self.conv2d_block(c)

    return c

class Decoder(tf.keras.Model):
  def __init__(self,output_channels):
    super(Decoder, self).__init__()


    self.decoder_block_1 = Decoder_Block(filters=512, kernel_size=3,
                                         strides=2, dropout_rate=0.3)
    self.decoder_block_2 = Decoder_Block(filters=256, kernel_size=3,
                                      strides=2, dropout_rate=0.3)
    self.decoder_block_3 = Decoder_Block(filters=128, kernel_size=3,
                                  strides=2, dropout_rate=0.3)
    self.decoder_block_4 = Decoder_Block(filters=64, kernel_size=3,
                              strides=2, dropout_rate=0.3)
    self.conv2d = tf.keras.layers.Conv2D(output_channels, (1,1),
                                       activation="softmax")

  def call(self,inputs):
    input, convs = inputs
    f1, f2, f3, f4 = convs
    c6 = self.decoder_block_1(input, f4)
    c7 = self.decoder_block_2(c6, f3)
    c8 = self.decoder_block_3(c7, f2)
    c9 = self.decoder_block_4(c8, f1)

    outputs = self.conv2d(c9)

    return outputs

In [10]:
# Defining UNet
OUTPUT_CHANNELS = 3
def UNet():

  inputs = tf.keras.layers.Input(shape=(128,128,3,))

  encoder_out, convs = Encoder()(inputs)

  bottleneck_out = Bottleneck()(encoder_out)

  outputs = Decoder(output_channels=OUTPUT_CHANNELS)([bottleneck_out, convs])

  unet = tf.keras.Model(inputs=inputs, outputs = outputs)

  return unet

In [11]:
unet = UNet()

In [12]:
unet.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 128, 128, 3  0           []                               
                                )]                                                                
                                                                                                  
 encoder (Encoder)              ((None, 8, 8, 512),  4685376     ['input_1[0][0]']                
                                 ((None, 128, 128,                                                
                                64),                                                              
                                 (None, 64, 64, 128                                               
                                ),                                                            

In [13]:
# compiling and training
unet.compile(optimizer = tf.keras.optimizers.legacy.Adam(),
             loss = "sparse_categorical_crossentropy",
             metrics = ["accuracy"])

callback = tf.keras.callbacks.ModelCheckpoint(
    "best_val.h5",
    monitor = "val_accuracy",
    save_best_only=True,
    save_weights_only=False
)

In [14]:
TRAIN_LENGTH = info.splits["train"].num_examples
EPOCHS = 20
VAL_SUBSPLITS = 5
STEPS_PER_EPOCH = TRAIN_LENGTH // BATCH_SIZE
VALIDATION_STEPS = info.splits["test"].num_examples//BATCH_SIZE//VAL_SUBSPLITS

history = unet.fit(train_dataset,
                   epochs=EPOCHS,
                   steps_per_epoch=STEPS_PER_EPOCH,
                   validation_data=test_dataset,
                   validation_steps=VALIDATION_STEPS,
                   callbacks = [callback])

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [15]:
def get_test_image_and_annotation_arrays():
  ds = test_dataset.unbatch()
  ds = ds.batch(info.splits['test'].num_examples)

  images = []
  y_true_segments = []

  for image, annotation in ds.take(1):
    y_true_segments = annotation.numpy()
    images = image.numpy()

  y_true_segments = y_true_segments[:(info.splits['test'].num_examples - (info.splits['test'].num_examples % BATCH_SIZE))]

  return images[:(info.splits['test'].num_examples - (info.splits['test'].num_examples % BATCH_SIZE))], y_true_segments

def create_mask(pred_mask):
  pred_mask = tf.argmax(pred_mask, axis=-1)
  pred_mask = pred_mask[..., tf.newaxis]
  return pred_mask[0].numpy()


def make_predictions(image, mask, num=1):

  image = np.reshape(image,(num, image.shape[0], image.shape[1], image.shape[2]))
  pred_mask = unet.predict(image)
  pred_mask = create_mask(pred_mask)

  return pred_mask

In [16]:
def class_wise_metrics(y_true, y_pred):
  class_wise_iou = []
  class_wise_dice_score = []

  smoothening_factor = 0.00001
  for i in range(3):

    intersection = np.sum((y_pred == i) * (y_true == i))
    y_true_area = np.sum((y_true == i))
    y_pred_area = np.sum((y_pred == i))
    combined_area = y_true_area + y_pred_area

    iou = (intersection + smoothening_factor) / (combined_area - intersection + smoothening_factor)
    class_wise_iou.append(iou)

    dice_score =  2 * ((intersection + smoothening_factor) / (combined_area + smoothening_factor))
    class_wise_dice_score.append(dice_score)

  return class_wise_iou, class_wise_dice_score

In [17]:
# get the ground truth from the test set
y_true_images, y_true_segments = get_test_image_and_annotation_arrays()

# feed the test set to th emodel to get the predicted masks
results = unet.predict(test_dataset, steps=info.splits['test'].num_examples//BATCH_SIZE)
results = np.argmax(results, axis=3)
results = results[..., tf.newaxis]



In [18]:
# compute the class wise metrics
cls_wise_iou, cls_wise_dice_score = class_wise_metrics(y_true_segments, results)

In [21]:
# class list of the mask pixels
class_names = ['pet', 'background', 'outline']
# show the IOU for each class
for idx, iou in enumerate(cls_wise_iou):
  spaces = ' ' * (10-len(class_names[idx]) + 2)
  print("{}{}{} ".format(class_names[idx], spaces, iou))

pet         0.7390193584173999 
background  0.849992509611345 
outline     0.3968899396172031 


In [22]:
# show the Dice Score for each class
for idx, dice_score in enumerate(cls_wise_dice_score):
  spaces = ' ' * (10-len(class_names[idx]) + 2)
  print("{}{}{} ".format(class_names[idx], spaces, dice_score))

pet         0.8499265460622979 
background  0.918914541756756 
outline     0.5682479748208263 
