In [None]:
import numpy as np
import keras as K
import keras.layers as KL
import pickle
import tqdm
import os
import cv2
from matplotlib import pyplot as plt

!pip3 install -U tensorflow-addons
import tensorflow_addons as tfa
import tensorflow as tf

## Variables that need to be changed if you are training on your data

In [None]:
# name of root directory of the dataset (name of the zip file with your dataset)
root_dir = "full-track"

# lower and upper bound for catching color mask
hsv_lower = (94, 38, 65)
hsv_upper = (136, 141, 172)

# height of the crop area (number of pixels, starting from upper edge of image that will be cropped)
crop_height = 220

# path to test image for preprocessing (can be any image)
preprocess_path = "../input/full-track/data/21420221938-img1.jpg"

# save directory for trained model
save_dir = "./model"

# Chose the name of saved tflite model
tflite_model_name = "tflite_model"

# Reading prepared data labels and partition

In [None]:
labels = None
partition = None


with open('../input/' + root_dir + '/labels.pickle', 'rb') as handle:
    labels = pickle.load(handle)
    
with open('../input/' + root_dir + '/partition.pickle', 'rb') as handle:
    partition = pickle.load(handle)

# Neuarl Network model parts

In [None]:
GPUs_num = len(tf.config.list_physical_devices('GPU'))
CPUs_num = len(tf.config.list_physical_devices('CPU'))
print("Num GPUs Available: ", GPUs_num)
print("gpu_devices: ", tf.config.list_physical_devices('GPU'))
print("Num CPUs Available: ", CPUs_num)
print("cpu_devices: ", tf.config.list_physical_devices('CPU'))
device = None
device = '/GPU:0' if GPUs_num > 0 else '/CPU:0'
device

## Preprocess function

In [None]:
def preprocess(img, dim):
    # converting to hsv
    hsv_img = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
    # croping the img
    crop_img = hsv_img[crop_height:hsv_img.shape[0], :]
    # catching color mask
    color_mask = cv2.inRange(crop_img, hsv_lower, hsv_upper)
    # conveting values to float
    float_img = color_mask.astype(np.float32)
    # resizing
    resized_img = cv2.resize(float_img, (dim[1], dim[0]))
    # normalizing
    final_img = resized_img / 255.0
    
    return final_img[:,:,np.newaxis]
    

### Preprocessing test

In [None]:
preprocess_test_img = cv2.imread(preprocess_path)
print("Image shape before preprocess: {}".format(preprocess_test_img.shape))
plt.imshow(cv2.cvtColor(preprocess_test_img, cv2.COLOR_BGR2RGB))
plt.show()
processed = preprocess(preprocess_test_img, (120,160))
print("Image shape after preprocess: {}".format(processed.shape))
plt.imshow(processed, cmap="gray")
plt.show()

## Keras Data Generator

In [None]:
class DataGenerator(K.utils.all_utils.Sequence):
    def __init__(self, list_IDs, labels, batch_size=32, dim=(32,32,32), n_channels=1, shuffle=True):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.labels = labels
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.shuffle = shuffle
        self.on_epoch_end()
        
    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
        
    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))
    
    def __data_generation(self, list_IDs_temp):
        'Generates data containing batch_size samples'
        X = np.empty((self.batch_size, *self.dim, self.n_channels))
        y_linear = np.empty((self.batch_size, 1), dtype=float)
        y_angular = np.empty((self.batch_size, 1), dtype=float)

        for i, ID in enumerate(list_IDs_temp):
            img = cv2.imread('../input/' + root_dir + '/data/' + ID)
            preprocess_img = preprocess(img, self.dim)
            X[i,:] = preprocess_img

            y_linear[i] = self.labels['linear'][ID]
            y_angular[i] = self.labels['angular'][ID]
        
        return X, {'linear': y_linear, 'angular': y_angular}

    def __getitem__(self, index):
        'Generate one batch of data'
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        X, y = self.__data_generation(list_IDs_temp)

        return X, y

### Keras model


In [None]:
img_in = KL.Input(shape=(120, 160, 1), name='img_in')
x = img_in

x = KL.Convolution2D(filters=24, kernel_size=(5, 5), strides=(2, 2), activation='relu')(x)
x = KL.Convolution2D(filters=32, kernel_size=(5, 5), strides=(2, 2), activation='relu')(x)
x = KL.Convolution2D(filters=64, kernel_size=(5, 5), strides=(2, 2), activation='relu')(x)
x = KL.Convolution2D(filters=64, kernel_size=(3, 3), strides=(2, 2), activation='relu')(x)
x = KL.Convolution2D(filters=64, kernel_size=(3, 3), strides=(1, 1), activation='relu')(x)

x = KL.Flatten(name='flattened')(x)
x = KL.Dense(units=100, activation='linear')(x)
x = KL.Dropout(rate=.1)(x)
x = KL.Dense(units=50, activation='linear')(x)
x = KL.Dropout(rate=.1)(x)

linear = KL.Dense(units=1, activation='linear', name='linear')(x)

angular = KL.Dense(units=1, activation='linear', name='angular')(x)

model = K.Model(inputs=[img_in], outputs=[linear, angular])

with tf.device(device):
    model.compile(optimizer='adam',
                  loss={'linear': 'mean_squared_error', 'angular': 'mean_squared_error'},
                  loss_weights={'linear': 0.3, 'angular': 0.7})

In [None]:
callbacks = [
        K.callbacks.ModelCheckpoint(save_dir, save_best_only=True),
        K.callbacks.EarlyStopping(monitor='val_loss',
                                  min_delta=.0005,
                                  patience=20,
                                  verbose=True,
                                  mode='auto'),
        tfa.callbacks.TQDMProgressBar(),
        K.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2,
                              patience=5, min_lr=0.001),
        K.callbacks.TensorBoard(log_dir='./logs', profile_batch=(0, 10))


    ]
params = {'dim': (120, 160),
          'batch_size': 64,
          'n_channels': 1,
          'shuffle': True}


with tf.device(device):
    training_generator = DataGenerator(partition['train'], labels, **params)
    validation_generator = DataGenerator(partition['validation'], labels, **params)

## Training the model

In [None]:
with tf.device(device):
    hist = model.fit(training_generator,
                           validation_data=validation_generator,
                           use_multiprocessing=True,
                           workers=6,
                           callbacks=callbacks,
                           epochs=100)

## Converting Keras model to tflite

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

with open(tflite_model_name + '.tflite', 'wb') as f:
    f.write(tflite_model)

# Custom tests

In [None]:
# get prefixes of images names
files = os.listdir("../input/" + root_dir + "/data")
dates = set()
for file in files:
    div = file.split("-")
    dates.add(div[0])
print("Available prefixes for the images.")
dates

In [None]:
test_file = "21420221938-img953.jpg" # can be any image of the data directory
test_dimension = (120, 160)
test_img = cv2.imread("../input/" + root_dir + "/data/" + test_file)

plt.imshow(cv2.cvtColor(test_img, cv2.COLOR_BGR2RGB))
plt.show()
print("Shape of the test image: {}".format(test_img.shape))

In [None]:
preprocessed_test = preprocess(test_img, test_dimension)
print("Shape of the preprocessed test image: {}".format(preprocessed_test.shape))
plt.imshow(preprocessed_test, cmap="gray")
plt.show()

### Keras prediction

In [None]:
keras_input = np.empty((1, 120, 160, 1))
keras_input[0,:] = preprocessed_test
prediction = model.predict(keras_input)
prediction

### Tflite prediction

In [None]:
interpreter = tf.lite.Interpreter(model_path=tflite_model_name+'.tflite')

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

interpreter.allocate_tensors()

In [None]:
interpreter.set_tensor(input_details[0]['index'], [preprocessed_test])
    
interpreter.invoke()

linear_predict = interpreter.get_tensor(output_details[0]['index'])
angular_predict = interpreter.get_tensor(output_details[1]['index'])
print(linear_predict)
print(angular_predict)

### Ground truth

In [None]:
print("True values")
print("Linear: " + str(labels['linear'][test_file]))
print("Angular: " + str(labels['angular'][test_file]))

## Visualizing Model features

In [None]:
def get_conv_layers(trained_model):
    num = 0
    nums = []
    for layer in trained_model.layers:
        if 'conv' in layer.name:
            nums.append(num)
        num += 1
    return nums

In [None]:
def visualize_kernels(layer_num=1, trained_model=model, filters_num=6):
    conv_layers_ids = get_conv_layers(trained_model)
    
    if layer_num not in conv_layers_ids:
        print("layer_num is not an index of convolutional layer!")
        print("Indexes of conv layers:")
        print(conv_layers_nums)
        return
    filters, biases = trained_model.layers[1].get_weights()
    filter_min, filter_max = filters.min(), filters.max()
    
    filters = (filters - filter_min) / (filter_max - filter_min)
    max_filters_num = filters.shape[3]
    if filters_num > max_filters_num:
        print("Number of filters to visualize greater than number of filters in chosen conv layer")
        print("Number of filters in chosen layer: " + max_filters_num)
        
    ix = 1
    for i in range(filters_num):
        f = filters[:,:,:,i]
        for j in range(1):
            ax = plt.subplot(filters_num, 3, ix)
            ax.set_xticks([])
            ax.set_yticks([])
            plt.imshow(f[:,:,j], cmap='gray')
            ix += 1
        
    plt.show()

In [None]:
def visualize_feature_maps(layer_num=1, base_model=model, input_img=keras_input, row=8, column=3):
    '''
        layer_num - number of convolutional layer which features will be visualized
        base_model - name of the model that we want to visualize features from
        input_img - input img for the base model to show its features
        row - number of maps in a row
        column - number of maps in a column
    '''
    conv_layers_ids = get_conv_layers(base_model)
    if layer_num not in conv_layers_ids:
        print("layer_num is not an conv layer id")
        print("Conv layers ids:", conv_layers_ids)
        return
    
    visualization_model = K.models.Model(inputs=base_model.inputs, outputs=model.layers[layer_num].output)
    feature_maps = visualization_model.predict(input_img)
    maps_num = feature_maps.shape[3]
    
    if row * column > maps_num:
        print("Specified plot row and column exceeds number of feature maps from chosen layer")
        print("Number of maps in conv layer " + str(layer_num) + " is " + str(maps_num))
        print("cols * row = ", row*column, sep=" ")
        return
    
    ix = 1
    for _ in range(row):
        for _ in range(column):
            ax = plt.subplot(column, row, ix)
            ax.set_xticks([])
            ax.set_yticks([])
            plt.imshow(feature_maps[0, :, :, ix-1], cmap='gray')
            ix += 1
    plt.show()

In [None]:
visualize_kernels(layer_num=2)

In [None]:
visualize_feature_maps(column=4, row=2)