In [None]:
!pip3 install imgaug

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow.keras
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.optimizers import Adam, Adadelta
from tensorflow.keras.layers import Dense, Flatten, Dropout, Convolution2D, MaxPooling2D, Lambda, Reshape, Activation
from tensorflow.keras.layers import Convolution2DTranspose, concatenate, Input, UpSampling2D, BatchNormalization, Add
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, TensorBoard
from tensorflow.keras.utils import to_categorical
from sklearn.utils import shuffle
import cv2
import pandas as pd
import random
import os
import ntpath
import matplotlib.image as mpimg
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from imgaug import augmenters as iaa

In [None]:
# Set Keras to use GPUs
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
# Check if GPU is available
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())

In [None]:
# Define the image resolution and the number of classes
HEIGHT = 224
WIDTH = 224
NUM_CLASSES = 11
# Define the checkpoint name and the final model name
CHECKPOINT_NAME = 'model_checkpoint.h5'
MODEL_NAME = 'model.h5'

MODEL_NAME = "models/" + MODEL_NAME

In [None]:
# Specify the path to the data
datadir = 'data'
# Specify the column names for the data
cols = ['image_name', 'mask_name', 'binary_mask_name']
# Read the data into a pandas dataframe
data = pd.read_csv(os.path.join(datadir, 'labels.csv'), names=cols)
data.head()

In [None]:
def load_img_mask(datadir, df):
    """Load the images and mask path into memory"""
    image_paths = []
    mask_paths = []
    for i in range(len(df)):
        index_data = df.iloc[i]
        image = index_data[0]
        mask = index_data[2]
        image_paths.append(os.path.join(datadir,image))
        mask_paths.append(os.path.join(datadir,mask))
    image_paths = np.asarray(image_paths)
    mask_paths = np.asarray(mask_paths)
    return image_paths, mask_paths

image_paths, mask_paths = load_img_mask(datadir, data)

In [None]:
# Split the data into training and validation sets. You can change the seed to generate a different training set.
X_train, X_valid, y_train, y_valid = train_test_split(image_paths, mask_paths, test_size=0.2, random_state=420)
print('Training Samples: {}\nValidation Samples: {}'.format(len(X_train), len(X_valid)))

In [None]:
# load 5 random images and masks to visualize the data
fig, ax = plt.subplots(2, 5, figsize=(20, 8))
for i in range(5):
    ax[0, i].imshow(mpimg.imread(X_train[i]))
    ax[0, i].set_title('Image: {}'.format(ntpath.basename(X_train[i])))
    ax[1, i].imshow(mpimg.imread(y_train[i]))
    ax[1, i].set_title('Mask: {}'.format(ntpath.basename(y_train[i])))
plt.show()

In [None]:
def change_contrast(image):
    """Change the contrast of the image"""
    lab = cv2.cvtColor(image, cv2.COLOR_RGB2LAB)
    l_channel, a_channel, b_channel = cv2.split(lab)

    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    l_channel = clahe.apply(l_channel)
    limg = cv2.merge((l_channel, a_channel, b_channel))

    image = cv2.cvtColor(limg, cv2.COLOR_LAB2RGB)

    return image

def img_random_brightness(image):
    """Change the brightness of the image"""
    brightness = iaa.Multiply((0.8, 1.2))
    image = brightness.augment_image(image)
    # print("Brightness", image.dtype)
    return image

def random_horizontal_flip(image, mask):
    """Flip the image and mask horizontally"""
    horizontal_flip = iaa.Fliplr(1)
    image = horizontal_flip.augment_image(image)
    mask = horizontal_flip.augment_image(mask)
    # print("Horizontal Flip", image.dtype)
    return image, mask

def random_noise(image):
    """Add random 20 % noise to the image"""
    #add 5% of noise to the image
    noise = np.random.normal(0, 0.2, image.shape)
    image = image + noise
    # convert back to uint8
    image = np.clip(image, 0, 255)
    image = image.astype('uint8')
    return image

def crop_bottom(image, mask):
    """Crop the image and mask bottom"""
    crop_value = np.random.randint(1, 200)
    image = image[:-crop_value, :, :]
    mask = mask[:-crop_value, :]
    return image, mask

def crop_top(image, mask):
    """Crop the image and mask top"""
    crop_value = np.random.randint(1, 200)
    image = image[crop_value:, :, :]
    mask = mask[crop_value:, :]
    return image, mask

def change_saturation(image):
    """Change the saturation of the image"""
    image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
    image[:, :, 1] = image[:, :, 1] * np.random.uniform(0.7, 1.3)
    image = cv2.cvtColor(image, cv2.COLOR_HSV2RGB)
    return image

def change_hue(image):
    """Change the hue of the image"""
    image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
    image[:, :, 0] = image[:, :, 0] + np.random.randint(-18, 18)
    image = cv2.cvtColor(image, cv2.COLOR_HSV2RGB)
    return image


In [None]:
# Show a all augmentation techniques on a random image
def show_sample(image, mask):
    image_contrast = change_contrast(image)
    image_brightness = img_random_brightness(image)
    image_noise = random_noise(image)
    image_flip, mask_flip = random_horizontal_flip(image, mask)
    image_crop_top, mask_crop_top = crop_top(image, mask)
    image_crop_bottom, mask_crop_bottom = crop_bottom(image, mask)
    image_saturation = change_saturation(image)
    image_hue = change_hue(image)

    fig, ax = plt.subplots(9, 2, figsize=(8, 20))
    fig.tight_layout()
    ax[0, 0].imshow(image)
    ax[0, 0].set_title('Original Image')
    ax[0, 1].imshow(mask)
    ax[0, 1].set_title('Original Mask')
    ax[1, 0].imshow(image_contrast)
    ax[1, 0].set_title('Contrast Image')
    ax[1, 1].imshow(mask)
    ax[1, 1].set_title('Contrast Mask')
    ax[2, 0].imshow(image_brightness)
    ax[2, 0].set_title('Brightness Image')
    ax[2, 1].imshow(mask)
    ax[2, 1].set_title('Brightness Mask')
    ax[3, 0].imshow(image_noise)
    ax[3, 0].set_title('Noise Image')
    ax[3, 1].imshow(mask)
    ax[3, 1].set_title('Noise Mask')
    ax[4, 0].imshow(image_flip)
    ax[4, 0].set_title('Flipped Image')
    ax[4, 1].imshow(mask_flip)
    ax[4, 1].set_title('Flipped Mask')
    ax[5, 0].imshow(image_crop_top)
    ax[5, 0].set_title('Cropped Top Image')
    ax[5, 1].imshow(mask_crop_top)
    ax[5, 1].set_title('Cropped Top Mask')
    ax[6, 0].imshow(image_crop_bottom)
    ax[6, 0].set_title('Cropped Bottom Image')
    ax[6, 1].imshow(mask_crop_bottom)
    ax[6, 1].set_title('Cropped Bottom Mask')
    ax[7, 0].imshow(image_saturation)
    ax[7, 0].set_title('Saturation Image')
    ax[7, 1].imshow(mask)
    ax[7, 1].set_title('Saturation Mask')
    ax[8, 0].imshow(image_hue)
    ax[8, 0].set_title('Hue Image')
    ax[8, 1].imshow(mask)
    ax[8, 1].set_title('Hue Mask')
    plt.show()

image, mask = random.choice(list(zip(X_train, y_train)))
# print(image, mask)
image, mask = cv2.imread(image,cv2.IMREAD_COLOR), cv2.imread(mask,0)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# print(np.unique(mask))
show_sample(image, mask)



In [None]:
# Randomly augment the images and masks
def random_augment(image, mask):
    path_img, path_mask = image, mask
    image = cv2.imread(image)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    mask = cv2.imread(mask,0)
    # Augment 75% of the images
    should_augment = np.random.rand() < 0.75
    
    if should_augment:
        if np.random.rand() < 0.5:
            image, mask = random_horizontal_flip(image, mask)
            if image is None or mask is None:
                print("horizontal flip failed")
                print(path_img, path_mask)
                
        if np.random.rand() < 0.5:
            image = random_noise(image)
            if image is None or mask is None:
                print("noise failed")
                print(path_img, path_mask)

        if np.random.rand() < 0.5:
            image, mask = crop_top(image, mask)
            if image is None or mask is None:
                print("crop top failed")
                print(path_img, path_mask)

        if np.random.rand() < 0.5:
            image = img_random_brightness(image)
            if image is None or mask is None:
                print("brightness failed")
                print(path_img, path_mask)

        #if np.random.rand() < 0.5:
            #image = change_brightness(image)
            #if image is None or mask is None:
                #print("brightness failed")
                #print(path_img, path_mask)

        if np.random.rand() < 0.5:
            image = change_hue(image)
            if image is None or mask is None:
                print("hue failed")
                print(path_img, path_mask)

        if np.random.rand() < 0.5:
            image = change_saturation(image)
            if image is None or mask is None:
                print("saturation failed")
                print(path_img, path_mask)

        if np.random.rand() < 0.5:
            image = change_contrast(image)
            if image is None or mask is None:
                print("contrast failed")
                print(path_img, path_mask)

    # Crop the bottom in 50% of the images. This is required to avoid overfitting
    # for the hood of the car.
    if np.random.rand() < 0.50:
        image, mask = crop_bottom(image, mask)
        if image is None or mask is None:
            print("crop bottom failed")
            print(path_img, path_mask)

    if image is None or mask is None:
        print("failed")
        print(path_img, path_mask)
    return image, mask

In [None]:
def img_preprocess(image):
    """Preprocess the image"""
    image = cv2.GaussianBlur(image, (3, 3), 0)
    image = cv2.resize(image, (WIDTH, HEIGHT))
    return image

In [None]:
#----------------------------------------------------------------------------------------------------------------------
# The data generator is required to ease the memory load on the GPU. Only the images and masks required for training
# step are loaded in memory. This happens dynamically when the generator is called.
#----------------------------------------------------------------------------------------------------------------------
def batch_generator(image_paths, mask_paths, batch_size, is_train=True):
    """A generator that returns images and masks."""
    while True:
        batch_img = []
        batch_mask = []
        one_hot_mask = np.zeros((HEIGHT,WIDTH,NUM_CLASSES))
        for i in range(batch_size):
            random_index = random.randint(0, len(image_paths) - 1)
            
            if is_train:
                img, mask = random_augment(image_paths[random_index], mask_paths[random_index])
            else:
                img = cv2.imread(image_paths[random_index])
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                mask = cv2.imread(mask_paths[random_index], 0)
            try:
                img = img_preprocess(img)
            except Exception as e:
                print(e)
                print(image_paths[random_index])

            mask = cv2.resize(mask, (WIDTH, HEIGHT), interpolation=cv2.INTER_NEAREST)
            # One hot encoding of the mask
            one_hot_mask = to_categorical(mask, num_classes=NUM_CLASSES)
            batch_img.append(img)
            batch_mask.append(one_hot_mask)
        yield (np.asarray(batch_img), np.asarray(batch_mask))


In [None]:
"""Scaled down Unet model. Works ok with an image resolution of 160x120"""

def unet_model(input_image, num_classes):

    n0 = Lambda(lambda x: x/255 - 0.5)(input_image)

    c1 = Convolution2D(8, (3, 3), activation='elu', padding='same')(n0)
    c1 = Dropout(0.1)(c1)
    c1 = Convolution2D(8, (3, 3), activation='elu', padding='same')(c1)
    p1 = MaxPooling2D((2, 2))(c1)

    c2 = Convolution2D(16, (3, 3), activation='elu', padding='same')(p1)
    c2 = Dropout(0.1)(c2)
    c2 = Convolution2D(16, (3, 3), activation='elu', padding='same')(c2)
    p2 = MaxPooling2D((2, 2))(c2)

    c3 = Convolution2D(32, (3, 3), activation='elu', padding='same')(p2)
    c3 = Dropout(0.2)(c3)
    c3 = Convolution2D(32, (3, 3), activation='elu', padding='same')(c3)
    p3 = MaxPooling2D((2, 2))(c3)

    c4 = Convolution2D(64, (3, 3), activation='elu', padding='same')(p3)
    c4 = Dropout(0.2)(c4)
    c4 = Convolution2D(64, (3, 3), activation='elu', padding='same')(c4)

    u5 = Convolution2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c4)
    u5 = concatenate([u5, c3])
    c5 = Convolution2D(32, (3, 3), activation='elu', padding='same')(u5)
    c5 = Dropout(0.2)(c5)
    c5 = Convolution2D(32, (3, 3), activation='elu', padding='same')(c5)

    u6 = Convolution2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(c5)
    u6 = concatenate([u6, c2])
    c6 = Convolution2D(16, (3, 3), activation='elu', padding='same')(u6)
    c6 = Dropout(0.1)(c6)
    c6 = Convolution2D(16, (3, 3), activation='elu', padding='same')(c6)

    u7 = Convolution2DTranspose(16, (2, 2), strides=(2, 2), padding='same')(c6)
    u7 = concatenate([u7, c1])
    c7 = Convolution2D(8, (3, 3), activation='elu', padding='same')(u7)
    c7 = Dropout(0.1)(c7)
    c7 = Convolution2D(8, (3, 3), activation='elu', padding='same')(c7)

    output = Convolution2D(num_classes, (1, 1), activation='softmax')(c7)

    model = Model(inputs=[input_image], outputs=[output])

    model.compile(optimizer=Adam(learning_rate=1e-3), loss='categorical_crossentropy', metrics=['accuracy'])

    return model

In [None]:
"""Unet with VGG16 encoder. Requires an image resolution of at least 224x224"""
def vgg16_unet(input_image, num_classes):

    n0 = Lambda(lambda x: x/255 - 0.5)(input_image)

    c1 = Convolution2D(64, (3, 3), activation='relu', padding='same')(n0)
    c1 = Convolution2D(64, (3, 3), activation='relu', padding='same')(c1)

    p1 = MaxPooling2D((2, 2))(c1)
    c2 = Convolution2D(128, (3, 3), activation='relu', padding='same')(p1)
    c2 = Convolution2D(128, (3, 3), activation='relu', padding='same')(c2)
   
    p2 = MaxPooling2D((2, 2))(c2)
    c3 = Convolution2D(256, (3, 3), activation='relu', padding='same')(p2)
    c3 = Convolution2D(256, (3, 3), activation='relu', padding='same')(c3)
    c3 = Convolution2D(256, (3, 3), activation='relu', padding='same')(c3)

    p3 = MaxPooling2D((2, 2))(c3)
    c4 = Convolution2D(512, (3, 3), activation='relu', padding='same')(p3)
    c4 = Convolution2D(512, (3, 3), activation='relu', padding='same')(c4)
    c4 = Convolution2D(512, (3, 3), activation='relu', padding='same')(c4)

    u5 = Convolution2DTranspose(512, (2, 2), strides=(2, 2), padding='same')(c4)
    u5 = concatenate([u5, c3])
    c5 = Convolution2D(256, (3, 3), activation='relu', padding='same')(u5)
    c5 = Convolution2D(256, (3, 3), activation='relu', padding='same')(c5)

    u6 = Convolution2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(c5)
    u6 = concatenate([u6, c2])
    c6 = Convolution2D(128, (3, 3), activation='relu', padding='same')(u6)
    c6 = Convolution2D(128, (3, 3), activation='relu', padding='same')(c6)

    u7 = Convolution2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(c6)
    u7 = concatenate([u7, c1])
    c7 = Convolution2D(64, (3, 3), activation='relu', padding='same')(u7)
    c7 = Convolution2D(64, (3, 3), activation='relu', padding='same')(c7)

    output = Convolution2D(num_classes, (1, 1), activation='softmax')(c7)

    model = Model(inputs=[input_image], outputs=[output])

    model.compile(optimizer=Adam(learning_rate=1e-3), loss='categorical_crossentropy', metrics=['accuracy'])
    
    return model

In [None]:
input_image = Input((HEIGHT, WIDTH, 3))
model = vgg16_unet(input_image, NUM_CLASSES)
model.summary()

In [None]:
# Define the callback to reduce the learning rate and stop training if the model does not improve after 3 epochs
# Also initialize tensorboard for logging
callbacks = [ModelCheckpoint(CHECKPOINT_NAME, monitor='val_loss', save_best_only=True),
             EarlyStopping(monitor='val_loss', patience=5, verbose=1, restore_best_weights=True),
             ReduceLROnPlateau(monitor='val_loss', factor=0.05, patience=4, verbose=1),
             TensorBoard(log_dir='logs/', histogram_freq=5, write_graph=True, write_images=True)]

In [None]:
# Train the model, this might take a while
history = model.fit(batch_generator(X_train, y_train, batch_size=8, is_train=True),
                    steps_per_epoch=100,
                    epochs=40,
                    validation_data=batch_generator(X_valid, y_valid, batch_size=8, is_train=False),
                    validation_steps=60,
                    callbacks=callbacks,
                    verbose=1,
                    shuffle=True)

In [None]:
# Plot the loss function. Generally the validation loss should be less than the training loss
# Both the training and validation loss should decrease over time
# Traning and validation loss should be close to each other
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['train', 'val'])
plt.title('loss')
plt.xlabel('epoch')

In [None]:
# Save the model
model.save(MODEL_NAME)

In [None]:
# Test the model
model = load_model(MODEL_NAME)
test_image = cv2.imread("psaf2_circle.png")
test_image = cv2.cvtColor(test_image, cv2.COLOR_BGR2RGB)
test_image = cv2.resize(test_image, (WIDTH, HEIGHT))
test_image = img_preprocess(test_image)
test_image = np.expand_dims(test_image, axis=0)
pred = model.predict(test_image)
pred = np.argmax(pred, axis=-1)
pred = pred[0]
pred = pred.astype(np.uint8)
pred = cv2.resize(pred, (WIDTH, HEIGHT))
print(np.unique(pred))
# plot
fig, ax = plt.subplots(1, 2, figsize=(10, 5))
ax[0].imshow(test_image[0])
ax[0].set_title('input image')
ax[1].imshow(pred)
ax[1].set_title('predicted mask')

In [None]:
from google.colab import files
files.download(MODEL_NAME)