## Road Lane Detection with U-Net Architecture

In [20]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import cv2

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import models, layers, Sequential

In [None]:
gpus = tf.config.experimental.list_physical_devices("GPU")
if gpus:
    try:
        for gpu in gpus:
            # Allow memory growth
            tf.config.experimental.set_memory_growth(gpu, True)
        print("Memory growth enabled for GPUs.")
    except RuntimeError as e:
        print(e)


In [22]:
train_path = "tusimple_preprocessed/training"

Creating a generator and get the images from the directory

In [None]:
img_generator = keras.preprocessing.image.ImageDataGenerator()
seed = 10
images_set = img_generator.flow_from_directory(
    train_path,
    shuffle=False,
    batch_size=64,
    class_mode="binary",
    target_size=(256, 320),
)


In [None]:
"""
Assign the images in 'images_set' to two seperate arrays:
assign the road images to 'X' and the ground truth masks to 'Y'
"""
num_images = 7252  # gotten from the output of the cell above
num_batches = num_images // 64 + 1

# initialize an empty list to store the images
X = []
Y = []
# loop over the batches and extract the images
for i in range(num_batches):
    batch = next(images_set)
    batch_images = batch[0]  # this contains the images
    batch_labels = batch[1]  # this contains 0s and 1s
    for ind, lb in enumerate(batch_labels):
        """
        a label of 0 means the image belongs to ground truth image,
        and a label of 1 means that the image belongs to the ground truth mask
        """
        if lb == 0:
            X.append(batch_images[ind])
        else:
            Y.append(np.mean(batch_images[ind], axis=2))  # Y shape is (m, 256, 320)
    if i % 10 == 0:
        print(f"Batch {i}")

# convert the lists to numpy arrays
X = np.array(X)
Y = np.array(Y)


Shuffle the Dataset

In [None]:
display(X.shape)
display(Y.shape)

In [26]:
from sklearn.utils import shuffle

X, Y = shuffle(X, Y, random_state=100)


In [27]:
# becacuse of lack of resources like RAM, we only get 2000 training samples
X = np.array(X[:2000])
Y = np.array(Y[:2000])

In [None]:
display(X.shape)
display(Y.shape)


# Modelling

In [29]:
# Normalize and reshape the mask set (Y)
Y = (Y >= 100).astype("int").reshape(-1, 256, 320, 1)

In [None]:
Y.min(), Y.max()

In [31]:
# we get 2000 images for training and evaluation
X = np.array(X[:2000])
Y = np.array(Y[:2000])

Split Dataset into Train and Val sets

In [32]:
from sklearn.model_selection import train_test_split

X_train, X_val, Y_train, Y_val = train_test_split(X, Y, test_size=0.1, random_state=100)

In [None]:
print("Shape of X_train:", X_train.shape)
print("Shape of X_val:", X_val.shape)
print("Shape of Y_train:", Y_train.shape)
print("Shape of Y_val:", Y_val.shape)

In [None]:
# free the RAM from undesired clutters
import gc

del X, Y, images_set
gc.collect()

In [None]:
# visualize some samples from the training set
plt.figure(figsize=(10, 40))
s, e = 80, 84
index = 1

for i, j in zip(X_train[s:e], Y_train[s:e]):
    plt.subplot(10, 2, index)
    plt.imshow(i / 255.0)
    plt.title("Ground truth image")

    plt.subplot(10, 2, index + 1)
    plt.imshow(j, cmap="gray")
    plt.title("Ground truth mask")
    index += 2


Defining the Model architecture

In [36]:
# model structure
from tensorflow.keras.layers import Input, Conv2DTranspose, Conv2D, MaxPooling2D
from tensorflow.keras.applications.resnet50 import ResNet50

# Define the input shape of the images
input_shape = (256, 320, 3)

# Define the encoder using a pretrained ResNet50 model
encoder = ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)

'''
Find the index of the layer named conv3_block4_out of which output size is (32x32xnC)
 because we want to shrink the input's size down upto (32x32xnC) in the encoder section
'''

for i, layer in enumerate(encoder.layers):
    if layer.name == 'conv3_block4_out':
        break

# Create a new model that includes only the layers up to conv3_block4_out
encoder = tf.keras.Model(inputs=encoder.inputs, outputs=encoder.layers[i].output)

# Freeze the weights of the encoder layers to prevent them from being updated during training
for layer in encoder.layers[:50]:
    layer.trainable = False

# Define the decoder using a smaller FCN architecture
def decoder(inputs):
    conv1 = Conv2D(256, (3, 3), activation='relu', padding='same')(inputs)
    up1 = Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(conv1)
    
    conv2 = Conv2D(128, (3, 3), activation='relu', padding='same')(up1)
    up2 = Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(conv2)
    
    conv3 = Conv2D(64, (3, 3), activation='relu', padding='same')(up2)
    up3 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv3)
    
    outputs = Conv2D(1, (1, 1), activation='sigmoid')(up3)
    return outputs

# Define the input tensor
inputs = Input(input_shape)

# Pass the input through the encoder and decoder to obtain the output
outputs = decoder(encoder(inputs))

# Define the model
model = tf.keras.Model(inputs=inputs, outputs=outputs)

Compile Model

In [None]:
# Compile the model
model.compile(
    optimizer="adam", loss=keras.losses.BinaryFocalCrossentropy(), metrics=["accuracy"]
)
model.summary()

In [None]:
from tensorflow.keras.callbacks import Callback


class MetricsLogger(Callback):
    def __init__(self, X_val, Y_val):
        super().__init__()
        self.X_val = X_val
        self.Y_val = Y_val
        self.epoch_metrics = {
            "accuracy": [],
            "precision": [],
            "recall": [],
            "f1_score": [],
            "iou": [],
        }

    def on_epoch_end(self, epoch, logs=None):
        preds = (self.model.predict(self.X_val) >= 0.5).astype("int")

        # Calculate metrics
        accuracy = tf.keras.metrics.Accuracy()
        precision = tf.keras.metrics.Precision()
        recall = tf.keras.metrics.Recall()
        iou = tf.keras.metrics.IoU(num_classes=2, target_class_ids=[1])

        accuracy.update_state(self.Y_val, preds)
        precision.update_state(self.Y_val, preds)
        recall.update_state(self.Y_val, preds)
        iou.update_state(self.Y_val, preds)

        accuracy_value = accuracy.result().numpy()
        precision_value = precision.result().numpy()
        recall_value = recall.result().numpy()
        f1_score_value = 2 / ((1 / precision_value) + (1 / recall_value))
        iou_value = iou.result().numpy()

        # Store metrics
        self.epoch_metrics["accuracy"].append(accuracy_value)
        self.epoch_metrics["precision"].append(precision_value)
        self.epoch_metrics["recall"].append(recall_value)
        self.epoch_metrics["f1_score"].append(f1_score_value)
        self.epoch_metrics["iou"].append(iou_value)

        print(
            f"Epoch {epoch + 1} - Accuracy: {accuracy_value:.4f}, Precision: {precision_value:.4f}, Recall: {recall_value:.4f}, "
            f"F1 Score: {f1_score_value:.4f}, IoU: {iou_value:.4f}"
        )


# Instantiate the custom metrics logger
metrics_logger = MetricsLogger(X_val, Y_val)

Train Model

In [None]:
# train the model
epochs = 32
batch_size = 8

callbacks = [keras.callbacks.ModelCheckpoint("save_at_{epoch}.keras"), metrics_logger]

model.fit(
    X_train,
    Y_train,
    epochs=epochs,
    callbacks=callbacks,
    validation_data=(X_val, Y_val),
    batch_size=batch_size,
)

In [None]:
preds = model.predict(X_val)
preds.max(), preds.min()

In [None]:
# make a directory to store some predicted lane masks
!mkdir out

In [None]:
# visualize some results from the val set.
plt.figure(figsize=(10, 45))
s, e = 90, 98
index = 1

preds = (preds >= 0.5).astype("int")
for i, j, k in zip(X_val[s:e], preds[s:e], Y_val[s:e]):
    # write these images into file as well
    cv2.imwrite(f"./out/img-{index}.jpg", i)
    cv2.imwrite(f"./out/pred-{index}.jpg", j * 255.0)
    cv2.imwrite(f"./out/ground-{index}.jpg", k * 255.0)

    plt.subplot(10, 2, index)
    plt.imshow(i / 255.0)
    plt.title("Ground truth image")

    plt.subplot(10, 2, index + 1)
    plt.imshow(j, cmap="gray")
    plt.title("Pred mask")
    index += 2

In [None]:
!zip out.zip out -r -q

In [None]:
# save the model
model.save("/lane-detection-model-fcn")

Create Metrics

In [None]:
# create metrices
accuracy = tf.keras.metrics.Accuracy()
precision = tf.keras.metrics.Precision()
recal = tf.keras.metrics.Recall()
iou = tf.keras.metrics.IoU(num_classes=2, target_class_ids=[1])


# accuracy
accuracy.update_state(Y_val, preds)
accuracy_value = accuracy.result().numpy()
# precision
precision.update_state(Y_val, preds)
precision_value = precision.result().numpy()
# recal
recal.update_state(Y_val, preds)
recal_value = recal.result().numpy()
# f1 score
f1_score = 2 / ((1 / precision_value) + (1 / recal_value))

# Intersection over union (IoU)
iou.update_state(Y_val, preds)
iou_value = iou.result().numpy()

print("Accuracy:", accuracy_value)
print("Precision:", precision_value)
print("Recal:", recal_value)
print("F1 Score: ", f1_score)
print("IoU: ", iou_value)

# Visualization

In [None]:
# Extract metrics
epoch_list = range(1, len(metrics_logger.epoch_metrics["accuracy"]) + 1)
accuracy = metrics_logger.epoch_metrics["accuracy"]
precision = metrics_logger.epoch_metrics["precision"]
recall = metrics_logger.epoch_metrics["recall"]
f1_score = metrics_logger.epoch_metrics["f1_score"]
iou = metrics_logger.epoch_metrics["iou"]

# Plot Accuracy Progress
plt.figure(figsize=(10, 6))
plt.plot(epoch_list, accuracy, label="Accuracy", color="blue", linewidth=2)
plt.title("Accuracy Progress Over Epochs")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
plt.grid(True)
plt.show()

# Plot Precision Progress
plt.figure(figsize=(10, 6))
plt.plot(epoch_list, precision, label="Precision", color="green", linewidth=2)
plt.title("Precision Progress Over Epochs")
plt.xlabel("Epochs")
plt.ylabel("Precision")
plt.legend()
plt.grid(True)
plt.show()

# Plot Recall Progress
plt.figure(figsize=(10, 6))
plt.plot(epoch_list, recall, label="Recall", color="orange", linewidth=2)
plt.title("Recall Progress Over Epochs")
plt.xlabel("Epochs")
plt.ylabel("Recall")
plt.legend()
plt.grid(True)
plt.show()

# Plot F1 Score Progress
plt.figure(figsize=(10, 6))
plt.plot(epoch_list, f1_score, label="F1 Score", color="red", linewidth=2)
plt.title("F1 Score Progress Over Epochs")
plt.xlabel("Epochs")
plt.ylabel("F1 Score")
plt.legend()
plt.grid(True)
plt.show()

# Plot IoU Progress
plt.figure(figsize=(10, 6))
plt.plot(epoch_list, iou, label="IoU", color="purple", linewidth=2)
plt.title("IoU Progress Over Epochs")
plt.xlabel("Epochs")
plt.ylabel("IoU")
plt.legend()
plt.grid(True)
plt.show()