# Contribute to design of a self-driving car

To introduce the project, we have to perform image segmentation from Gerlany landscapes and town image dataset. Each of RGB image has its annotated images called a mask that identify the next eight classes : void, flat, human, sky, construction, nature, vehicle and object. The main goal is to build a model of Convolution neural networks that will take as input an RGB image to produce a predicted mask. The more the predicted mask will match with the true annotated one, the better the model will perform.
This script should be use with the technical note (in the folder with file name "ETIENNE_Louis_note_technique_11_2022") to better understand the different approaches in building the different models

In [1]:
import os, json 
import pandas as pd
import numpy as np
import cv2
from IPython.display import Image, display
from keras_preprocessing.image import load_img
from PIL import ImageOps, Image
import imgaug.augmenters as iaa
import imgaug as ia
from tensorflow import keras
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras import layers
from keras import metrics
from keras.api._v2.keras.metrics import SparseCategoricalAccuracy
import segmentation_models as sm 
from focal_loss import SparseCategoricalFocalLoss
from datetime import datetime as dt
import random

Segmentation Models: using `keras` framework.


# Images & annotated images Paths

In [2]:
test_cities = ['berlin', 'bielefeld', 'bonn', 'leverkusen', 'mainz', 'munich']
train_cities = ['bochum', 'bremen', 'cologne', 'darmstadt', 'dusseldorf', 'erfurt', 'hamburg', 'hanover', 'jena', 'krefeld', 
                'monchengladbach', 'strasbourg', 'stuttgart', 'tubingen', 'ulm', 'weimar', 'zurich']
val_cities = ['frankfurt', 'lindau', 'munster']

In [3]:
train_img_paths = []
train_ann_paths = []

for cities in train_cities:
    
    train_img_dir = r"D:\Mes Documents\Downloads\leftImg8bit\train/" + cities
    train_ann_dir = r"D:\Mes Documents\Downloads\P8_Cityscapes_gtFine_trainvaltest\gtFine\train/" + cities
    
    train_img_paths = train_img_paths + sorted(
        [
            os.path.join(train_img_dir, fname)
            for fname in os.listdir(train_img_dir)
            if fname.endswith("_leftImg8bit.png")
        ]
    )
    train_ann_paths = train_ann_paths + sorted(
        [
            os.path.join(train_ann_dir, fname)
            for fname in os.listdir(train_ann_dir)
            if fname.endswith("_gtFine_labelIds.png")
        ]
    )
    
print("Number of train images:", len(train_img_paths))
print("Number of train annotations:", len(train_ann_paths))


val_img_paths = []
val_ann_paths = []

for cities in val_cities:
    val_img_dir = r"D:\Mes Documents\Downloads\leftImg8bit\val/" + cities
    val_ann_dir = r"D:\Mes Documents\Downloads\P8_Cityscapes_gtFine_trainvaltest\gtFine\val/" + cities
    


    val_img_paths = val_img_paths + sorted(
        [
            os.path.join(val_img_dir, fname)
            for fname in os.listdir(val_img_dir)
            if fname.endswith("_leftImg8bit.png")
        ]
    )
    val_ann_paths = val_ann_paths + sorted(
        [
            os.path.join(val_ann_dir, fname)
            for fname in os.listdir(val_ann_dir)
            if fname.endswith("_gtFine_labelIds.png")
        ]
    )

print("Number of val images:", len(val_img_paths))

print("Number of val annotations:", len(val_ann_paths))

Number of train images: 2801
Number of train annotations: 2801
Number of val images: 500
Number of val annotations: 500


## Set parameters

In [29]:
input_dir = train_img_paths
target_dir = train_ann_paths
img_size = (160,160)
num_classes = 8
batch_size = 16
augm_multiplier = 2 #coefficient of the number of times the input images will be produced during data augmentation

## Define the 8 classes among the 33 sub-classes from the dataset

In [30]:
cats = {'void': [0, 1, 2, 3, 4, 5, 6],
 'flat': [7, 8, 9, 10],
 'construction': [11, 12, 13, 14, 15, 16],
 'object': [17, 18, 19, 20],
 'nature': [21, 22],
 'sky': [23],
 'human': [24, 25],
 'vehicle': [26, 27, 28, 29, 30, 31, 32, 33, -1]}

def convertCats(x):
    if x in cats['void']:
        return 0
    elif x in cats['flat']:
        return 1
    elif x in cats['construction']:
        return 2
    elif x in cats['object']:
        return 3
    elif x in cats['nature']:
        return 4
    elif x in cats['sky']:
        return 5
    elif x in cats['human']:
        return 6
    elif x in cats['vehicle']:
        return 7

convertCats_v = np.vectorize(convertCats) #to vectorize images array otherwise we can't get class of list of numbers 'x' with convertCats(x)

def preprocessImg(img):
    """ This function is a preprocess function that will make mask usable by numpy operations"""
    image_matrix = np.expand_dims(img, 2)
    converted_image = convertCats_v(image_matrix)

    return converted_image


# Prepare Sequence of data
## Data augmentation parameters

In [31]:
seq = iaa.Sequential([                                     

    iaa.Sometimes(
        0.5, iaa.GaussianBlur(sigma=(0, 0.1))),

    iaa.Affine(
        scale={"x": (0.97, 1.03), "y": (0.97, 1.03)},                # Zoom images to a value of 80 to 120% of their original size
        translate_percent={"x": (-0.03, 0.03), "y": (-0.03, 0.03)},  # Translate images by -20 to 20% on x_axis and y_axis independently
        rotate=(-25, 25))],                                          # Rotate images by -25 to 25% degrees
         random_order=True)                                          # "random_order = True" means order of geometric augmentation is set randomly

## DataGenerator
Thanks to Keras.utils.Sequence class

In [32]:
class OxfordPets(keras.utils.Sequence):
    """Helper to iterate over the data (as Numpy arrays)."""

    def __init__(self, batch_size, img_size, input_img_paths, target_img_paths):
        self.batch_size = batch_size
        self.img_size = img_size
        self.input_img_paths = input_img_paths
        self.target_img_paths = target_img_paths

    def __len__(self):
        return len(self.target_img_paths) // self.batch_size

    def __getitem__(self, idx):
        """Returns tuple (input, target) correspond to batch #idx."""
        i = idx * self.batch_size
        batch_input_img_paths = self.input_img_paths[i : i + self.batch_size]
        batch_target_img_paths = self.target_img_paths[i : i + self.batch_size]
        x = np.zeros((self.batch_size * augm_multiplier,) + self.img_size + (3,), dtype="float32") #la "," après le nombre dans les "()" signifie que c'est un "One element tuple" et non un int
        for j, path in enumerate(batch_input_img_paths):
            img = load_img(path, target_size=self.img_size)
            x[j] = img
        y = np.zeros((self.batch_size * augm_multiplier,) + self.img_size + (1,), dtype="uint8")

        for j, path in enumerate(batch_target_img_paths):
            img = load_img(path, target_size=self.img_size, color_mode="grayscale") #img's shape = (2048,1024) --> without color_mode='grayscale', img would have shape of (2048,1024,3)
            y[j] = preprocessImg(img) # "np.expand_dims(img, 2)" allows dimension expansion from (2048,1024) to (2048,1024,1) to get class number for each pixel

        for multiplier in range(1, augm_multiplier):
            for i in range(0, batch_size):
                ia.seed(i)
                img_augmentation = seq(image=x[i])
                x[batch_size * multiplier + i] = img_augmentation

                ia.seed(i)
                target_augmentation = seq(image=y[i])
                y[batch_size * multiplier + i] = target_augmentation

        return x, y

# Define U_net model
With custom layers

In [33]:
def get_model(img_size, num_classes):
    inputs = keras.Input(shape=img_size + (3,))

    ### [First half of the network: downsampling inputs] ###

    # Entry block
    x = layers.Conv2D(32, 3, strides=2, padding="same")(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    previous_block_activation = x  # Set aside residual

    # Blocks 1, 2, 3 are identical apart from the feature depth.
    for filters in [64, 128, 256]:
        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.SeparableConv2D(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.MaxPooling2D(3, strides=2, padding="same")(x)

        # Project residual
        residual = layers.Conv2D(filters, 1, strides=2, padding="same")(
            previous_block_activation
        )
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    ### [Second half of the network: upsampling inputs] ###

    for filters in [256, 128, 64, 32]:
        x = layers.Activation("relu")(x)
        x = layers.Conv2DTranspose(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.Activation("relu")(x)
        x = layers.Conv2DTranspose(filters, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        x = layers.UpSampling2D(2)(x)

        # Project residual
        residual = layers.UpSampling2D(2)(previous_block_activation)
        residual = layers.Conv2D(filters, 1, padding="same")(residual)
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    # Add a per-pixel classification layer
    outputs = layers.Conv2D(num_classes, 3, activation="softmax", padding="same")(x)

    # Define the model
    model = keras.Model(inputs, outputs)
    return model


# Free up RAM in case the model definition cells were run multiple times
keras.backend.clear_session()

# Build model
model = get_model(img_size, num_classes)
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 160, 160, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d (Conv2D)                (None, 80, 80, 32)   896         ['input_1[0][0]']                
                                                                                                  
 batch_normalization (BatchNorm  (None, 80, 80, 32)  128         ['conv2d[0][0]']                 
 alization)                                                                                       
                                                                                              

# Prepare Data
Kind of train/validation set of data preparation and load train and validation sequences

In [34]:
# Split our img paths into a training and a validation set
val_samples = 400
random.Random(1337).shuffle(train_img_paths)
random.Random(1337).shuffle(train_ann_paths)
train_input_img_paths = train_img_paths[:-val_samples]
train_target_img_paths = train_ann_paths[:-val_samples]
val_input_img_paths = val_img_paths[-val_samples:]
val_target_img_paths = val_ann_paths[-val_samples:]

# Instantiate data Sequences for each split
train_gen = OxfordPets(
    batch_size, img_size, train_input_img_paths, train_target_img_paths
)
val_gen = OxfordPets(batch_size, img_size, val_input_img_paths, val_target_img_paths)

## Define UpdatedMeanIoU
To use with sparse classes

In [35]:
class UpdatedMeanIoU(tf.keras.metrics.MeanIoU):
  def __init__(self,
               y_true=None,
               y_pred=None,
               num_classes=None,
               ignore_class=None,
               sparse_y_true: bool = True,
               sparse_y_pred: bool = True,
               axis: int = -1,
               name=None,
               dtype=None):
    super(UpdatedMeanIoU, self).__init__(num_classes = num_classes,name=name, dtype=dtype)

  def update_state(self, y_true, y_pred, sample_weight=None):
    y_pred = tf.math.argmax(y_pred, axis=-1)
    return super().update_state(y_true, y_pred, sample_weight)

# Compile the model

In [20]:
# Configure the model for training.
# We use the "sparse" version of categorical_crossentropy
# because our target data is integers and not one-hot vectors class

model.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=[UpdatedMeanIoU(num_classes=8), keras.metrics.SparseCategoricalAccuracy()])

callbacks = [
    keras.callbacks.ModelCheckpoint("oxford_segmentation.h5", save_best_only=True)
]

# Train the model, doing validation at the end of each epoch.
epochs = 10
model.fit(train_gen, epochs=epochs, validation_data=val_gen, callbacks=callbacks)

  ia.warn(
  ia.warn(


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x196ef98cf10>

# Segmenation with "Segmentation.models" (Unet / Linknet )

# Function pipeline segmentation models

In [11]:
def prepare_images(
    img_size,
    train_img_paths=train_img_paths,
    train_ann_paths=train_ann_paths,
    val_img_paths=val_img_paths,
    val_ann_paths=val_ann_paths):

    train_img, train_ann, val_img, val_ann = [], [], [], []

    for directory_path in train_img_paths[:]:
        img = cv2.imread(directory_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, img_size)
        train_img.append(img)

    for directory_path in train_ann_paths[:]:
        img = cv2.imread(directory_path, 0)
        img = cv2.resize(img, img_size)
        img = preprocessImg(img)
        train_ann.append(img)

    for directory_path in val_img_paths[:]:
        img = cv2.imread(directory_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, img_size)
        val_img.append(img)

    for directory_path in val_ann_paths[:]:
        img = cv2.imread(directory_path, 0)
        img = cv2.resize(img, img_size)
        img = preprocessImg(img)
        val_ann.append(img)

    train_img = np.array(train_img)
    train_ann = np.array(train_ann)
    val_img = np.array(val_img)
    val_ann = np.array(val_ann)

    return train_img, train_ann, val_img, val_ann

In [12]:
train_img, train_ann, val_img, val_ann = prepare_images(
    img_size=img_size,
    train_img_paths=train_img_paths[:],
    train_ann_paths=train_ann_paths[:],
    val_img_paths=val_img_paths[:],
    val_ann_paths=val_ann_paths[:]
    )

In [13]:
print("train image:", train_img.shape)
print("train_ann", train_ann.shape)
print("val_img", val_img.shape)
print("val_ann", val_ann.shape)

train image: (2801, 160, 160, 3)
train_ann (2801, 160, 160, 1)
val_img (500, 160, 160, 3)
val_ann (500, 160, 160, 1)


# Segmentation Models Library
https://github.com/qubvel/segmentation_models

In [33]:
def segmentation_unet(
    backbone, callbacks, epochs, batch_size, train_img, train_ann,
    val_img, val_ann, encoder_weights=True,
    optimizer='rmsprop', loss="sparse_categorical_crossentropy", 
    metrics=[UpdatedMeanIoU(num_classes=8), SparseCategoricalAccuracy()], 
    activation='softmax'):
    
    BACKBONE = backbone
    preprocess_input = sm.get_preprocessing(BACKBONE)

    X_train = preprocess_input(train_img)
    X_test = preprocess_input(val_img)

    if encoder_weights:
        model = sm.Unet(BACKBONE, encoder_weights='imagenet', classes=num_classes)

    else:
        model = sm.Unet(BACKBONE, classes=num_classes)
    
    model.compile(
        optimizer=optimizer,
        loss=loss,
        metrics=metrics
    )

    callbacks = [
        keras.callbacks.ModelCheckpoint(callbacks, save_best_only=True)
    ]

    print(model.summary())

    history = model.fit(
        X_train, 
        train_ann,
        batch_size=batch_size,
        epochs=epochs,
        verbose=1,
        validation_data=(X_test, val_ann),
        callbacks=callbacks
    )

    return history 

In [None]:
start = dt.now()

history = segmentation_unet(
    backbone='resnet34',
    callbacks='Unet_resnet34_FL_160x160.h5',
    epochs=15,
    batch_size=16, 
    train_img=train_img,
    train_ann=train_ann, 
    val_img=val_img,
    val_ann=val_ann,
)

running_secs = (dt.now() - start).seconds
print(running_secs)

In [36]:
keras.backend.clear_session()

# Training model with image augmentation

In [37]:
def segmentation_unet2(
    backbone, callbacks, epochs, train_gen,
    val_gen, encoder_weights=True,
    optimizer='rmsprop', loss="sparse_categorical_crossentropy", 
    metrics=[UpdatedMeanIoU(num_classes=8), SparseCategoricalAccuracy()], 
    activation='softmax'):
    
    BACKBONE = backbone
    preprocess_input = sm.get_preprocessing(BACKBONE)

    # X_train = preprocess_input(train_img)
    # X_test = preprocess_input(val_img)

    if encoder_weights:
        model = sm.Unet(BACKBONE, encoder_weights='imagenet', classes=num_classes)

    else:
        model = sm.Unet(BACKBONE, classes=num_classes)
    
    model.compile(
        optimizer=optimizer,
        loss=loss,
        metrics=metrics
    )

    callbacks = [
        keras.callbacks.ModelCheckpoint(callbacks, save_best_only=True)
    ]

    # print(model.summary())

    history = model.fit_generator(
        train_gen,
        epochs=epochs,
        verbose=1,
        validation_data=val_gen,
        callbacks=callbacks
    )

    return history 

In [38]:
start = dt.now()

history = segmentation_unet2(
    train_gen=train_gen,
    val_gen=val_gen,
    backbone='resnet34',
    callbacks='Unet_resnet34_FL_160x160.h5',
    epochs=15, 
    
)

running_secs = (dt.now() - start).seconds
print(running_secs)

  history = model.fit_generator(


Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
3582
