### Semantic segmentation with U-Net architecture on CARLA dataset. A solution for ['Semantic Segmentation for Self Driving Cars'](https://www.kaggle.com/kumaresanmanickavelu/lyft-udacity-challenge) challenge on kaggle.

In [1]:
import os
import random
import glob
import numpy as np
import matplotlib.pyplot as plt
import cv2
import tensorflow as tf
from sklearn.model_selection import train_test_split
import tensorflow.keras.layers as tkl

In [2]:
WIDTH = 256
HEIGHT = 256
n_channels = 3
BATCH_SIZE = 32

In [3]:
dir_path = '../input/lyft-udacity-challenge'

In [4]:
images_names = []
mask_names = []

for i in ['A', 'B', 'C', 'D', 'E']:
    
    images_names.extend(glob.glob(os.path.join(dir_path, f'data{i}/data*/CameraRGB/*.png')))
    mask_names.extend(glob.glob(os.path.join(dir_path, f'data{i}/data*/CameraSeg/*.png')))

n_samples = len(images_names)
print(n_samples)
# print(type(glob.glob(os.path.join(dir_path, f'data{i}/data*/CameraRGB/*.png'))))

In [5]:
train_images, val_images, train_masks, val_masks = train_test_split(images_names, mask_names, test_size = 0.2, random_state = 0)

In [6]:
class  DataGenerator(tf.keras.utils.Sequence):
    
    def __init__(self, image_filenames, mask_filenames, batch_size = BATCH_SIZE, shuffle = True):
        self.image_filenames = image_filenames
        self.mask_filenames = mask_filenames
        self.filenames = list(zip(image_filenames, mask_filenames))
        self.batch_size = BATCH_SIZE
        self.shuffle = shuffle
    
    def __len__(self):
        return (len(self.image_filenames) // self.batch_size)
    
    def __getitem__(self, idx):
        batch = self.filenames[idx * self.batch_size : (idx + 1) * self.batch_size]
        X, y = self.__data_generation(batch)
        return X, y
    
    def on_epoch_end(self):
        self.indexes = np.arange(len(self.image_filenames))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
            
    def __data_generation(self, batch):
        images = []
        segments = []
        
        for img_file, mask_file in batch:
            image = cv2.imread(img_file)
            image = cv2.resize(image, (WIDTH, HEIGHT))
            image = tf.image.convert_image_dtype(image, tf.float32)
            
            mask = cv2.imread(mask_file)
            mask = cv2.resize(mask, (WIDTH, HEIGHT))[:, :, 2]

            images.append(image)
            segments.append(mask)
            
        return np.array(images), np.array(segments)

In [7]:
train_generator = DataGenerator(train_images, train_masks)
val_generator = DataGenerator(val_images, val_masks)

In [8]:
x, y = train_generator[124]
for i in range(5):
    plt.figure(figsize=(12,6))
    plt.subplot(121)
    plt.imshow(x[i])
    plt.title('Image')
    plt.subplot(122)
    plt.imshow(y[i])
    plt.title('Segmentation')

In [9]:
fig, arr = plt.subplots(1,2, figsize=(15, 6))
arr[0].imshow(x[10])
arr[0].set_title('Image')
arr[1].imshow(y[10])
arr[1].set_title('Segmentation')

In [10]:
def conv_block(inputs = None, n_filters = 32, kernel_size = 3, batch_norm = True):
    
    '''First layer'''
    
    conv = tkl.Conv2D(filters = n_filters,
                      kernel_size = kernel_size,
                      padding = 'same',
                      kernel_initializer = 'he_normal')(inputs)
    
    if batch_norm:
        conv = tkl.BatchNormalization()(conv)
    
    conv = tkl.Activation('relu')(conv)
    
    '''Second layer'''
    
    conv = tkl.Conv2D(filters = n_filters,
                     kernel_size = kernel_size,
                     padding = 'same',
                     kernel_initializer = 'he_normal')(conv)
    
    if batch_norm:
        conv = tkl.BatchNormalization()(conv)
    
    conv = tkl.Activation('relu')(conv)
        
    return conv


def conv_transpose_block(n_filters, kernel_size = 3):
    conv_transpose = tkl.Conv2DTranspose(n_filters, kernel_size = kernel_size, strides = (2,2), padding = 'same')
    
    return conv_transpose
    

def u_net(n_filters = 32, dropout_prob = 0.1, batch_norm = True):
    input_size = (WIDTH, HEIGHT, n_channels)
    
    input_img = tf.keras.Input(input_size, name = 'image' )
    
    c1 = conv_block(input_img, n_filters, kernel_size = 3)
    p1 = tkl.MaxPooling2D((2,2))(c1)
    p1 = tkl.Dropout(dropout_prob)(p1)
    
    c2 = conv_block(p1, n_filters * 2, kernel_size = 3)
    p2 = tkl.MaxPooling2D((2,2))(c2)
    p2 = tkl.Dropout(dropout_prob)(p2)
    
    c3 = conv_block(p2, n_filters * 4, kernel_size = 3)
    p3 = tkl.MaxPooling2D((2,2))(c3)
    p3 = tkl.Dropout(dropout_prob)(p3)
    
    c4 = conv_block(p3, n_filters * 8, kernel_size = 3)
    p4 = tkl.MaxPooling2D((2,2))(c4)
    p4 = tkl.Dropout(dropout_prob)(p4)
    
    c5 = conv_block(p4, n_filters * 16 , kernel_size = 3)
    
    ''' UpSampling '''
    
    u6 = conv_transpose_block(n_filters * 8, kernel_size = 3)(c5)  #----> 256
    u6 = tkl.concatenate([u6, c4])     #----> 512
    u6 = tkl.Dropout(dropout_prob)(u6)
    c6 = conv_block(u6, n_filters * 8, kernel_size = 3)   #----> 256
    
    u7 = conv_transpose_block(n_filters * 4)(c6)   #----> 128
    u7 = tkl.concatenate([u7, c3])   #----> 256
    u7 = tkl.Dropout(dropout_prob)(u7)
    c7 = conv_block(u7, n_filters * 4, kernel_size = 3)   #----> 128
    
    u8 = conv_transpose_block(n_filters * 2, kernel_size = 3)(c7)   #----> 64
    u8 = tkl.concatenate([u8, c2])   #----> 128
    u8 = tkl.Dropout(dropout_prob)(u8)
    c8 = conv_block(u8, n_filters * 2, kernel_size = 3)   #----> 64
    
    u9 = conv_transpose_block(n_filters, kernel_size = 3)(c8)   #----> 32
    u9 = tkl.concatenate([u9, c1])   #----> 64
    u9 = tkl.Dropout(dropout_prob)(u9)
    c9 = conv_block(u9, n_filters , kernel_size = 3)   #----> 32
    
    output = tkl.Conv2D(13, (1,1), activation = 'sigmoid')(c9)
    
    model = tf.keras.Model(inputs = [input_img], outputs = [output])

    return model

In [11]:
model = u_net()
model.summary()

In [12]:
model.compile(optimizer = 'adam', loss = 'sparse_categorical_crossentropy', metrics = ['accuracy'])

In [13]:
checkpoint = tf.keras.callbacks.ModelCheckpoint('model.h5', 'val_loss', save_best_only=True, save_weights_only=True, verbose=1)

In [14]:
train_steps = len(train_generator)
val_steps = len(val_generator)

history = model.fit(train_generator, validation_data = val_generator,
                    steps_per_epoch = train_steps, validation_steps = val_steps,
                    epochs = 40, verbose = 1)

In [17]:
plt.figure()
plt.plot(history.history['accuracy'], label = 'Train accuracy')
plt.plot(history.history['val_accuracy'], label = 'Validation accuracy')
plt.title('Accuracy')
plt.legend()

In [18]:
plt.figure()
plt.plot(history.history['loss'], label = 'Train loss')
plt.plot(history.history['val_loss'], label = 'Validation loss')
plt.title('loss')
plt.legend()