# Imports

In [1]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import numpy as np
import matplotlib.pyplot as plt

import os

import tensorflow as tf
import tensorflow.keras.backend as K

In [2]:
# how many GPUs are available for tensorflow-gpu
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

Num GPUs Available:  1


# Constants

In [3]:
# for creating reproducible results
SEED = 22
np.random.seed = SEED

# values for images
IMG_WIDTH = 512
IMG_HEIGHT = 512
IMG_CHANNELS = 3

# number of classes
NUM_CLASSES = 4

# necessary colours
CLASS_BLACK = (0,0,0)
CLASS_RED = (255,0,0)
CLASS_GREEN = (0,255,0)
CLASS_BLUE = (0,0,255)
# all colours which identify a class
PALETTE = [CLASS_BLACK, CLASS_RED, CLASS_GREEN, CLASS_BLUE]

# directories that contain the results from pre-processing, should already exist
IMAGE_DIRECTORY = "./data/images/"
MASK_DIRECTORY = "./data/masks/"
FILENAME_DIRECTORY = './data/filenames/'
# directory to save the results, should be created
RESULT_DIRECTORY = './data/results/'

In [4]:
# create the essential directory if it does not exist
if not os.path.exists(RESULT_DIRECTORY):
    os.makedirs(RESULT_DIRECTORY)

# Load the training and test data

In [5]:
X_train = np.load(FILENAME_DIRECTORY + 'X_train.npy')
y_train = np.load(FILENAME_DIRECTORY + 'y_train.npy')

X_test = np.load(FILENAME_DIRECTORY + 'X_test.npy')
y_test = np.load(FILENAME_DIRECTORY + 'y_test.npy')

# One Hot Encoding and the reverse operation

In [6]:
def rgbToOnehot(rgbImage, palette=PALETTE):
    """Implements One Hot Encoding (OHE) for RGB images, 
    where each colour represents a class

    Parameters
    ----------
    rgbImage : numpy-array
        RGB-image to which OHE should be applied
    palette : list
        All colours that define a class
        
    Returns
    -------
    numpy-array
        One Hot encoded image
    """
        
    # create new image
    labelImage = np.zeros((rgbImage.shape[:2]), dtype=np.uint8)
    # label the classes correctly
    labelImage[(rgbImage==palette[0]).all(axis=2)] = 0
    labelImage[(rgbImage==palette[1]).all(axis=2)] = 1
    labelImage[(rgbImage==palette[2]).all(axis=2)] = 2
    labelImage[(rgbImage==palette[3]).all(axis=2)] = 3
    # transform them into binary values
    onehotImage = tf.keras.utils.to_categorical(labelImage, 4)
    return onehotImage

def onehotToRgb(onehotImage, palette=PALETTE):
    """Reverts OHE and gives the image its initial
    colours back

    Parameters
    ----------
    onehotImage : numpy-array
        Image with OHE which should get its colour back
    palette : list
        All colours that define a class
        
    Returns
    -------
    numpy-array
        RGB image
    """
        
    # returns indices of the highest elements
    indices = np.argmax(onehotImage, axis=2)
    # use the initial image shape plus three channels for rgb
    rgbImage = np.zeros(onehotImage.shape[:2]+(3,))
    for i, colours in enumerate(palette):
        # get the colours back
        rgbImage[indices==i] = colours
    return np.uint8(rgbImage)

## Test the One Hot Encoding and the reverse operation

In [7]:
# load one mask and apply OHE
test = np.load(MASK_DIRECTORY + y_train[0])
test = rgbToOnehot(test)
print(test.shape)

(512, 512, 4)


In [8]:
# check if all red pixels are transformed in the correct binary value
l = []
for x in test:
    for y in x:
        if np.all(y == (0.,1.,0.,0.)):
            l.append(y)
print(len(l))

267


In [9]:
# revert OHE
test = onehotToRgb(test)
print(test.shape)

(512, 512, 3)


In [10]:
# check if all red pixels are still available
l = []
for x in test:
    for y in x:
        if np.all(y == (255,0,0)):
            l.append(y)
print(len(l))

267


# Custom generator

In [11]:
class IMT_Generator(tf.keras.utils.Sequence):
    """A class for the custom generators of this application
    
    Attributes
    ----------
    filenames_images : list
        List of all file names of the images
    filenames_masks : list
        List of all file names of the masks
    batch_size :  int
        The for the training specified batch size

    Methods
    -------
    __getitem__(index)
        Returns the from the U-Net model wanted images and 
        masks, while OHE gets applied on the masks
    __len__()
        Specifies the total number of batches
    """
    
    def __init__(self, filenames_images, filenames_masks, batch_size=32):
        """
        Parameters
        ----------
        filenames_images : list
            List of all file names of the images
        filenames_masks : list
            List of all file names of the masks
        batch_size :  int
            Defines the for the training specified batch size
        """
            
        self.batch_size = batch_size
        self.filenames_images = filenames_images
        self.filenames_masks = filenames_masks
        
    def __len__(self):
        """ Specifies the total number of batches
        
        Returns
        -------
        int
            number of batches
        """
        return int(np.floor(len(self.filenames_images) / float(self.batch_size)))
    
    def __getitem__(self, index):        
        """Returns the from the U-Net model wanted images and 
        masks, while OHE gets applied on the masks

        Parameters
        ----------
        index : int
            Index of the requested batch from the U-Net model,
            after the processing of one batch the index is
            raised
        
        Returns
        -------
        list
            The images of the current batch
        list
            The masks of the current batch
        """
        
        # initialize both batches
        batchX = self.filenames_images[index * self.batch_size : (index+1) * self.batch_size]
        batchY = self.filenames_masks[index * self.batch_size : (index+1) * self.batch_size]
        # load the images
        returnX = np.array([np.load(IMAGE_DIRECTORY + str(fileName)) for fileName in batchX])
        # load the masks and apply OHE
        returnY = np.array([rgbToOnehot(np.load(MASK_DIRECTORY + str(fileName))) for fileName in batchY])
        
        return returnX, returnY 

# Implementation of focal loss function, Dice coefficient and intersection over union

In [12]:
def dice_coef(y_true, y_pred, smooth=1.0):
    """Calculates Dice coefficient as a evaluation
    score for the U-Net model

    Parameters
    ----------
    y_true : numpy-arrays
        Groundtruth of the current image
    y_pred : numpy-arrays
        Prediction result of the current image
    smooth : float
        For smoothing the result
        
    Returns
    -------
    float
        Dice coefficient
    """
    
    # flat the arrays
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    # area of overlap
    intersection = K.sum(K.abs(y_true * y_pred), axis=[1,2,3])
    # calculate and return dice
    return (2. * intersection + smooth) / (K.sum(y_true_f*y_true_f) + K.sum(y_pred_f*y_pred_f) + smooth)

def iou_coef(y_true, y_pred, smooth=1.0):
    """Calculates intersection over union as a evaluation
    score for the U-Net model

    Parameters
    ----------
    y_true : numpy-arrays
        Groundtruth of the current image
    y_pred : numpy-arrays
        Prediction result of the current image
    smooth : float
        For smoothing the result
        
    Returns
    -------
    float
        Intersection over Union
    """
        
    # area of overlap
    intersection = K.sum(K.abs(y_true * y_pred), axis=[1,2,3])
    # area of union
    union = K.sum(y_true,[1,2,3])+K.sum(y_pred,[1,2,3])-intersection
    # calculate intersection over union
    iou = K.mean((intersection + smooth) / (union + smooth), axis=0)
    return iou

In [13]:
def categorical_focal_loss(alpha, gamma=2.):
    """Implementation of the focal loss function as a custom loss 
    function for keras, this function was used from a GitHub project 
    from Umberto Griffo: 
    https://github.com/umbertogriffo/focal-loss-keras
    link acessed at: 15.12.2020 

    Parameters
    ----------
    alpha : numpy-arrays
        Alpha is used to specify the weights of different categories/labels, 
        the size of the array needs to be consistent with the number of 
        classes
    gamma : float
        Focusing parameter for modulating factor
        
    Returns
    -------
    float
        Value of the calculated focal loss function for given inputs
    """
    
    # weights
    alpha = np.array(alpha, dtype=np.float32)

    def categorical_focal_loss_fixed(y_true, y_pred):
        """
        Parameters
        ----------
        y_true : numpy-arrays
            Groundtruth of the current image
        y_pred : numpy-arrays
            Prediction result of the current image
        """
        # clip the prediction value to prevent NaN's and Inf's
        epsilon = K.epsilon()
        y_pred = K.clip(y_pred, epsilon, 1. - epsilon)
        # calculate cross entropy
        cross_entropy = -y_true * K.log(y_pred)
        # calculate focal loss
        loss = alpha * K.pow(1 - y_pred, gamma) * cross_entropy
        # compute mean loss in mini_batch
        return K.mean(K.sum(loss, axis=-1))

    return categorical_focal_loss_fixed

# Create the U-Net model

In [14]:
def createUNetModel(nFilters=32, bias_neuron=True):
    """Create the U-Net model after the specified architecture 
    with a few improvements, the link below explains the 
    architecture in detail
    https://arxiv.org/pdf/1505.04597.pdf
    link acessed at: 15.12.2020 

    Parameters
    ----------
    nFilters : int
        Used amount of filters for the first layer
    bias_neuron : bool
        Are bias neurons used across the model or not
        
    Returns
    -------
    float
        Created U-Net model
    """
    # input layer
    inputs = tf.keras.layers.Input((IMG_WIDTH, IMG_HEIGHT, IMG_CHANNELS))
    
    # normalize the values
    n = tf.keras.layers.Lambda(lambda x: x/255)(inputs)
    
    # --- down-sampling path --- 
    # first part
    c1 = tf.keras.layers.Conv2D(nFilters, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(n)
    c1 = tf.keras.layers.BatchNormalization()(c1)
    c1 = tf.keras.layers.Dropout(0.1)(c1)
    c1 = tf.keras.layers.Conv2D(nFilters, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(c1)
    c1 = tf.keras.layers.BatchNormalization()(c1)
    p1 = tf.keras.layers.MaxPooling2D((2,2), data_format='channels_last')(c1)
    
    # second part
    c2 = tf.keras.layers.Conv2D(nFilters*2, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(p1)
    c2 = tf.keras.layers.BatchNormalization()(c2)
    c2 = tf.keras.layers.Dropout(0.1)(c2)
    c2 = tf.keras.layers.Conv2D(nFilters*2, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(c2)
    c2 = tf.keras.layers.BatchNormalization()(c2)
    p2 = tf.keras.layers.MaxPooling2D((2,2), data_format='channels_last')(c2)
    
    # third part
    c3 = tf.keras.layers.Conv2D(nFilters*4, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(p2)
    c3 = tf.keras.layers.BatchNormalization()(c3)
    c3 = tf.keras.layers.Dropout(0.1)(c3)
    c3 = tf.keras.layers.Conv2D(nFilters*4, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(c3)
    c3 = tf.keras.layers.BatchNormalization()(c3)
    p3 = tf.keras.layers.MaxPooling2D((2,2), data_format='channels_last')(c3)
    
    # fourth part
    c4 = tf.keras.layers.Conv2D(nFilters*8, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(p3)
    c4 = tf.keras.layers.BatchNormalization()(c4)
    c4 = tf.keras.layers.Dropout(0.1)(c4)
    c4 = tf.keras.layers.Conv2D(nFilters*8, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(c4)
    c4 = tf.keras.layers.BatchNormalization()(c4)
    p4 = tf.keras.layers.MaxPooling2D((2,2), data_format='channels_last')(c4)
    
    # fifth part
    c5 = tf.keras.layers.Conv2D(nFilters*16, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(p4)
    c5 = tf.keras.layers.BatchNormalization()(c5)
    c5 = tf.keras.layers.Dropout(0.1)(c5)
    c5 = tf.keras.layers.Conv2D(nFilters*16, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(c5)
    c5 = tf.keras.layers.BatchNormalization()(c5)
    
    # --- up-sampling path --- 
    # first part
    u6 = tf.keras.layers.Conv2DTranspose(nFilters*8, (2,2), strides=(2,2), use_bias=bias_neuron, padding='same')(c5)
    u6 = tf.keras.layers.concatenate([u6, c4])
    c6 = tf.keras.layers.Conv2D(nFilters*8, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(u6)
    c6 = tf.keras.layers.BatchNormalization()(c6)
    c6 = tf.keras.layers.Dropout(0.1)(c6)
    c6 = tf.keras.layers.Conv2D(nFilters*8, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(c6)
    c6 = tf.keras.layers.BatchNormalization()(c6)
    
    # second part
    u7 = tf.keras.layers.Conv2DTranspose(nFilters*4, (2,2), strides=(2,2), use_bias=bias_neuron, padding='same')(c6)
    u7 = tf.keras.layers.concatenate([u7, c3])
    c7 = tf.keras.layers.Conv2D(nFilters*4, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(u7)
    c7 = tf.keras.layers.BatchNormalization()(c7)
    c7 = tf.keras.layers.Dropout(0.1)(c7)
    c7 = tf.keras.layers.Conv2D(nFilters*4, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(c7)
    c7 = tf.keras.layers.BatchNormalization()(c7)
    
    # third part
    u8 = tf.keras.layers.Conv2DTranspose(nFilters*2, (2,2), strides=(2,2), use_bias=bias_neuron, padding='same')(c7)
    u8 = tf.keras.layers.concatenate([u8, c2])
    c8 = tf.keras.layers.Conv2D(nFilters*2, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(u8)
    c8 = tf.keras.layers.BatchNormalization()(c8)
    c8 = tf.keras.layers.Dropout(0.1)(c8)
    c8 = tf.keras.layers.Conv2D(nFilters*2, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(c8)
    c8 = tf.keras.layers.BatchNormalization()(c8)
    
    # fourth part
    u9 = tf.keras.layers.Conv2DTranspose(nFilters, (2,2), strides=(2,2), use_bias=bias_neuron, padding='same')(c8)
    u9 = tf.keras.layers.concatenate([u9, c1])
    c9 = tf.keras.layers.Conv2D(nFilters, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(u9)
    c9 = tf.keras.layers.BatchNormalization()(c9)
    c9 = tf.keras.layers.Dropout(0.1)(c9)
    c9 = tf.keras.layers.Conv2D(nFilters, (3,3), activation='relu', use_bias=bias_neuron, 
                                kernel_initializer='he_normal', padding='same')(c9)
    c9 = tf.keras.layers.BatchNormalization()(c9)
    
    # output layer
    outputs = tf.keras.layers.Conv2D(NUM_CLASSES, (1,1), activation='softmax')(c9)
    
    # create and compile the model
    model = tf.keras.Model(inputs=[inputs], outputs=[outputs])
    model.compile(optimizer='adam', 
                  loss=[categorical_focal_loss(alpha=[[.1, .35, .35, .35]], gamma=0.1)], 
                  metrics=[dice_coef, iou_coef, 'accuracy'])
    # print summary
    model.summary()
    return model

In [16]:
model = createUNetModel(32, True)

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 512, 512, 3) 0                                            
__________________________________________________________________________________________________
lambda_1 (Lambda)               (None, 512, 512, 3)  0           input_2[0][0]                    
__________________________________________________________________________________________________
conv2d_19 (Conv2D)              (None, 512, 512, 16) 448         lambda_1[0][0]                   
__________________________________________________________________________________________________
batch_normalization_18 (BatchNo (None, 512, 512, 16) 64          conv2d_19[0][0]                  
____________________________________________________________________________________________

# Train the U-Net model

In [17]:
# create the generators
batchSize = 6

train_generator = IMT_Generator(X_train, y_train, batchSize)
test_generator = IMT_Generator(X_test, y_test, batchSize)

In [18]:
# create callbacks

# checkpoints so the model is saved
modelCheckpoint = tf.keras.callbacks.ModelCheckpoint('unet_imt_1px.h5', mode='max', verbose=1, monitor='val_iou_coef', save_best_only=True)
# early stopping against overfitting
earlyStopping = tf.keras.callbacks.EarlyStopping(mode='max', monitor='val_iou_coef', patience=10, verbose=1)
# tensorboard for graphics and statistics
tensorBoard = tf.keras.callbacks.TensorBoard(log_dir='logs', write_graph=True)

callbacks = [modelCheckpoint, earlyStopping, tensorBoard]

In [19]:
# final call for training
model.fit_generator(generator=train_generator,
                    epochs=100,
                    verbose=1,
                    callbacks=callbacks,
                    validation_data=test_generator,
                    shuffle=False)

Epoch 1/10
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
Epoch 00001: val_iou_coef improved from -inf to 0.92301, saving model to unet_imt_1px.h5
Epoch 2/10
Epoch 00002: val_iou_coef improved from 0.92301 to 0.97469, saving model to unet_imt_1px.h5
Epoch 3/10
Epoch 00003: val_iou_coef improved from 0.97469 to 0.98624, saving model to unet_imt_1px.h5
Epoch 4/10
Epoch 00004: val_iou_coef improved from 0.98624 to 0.99021, saving model to unet_imt_1px.h5
Epoch 5/10
Epoch 00005: val_iou_coef improved from 0.99021 to 0.99141, saving model to unet_imt_1px.h5
Epoch 6/10
Epoch 00006: val_iou_coef improved from 0.99141 to 0.99287, saving model to unet_imt_1px.h5
Epoch 7/10
Epoch 00007: val_iou_coef improved from 0.99287 to 0.99296, saving model to unet_imt_1px.h5
Epoch 8/10
Epoch 00008: val_iou_coef improved from 0.99296 to 0.99361, saving model to unet_imt_1px.h5
Epoch 9/10
Epoch 00009: val_iou_coef improved from 0.99361 to 0.99381, saving model t

<tensorflow.python.keras.callbacks.History at 0x169d8488>

# Test the U-Net model

In [20]:
# load the best performing model
model = tf.keras.models.load_model('./unet_imt_1px.h5', compile=False)
# compile it separately else problems occur with the custom loss function / custom evaluation score
model.compile(optimizer='adam', 
              loss=[categorical_focal_loss(alpha=[[.1, .35, .35, .35]], gamma=0.1)], 
              metrics=[dice_coef, iou_coef, 'accuracy'])

Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor


In [21]:
# calculate and save the achieved scores
scores = model.evaluate_generator(test_generator, verbose=1)
np.save(RESULT_DIRECTORY + 'scores.npy', scores)



In [22]:
# create and save the results
results = model.predict_generator(test_generator, verbose=1)
# the file 'results.npy' is really big (multiple GBs) since it contains float values
np.save(RESULT_DIRECTORY + 'results.npy', results)

