In [1]:
import os
import sys
import cv2
import numpy as np
from glob import glob
from scipy.io import loadmat
import matplotlib.pyplot as plt
import pandas as pd

import tensorflow as tf
import tensorflow_addons as tfa

In [2]:
BATCH_SIZE = 2

IMG_SIZE = 512

MASK_SIZE = 512

NUM_CLASSES = 5

NUM_TRAIN_IMAGES = 7470

NUM_VAL_IMAGES = 1602

NUM_TEST_IMAGES = 1602

# Datasets

In [3]:
def get_absolute_path_to_project():
    abs_path = %pwd

    slash_idx = [idx for idx,ch in enumerate(abs_path) if ch=='/']

    abs_path = abs_path[:slash_idx[-2]]
    
    return abs_path

In [4]:
ABS_PATH = get_absolute_path_to_project()

In [5]:
DATA_DIR = ABS_PATH + '/data/processed'

In [6]:
TRAIN_PATH = DATA_DIR + '/train'
VAL_PATH = DATA_DIR + '/val'
TEST_PATH = DATA_DIR + '/test'

In [7]:
train_images = sorted(glob(os.path.join(TRAIN_PATH, "images/img/*")))
train_masks = sorted(glob(os.path.join(TRAIN_PATH, "masks/img/*")))
val_images = sorted(glob(os.path.join(VAL_PATH, "images/img/*")))
val_masks = sorted(glob(os.path.join(VAL_PATH, "masks/img/*")))
test_images = sorted(glob(os.path.join(TEST_PATH, "images/img/*")))
test_masks = sorted(glob(os.path.join(TEST_PATH, "masks/img/*")))


def read_image(image_path, mask=False):
    image = tf.io.read_file(image_path)
    if mask:
        image = tf.image.decode_png(image, channels=3)
        image = image[..., 0]
        image = tf.reshape(image, (MASK_SIZE, MASK_SIZE, 1))
        image.set_shape([None, None, 1])
        image = tf.image.resize(images=image, size=[MASK_SIZE, MASK_SIZE])
    else:
        image = tf.image.decode_jpeg(image, channels=3)
        image.set_shape([None, None, 3])
        image = tf.image.resize(images=image, size=[IMG_SIZE, IMG_SIZE])
        image = image / 255.
    return image

def load_data(image_list, mask_list):
    image = read_image(image_list)
    mask = read_image(mask_list, mask=True)
    return image, mask

def load_augmented_data(image_list, mask_list):
    image = read_image(image_list)
    mask = read_image(mask_list, mask=True)
    image, mask = augmentation(image, mask)
    return image, mask


def data_generator(image_list, mask_list, augmentation=False, factor=1):
    
    """
    Returns augmented or not augmented dataset with the same amount of elements.
    
    Args:
    image_list: list of paths to each image
    mask_list: list of paths to corresponding masks of images (sorted)
    augmentation: "True" for getting augmeneted images and masks
    """
    
    dataset = tf.data.Dataset.from_tensor_slices((image_list, mask_list))
    
    if augmentation:
        dataset = dataset.map(load_augmented_data, num_parallel_calls=tf.data.AUTOTUNE)
    
    else:
        dataset = dataset.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
        
    dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
    
    if factor > 1 and augmentation:
        for _ in range(factor-1):
            dataset_to_concat = tf.data.Dataset.from_tensor_slices((image_list, mask_list))
            dataset_to_concat = dataset_to_concat.map(load_augmented_data, num_parallel_calls=tf.data.AUTOTUNE)
            dataset_to_concat = dataset_to_concat.batch(BATCH_SIZE, drop_remainder=True)
            
            dataset = dataset.concatenate(dataset_to_concat)

    return dataset

In [8]:
train_dataset = data_generator(train_images, train_masks)
val_dataset = data_generator(val_images, val_masks)
test_dataset = data_generator(test_images, test_masks)

print("Train Dataset:", train_dataset)
print("Number of images in Train Dataset:", BATCH_SIZE * len(train_dataset))
print("Val Dataset:", val_dataset)
print("Number of images in Val Dataset:", BATCH_SIZE * len(val_dataset))
print("Test Dataset:", test_dataset)
print("Number of images in Val Dataset:", BATCH_SIZE * len(test_dataset))


Train Dataset: <BatchDataset shapes: ((2, 512, 512, 3), (2, 512, 512, 1)), types: (tf.float32, tf.float32)>
Number of images in Train Dataset: 7470
Val Dataset: <BatchDataset shapes: ((2, 512, 512, 3), (2, 512, 512, 1)), types: (tf.float32, tf.float32)>
Number of images in Val Dataset: 1602
Test Dataset: <BatchDataset shapes: ((2, 512, 512, 3), (2, 512, 512, 1)), types: (tf.float32, tf.float32)>
Number of images in Val Dataset: 1602


# Image utils

In [8]:
def infer(model, image_tensor):
    predictions = model.predict(np.expand_dims((image_tensor), axis=0))
    predictions = np.squeeze(predictions)
    predictions = np.argmax(predictions, axis=2)
    return predictions


def decode_segmentation_masks(mask, colormap, n_classes):
    r = np.zeros_like(mask).astype(np.uint8)
    g = np.zeros_like(mask).astype(np.uint8)
    b = np.zeros_like(mask).astype(np.uint8)
    for l in range(0, n_classes):
        idx = mask == l
        r[idx] = colormap[l][0]
        g[idx] = colormap[l][1]
        b[idx] = colormap[l][2]
    rgb = np.stack([r, g, b], axis=2)
    return rgb


def get_overlay(image, colored_mask):
    image = tf.keras.preprocessing.image.array_to_img(image)
    image = np.array(image).astype(np.uint8)
    image = tf.image.resize(image, [512, 512])
    image.set_shape([None, None, 3])
    image = tf.reshape(image, (512,512,3))
    overlay = tfa.image.blend(image, colored_mask, 0.5)
    return overlay


def plot_samples_matplotlib(display_list, figsize=(5, 3)):
    _, axes = plt.subplots(nrows=1, ncols=len(display_list), figsize=figsize)
    for i in range(len(display_list)):
        if display_list[i].shape[-1] == 3:
            axes[i].imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        else:
            axes[i].imshow(display_list[i])
    plt.show()
    

def plot_predictions(images_list, masks_list, colormap, model):
    for image_file, mask_file in zip(images_list, masks_list):
        image_tensor = read_image(image_file)
        mask_tensor = read_image(mask_file, mask=True)
        mask_tensor = mask_tensor[..., 0]
        mask_tensor = decode_segmentation_masks(mask_tensor, colormap, NUM_CLASSES)
        prediction_mask = infer(image_tensor=image_tensor, model=model)
        prediction_colormap = decode_segmentation_masks(prediction_mask, colormap, NUM_CLASSES)
        overlay = get_overlay(image_tensor, prediction_colormap)
        overlay_orginal = get_overlay(image_tensor, mask_tensor)
        plot_samples_matplotlib(
            [image_tensor, overlay_orginal, overlay, prediction_colormap], figsize=(18, 14)
        )


In [9]:
custom_colormap = [[0, 0, 0], [255, 0, 0], [0, 255, 0], [0, 0, 255], [255, 255, 255]]

# Load model

In [10]:
saved_weights_path = ABS_PATH + f'/models/saved_weights/deeplabv3plus_v5.10.h5'
saved_weights_dir= ABS_PATH + f'/models/saved_weights'

In [11]:
# add additional module import path

module_path = ABS_PATH + '/src/models'
if module_path not in sys.path:
    sys.path.append(module_path)

In [12]:
from deeplabv3plus import Deeplabv3

In [13]:
def get_deeplab_model(weights=None, freeze_conv_base=True, freeze_border=359, activation=None):
    
    """
    Args:
    weights: one of 'pascal_voc' (pre-trained on pascal voc),
            'cityscapes' (pre-trained on cityscape) or None (random initialization)
    freeze_conv_base: True if convolution base should be freezed or 
            False if it to be otherwise
    activation: optional activation to add to the top of the network.
            One of 'softmax', 'sigmoid' or None
    
    """
    
    model = Deeplabv3(
        weights=weights,
        classes=NUM_CLASSES,
        backbone='xception',
        OS=16,
        input_shape=(IMG_SIZE, IMG_SIZE, 3),
        activation=activation)
    
    if freeze_conv_base:
        
        for i, layer in enumerate(model.layers):
            
            if i < freeze_border:
                layer.trainable=False
                
    return model

In [15]:
model = get_deeplab_model(weights='cityscapes', freeze_conv_base=False)

In [16]:
model.load_weights(saved_weights_path)

# Find which of predicted masks have low meanIoU

In [17]:
ds_split = ['train', 'test', 'val']

for split in ds_split:
    path = ABS_PATH + f'/data/evaluation_of_masks/{split}'
    if not os.path.exists(path):      
        os.makedirs(path)

In [18]:
def save_samples_matplotlib(display_list, miou_score, save_path, figsize=(5, 3)):
    
    sub_names = ['Image', 'Image-Ground truth overlay', 'Image-predicted mask overlay', 'Ground truth mask', 'Predicted mask']
    fig, axes = plt.subplots(nrows=1, ncols=len(display_list), figsize=figsize)
    
    for i in range(len(display_list)):
#         axes[i].title.set_text(sub_names[i])
        axes[i].set_title(sub_names[i], size=16)
        axes[i].axis('off')
        if display_list[i].shape[-1] == 3:
            axes[i].imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        else:
            axes[i].imshow(display_list[i])
            
    fig.suptitle('meanIoU = {0:.2f}%'.format(miou_score*100), fontsize=20)
    fig.savefig(save_path)
    plt.close(fig)

In [19]:
def save_image(image_tensor, mask_tensor, prediction_mask, miou_score, save_path, colormap):
   
    mask_tensor = decode_segmentation_masks(mask_tensor, colormap, NUM_CLASSES)
    prediction_colormap = decode_segmentation_masks(prediction_mask, colormap, NUM_CLASSES)
    overlay = get_overlay(image_tensor, prediction_colormap)
    overlay_orginal = get_overlay(image_tensor, mask_tensor)
    save_samples_matplotlib(
        [image_tensor, overlay_orginal, overlay, mask_tensor, prediction_colormap],
        miou_score,
        save_path,
        figsize=(32, 7)
    )

In [26]:
def get_and_save_image_mask_miou_below_threshold(images_list,
                                                masks_list,
                                                which_ds_part,
                                                predictive_model,
                                                threshold=0.2):

    m = tf.keras.metrics.MeanIoU(NUM_CLASSES)
    
    image_mask_miou = [[], [], []]
    
    for image_file, mask_file in zip(images_list, masks_list):
        
        m.reset_state()
        
        image_tensor = read_image(image_file)
        
        mask_tensor = read_image(mask_file, mask=True)
        mask_tensor = mask_tensor[..., 0]

        prediction_mask = infer(image_tensor=image_tensor, model=predictive_model)
        
        m.update_state([mask_tensor],
                       [prediction_mask])
        
        miou_score = m.result().numpy()
        
        if miou_score < threshold:
            
            for i, element in enumerate([image_file, mask_file, miou_score]):
                image_mask_miou[i].append(element)
                
            slash_idx = [idx for idx,ch in enumerate(image_file) if ch=='/']
            f_name = image_file[slash_idx[-1]:]

            save_path = ABS_PATH + f'/data/evaluation_of_masks/{which_ds_part}{f_name}'
            
            save_image(image_tensor,
                       mask_tensor,
                       prediction_mask,
                       miou_score,
                       save_path,
                       custom_colormap)
    
    return image_mask_miou

In [28]:
image_mask_miou_train_dataset = get_and_save_image_mask_miou_below_threshold(train_images,
                                                                           train_masks,
                                                                           which_ds_part='train',
                                                                           predictive_model=model)

In [29]:
image_mask_miou_val_dataset = get_and_save_image_mask_miou_below_threshold(val_images,
                                                                           val_masks,
                                                                           which_ds_part='val',
                                                                           predictive_model=model)

In [30]:
image_mask_miou_test_dataset = get_and_save_image_mask_miou_below_threshold(test_images,
                                                                           test_masks,
                                                                           which_ds_part='test',
                                                                           predictive_model=model)

In [39]:
def save_file_paths_to_txt(image_mask_miou_all_parts):

    dataset_split = ['train', 'val', 'test']
    data_type = ['images', 'masks', 'mious']
    
    for split, lists in zip(dataset_split, image_mask_miou_all_parts):
        
        for d_type, list_to_save in zip(data_type, lists):
            
            save_path = ABS_PATH + f'/data/evaluation_of_masks/{split}_{d_type}_to_exclude.txt'
            
            with open(save_path, 'w') as f:
                for item in list_to_save:
                    f.write("%s\n" % item)

In [40]:
save_file_paths_to_txt([image_mask_miou_train_dataset, image_mask_miou_val_dataset, image_mask_miou_test_dataset])

In [42]:
len(image_mask_miou_train_dataset[0]) / len(train_images) * 100

1.5127175368139223

In [43]:
len(image_mask_miou_val_dataset[0]) / len(val_images) * 100

1.8726591760299627

In [44]:
len(image_mask_miou_test_dataset[0]) / len(test_images) * 100

1.8726591760299627