In [5]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
import re
import imageio
import cv2
import matplotlib.pyplot as plt
import os
import yaml
from sklearn.metrics import mean_squared_error, mean_absolute_error, accuracy_score

# Settings

In [6]:
train_path = './TrafficSignLocalizationandDetection/train'
valid_path = './TrafficSignLocalizationandDetection/valid'
test_path = './TrafficSignLocalizationandDetection/test'

In [7]:
num_categories = 36

In [8]:
reduced_image_size = (208, 208)

# Preprocessing

## Create Data Frames For Loading Data

In [9]:
raw_train_labels = pd.DataFrame(np.zeros((0, 4 + num_categories)), 
                                columns=['x', 'y', 'w', 'h'] + list(range(1, num_categories + 1)))
raw_train_images = []
raw_test_labels = pd.DataFrame(np.zeros((0, 4 + num_categories)), 
                               columns=['x', 'y', 'w', 'h'] + list(range(1, num_categories + 1)))
raw_test_images = []

## Resize And Denoise Images

In [24]:
def load_images(path):
    images = []
    
    for file in os.listdir(path):
        filename = os.path.join(path, os.fsdecode(file))
        image = cv2.imread(filename)
        # takes a long time to denoise images, only run if you have a lot of time / 
        # extremely fast CPU
        # image = cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21) 
        image = cv2.resize(image, reduced_image_size)
        images.append(image)
    return np.array(images)

## Loading Images

In [23]:
raw_train_images = np.concatenate([
    load_images(os.path.join(train_path, 'images')),
    load_images(os.path.join(valid_path, 'images')),
])
raw_test_images = load_images(os.path.join(test_path, 'images'))

KeyboardInterrupt: 

In [21]:
load_images(os.path.join(train_path, 'images'))

KeyboardInterrupt: 

## Read Labels From File

In [12]:
def load_labels(path):
    labels = pd.DataFrame(np.zeros((0, 40)), columns=['x', 'y', 'w', 'h'] + list(range(1, 37)))
    idx = 0

    for file in os.listdir(path):
        filename = os.path.join(path, os.fsdecode(file))
        file_data = pd.read_csv(filename, sep=' ', header=None)
        file_data.columns = ['cat', 'x', 'y', 'w', 'h']
        cat = file_data['cat'][0]
        new_inst = np.zeros((1, 40))
        new_inst[0][0] = file_data['x'][0]
        new_inst[0][1] = file_data['y'][0]
        new_inst[0][2] = file_data['w'][0]
        new_inst[0][3] = file_data['h'][0]
        new_inst[0][file_data['cat'][0] + 3] = 1
        labels = pd.concat([labels, 
                   pd.DataFrame((new_inst), columns=['x', 'y', 'w', 'h'] + list(range(1, 37)))
                  ])
    return labels

## Load Labels

In [13]:
raw_train_labels = pd.concat([
    load_labels(os.path.join(train_path, 'labels')),
    load_labels(os.path.join(valid_path, 'labels')),
])
raw_test_labels = load_labels(os.path.join(test_path, 'labels'))

# Model Creation

## Creating The Model

Implement the model as originally proposed by the Xception paper by Francois Chollet

In [15]:
input_layer = keras.layers.Input(shape=[reduced_image_size[0], reduced_image_size[1], 3])

conv_1 = keras.layers.Conv2D(32, (3,3), strides=(2,2), activation='relu')(input_layer)
conv_2 = keras.layers.Conv2D(64, (3,3), activation='relu')(conv_1)
drop_1 = keras.layers.Dropout(0.5)(conv_2)

# channel 1/1
conv_3 = keras.layers.Conv2D(128, (1,1), strides=(2,2), activation='relu', padding='same')(drop_1)
# channel 1/2
conv_4 = keras.layers.SeparableConv2D(128, (3,3), padding='same')(drop_1)
conv_5 = keras.layers.SeparableConv2D(128, (3,3), activation='relu', padding='same')(conv_4)
max_pool_1 = keras.layers.MaxPooling2D((3,3), strides=(2,2), padding='same')(conv_5)

concat_1 = keras.layers.add([conv_3, max_pool_1])
bn_2 = keras.layers.BatchNormalization()(concat_1)

# channel 2/1
conv_6 = keras.layers.Conv2D(128, (1,1), strides=(2,2), activation='relu', padding='same')(bn_2)
# channel 2/2
conv_7 = keras.layers.SeparableConv2D(128, (3,3), activation='relu', padding='same')(bn_2)
conv_8 = keras.layers.SeparableConv2D(128, (3,3), activation='relu', padding='same')(conv_7)
max_pool_2 = keras.layers.MaxPooling2D((3,3), strides=(2,2), padding='same')(conv_8)

concat_2 = keras.layers.add([conv_6, max_pool_2])
bn_3 = keras.layers.BatchNormalization()(concat_2)

# channel 3/1
conv_9 = keras.layers.Conv2D(128, (1,1), strides=(2,2), activation='relu', padding='same')(bn_3)
# channel 3/2
conv_10 = keras.layers.SeparableConv2D(128, (3,3), activation='relu', padding='same')(bn_3)
conv_11 = keras.layers.SeparableConv2D(128, (3,3), activation='relu', padding='same')(conv_10)
max_pool_3 = keras.layers.MaxPooling2D((3,3), strides=(2,2), padding='same')(conv_11)

concat_3 = keras.layers.add([conv_9, max_pool_3])
concat_3 = keras.layers.BatchNormalization()(concat_3)

In [16]:
for i in range(8):
    conv_12 = keras.layers.SeparableConv2D(128, (3,3), activation='relu', padding='same')(concat_3)
    conv_13 = keras.layers.SeparableConv2D(128, (3,3), activation='relu', padding='same')(conv_12)
    conv_14 = keras.layers.SeparableConv2D(128, (3,3), activation='relu', padding='same')(conv_13)
    concat_3 = keras.layers.add([concat_3, conv_14])

In [17]:
# channel 4/1
conv_15 = keras.layers.Conv2D(1024, (1,1), (2,2))(concat_3)
# channel 4/2
conv_16 = keras.layers.SeparableConv2D(728, (3,3), activation='relu', padding='same')(concat_3)
conv_17 = keras.layers.SeparableConv2D(1024, (3,3), activation='relu', padding='same')(conv_16)
max_pool_4 = keras.layers.MaxPooling2D((3,3), strides=(2,2), padding='same')(conv_17)

concat_4 = keras.layers.add([conv_15, max_pool_4])

conv_18 = keras.layers.SeparableConv2D(1536, (3,3), activation='relu', padding='same')(concat_4)
conv_19 = keras.layers.SeparableConv2D(2048, (3,3), activation='relu', padding='same')(conv_18)
avg_pool = keras.layers.GlobalAveragePooling2D()(conv_19)

dense_1 = keras.layers.Dense(128, activation='relu')(avg_pool)
dense_2 = keras.layers.Dense(128, activation='relu')(dense_1)
dense_3 = keras.layers.Dense(128)(dense_2)

class_output = keras.layers.Dense(num_categories, activation="softmax")(dense_3)

concat_5 = keras.layers.concatenate([dense_3, class_output])

loc_output = keras.layers.Dense(4)(concat_5)

In [18]:
model = keras.models.Model(inputs=input_layer, outputs=[loc_output, class_output])

model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 208, 208, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d_5 (Conv2D)              (None, 103, 103, 32  896         ['input_2[0][0]']                
                                )                                                                 
                                                                                                  
 conv2d_6 (Conv2D)              (None, 101, 101, 64  18496       ['conv2d_5[0][0]']               
                                )                                                             

                                                                                                  
 add_7 (Add)                    (None, 13, 13, 128)  0           ['add_6[0][0]',                  
                                                                  'separable_conv2d_17[0][0]']    
                                                                                                  
 separable_conv2d_18 (Separable  (None, 13, 13, 128)  17664      ['add_7[0][0]']                  
 Conv2D)                                                                                          
                                                                                                  
 separable_conv2d_19 (Separable  (None, 13, 13, 128)  17664      ['separable_conv2d_18[0][0]']    
 Conv2D)                                                                                          
                                                                                                  
 separable

                                                                                                  
 max_pooling2d_6 (MaxPooling2D)  (None, 7, 7, 1024)  0           ['separable_conv2d_37[0][0]']    
                                                                                                  
 add_14 (Add)                   (None, 7, 7, 1024)   0           ['conv2d_10[0][0]',              
                                                                  'max_pooling2d_6[0][0]']        
                                                                                                  
 separable_conv2d_38 (Separable  (None, 7, 7, 1536)  1583616     ['add_14[0][0]']                 
 Conv2D)                                                                                          
                                                                                                  
 separable_conv2d_39 (Separable  (None, 7, 7, 2048)  3161600     ['separable_conv2d_38[0][0]']    
 Conv2D)  

## Train the Model

In [19]:
model.compile(loss="mse", 
              optimizer='adam',
              metrics=["mse", "categorical_crossentropy"])
model.fit(raw_train_images, 
          [raw_train_labels.iloc[:,:4], raw_train_labels.iloc[:,4:]],
          epochs=40,
          validation_split=0.1,
          verbose=1,
         )

Epoch 1/40


ValueError: in user code:

    File "C:\Users\Armin\anaconda3\envs\tfdml_plugin\lib\site-packages\keras\engine\training.py", line 1160, in train_function  *
        return step_function(self, iterator)
    File "C:\Users\Armin\anaconda3\envs\tfdml_plugin\lib\site-packages\keras\engine\training.py", line 1146, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "C:\Users\Armin\anaconda3\envs\tfdml_plugin\lib\site-packages\keras\engine\training.py", line 1135, in run_step  **
        outputs = model.train_step(data)
    File "C:\Users\Armin\anaconda3\envs\tfdml_plugin\lib\site-packages\keras\engine\training.py", line 993, in train_step
        y_pred = self(x, training=True)
    File "C:\Users\Armin\anaconda3\envs\tfdml_plugin\lib\site-packages\keras\utils\traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "C:\Users\Armin\anaconda3\envs\tfdml_plugin\lib\site-packages\keras\engine\input_spec.py", line 216, in assert_input_compatibility
        raise ValueError(

    ValueError: Layer "model" expects 1 input(s), but it received 0 input tensors. Inputs received: []


## Make Predictions

In [None]:
predictions_loc, predictions_class = model.predict(raw_test_images)
predictions_loc = pd.DataFrame(predictions_loc, columns=['x', 'y', 'w', 'h'])
predictions_class = pd.DataFrame(predictions_class, columns=list(range(1, 37)))

In [None]:
predictions_class

## Compute The IoU Score

In [None]:
def intersection_over_union(prediction, actual):
    center_x_pred = prediction[0]
    center_y_pred = prediction[1]
    width_pred = prediction[2]
    height_pred = prediction[3]
    start_point_pred = (center_x_pred - width_pred / 2, center_y_pred - height_pred / 2)
    end_point_pred = (center_x_pred + width_pred / 2, center_y_pred + height_pred / 2)
    
    center_x_actual = actual[0]
    center_y_actual = actual[1]
    width_actual = actual[2]
    height_actual = actual[3]
    start_point_actual = (center_x_actual - width_actual / 2, center_y_actual - height_actual / 2)
    end_point_actual = (center_x_actual + width_actual / 2, center_y_actual + height_actual / 2)
    
    # determine the (x, y)-coordinates of the intersection rectangle
    xA = max(start_point_pred[0], start_point_actual[0])
    yA = max(start_point_pred[1], start_point_actual[1])
    xB = min(end_point_pred[0], end_point_pred[0])
    yB = min(end_point_pred[1], end_point_pred[1])
    
    # compute the area of intersection rectangle
    interArea = max(0, xB - xA) * max(0, yB - yA)
    
    # compute the area of both the prediction and ground-truth
    # rectangles
    boxAArea = width_pred * height_pred
    boxBArea = width_actual * height_actual
    
    # compute the intersection over union by taking the intersection
    # area and dividing it by the sum of prediction + ground-truth
    # areas - the interesection area
    iou = interArea / float(boxAArea + boxBArea - interArea)
    
    # return the intersection over union value
    return iou

## Evaluate Model

In [None]:
mean_absolute_error(predictions_loc, raw_test_labels.iloc[:,:4])

In [None]:
ious = list(map(intersection_over_union, predictions_loc.to_numpy(), 
         raw_test_labels.iloc[:,:4].to_numpy()
        ))
np.mean(ious)

In [None]:
act_label = list(map(np.argmax, raw_test_labels.iloc[:,4:].to_numpy().tolist()))
pred_label = list(map(np.argmax, predictions_class.to_numpy().tolist()))

In [None]:
accuracy_score(act_label, pred_label)

## Create System To Display Prediction Bounding Boxes

In [None]:
pred = 9
center_x = predictions_loc['x'][pred]
center_y = predictions_loc['y'][pred]
width = predictions_loc['w'][pred]
height = predictions_loc['h'][pred]
start_point = (int((center_x - width / 2) * 208), int((center_y - height / 2) * 208))
end_point = (int((center_x + width / 2) * 208), int((center_y + height / 2) * 208))
color = (255, 0, 0)
thickness = 2
plt.imshow(cv2.rectangle(raw_test_images[pred], start_point, end_point, color, thickness))

In [None]:
model.save('model.keras')