In [None]:
!pip install segmentation-models albumentations

In [None]:
%load_ext autoreload
%autoreload 2

# include include folder
import sys
sys.path.append("./miniaturautonomie_lanedetection/include/")
sys.path.append("./include/")

from tensorflow.keras.models import load_model, Model
from tensorflow.keras.utils import plot_model
from tensorflow.keras.callbacks import ModelCheckpoint
import tensorflow as tf

import matplotlib.pyplot as plt
import matplotlib.image as mpimg

import numpy as np
import os
import fnmatch
import random
import cv2
import json

import supervisely_parser as svp
import render
import evaluation as evl
from DataGenerator import DataGenerator

import segmentation_models as sm # for simple segmentation architecture

import albumentations as Alb

In [None]:
name = 'reference' # should be changed for every new test run

annotation_path = './annotation_v3.0/'
image_path = './data/'
packages = ['knuff_main1', 'knuff_main2', 'knuff_main3', 'knuff_hill', 'uni', 'highway', 'knuff_main5', 'knuff_main6']

output_dir = f"./output/{name}"
model_path = f"{output_dir}/model.h5"

input_img_size = (480, 640)

number_classes = 7 # outer, middle_curb, guide_lane, solid_lane, hold_line, zebra, background
output_width = 640 
output_height = 224 
input_width = 640 
input_height = 224

val_size = 0.2 # percentage
augmentation = True

batch_size = 16
epochs = 5

transform = Alb.Compose([
    Alb.ShiftScaleRotate(p=0.2),
    Alb.RandomContrast(p=0.4),
    Alb.RandomBrightness(limit=[-0.2,0.2], p=0.4)
])

params = {
    'batch_size': batch_size,
    'input_img_size': input_img_size,
    'target_img_size': (output_height, output_width),
    'shuffle': True,
    'n_channels': number_classes,
    'transform': transform,
    'augmentation': augmentation
}

#################################################################
# run only if to rerender all masks or new packages are added
#render.render_packages(packages, annotation_path, input_img_size)
#################################################################

# create output dir for every generated file
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# generate absolute list of all img and masks paths
image_paths = []
mask_paths = []
# for package
for index in range(len(packages)):
    image_base_path = f"{image_path}{packages[index]}/"
    masks_base_path = f"{annotation_path}{packages[index]}/masks/"

    file_list = os.listdir(masks_base_path)
    pattern = '*.png'
    for filename in file_list:
        if fnmatch.fnmatch(filename, pattern):
            mask_paths.append(os.path.join(masks_base_path, filename))
            image_name = filename[:len(filename)-3]+"jpg"
            image_paths.append(os.path.join(image_base_path, image_name))

# split into train and val set
size_all = len(image_paths)
val_samples = int(size_all*val_size)
random.Random(size_all).shuffle(image_paths)
random.Random(size_all).shuffle(mask_paths)

train_input_img_paths = image_paths[:-val_samples]
train_target_mask_paths = mask_paths[:-val_samples]
val_input_img_paths = image_paths[-val_samples:]
val_target_mask_paths = mask_paths[-val_samples:]

print(f"Len train img: {len(train_input_img_paths)} len train mask: {len(train_target_mask_paths)}")
print(f"Len val img: {len(val_input_img_paths)} len val mask: {len(val_target_mask_paths)}")

# Generators
train_gen = DataGenerator(train_input_img_paths, train_target_mask_paths, **params)
val_gen = DataGenerator(val_input_img_paths, val_target_mask_paths, **params)

    

In [None]:
c = 16
f, axs = plt.subplots(c, 8, figsize=(50,c*3))
i = 0

batch_input_test = train_input_img_paths[:c]
batch_target_test = train_target_mask_paths[:c]
images, data = train_gen.data_generation(batch_input_test, batch_target_test)

print(len(images))
for y in range(c):
    axs[y,0].imshow(images[i])
    axs[y,0].title.set_text('input')
    axs[y,1].imshow(data[i][:,:,0], cmap='plasma')
    axs[y,1].title.set_text('outer')
    axs[y,2].imshow(data[i][:,:,1], cmap='plasma')
    axs[y,2].title.set_text('middle_curb')
    axs[y,3].imshow(data[i][:,:,2], cmap='plasma')
    axs[y,3].title.set_text('guide_lane')
    axs[y,4].imshow(data[i][:,:,3], cmap='plasma')
    axs[y,4].title.set_text('solid_lane')
    axs[y,5].imshow(data[i][:,:,4], cmap='plasma')
    axs[y,5].title.set_text('hold_line')
    axs[y,6].imshow(data[i][:,:,5], cmap='plasma')
    axs[y,6].title.set_text('zebra')
    axs[y,7].imshow(data[i][:,:,6], cmap='plasma')
    axs[y,7].title.set_text('background')
    i += 1

In [None]:
import models_backbone as models_b
metrics = [sm.metrics.iou_score, evl.precision,evl.recall,evl.f1_score]

model = models_b.reference('LANESEGMENTATION_REFERENCE', input_height, input_width, number_classes, metrics)
#model = custom_unet()
#model = segmentation_model()

plot_model(model, f"{output_dir}/model.png", show_shapes=True, show_dtype=False, show_layer_names=True)
print(model.summary())

In [None]:
# checkpoint
config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True
#config.gpu_options.per_process_gpu_memory_fraction = 0.4
sess = tf.compat.v1.Session(config=config)
    
filepath=f"{output_dir}/checkpoint.h5"
checkpoint = ModelCheckpoint(filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
callbacks_list = [checkpoint]

tf.get_logger().setLevel('ERROR')

history = model.fit(train_gen,
            validation_data=val_gen,
            epochs=epochs,
            use_multiprocessing=False,
            workers=6,
            callbacks=callbacks_list)
model.save(model_path)

In [None]:
plt.figure(figsize=(20, 10))
# Plot training & validation loss values
plt.subplot(231)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')

# Plot training & validation iou_score values
plt.subplot(232)
plt.plot(history.history['iou_score'])
plt.plot(history.history['val_iou_score'])
plt.title('Model iou_score')
plt.ylabel('iou_score')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')

plt.subplot(233)
plt.plot(history.history['recall'])
plt.plot(history.history['val_recall'])
plt.title('Model recall mean')
plt.ylabel('recall')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')

plt.subplot(234)
plt.plot(history.history['precision'])
plt.plot(history.history['val_precision'])
plt.title('Model precision mean')
plt.ylabel('precision')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')

plt.subplot(235)
plt.plot(history.history['f1_score'])
plt.plot(history.history['val_f1_score'])
plt.title('Model F1 score mean')
plt.ylabel('f1')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')


plt.savefig(f"{output_dir}/{name}_plot.png", bbox_inches="tight")
plt.show()

# Evaluation

In [None]:
# load model only if not trained in this session
model = load_model(f"{output_dir}/model.h5", custom_objects={'iou_score':sm.metrics.iou_score, 
                                                            'f1-score':sm.metrics.f1_score, 
                                                            'precision':sm.metrics.precision, 
                                                            'recall':sm.metrics.recall})
# load test images
packages = ['knuff_main1']

# generate absolute list of all img and masks paths
image_paths = []
mask_paths = []
# for package
for index in range(len(packages)):
    image_base_path = f"{image_path}{packages[index]}/"
    masks_base_path = f"{annotation_path}{packages[index]}/masks/"

    file_list = os.listdir(masks_base_path)
    pattern = '*.png'
    for filename in file_list:
        if fnmatch.fnmatch(filename, pattern):
            mask_paths.append(os.path.join(masks_base_path, filename))
            image_name = filename[:len(filename)-3]+"jpg"
            image_paths.append(os.path.join(image_base_path, image_name))

test_batch_size = 16
params = {
    'batch_size': test_batch_size,
    'input_img_size': input_img_size,
    'target_img_size': (output_height, output_width),
    'shuffle': False,
    'n_channels': number_classes,
    'augmentation': False
}

test_gen = DataGenerator(image_paths, mask_paths, **params)

In [None]:
loss, iou, f1_score, precision, recall = model.evaluate(test_gen, verbose=0)
print("Evaluation result:")
print(f"Loss: {loss} \nIoU: {iou} \nF1: {f1_score} \nPrecision: {precision} \nRecall: {recall} \n")

testbatch_imgs = image_paths[:test_batch_size]
testbatch_masks = mask_paths[:test_batch_size]

imgs, masks = test_gen.data_generation(testbatch_imgs, testbatch_masks)

predictions = model.predict(imgs)

thres_value = 0.1

f, axs = plt.subplots(len(predictions), 3, figsize=(20,len(predictions)*3))
for i, prediction in enumerate(predictions):
    # show input img
    axs[i,0].imshow(imgs[i])
    axs[i,0].title.set_text('Original')
    
    # show prediction
    ## clip to 0 or 1 with thres
    clipped_pred = np.where(prediction > thres_value, 1, 0)
    rgb_pred = render.render_rgb(clipped_pred)
    
    axs[i,1].imshow(rgb_pred)
    axs[i,1].title.set_text('Prediction')
    # show ground truth
    rgb_gt = render.render_rgb(masks[i].astype(int))
    
    axs[i,2].imshow(rgb_gt)
    axs[i,2].title.set_text('Ground Truth')