In [None]:
import os
import pickle
import tqdm

import cv2
import keras as K
import keras.layers as KL
import numpy as np
import pandas as pd
import tensorflow as tf

from matplotlib import pyplot as plt
from tensorflow.keras.utils import Sequence
from tqdm.keras import TqdmCallback

## Variables that need to be changed if you are training on your data

In [None]:
# name of root directory of the dataset (name of the zip file with your dataset)
root_dir = "full-track"

# lower and upper bound for capturing color mask
hsv_lower = (94, 38, 65)
hsv_upper = (136, 141, 172)

# height of the crop area (number of pixels, starting from upper edge of image that will be cropped)
crop_height = 220

# path to test image for preprocessing (can be any image)
preprocess_path = "../input/full-track/data/21420221938-img1.jpg"

# save directory for trained model
save_dir = "./model.keras"

# Chose the name of saved tflite model
tflite_model_name = "tflite_model"

# Reading prepared data labels and partition

In [None]:
labels = None
partition = None


with open('../input/' + root_dir + '/labels.pickle', 'rb') as handle:
    labels = pickle.load(handle)
    
with open('../input/' + root_dir + '/partition.pickle', 'rb') as handle:
    partition = pickle.load(handle)

In [None]:
{elem.split("-")[0] for elem in partition['validation']}

# Neural Network model parts

In [None]:
GPUs_num = len(tf.config.list_physical_devices('GPU'))
CPUs_num = len(tf.config.list_physical_devices('CPU'))
print("Num GPUs Available: ", GPUs_num)
print("gpu_devices: ", tf.config.list_physical_devices('GPU'))
print("Num CPUs Available: ", CPUs_num)
print("cpu_devices: ", tf.config.list_physical_devices('CPU'))
device = None
device = '/GPU:0' if GPUs_num > 0 else '/CPU:0'
device

## Preprocess function

In [None]:
def preprocess(img, dim):
    # converting to hsv
    hsv_img = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
    # croping the img
    crop_img = hsv_img[crop_height:hsv_img.shape[0], :]
    # catching color mask
    color_mask = cv2.inRange(crop_img, hsv_lower, hsv_upper)
    # conveting values to float
    float_img = color_mask.astype(np.float32)
    # resizing
    resized_img = cv2.resize(float_img, (dim[1], dim[0]))
    # normalizing
    final_img = resized_img / 255.0
    
    return final_img[:,:,np.newaxis]
    

### Preprocessing test

In [None]:
preprocess_test_img = cv2.imread(preprocess_path)
print(f"Image shape before preprocess: {preprocess_test_img.shape}")
plt.imshow(cv2.cvtColor(preprocess_test_img, cv2.COLOR_BGR2RGB))
plt.show()
processed = preprocess(preprocess_test_img, (120,160))
print(f"Image shape after preprocess: {processed.shape}")
plt.imshow(processed, cmap="gray")
plt.show()

## Keras Data Generator

In [None]:
class DataGenerator(Sequence):
    def __init__(self, list_IDs, labels, batch_size=32, dim=(32,32,32), n_channels=1, shuffle=True, **kwargs):
        'Initialization'
        super().__init__(**kwargs)
        self.dim = dim
        self.batch_size = batch_size
        self.labels = labels
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.shuffle = shuffle
        self.on_epoch_end()
        
    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
        
    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))
    
    def __data_generation(self, list_IDs_temp):
        'Generates data containing batch_size samples'
        X = np.empty((self.batch_size, *self.dim, self.n_channels))
        y_linear = np.empty((self.batch_size, 1), dtype=float)
        y_angular = np.empty((self.batch_size, 1), dtype=float)

        for i, ID in enumerate(list_IDs_temp):
            img = cv2.imread('../input/' + root_dir + '/data/' + ID)
            preprocess_img = preprocess(img, self.dim)
            X[i,:] = preprocess_img

            y_linear[i] = self.labels['linear'][ID]
            y_angular[i] = self.labels['angular'][ID]
        
        # return  {'img_in': X}, {'linear': y_linear, 'angular': y_angular}
        return {'img_in': X}, (y_linear, y_angular)

    def __getitem__(self, index):
        'Generate one batch of data'
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        X, y = self.__data_generation(list_IDs_temp)

        return X, y

### Keras model


In [None]:
img_in = KL.Input(shape=(120, 160, 1), name='img_in')
x = img_in

x = KL.Convolution2D(filters=24, kernel_size=(5, 5), strides=(2, 2), activation='relu')(x)
x = KL.Convolution2D(filters=32, kernel_size=(5, 5), strides=(2, 2), activation='relu')(x)
x = KL.Convolution2D(filters=64, kernel_size=(5, 5), strides=(2, 2), activation='relu')(x)
x = KL.Convolution2D(filters=64, kernel_size=(3, 3), strides=(2, 2), activation='relu')(x)
x = KL.Convolution2D(filters=64, kernel_size=(3, 3), strides=(1, 1), activation='relu')(x)

x = KL.Flatten(name='flattened')(x)
x = KL.Dense(units=100, activation='linear')(x)
x = KL.Dropout(rate=.1)(x)
x = KL.Dense(units=50, activation='linear')(x)
x = KL.Dropout(rate=.1)(x)

linear = KL.Dense(units=1, activation='linear', name='linear')(x)

angular = KL.Dense(units=1, activation='linear', name='angular')(x)

model = K.Model(inputs=[img_in], outputs=[linear, angular])

with tf.device(device):
    model.compile(optimizer='adam',
                  loss={'linear': 'mean_squared_error', 'angular': 'mean_squared_error'},
                  # loss=['mean_squared_error', 'mean_squared_error'],
                  loss_weights={'linear': 0.3, 'angular': 0.7})
                  # loss_weights=[0.3, 0.7])

In [None]:
params = {'dim': (120, 160),
          'batch_size': 64,
          'n_channels': 1,
          'shuffle': True}


with tf.device(device):
    training_generator = DataGenerator(partition['train'], labels, **params)
    params['shuffle'] = False
    validation_generator = DataGenerator(partition['validation'], labels, **params)

## Training the model

In [None]:
callbacks = [
        K.callbacks.ModelCheckpoint(save_dir, save_best_only=True),
        K.callbacks.EarlyStopping(monitor='val_angular_loss',
                                  min_delta=.0001,
                                  patience=15,
                                  verbose=True,
                                  mode='min',
                                  restore_best_weights=True),
        TqdmCallback(verbose=1),
        K.callbacks.ReduceLROnPlateau(monitor='val_angular_loss', factor=0.8,
                              patience=5, min_lr=0.001, mode='min', verbose=1),
        K.callbacks.TensorBoard(log_dir='./logs', profile_batch=(0, 10))


    ]
with tf.device(device):
    hist = model.fit(training_generator,
                     validation_data=validation_generator,
                     callbacks=callbacks,
                     epochs=100,
                     verbose=0)

## Converting Keras model to tflite

TensorFlow Lite reverses the order of multi-output tensors during conversion.
To maintain backward compatibility with the legacy ROS1 model (where the first output was linear speed and the second was angular speed), we create a temporary Keras model with its outputs intentionally swapped before converting it to TFLite.
After conversion, TFLite reverses the outputs again, restoring the correct order for the ROS node while still supporting the old models.

Therefore, the current ROS2 node can use both the legacy model and any new models trained with this notebook.

In [None]:
reorder_model = K.Model(inputs=[img_in], outputs=[model.output[1], model.output[0]])

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(reorder_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT] 
tflite_model = converter.convert()

with open(tflite_model_name + '.tflite', 'wb') as f:
    f.write(tflite_model)

# Custom tests

In [None]:
# get prefixes of images names
files = os.listdir("../input/" + root_dir + "/data")
image_amount_dict = dict()
for file in files:
    div = file.split("-")
    current_max = image_amount_dict.get(div[0], -1)
    current_index = int(div[1][3:-4])
    image_amount_dict[div[0]] = max(current_max, current_index)
print("Available prefixes and number of images for each prefix:")
image_amount_dict

In [None]:
test_file = "21420221946-img750.jpg" # can be any image of the data directory
test_dimension = (120, 160)
test_img = cv2.imread("../input/" + root_dir + "/data/" + test_file)

plt.imshow(cv2.cvtColor(test_img, cv2.COLOR_BGR2RGB))
plt.show()
print("Shape of the test image: {}".format(test_img.shape))

preprocessed_test = preprocess(test_img, test_dimension)
print("Shape of the preprocessed test image: {}".format(preprocessed_test.shape))
plt.imshow(preprocessed_test, cmap="gray")
plt.show()

## Comparing predictions

In [None]:
# KERAS
keras_input = np.empty((1, 120, 160, 1))
keras_input[0,:] = preprocessed_test
keras_prediction = model.predict({'img_in': keras_input})

# REORDERED KERAS 
reord_keras_prediction = reorder_model.predict({'img_in': keras_input})


# TFLITE
interpreter = tf.lite.Interpreter(model_path=tflite_model_name+'.tflite')

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

interpreter.allocate_tensors()

interpreter.set_tensor(input_details[0]['index'], [preprocessed_test])
    
interpreter.invoke()

tflite_prediction = [interpreter.get_tensor(output_details[0]['index'])[0][0],
                     interpreter.get_tensor(output_details[1]['index'])[0][0]]

# GROUND TRUTH
ground_truth = [labels['linear'][test_file], labels['angular'][test_file]]

In [None]:
data = {
    'Keras prediction': [out.item() for out in keras_prediction],
    'Reordered Keras prediction': [out.item() for out in reord_keras_prediction],
    'TFLite prediction': tflite_prediction,
    'Ground Truth': ground_truth
}
index_labels = ['Linear', 'Angular']

# Create the DataFrame
df = pd.DataFrame(data, index=index_labels)

# Display with some styling
styled_df = df.style.format("{:.2f}").set_table_styles([
    {'selector': 'th', 'props': [('background-color', '#4C72B0'),
                                 ('color', 'white'),
                                 ('font-weight', 'bold'),
                                 ('text-align', 'center')]},
    {'selector': 'td', 'props': [('text-align', 'center'),
                                 ('padding', '8px'),
                                 ('border', '1px solid #ddd')]},
    {'selector': 'tr:nth-child(even)', 'props': [('background-color', '#f9f9f9')]}
]).set_caption("Results")

styled_df

## Visualizing Model features

In [None]:
def get_conv_layers(trained_model):
    num = 0
    nums = []
    for layer in trained_model.layers:
        if 'conv' in layer.name:
            nums.append(num)
        num += 1
    return nums

In [None]:
def visualize_kernels(layer_num=1, trained_model=model, filters_num=6):
    '''
        layer_num - number of convolutional layer which kernels will be visualized
        trained_model - name of the model that we want to visualize filters from
        filters_num - number of filters to visualize
    '''
    conv_layers_ids = get_conv_layers(trained_model)
    
    if layer_num not in conv_layers_ids:
        print("layer_num is not an index of convolutional layer!")
        print("Indexes of conv layers:")
        print(conv_layers_ids)
        return
    filters, biases = trained_model.layers[layer_num].get_weights()
    filter_min, filter_max = filters.min(), filters.max()
    
    filters = (filters - filter_min) / (filter_max - filter_min)
    max_filters_num = filters.shape[3]
    if filters_num > max_filters_num:
        print("Number of filters to visualize greater than number of filters in chosen conv layer")
        print(f"Number of filters in chosen layer: {max_filters_num}")
        return
        
    ix = 1
    for i in range(filters_num):
        f = filters[:,:,:,i]
        for j in range(1):
            ax = plt.subplot(filters_num, 3, ix)
            ax.set_xticks([])
            ax.set_yticks([])
            plt.imshow(f[:,:,j], cmap='gray')
            ix += 1
        
    plt.show()

In [None]:
def visualize_feature_maps(layer_num=1, base_model=model, input_img=keras_input, row=8, column=3):
    '''
        layer_num - number of convolutional layer which features will be visualized
        base_model - name of the model that we want to visualize features from
        input_img - input img for the base model to show its features
        row - number of maps in a row
        column - number of maps in a column
    '''
    conv_layers_ids = get_conv_layers(base_model)
    if layer_num not in conv_layers_ids:
        print("layer_num is not an conv layer id")
        print("Conv layers ids:", conv_layers_ids)
        return
    
    visualization_model = K.models.Model(inputs=base_model.inputs, outputs=model.layers[layer_num].output)
    feature_maps = visualization_model.predict(input_img)
    maps_num = feature_maps.shape[3]
    
    if row * column > maps_num:
        print("Specified plot row and column exceed number of feature maps from chosen layer")
        print(f"Number of maps in conv layer {layer_num} is {maps_num}")
        print(f"row * column = {row*column}")
        return
    
    ix = 1
    for _ in range(row):
        for _ in range(column):
            ax = plt.subplot(column, row, ix)
            ax.set_xticks([])
            ax.set_yticks([])
            plt.imshow(feature_maps[0, :, :, ix-1], cmap='gray')
            ix += 1
    plt.show()

In [None]:
visualize_kernels(layer_num=2)

In [None]:
visualize_feature_maps(column=4, row=2)