<a href="https://colab.research.google.com/github/BondaiKa/lane_line_recognition/blob/main/vl_100_lane_recognition.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import sys
print(sys.version)

3.7.12 (default, Sep 10 2021, 00:21:48) 
[GCC 7.5.0]


In [2]:
import os
from google.colab import drive
import numpy as np
import glob
import json

import cv2
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
import tensorflow as tf

from typing import NamedTuple, Tuple, List

%matplotlib inline

!pip install requests
!pip install unrar

%load_ext tensorboard

Collecting unrar
  Downloading unrar-0.4-py3-none-any.whl (25 kB)
Installing collected packages: unrar
Successfully installed unrar-0.4


In [3]:
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
!unrar x -Y "/content/drive/My Drive/Ilmenau/dataset.rar" "/tmp/"

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Extracting  /tmp/dataset/VIL100/Json/25_Road015_Trim006_frames/00150.jpg.json      99%  OK 
Extracting  /tmp/dataset/VIL100/Json/25_Road015_Trim006_frames/00153.jpg.json      99%  OK 
Extracting  /tmp/dataset/VIL100/Json/25_Road015_Trim006_frames/00156.jpg.json      99%  OK 
Extracting  /tmp/dataset/VIL100/Json/25_Road015_Trim006_frames/00159.jpg.json      99%  OK 
Extracting  /tmp/dataset/VIL100/Json/25_Road015_Trim006_frames/00162.jpg.json      99%  OK 
Extracting  /tmp/dataset/VIL100/Json/25_Road015_Trim006_frames/00165.jpg.json      99%  OK 
Extracting  /tmp/dataset/VIL100/Json/25_Road015_Trim006_frames/00168.jpg.json      99%  OK 
Extracting  /tmp/dataset/VIL100/Json/25_Road015_Trim006_frames/00171.jpg.json      99%  OK 
Extracting  /tmp/dataset/VIL100/Json/25_Road015_Trim006_frames/00174.jpg.json      99%  OK 
Extracting  /tmp/dataset/V

In [5]:
BASE_DIR = "/tmp/dataset/VIL100/"
IMAGE_PATH = BASE_DIR + "JPEGImages/"
JSON_PATH = BASE_DIR + "Json/"
INPUT_SHAPE = (1280,960)
BATCH_SIZE = 32

print(IMAGE_PATH)
print(JSON_PATH)

/tmp/dataset/VIL100/JPEGImages/
/tmp/dataset/VIL100/Json/


In [6]:
# from google.colab import files
# !zip -r "/tmp/dataset/VIL100/JPEGImages/0_Road029_Trim004_frames.zip" "/tmp/dataset/VIL100/JPEGImages/0_Road029_Trim004_frames/"
# files.download("/tmp/dataset/VIL100/JPEGImages/0_Road029_Trim004_frames.zip")

In [7]:
# from google.colab import files
# !zip -r "/tmp/dataset/VIL100/Json/0_Road029_Trim004_frames.zip" "/tmp/dataset/VIL100/Json/0_Road029_Trim004_frames/"
# files.download("/tmp/dataset/VIL100/Json/0_Road029_Trim004_frames.zip")

In [8]:
images = glob.glob(IMAGE_PATH+'/*/*.jpg')
json_files = glob.glob(JSON_PATH+'/*/*.json')
print(images)
print(json_files)

['/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00126.jpg', '/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00054.jpg', '/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00270.jpg', '/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00144.jpg', '/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00108.jpg', '/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00051.jpg', '/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00057.jpg', '/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00258.jpg', '/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00261.jpg', '/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00300.jpg', '/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00180.jpg', '/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00120.jpg', '/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00324.jpg', '/tmp/dataset/VIL100/JPEGImages/6_Road024_Trim001_frames/00243.jpg', '/tmp/dataset/VIL100/JPEGImages/6

In [9]:
def calculate_perspective_transform_matrix(width: int, height: int, reverse_flag=False) -> Tuple[
    np.ndarray]:
    """
    Calculate transformation matrix for perspective transformation
    :param width: frame width
    :param height: frame height
    :param reverse_flag: create reverse matrix for reverting to initial frame
    :return: matrix for transformation the frame
    """
    # TODO @Karim: check on real Audi Q2 input frame
    high_left_crd, high_right_crd = (550, 530), (700, 530)
    down_left_crd, down_right_crd, = (0, height - 150), (width, height - 150)

    initial_matrix = np.float32([[high_left_crd, high_right_crd,
                                  down_left_crd, down_right_crd]])
    final_matrix = np.float32([[(0, 0), (width, 0), (0, height), (width, height)]])

    return cv2.getPerspectiveTransform(initial_matrix, final_matrix) \
        if not reverse_flag else cv2.getPerspectiveTransform(final_matrix, initial_matrix)


def transform_frame(frame: np.ndarray, width: int, height: int, reverse_flag=False) -> np.ndarray:
    """
    Perform perspective transformation
    :param frame: frame
    :param width: frame width
    :param height: frame height
    :param reverse_flag: cancel perspective transformation
    :return: changed (un)transformed frame
    """
    if not reverse_flag:
        initial_matrix = calculate_perspective_transform_matrix(width, height)
        frame = cv2.warpPerspective(frame, initial_matrix, dsize=(width, height))
    else:
        final_matrix = calculate_perspective_transform_matrix(width, height, reverse_flag=True)
        frame = cv2.warpPerspective(frame, final_matrix, dsize=(width, height))
    return frame

In [49]:
from tensorflow.keras.utils import Sequence
from typing import Tuple, List, Dict, Iterable
from skimage.io import imread
from skimage.transform import resize
import logging
import math
from typing import Optional
import random

log = logging.getLogger(__name__)

VIL_100_attributes = {
    7: 6,
    8: 7,
    9: 8,
    10: 9,
    13: 10,
}


def get_valid_attribute(attr: int) -> int:
    return VIL_100_attributes.get(attr, attr)

def one_hot_list_encoder(target_class_idx: int, num_classes: int) -> np.ndarray:
    """One-hot list encoder"""
    target_vector = np.zeros(num_classes)
    target_vector[target_class_idx] = 1
    return target_vector



class SimpleFrameGenerator(Sequence):
    """Sequence of frames generator

    Usage for training NN that could process independent
    frames without context window etc
    """

    def __init__(self,
                 num_type_of_lines=11,
                 max_num_points=91,
                 max_lines_per_frame=6,
                 rescale=1 / 255.,  # TODO @Karim: include and use later
                 batch_size: int = 8,
                 target_shape: Tuple[int, int] = (1280, 960),
                 shuffle: bool = False,
                 nb_channel: int = 3,  # TODO: Use rgb later
                 files: Optional[List[str]] = None,
                 json_files: Optional[List[str]] = None):
        """
        :param subset: training or validation data
        :param max_lines_per_frame: maxinum number of lines per frame
        :param max_num_points: maximum number of points un one polyline
        :param num_type_of_lines: number of possible lines on road
        :param rescale:
        :param batch_size: batch size of the dataset
        :param target_shape: final size for NN input
        :param shuffle: shuffle flag of frames sequences
        :param split: split dataset to train/test
        :param nb_channel: grayscaled or RGB frames
        :param frame_glob_path: glob pattern of frames
        :param json_glob_path: glob pattern path of jsons
        """
        self.max_lines_per_frame = max_lines_per_frame
        self.max_num_points = max_num_points
        self.num_type_of_lines = num_type_of_lines
        self.rescale = rescale
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.target_shape = target_shape
        self.nb_channel = nb_channel
        self.files = files
        self.json_files = json_files
        self.files_count = len(self.files)

        if shuffle:
            temp = list(zip(self.files, self.json_files))
            random.shuffle(temp)
            self.files, self.json_files = zip(*temp)

    def __len__(self):
        return math.ceil(self.files_count / self.batch_size)

    def __get_polyline_with_label(self, lane: dict) -> Tuple[np.ndarray, np.ndarray]:
        """Get array from points list"""
        points = np.array(
            lane["points"]).flatten()
        points = np.pad(points, pad_width=(0, self.max_num_points * 2 - points.shape[0]))
        # TODO @Karim: remember below `label.get(label)` is index 1,2,3,4
        label = get_valid_attribute(lane.get('attribute', 1))
        labels = one_hot_list_encoder(label, self.num_type_of_lines)
        return points, labels

    def __get_polyline_and_label_from_file(self, json_path: str) -> np.ndarray:
        """
        Get all Polygonal chains from json file and label of line
        :param json_path: path of json file
        :return: right points for frame
        """
        with open(json_path) as f:
            lanes: List[Dict[str, int]] = json.load(f)["annotations"]["lane"]
            if lanes:
                polylines, labels = list(), list()
                # TODO @Karim: check another params in json files like "occlusion"
                for lane in lanes:
                    points, label = self.__get_polyline_with_label(lane=lane)
                    # TODO @Karim: debug
                    points = np.hstack([points, np.zeros(shape=(self.max_num_points * 2 - points.shape[0]))])
                    polylines.append(points)
                    labels.append(label)

                #todo fix error
                polylines.append(np.zeros(
                    shape=((self.max_lines_per_frame - len(polylines)) * self.max_num_points * 2)))
                
                label_empty_list = [one_hot_list_encoder(0, self.num_type_of_lines) for x in range(self.max_lines_per_frame - len(labels))]
                if label_empty_list:
                  labels.append(np.concatenate(label_empty_list))

                return np.concatenate(polylines), np.concatenate(labels)
            else:
              empty_label = one_hot_list_encoder(0, self.num_type_of_lines)
              polylines_empty_shape = self.max_lines_per_frame * self.max_num_points * 2
              return np.zeros(shape=(polylines_empty_shape)), \
                     np.tile(empty_label, (1, self.max_lines_per_frame))
                       

    def __getitem__(self, idx) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        batch_frames_path = self.files[idx * self.batch_size:
                                       (idx + 1) * self.batch_size]
        batch_json_path = self.json_files[idx * self.batch_size:
                                          (idx + 1) * self.batch_size]
        polyline_list, label_list = list(), list()

        for json in batch_json_path:
            polylines, labels = self.__get_polyline_and_label_from_file(json)
            polyline_list.append(polylines)
            label_list.append(labels)

        labels_output = np.vstack(label_list)
        polylines_output = np.vstack(polyline_list)

        return np.array([
            resize(imread(file_name) * self.rescale, self.target_shape) for file_name in
            batch_frames_path]), (polylines_output, labels_output)

class SimpleFrameDataGen:
    TRAINING = 'training'
    VALIDATION = 'validation'

    __reverse_dataset_type = {
        TRAINING: VALIDATION,
        VALIDATION: TRAINING
    }
    __dataset = {}

    def __init__(self,
                 rescale=1 / 255.,
                 validation_split: Optional[float] = None,
                 frame_glob_path: str = "",
                 json_glob_path: str = ""):
        """
        :param validation_split: split for train/validation sets
        :param rescale:
        :param frame_glob_path: glob pattern of frames
        :param json_glob_path: glob pattern path of jsons
        """
        self.rescale = rescale
        self.validation_split = validation_split

        self.__frame_glob_path = frame_glob_path
        self.__json_glob_path = json_glob_path

    def flow_from_directory(self, subset: str = TRAINING,
                            shuffle: bool = True, number_files:int = 2000, *args, **kwargs) -> SimpleFrameGenerator:
        """
        Get generator for subset
        :param subset: 'training' or 'validation'
        :param shuffle: flag for shuffling
        :param number_files: rectrict max number of files from dataset
        :param args: args for specific dataset
        :param kwargs: kwargs for specific dataset
        :return: Specific generator for specific subset
        """

        files = sorted(glob.glob(self.__frame_glob_path))
        log.info(f"Number of files in dataset: {len(files)}. Using in training/validation: {number_files}")
        files = files[:number_files]
        
        json_files = sorted(glob.glob(self.__json_glob_path))[:number_files]
        files_count = len(files)
        json_files_count = len(json_files)

        
        if files_count != json_files_count:
            log.error(f"Dataset files error"
                      f"Number of frames: ({files_count}). "
                      f"Number of jsons({json_files_count}")
            raise FileNotFoundError(
                f"Numbers of frames and jsons are not equal!")

        if not self.__reverse_dataset_type.get(subset):
            log.error(f'Wrong subset value: "{subset}"')
            raise ValueError(f'Wrong type of subset - {subset}. '
                             f'Available types: {self.__reverse_dataset_type.keys()}')

        if self.validation_split and 0.0 < self.validation_split < 1.0:
            split = int(files_count * (1 - self.validation_split))
            if subset == self.TRAINING:
                files = files[:split]
                json_files = json_files[:split]
            else:
                files = files[split:]
                json_files = json_files[split:]

        return SimpleFrameGenerator(rescale=self.rescale,
                                    files=files,
                                    shuffle=shuffle,
                                    json_files=json_files,
                                    *args, **kwargs)


In [50]:
data_gen = SimpleFrameDataGen(validation_split=0.2, frame_glob_path=IMAGE_PATH+'/*/*.jpg', json_glob_path=JSON_PATH+'/*/*.json',)
train_generator = data_gen.flow_from_directory(subset='training', shuffle=True, batch_size = BATCH_SIZE, number_files=2000)
validation_generator = data_gen.flow_from_directory(subset='validation', shuffle=True, batch_size = BATCH_SIZE, number_files=2000)


In [51]:
# for image_polylines in validation_generator:
#     print(image_polylines[0].shape)
#     print(image_polylines[1][0].shape)
#     print(image_polylines[1][1].shape)
#     break


In [52]:
from tensorflow.keras import layers, Model


def build_model(polyline_output_shape:int ,label_output_shape:int, input_shape=(1280, 960, 3)):
    pre_trained_model = tf.keras.applications.InceptionResNetV2(input_shape=input_shape,
                            weights='imagenet',
                            include_top=False)
    
    global_max_pool = layers.GlobalMaxPool2D()(pre_trained_model.output)
    dense_polyline = tf.keras.layers.Dense(units=512, activation='relu')(global_max_pool)
    dropout_polyine = layers.Dropout(.2)(dense_polyline)
    dense_polyline_2 = tf.keras.layers.Dense(units=512, activation='relu')(dropout_polyine)
    dropout_polyine_2 = layers.Dropout(.2)(dense_polyline_2)

    dense_label = tf.keras.layers.Dense(units=66, activation='relu')(global_max_pool)
    dropout_label = layers.Dropout(.2)(dense_label)
    dense_label_2 = tf.keras.layers.Dense(units=66, activation='relu')(dropout_label)
    dropout_label_2 = layers.Dropout(.2)(dense_label_2)

    polyline_output = layers.Dense(polyline_output_shape,name='polyline_output')(dropout_polyine_2)
    label_output = layers.Dense(label_output_shape, activation='softmax', name='label_output')(dropout_label_2)

    model = Model(pre_trained_model.input, outputs=[polyline_output,label_output])

    return model, pre_trained_model

In [53]:
logdir = "logs/pretained-InceptionResNetV2-model"


tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=logdir)


In [54]:
model, pre_trained_model = build_model(polyline_output_shape=182*6, label_output_shape=6*11)
print(model.summary())

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_4 (InputLayer)           [(None, 1280, 960,   0           []                               
                                3)]                                                               
                                                                                                  
 conv2d_609 (Conv2D)            (None, 639, 479, 32  864         ['input_4[0][0]']                
                                )                                                                 
                                                                                                  
 batch_normalization_609 (Batch  (None, 639, 479, 32  96         ['conv2d_609[0][0]']             
 Normalization)                 )                                                           

In [55]:
pre_trained_model.trainable = False
print(model.summary())

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_4 (InputLayer)           [(None, 1280, 960,   0           []                               
                                3)]                                                               
                                                                                                  
 conv2d_609 (Conv2D)            (None, 639, 479, 32  864         ['input_4[0][0]']                
                                )                                                                 
                                                                                                  
 batch_normalization_609 (Batch  (None, 639, 479, 32  96         ['conv2d_609[0][0]']             
 Normalization)                 )                                                           

In [26]:
# tf.experimental.numpy.experimental_enable_numpy_behavior()
# tf.keras.utils.plot_model(model, "multi_output_model.png", show_shapes=True)

In [56]:
from tensorflow.keras.optimizers import Adam

learning_rate = 3e-4

model.compile(loss= {
      'polyline_output':tf.keras.losses.MeanSquaredLogarithmicError(),
      'label_output':tf.keras.losses.CategoricalCrossentropy(),
    },
    optimizer=Adam(learning_rate=learning_rate),
    metrics={'polyline_output':tf.keras.metrics.MeanSquaredLogarithmicError(),
             'label_output':'accuracy'},
    loss_weights={"polyline_output": 500, "label_output": 0},)
# tf.keras.metrics.MeanSquaredLogarithmicError()

In [None]:
history = model.fit(train_generator,
                    epochs=15,
                    verbose=2,
                    validation_data=validation_generator,
                    callbacks=[tensorboard_callback],
                    )

Epoch 1/15
50/50 - 491s - loss: 2129.2441 - polyline_output_loss: 4.2585 - label_output_loss: 39.8067 - polyline_output_mean_squared_logarithmic_error: 4.2585 - label_output_accuracy: 0.1100 - val_loss: 3197.8271 - val_polyline_output_loss: 6.3957 - val_label_output_loss: 32.1393 - val_polyline_output_mean_squared_logarithmic_error: 6.3957 - val_label_output_accuracy: 0.0150 - 491s/epoch - 10s/step
Epoch 2/15


In [None]:
loss, polyline_output_loss,label_output_loss, polyline_output_mean_squared_logarithmic_error, label_output_accuracy = model.evaluate(validation_generator)
print(f" \
  Loss:{loss}\n \
  polyline_output_loss:{polyline_output_loss}\n \
  label_output_loss:{label_output_loss}\n \
  polyline_output_mean_squared_logarithmic_error:{polyline_output_mean_squared_logarithmic_error}\n \
  label_output_accuracy:{label_output_accuracy}"
)

In [None]:
%tensorboard --logdir logs/

In [42]:
# !mkdir -p saved_model
# model.save('saved_model/my_model')

In [43]:
from google.colab import files

In [44]:
# files.download("saved_model/my_model")

In [45]:
model_name = 'pretrained-InceptionResNetV2-model.h5'

In [46]:
model.save(model_name) 

In [48]:
files.download(model_name)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>