Preparing Environment

In [None]:
!/opt/bin/nvidia-smi

In [None]:
!pip install tensorflow_addons

In [None]:
from glob import glob
import shutil
import argparse
import zipfile
import hashlib
import requests
from tqdm import tqdm
import IPython.display as display
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import datetime, os
from tensorflow.keras.layers import *
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import Adam
from IPython.display import clear_output
import tensorflow_addons as tfa
import cv2
#from keras.utils import normalize
import keras
from keras import regularizers

# For more information about autotune:
# https://www.tensorflow.org/guide/data_performance#prefetching
AUTOTUNE = tf.data.experimental.AUTOTUNE
print(f"Tensorflow ver. {tf.__version__}")

In [None]:
# important for reproducibility
# this allows to generate the same random numbers
SEED = 42

Update Dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
root = "/content/drive/My Drive/BF_TEM_Dataset/"
dataset_path = root + "images/"
training_data = "training/"
val_data = "validation/"

Create Dataloader

In [None]:
# Image size that we are going to use
IMG_SIZE = 1024
# Our images are gray scale (1 channels)
N_CHANNELS = 1
# Scene Parsing has 2 classes, ie boundary & non-boundary
N_CLASSES = 2

In [None]:
TRAINSET_SIZE = len(glob(dataset_path + training_data + "*.jpg"))
print(f"The Training Dataset contains {TRAINSET_SIZE} images.")

VALSET_SIZE = len(glob(dataset_path + val_data + "*.jpg"))
print(f"The Validation Dataset contains {VALSET_SIZE} images.")

In [None]:
def parse_image(img_path: str) -> dict:
    """Load an image and its annotation (mask) and returning
    a dictionary.

    Parameters
    ----------
    img_path : str
        Image (not the mask) location.

    Returns
    -------
    dict
        Dictionary mapping an image and its annotation.
    """
    image = tf.io.read_file(img_path)
    image = tf.image.decode_jpeg(image, channels=1)
    image = tf.image.convert_image_dtype(image, tf.uint8)

    # For one Image path:
    # ...
    # Its corresponding annotation path is:
    # ...
    mask_path = tf.strings.regex_replace(img_path, "images", "annotations")
    mask_path = tf.strings.regex_replace(mask_path, "jpg", "png")
    mask = tf.io.read_file(mask_path)
    # The masks contain a class index for each pixels
    mask = tf.image.decode_png(mask, channels=1)

    # The tf.image.decode_png has a problem
    # It cannot copy the png pixel value correctly
    # The original png file contains pixel=0 or 1
    # but after the decode_png, the values pixel=254 or 255
    # We need to correct the values back, because the N_CLASS are related to the pixel value range !!!
    mask = tf.where(mask == 255, np.dtype('uint8').type(1), mask)
    mask = tf.where(mask == 254, np.dtype('uint8').type(0), mask)
    # Note that we have to convert the new value (0)
    # With the same dtype than the tensor itself

    return {'image': image, 'segmentation_mask': mask}

In [None]:
train_dataset = tf.data.Dataset.list_files(dataset_path + training_data + "*.jpg", seed=SEED)
train_dataset = train_dataset.map(parse_image)

val_dataset = tf.data.Dataset.list_files(dataset_path + val_data + "*.jpg", seed=SEED)
val_dataset = val_dataset.map(parse_image)

In [None]:
type(train_dataset)

In [None]:
# Here we are using the decorator @tf.function
# if you want to know more about it:
# https://www.tensorflow.org/api_docs/python/tf/function

@tf.function
def normalize(input_image: tf.Tensor, input_mask: tf.Tensor) -> tuple:
    """Rescale the pixel values of the images between 0.0 and 1.0
    compared to [0,255] originally.

    Parameters
    ----------
    input_image : tf.Tensor
        Tensorflow tensor containing an image of size [SIZE,SIZE,3].
    input_mask : tf.Tensor
        Tensorflow tensor containing an annotation of size [SIZE,SIZE,1].

    Returns
    -------
    tuple
        Normalized image and its annotation.
    """
    input_image = tf.cast(input_image, tf.float32) / 255.0
    return input_image, input_mask

@tf.function
def load_image_train(datapoint: dict) -> tuple:
    """Apply some transformations to an input dictionary
    containing a train image and its annotation.

    Notes
    -----
    An annotation is a regular  channel image.
    If a transformation such as rotation is applied to the image,
    the same transformation has to be applied on the annotation also.

    Parameters
    ----------
    datapoint : dict
        A dict containing an image and its annotation.

    Returns
    -------
    tuple
        A modified image and its annotation.
    """
    input_image = tf.image.resize(datapoint['image'], (IMG_SIZE, IMG_SIZE))
    input_mask = tf.image.resize(datapoint['segmentation_mask'], (IMG_SIZE, IMG_SIZE))

    if tf.random.uniform(()) > 0.5:
        input_image = tf.image.flip_left_right(input_image)
        input_mask = tf.image.flip_left_right(input_mask)

    input_image, input_mask = normalize(input_image, input_mask)

    return input_image, input_mask

@tf.function
def load_image_test(datapoint: dict) -> tuple:
    """Normalize and resize a test image and its annotation.

    Notes
    -----
    Since this is for the test set, we don't need to apply
    any data augmentation technique.

    Parameters
    ----------
    datapoint : dict
        A dict containing an image and its annotation.

    Returns
    -------
    tuple
        A modified image and its annotation.
    """
    input_image = tf.image.resize(datapoint['image'], (IMG_SIZE, IMG_SIZE))
    input_mask = tf.image.resize(datapoint['segmentation_mask'], (IMG_SIZE, IMG_SIZE))

    input_image, input_mask = normalize(input_image, input_mask)

    return input_image, input_mask

In [None]:
BATCH_SIZE = 2

# for reference about the BUFFER_SIZE in shuffle:
# https://stackoverflow.com/questions/46444018/meaning-of-buffer-size-in-dataset-map-dataset-prefetch-and-dataset-shuffle
BUFFER_SIZE = 48

dataset = {"train": train_dataset, "val": val_dataset}

# -- Train Dataset --#
dataset['train'] = dataset['train'].map(load_image_train, num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset['train'] = dataset['train'].shuffle(buffer_size=BUFFER_SIZE, seed=SEED)
dataset['train'] = dataset['train'].repeat()
dataset['train'] = dataset['train'].batch(BATCH_SIZE)
dataset['train'] = dataset['train'].prefetch(buffer_size=AUTOTUNE)

#-- Validation Dataset --#
dataset['val'] = dataset['val'].map(load_image_test)
dataset['val'] = dataset['val'].repeat()
dataset['val'] = dataset['val'].batch(BATCH_SIZE)
dataset['val'] = dataset['val'].prefetch(buffer_size=AUTOTUNE)

print(dataset['train'])
print(dataset['val'])

# how shuffle works: https://stackoverflow.com/a/53517848

Visualizing the Dataset

In [None]:
def display_sample(display_list):
    """Show side-by-side an input image,
    the ground truth and the prediction.
    """
    plt.figure(figsize=(18, 18))

    title = ['Input Image', 'True Mask', 'Predicted Mask']

    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')
    plt.show()

In [None]:
for image, mask in dataset['train'].take(1):
    sample_image, sample_mask = image, mask

display_sample([sample_image[0], sample_mask[0]])

In [None]:
sample_mask.shape

In [None]:
array_a=np.array(sample_mask)

In [None]:
array_a.shape

In [None]:
imghist=array_a.reshape(array_a.size,1)
plt.hist(imghist, bins = 50)
plt.title("histogram")
plt.show()

In [None]:
array_b=np.array(sample_image)
imghist=array_b.reshape(array_b.size,1)
plt.hist(imghist, bins = 50)
plt.title("histogram")
plt.show()

Developing the Model (UNet) Using Keras Functional API

In [None]:
# -- Keras Functional API -- #
# -- UNet Implementation -- #
# Everything here is from tensorflow.keras.layers
# I imported tensorflow.keras.layers * to make it easier to read
input_size = (IMG_SIZE, IMG_SIZE, N_CHANNELS)

# If you want to know more about why we are using `he_normal`:
# https://stats.stackexchange.com/questions/319323/whats-the-difference-between-variance-scaling-initializer-and-xavier-initialize/319849#319849
# Or the excelent fastai course:
# https://github.com/fastai/course-v3/blob/master/nbs/dl2/02b_initializing.ipynb
initializer = 'he_normal'


# -- Encoder -- #
# Block encoder 1
inputs = Input(shape=input_size)
conv_enc_1 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer=initializer, kernel_regularizer=regularizers.l2(0.0001))(inputs)
conv_enc_1 = Conv2D(64, 3, activation = 'relu', padding='same', kernel_initializer=initializer, kernel_regularizer=regularizers.l2(0.0001))(conv_enc_1)

# Block encoder 2
max_pool_enc_2 = MaxPooling2D(pool_size=(2, 2))(conv_enc_1)
conv_enc_2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(max_pool_enc_2)
conv_enc_2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(conv_enc_2)

# Block  encoder 3
max_pool_enc_3 = MaxPooling2D(pool_size=(2, 2))(conv_enc_2)
conv_enc_3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(max_pool_enc_3)
conv_enc_3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(conv_enc_3)

# Block  encoder 4
max_pool_enc_4 = MaxPooling2D(pool_size=(2, 2))(conv_enc_3)
conv_enc_4 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(max_pool_enc_4)
conv_enc_4 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(conv_enc_4)

# -- Encoder -- #

# ----------- #
maxpool = MaxPooling2D(pool_size=(2, 2))(conv_enc_4)
conv = Conv2D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(maxpool)
conv = Conv2D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(conv)

# ----------- #

# -- Dencoder -- #
# Block decoder 1
up_dec_1 = Conv2D(512, 2, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(UpSampling2D(size = (2,2))(conv))
merge_dec_1 = concatenate([conv_enc_4, up_dec_1], axis = 3)
conv_dec_1 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(merge_dec_1)
conv_dec_1 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(conv_dec_1)

# Block decoder 2
up_dec_2 = Conv2D(256, 2, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(UpSampling2D(size = (2,2))(conv_dec_1))
merge_dec_2 = concatenate([conv_enc_3, up_dec_2], axis = 3)
conv_dec_2 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(merge_dec_2)
conv_dec_2 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(conv_dec_2)

# Block decoder 3
up_dec_3 = Conv2D(128, 2, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(UpSampling2D(size = (2,2))(conv_dec_2))
merge_dec_3 = concatenate([conv_enc_2, up_dec_3], axis = 3)
conv_dec_3 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(merge_dec_3)
conv_dec_3 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(conv_dec_3)

# Block decoder 4
up_dec_4 = Conv2D(64, 2, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(UpSampling2D(size = (2,2))(conv_dec_3))
merge_dec_4 = concatenate([conv_enc_1, up_dec_4], axis = 3)
conv_dec_4 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(merge_dec_4)
conv_dec_4 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(conv_dec_4)
conv_dec_4 = Conv2D(2, 3, activation = 'relu', padding = 'same', kernel_initializer = initializer, kernel_regularizer=regularizers.l2(0.0001))(conv_dec_4)

# -- Dencoder -- #

output = Conv2D(1, 1, activation = 'sigmoid')(conv_dec_4)

In [None]:
model = tf.keras.Model(inputs = inputs, outputs = output)

In [None]:
model.compile(optimizer=Adam(learning_rate=0.0001), loss = 'binary_crossentropy',
              metrics= ['accuracy', 'Precision', 'Recall'])
#model.compile(optimizer=Adam(learning_rate=0.0001), loss = 'binary_crossentropy',
              #metrics= ['accuracy'])
# tf.keras.losses.binary_crossentropy, tf.keras.losses.SparseCategoricalCrossentropy()

In [None]:
def create_mask(pred_mask: tf.Tensor) -> tf.Tensor:
    """Return a filter mask with the top 1 predicitons
    only.

    Parameters
    ----------
    pred_mask : tf.Tensor
        A [IMG_SIZE, IMG_SIZE, N_CLASS] tensor. For each pixel we have
        N_CLASS values (vector) which represents the probability of the pixel
        being these classes. Example: A pixel with the vector [0.0, 0.0, 1.0]
        has been predicted class 2 with a probability of 100%.

    Returns
    -------
    tf.Tensor
        A [IMG_SIZE, IMG_SIZE, 1] mask with top 1 predictions
        for each pixels.
    """
    # pred_mask -> [IMG_SIZE, SIZE, N_CLASS]
    # 1 prediction for each class but we want the highest score only
    # so we use argmax
    pred_mask = tf.argmax(pred_mask, axis=-1) # Xing: here -1 may have the problem
    # pred_mask becomes [IMG_SIZE, IMG_SIZE]
    # but matplotlib needs [IMG_SIZE, IMG_SIZE, 1]
    pred_mask = tf.expand_dims(pred_mask, axis=-1)
    return pred_mask

def show_predictions(dataset=None, num=1):
    """Show a sample prediction.

    Parameters
    ----------
    dataset : [type], optional
        [Input dataset, by default None
    num : int, optional
        Number of sample to show, by default 1
    """
    if dataset:
        for image, mask in dataset.take(num):
            pred_mask = model.predict(image)
            display_sample([image[0], true_mask, create_mask(pred_mask)])
    else:
        # The model is expecting a tensor of the size
        # [BATCH_SIZE, IMG_SIZE, IMG_SIZE, 1]
        # but sample_image[0] is [IMG_SIZE, IMG_SIZE, 1]
        # and we want only 1 inference to be faster
        # so we add an additional dimension [1, IMG_SIZE, IMG_SIZE, 1]
        one_img_batch = sample_image[0][tf.newaxis, ...]
        # one_img_batch -> [1, IMG_SIZE, IMG_SIZE, 1]
        inference = model.predict(one_img_batch)
        # inference -> [1, IMG_SIZE, IMG_SIZE, N_CLASS]
        pred_mask = inference
        # pred_mask -> [1, IMG_SIZE, IMG_SIZE, 1]
        display_sample([sample_image[0], sample_mask[0],
                        pred_mask[0]])

In [None]:
for image, mask in dataset['train'].take(1):
    sample_image, sample_mask = image, mask

show_predictions()

Train the Model

In [None]:
EPOCHS = 50

STEPS_PER_EPOCH = TRAINSET_SIZE // BATCH_SIZE
VALIDATION_STEPS = VALSET_SIZE // BATCH_SIZE

In [None]:
# sometimes it can be very interesting to run some batches on cpu
# because the tracing is way better than on GPU
# you will have more obvious error message
# but in our case, it takes A LOT of time

# On CPU
# with tf.device("/cpu:0"):
#     model_history = model.fit(dataset['train'], epochs=EPOCHS,
#                               steps_per_epoch=STEPS_PER_EPOCH,
#                               validation_steps=VALIDATION_STEPS,
#                               validation_data=dataset['val'])

# On GPU
model_history = model.fit(dataset['train'], epochs=EPOCHS,
                          steps_per_epoch=STEPS_PER_EPOCH,
                          validation_steps=VALIDATION_STEPS,
                          validation_data=dataset['val'])


In [None]:
model.save(root+'1024_10_BF_5_training_50_saved_model.h5')

In [None]:
for image, mask in dataset['train'].take(1):
    sample_image, sample_mask = image, mask


show_predictions()

In [None]:
for image, mask in dataset['val'].take(1):
    sample_image, sample_mask = image, mask

show_predictions()

In [None]:
#plot the training and validation accuracy and loss at each epoch
loss = model_history.history['loss']
val_loss = model_history.history['val_loss']
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, 'y', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

#acc = history.history['acc']
acc = model_history.history['accuracy']
#val_acc = history.history['val_acc']
val_acc = model_history.history['val_accuracy']

plt.plot(epochs, acc, 'y', label='Training acc')
plt.plot(epochs, val_acc, 'r', label='Validation acc')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
class DisplayCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        clear_output(wait=True)
        show_predictions()
        print ('\nSample Prediction after epoch {}\n'.format(epoch+1))

In [None]:
EPOCHS = 200

logdir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)

callbacks = [
    # to show samples after each epoch
    DisplayCallback(),
    # to collect some useful metrics and visualize them in tensorboard
    tensorboard_callback,
    # if no accuracy improvements we can stop the training directly
    tf.keras.callbacks.EarlyStopping(patience=20, verbose=1),
    # to save checkpoints
    tf.keras.callbacks.ModelCheckpoint('best_model_unet.h5', verbose=1, save_best_only=True, save_weights_only=True)
]

model = tf.keras.Model(inputs = inputs, outputs = output)

# # here I'm using a new optimizer: https://arxiv.org/abs/1908.03265
#optimizer=tfa.optimizers.RectifiedAdam(learning_rate=1e-3)

#loss = tf.keras.losses.SparseCategoricalCrossentropy()

In [None]:
model.compile(optimizer=Adam(learning_rate=0.0001), loss = tf.keras.losses.BinaryFocalCrossentropy(),
              metrics= ['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall()])
#model.compile(optimizer=optimizer, loss = loss,
                  #metrics=['accuracy'])

In [None]:
model_history = model.fit(dataset['train'], epochs=EPOCHS,
                    steps_per_epoch=STEPS_PER_EPOCH,
                    validation_steps=VALIDATION_STEPS,
                    validation_data=dataset['val'],
                    callbacks=callbacks)

In [None]:
for image, mask in dataset['train'].take(1):
    sample_image, sample_mask = image, mask

show_predictions()

In [None]:
for image, mask in dataset['val'].take(1):
    sample_image, sample_mask = image, mask

show_predictions()

In [None]:
#plot the training and validation accuracy and loss at each epoch
loss = model_history.history['loss']
val_loss = model_history.history['val_loss']
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, 'y', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

#acc = history.history['acc']
acc = model_history.history['accuracy']
#val_acc = history.history['val_acc']
val_acc = model_history.history['val_accuracy']

plt.plot(epochs, acc, 'y', label='Training acc')
plt.plot(epochs, val_acc, 'r', label='Validation acc')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

load model

In [None]:
model_trained = tf.keras.models.load_model(root+'best_model_unet.h5')
model_trained.summary()

In [None]:
train_dataset_test = tf.data.Dataset.list_files(dataset_path + training_data + "*.jpg", shuffle=False)
train_dataset_test = train_dataset_test.map(parse_image)
val_dataset_test = tf.data.Dataset.list_files(dataset_path + val_data + "*.jpg", shuffle=False)
val_dataset_test = val_dataset_test.map(parse_image)
dataset_test = {"train":train_dataset_test, "val": val_dataset_test}
dataset_test['train'] = dataset_test['train'].map(load_image_test)
dataset_test['train'] = dataset_test['train'].batch(1)
dataset_test['val'] = dataset_test['val'].map(load_image_test)
dataset_test['val'] = dataset_test['val'].batch(1)
print(dataset_test['train'])
print(dataset_test['val'])

In [None]:
for image, mask in dataset_test['train'].take(5):
    sample_image, sample_mask = image, mask
    one_img_batch = sample_image[0][tf.newaxis, ...]
    # one_img_batch -> [1, IMG_SIZE, IMG_SIZE, 3]
    inference = model.predict(one_img_batch)
    # inference -> [1, IMG_SIZE, IMG_SIZE, N_CLASS]
    pred_mask = create_mask(inference)
    # pred_mask -> [1, IMG_SIZE, IMG_SIZE, 1]
    display_sample([sample_image[0], sample_mask[0],
                        pred_mask[0]])

In [None]:
for image, mask in dataset_test['val'].take(5):
    sample_image, sample_mask = image, mask
    one_img_batch = sample_image[0][tf.newaxis, ...]
    # one_img_batch -> [1, IMG_SIZE, IMG_SIZE, 3]
    inference = model.predict(one_img_batch)
    # inference -> [1, IMG_SIZE, IMG_SIZE, N_CLASS]
    pred_mask = create_mask(inference)
    # pred_mask -> [1, IMG_SIZE, IMG_SIZE, 1]
    display_sample([sample_image[0], sample_mask[0],
                        pred_mask[0]])

In [None]:
for i in range(321):
    for image, mask in dataset_test['train'].skip(i*10).take(1):
      sample_image, sample_mask = image, mask
      one_img_batch = sample_image[0][tf.newaxis, ...]
      # one_img_batch -> [1, IMG_SIZE, IMG_SIZE, 3]
      inference = model.predict(one_img_batch)
      # inference -> [1, IMG_SIZE, IMG_SIZE, N_CLASS]
      pred_mask = create_mask(inference)
      # pred_mask -> [1, IMG_SIZE, IMG_SIZE, 1]
      #display_sample([sample_image[0], sample_mask[0],pred_mask[0]])

      img_data=tf.keras.preprocessing.image.array_to_img(pred_mask[0])
      plt.imsave(root+'results/.../'+str(i+1)+'.jpg',img_data)

In [None]:
for i in range(70):
    for image, mask in dataset_test['val'].skip(i).take(1):
      sample_image, sample_mask = image, mask
      one_img_batch = sample_image[0][tf.newaxis, ...]
      # one_img_batch -> [1, IMG_SIZE, IMG_SIZE, 3]
      inference = model.predict(one_img_batch)
      # inference -> [1, IMG_SIZE, IMG_SIZE, N_CLASS]
      pred_mask = create_mask(inference)
      # pred_mask -> [1, IMG_SIZE, IMG_SIZE, 1]
      #display_sample([sample_image[0], sample_mask[0],pred_mask[0]])

      img_data=tf.keras.preprocessing.image.array_to_img(pred_mask[0])
      plt.imsave(root+'results/'+str(i+1)+'.jpg',img_data)

In [None]:
def parse_image_test(img_path: str) -> dict:
    """Load an image and its annotation (mask) and returning
    a dictionary.

    Parameters
    ----------
    img_path : str
        Image (not the mask) location.

    Returns
    -------
    dict
        Dictionary mapping an image and its annotation.
    """
    image = tf.io.read_file(img_path)
    image = tf.image.decode_jpeg(image, channels=1)
    image = tf.image.convert_image_dtype(image, tf.uint8)

    return {'image': image}

In [None]:
@tf.function
def normalize_test(input_image: tf.Tensor) -> tuple:
    """Rescale the pixel values of the images between 0.0 and 1.0
    compared to [0,255] originally.

    Parameters
    ----------
    input_image : tf.Tensor
        Tensorflow tensor containing an image of size [SIZE,SIZE,3].
    input_mask : tf.Tensor
        Tensorflow tensor containing an annotation of size [SIZE,SIZE,1].

    Returns
    -------
    tuple
        Normalized image and its annotation.
    """
    input_image = tf.cast(input_image, tf.float32) / 255.0
    return input_image

@tf.function
def load_image_test_test(datapoint: dict) -> tuple:
    """Normalize and resize a test image and its annotation.

    Notes
    -----
    Since this is for the test set, we don't need to apply
    any data augmentation technique.

    Parameters
    ----------
    datapoint : dict
        A dict containing an image and its annotation.

    Returns
    -------
    tuple
        A modified image and its annotation.
    """
    input_image = tf.image.resize(datapoint['image'], (IMG_SIZE, IMG_SIZE))

    input_image = normalize_test(input_image)

    return input_image

In [None]:
test_data = "test/"

test_dataset = tf.data.Dataset.list_files(dataset_path + test_data + "*.jpg", shuffle = False)
test_dataset = test_dataset.map(parse_image_test)

dataset_test = {"test":test_dataset}
dataset_test['test'] = dataset_test['test'].map(load_image_test_test)
dataset_test['test'] = dataset_test['test'].batch(1)
print(dataset_test['test'])

In [None]:
TEST_SIZE = len(glob(dataset_path + test_data + "*.jpg"))
print(f"The Training Dataset contains {TEST_SIZE} images.")

In [None]:
for image in dataset_test['test'].take(3):
    sample_image = image
    one_img_batch = sample_image[0][tf.newaxis, ...]
    # one_img_batch -> [1, IMG_SIZE, IMG_SIZE, 3]
    inference = model_trained.predict(one_img_batch)
    # inference -> [1, IMG_SIZE, IMG_SIZE, N_CLASS]
    #pred_mask = create_mask(inference)
    pred_mask = inference
    # pred_mask -> [1, IMG_SIZE, IMG_SIZE, 1]
    display_sample([sample_image[0],pred_mask[0]])

In [None]:
for i in range(20):
    for image in dataset_test['test'].skip(i).take(1):
      sample_image = image
      one_img_batch = sample_image[0][tf.newaxis, ...]
      # one_img_batch -> [1, IMG_SIZE, IMG_SIZE, 3]
      inference = model.predict(one_img_batch)
      # inference -> [1, IMG_SIZE, IMG_SIZE, N_CLASS]
      pred_mask = create_mask(inference)
      # pred_mask -> [1, IMG_SIZE, IMG_SIZE, 1]
      #display_sample([sample_image[0], sample_mask[0],pred_mask[0]])

      img_data=tf.keras.preprocessing.image.array_to_img(pred_mask[0])
      plt.imsave(root+'results/'+str(i+1)+'.jpg',img_data)