# **Traffic sign detection, model learning**
*By Maxime **Charrière**.*

More infos, explications and links on the **GitHub** !


This NoteBook can totaly be run on **GoogleColab**, try it so you don't have to install package !  

<a href="https://colab.research.google.com/github/maximecharriere/AutonomousRcCar/blob/master/Code/tf_model_training/tf_model_training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
<a href="https://github.com/maximecharriere/AutonomousRcCar" target="_parent"><img src="https://img.shields.io/badge/-GitHub%20page-blue?style=flat&logo=github&label=_" alt="GitHub page"/></a>

# Enable GPU acceleration
Go to: 
`Edit > Notebook settings > Hardware accelerator > GPU`

# Import

In [None]:
%tensorflow_version 1.x
import tensorflow as tf
import os, sys
import urllib.request
import tarfile
import numpy as np
import re

print("Libraries version:")
print(f"Tensorflow: {tf.__version__}")

#Configure Google Drive
Usefull when working in Google Colab to save file

In [None]:
SAVE_IN_DRIVE = True

In [None]:
# to save a backup of trained model
# go to the URL, select your Google account and copy the code below
if SAVE_IN_DRIVE:
    from google.colab import drive
    drive.mount('/content/gdrive')

# Get all files

In [None]:
%cd /content

## Load my Git repo

In [None]:
if not os.path.isdir("AutonomousRcCar"):
    !git clone https://github.com/maximecharriere/AutonomousRcCar.git

In [None]:
# If we work with Google Drive, copy file from repo in GDrive

if SAVE_IN_DRIVE:
    model_training_dir_path = "/content/gdrive/My Drive/Colab Notebooks/ModelTraining_backup"
    !cp -avr "/content/AutonomousRcCar/autonomouscar/tf_model_training/." "{model_training_dir_path}"
else:
    model_training_dir_path = "/content/AutonomousRcCar/autonomouscar/tf_model_training"

## Load Tensorflow repo

In [None]:
if not os.path.isdir("models"):
    !git clone https://github.com/tensorflow/models.git

# Take an old version, current version don't work
%cd /content/models
!git checkout 36e786dc77c099fddc6c5c0f6826ce96730d3988

## Load pre-trained model

Choose a tensorflow model:
- [All object detection models](https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/detection_model_zoo.md#coco-trained-models)
- [Models compatible with Coral Edge TPU](https://coral.ai/models/)

In [None]:
%cd /content
model_name = 'ssd_mobilenet_v2_quantized_300x300_coco_2019_01_03'

if not os.path.isdir(model_name):
    base_url = 'http://download.tensorflow.org/models/object_detection/'
    model_file = model_name + '.tar.gz'
    urllib.request.urlretrieve(base_url + model_file, model_file)
    with tarfile.open(model_file) as tar:
        tar.extractall()
    os.remove(model_file)


In [None]:
base_model_checkpoint = os.path.join("/content", model_name, "model.ckpt")

# Install the Tensorflow Object Detection API
It's necessary in the next scripts
https://github.com/tensorflow/models/tree/master/research/object_detection

In [None]:
!apt-get install -qq protobuf-compiler python-pil python-lxml python-tk
!pip install -q Cython contextlib2 pillow lxml matplotlib
!pip install -q pycocotools
!pip install -q tf_slim

%cd /content/models/research
!protoc object_detection/protos/*.proto --python_out=.

os.environ['PYTHONPATH'] += ':/content/models/research/:/content/models/research/slim/'

In [None]:
#Test if the API is well installed
!python object_detection/builders/model_builder_tf1_test.py

# Install Edge TPU Compiller

In [None]:
!curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
!echo "deb https://packages.cloud.google.com/apt coral-edgetpu-stable main" | sudo tee /etc/apt/sources.list.d/coral-edgetpu.list
!sudo apt-get update
!sudo apt-get install edgetpu-compiler

# Generate `tfrecord` files

In [None]:
%cd {model_training_dir_path}

In [None]:
#Partition data between test and train set
if not os.path.isdir("training/images/test") or not os.path.isdir("training/images/train"):
    !python3 scripts/partition_dataset.py -i training/images/all -o training/images --ratio 0.1 --xml --seed 43

In [None]:
# Convert train folder annotation xml files to a single csv file,
# generate the `label_map.pbtxt` file to `data/` directory as well.
if not os.path.isfile("training/annotations/train_labels.csv"):
    !python3 scripts/xml_to_csv.py -i training/images/train -o training/annotations/train_labels.csv -l training/annotations

# Convert test folder annotation xml files to a single csv.
if not os.path.isfile("training/annotations/test_labels.csv"):
    !python3 scripts/xml_to_csv.py -i training/images/test -o training/annotations/test_labels.csv

In [None]:
# Generate `train.record`
if not os.path.isfile("training/annotations/train.record"):
    !python scripts/generate_tfrecord_tf2.py --csv_input=training/annotations/train_labels.csv --output_path=training/annotations/train.record --img_path=training/images/train --label_map training/annotations/label_map.pbtxt

# Generate `test.record`
if not os.path.isfile("training/annotations/test.record"):
    !python scripts/generate_tfrecord_tf2.py --csv_input=training/annotations/test_labels.csv --output_path=training/annotations/test.record --img_path=training/images/test --label_map training/annotations/label_map.pbtxt


# Configuring the Training `pipeline.conf`

In [None]:
def get_num_classes(pbtxt_fname):
    import sys

    sys.path.append("/content/models/research/")
    from object_detection.utils import label_map_util
    label_map = label_map_util.load_labelmap(pbtxt_fname)
    categories = label_map_util.convert_label_map_to_categories(
        label_map, max_num_classes=90, use_display_name=True)
    category_index = label_map_util.create_category_index(categories)
    return len(category_index.keys())

In [None]:
%cd {model_training_dir_path}
test_record_fname = model_training_dir_path + '/training/annotations/test.record'
train_record_fname = model_training_dir_path + '/training/annotations/train.record'
label_map_pbtxt_fname = model_training_dir_path + '/training/annotations/label_map.pbtxt'
batch_size = 12
num_steps = 10000
trainset_size = len([name for name in os.listdir('training/images/train') if name.endswith((".png",".jpg",".jpeg"))])
num_classes = get_num_classes(label_map_pbtxt_fname)
ssd_anchor_generator = '''{
        num_layers: 6
        min_scale: 0.04
        max_scale: 0.4
        aspect_ratios: 1.0
        aspect_ratios: 0.5
        aspect_ratios: 0.3333
      }'''

#   include_metrics_per_category : true

eval_config = f'''{{
  num_examples: {trainset_size}
  metrics_set: "coco_detection_metrics"
}}'''

In [None]:
%cd /content
base_pipeline_fname = '/content/models/research/object_detection/samples/configs/ssd_mobilenet_v2_quantized_300x300_coco.config' #os.path.join('/content', model_name, 'pipeline.config')
assert os.path.isfile(base_pipeline_fname), f'`{base_pipeline_fname}` not exist'

In [None]:
trained_model_path = os.path.join(model_training_dir_path, "training/model")
if not os.path.isdir(trained_model_path):
    os.makedirs(trained_model_path)

In [None]:
dest_pipeline_fname = os.path.join(trained_model_path, "pipeline.config")

In [None]:
print(base_pipeline_fname)
print(dest_pipeline_fname)

In [None]:
with open(base_pipeline_fname) as f:
    s = f.read()
with open(dest_pipeline_fname, 'w') as f:
    
    # fine_tune_checkpoint
    s = re.sub('fine_tune_checkpoint: ".*?"',
               f'fine_tune_checkpoint: "{base_model_checkpoint}"', s)
    
    # tfrecord files train and test.
    s = re.sub(
        '(input_path: ".*?)(train.record)(.*?")', f'input_path: "{train_record_fname}"', s)
    s = re.sub(
        '(input_path: ".*?)(val.record)(.*?")', f'input_path: "{test_record_fname}"', s)

    # label_map_path
    s = re.sub(
        'label_map_path: ".*?"', f'label_map_path: "{label_map_pbtxt_fname}"', s)

    # Set training batch_size.
    s = re.sub('batch_size: [0-9]+',
               f'batch_size: {batch_size}', s)
    
    # Set training steps, num_steps
    s = re.sub('num_steps: [0-9]+',
               f'num_steps: {num_steps}', s)
    
    # Set number of classes num_classes.
    s = re.sub('num_classes: [0-9]+',
               f'num_classes: {num_classes}', s)

    # Set anchors.
    s = re.sub('ssd_anchor_generator \{.*?\}',
               f'ssd_anchor_generator {ssd_anchor_generator}', s, flags = re.DOTALL)
    
    # Set eval config.
    # s = re.sub('eval_config: \{.*?\}',
    #            f'eval_config {eval_config}', s, flags = re.DOTALL)

    # # Modify deprecated parameter.
    s = re.sub('max_evals: 10', '', s)

    f.write(s)

In [None]:
!cat "{dest_pipeline_fname}" 2>&1 | tee  "{trained_model_path}/training_log_tensorflow.txt"

# Start TensorBoard for monitoring

In [None]:
%load_ext tensorboard
%tensorboard --logdir "/content/gdrive/My Drive/Colab Notebooks/ModelTraining_backup/training/model"

# Train the model

In [None]:
training_time  = %timeit -r 1 -n 1 -o !python /content/models/research/object_detection/model_main.py \
  --model_dir='{trained_model_path}' \
  --pipeline_config_path='{dest_pipeline_fname}' \
  --alsologtostderr \
  2>&1 | tee '{trained_model_path}/training_log_tensorflow.txt'

In [None]:
#save training time in file so we can leave the Notebook during training
with open(f"{trained_model_path}/training_time.txt", 'w') as fd:
    fd.write(str(training_time.best))

In [None]:
# find the last model checkpoint file, i.e. model.ckpt-1000

lst = os.listdir(trained_model_path)
lst = [l for l in lst if 'model.ckpt-' in l and '.meta' in l]
steps=np.array([int(re.findall('\d+', l)[0]) for l in lst])
last_model = lst[steps.argmax()].replace('.meta', '')

last_chekpoint = os.path.join(trained_model_path, last_model)
print(last_chekpoint)

In [None]:
# Freeze the model in default format

!python /content/models/research/object_detection/export_inference_graph.py \
    --input_type=image_tensor \
    --pipeline_config_path='{dest_pipeline_fname}' \
    --output_directory='{trained_model_path}' \
    --trained_checkpoint_prefix='{last_chekpoint}'

In [None]:
# create the tensorflow lite graph
!python /content/models/research/object_detection/export_tflite_ssd_graph.py \
    --pipeline_config_path='{dest_pipeline_fname}' \
    --trained_checkpoint_prefix='{last_chekpoint}' \
    --output_directory='{trained_model_path}' \
    --add_postprocessing_op=true

In [None]:
#Quantize the graph
!tflite_convert \
  --output_file='{trained_model_path}/model_quantized.tflite' \
  --graph_def_file='{trained_model_path}/tflite_graph.pb' \
  --inference_type=QUANTIZED_UINT8 \
  --input_arrays='normalized_input_image_tensor' \
  --output_arrays='TFLite_Detection_PostProcess,TFLite_Detection_PostProcess:1,TFLite_Detection_PostProcess:2,TFLite_Detection_PostProcess:3' \
  --mean_values=128 \
  --std_dev_values=128 \
  --input_shapes=1,300,300,3 \
  --change_concat_input_ranges=false \
  --allow_nudging_weights_to_use_fast_gemm_kernel=true \
  --allow_custom_ops

# Convert model to be compatible with Edge TPU

In [None]:
!edgetpu_compiler -o '{trained_model_path}' '{trained_model_path}/model_quantized.tflite'

#Test the model

In [None]:
import os
import glob
pb_fname = os.path.join(model_training_dir_path, 'training/model/frozen_inference_graph.pb')
# Path to frozen detection graph. This is the actual model that is used for the object detection.
PATH_TO_CKPT = pb_fname
print(PATH_TO_CKPT)

# List of the strings that is used to add correct label for each box.
PATH_TO_LABELS = label_map_pbtxt_fname

# If you want to test the code with your images, just add images files to the PATH_TO_TEST_IMAGES_DIR.
PATH_TO_TEST_IMAGES_DIR =  os.path.join(model_training_dir_path, 'training/images/test')

assert os.path.isfile(pb_fname)
assert os.path.isfile(PATH_TO_LABELS)
TEST_IMAGE_PATHS = glob.glob(os.path.join(PATH_TO_TEST_IMAGES_DIR, "*.jpg"))
assert len(TEST_IMAGE_PATHS) > 0, 'No image found in `{}`.'.format(PATH_TO_TEST_IMAGES_DIR)
print(TEST_IMAGE_PATHS)




#_______________________________________________________________________________



%cd /content/models/research/object_detection

import numpy as np
import os
import six.moves.urllib as urllib
import sys
import tarfile
import tensorflow as tf
import zipfile

from collections import defaultdict
from io import StringIO
from matplotlib import pyplot as plt
from PIL import Image

# This is needed since the notebook is stored in the object_detection folder.
sys.path.append("..")
from object_detection.utils import ops as utils_ops


# This is needed to display the images.
%matplotlib inline


from object_detection.utils import label_map_util

from object_detection.utils import visualization_utils as vis_util


detection_graph = tf.Graph()
with detection_graph.as_default():
    od_graph_def = tf.GraphDef()
    with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid:
        serialized_graph = fid.read()
        od_graph_def.ParseFromString(serialized_graph)
        tf.import_graph_def(od_graph_def, name='')


label_map = label_map_util.load_labelmap(PATH_TO_LABELS)
categories = label_map_util.convert_label_map_to_categories(
    label_map, max_num_classes=num_classes, use_display_name=True)
category_index = label_map_util.create_category_index(categories)


def load_image_into_numpy_array(image):
    (im_width, im_height) = image.size
    return np.array(image.getdata()).reshape(
        (im_height, im_width, 3)).astype(np.uint8)

# Size, in inches, of the output images.
IMAGE_SIZE = (12, 8)


def run_inference_for_single_image(image, graph):
    with graph.as_default():
        with tf.Session() as sess:
            # Get handles to input and output tensors
            ops = tf.get_default_graph().get_operations()
            all_tensor_names = {
                output.name for op in ops for output in op.outputs}
            tensor_dict = {}
            for key in [
                'num_detections', 'detection_boxes', 'detection_scores',
                'detection_classes', 'detection_masks'
            ]:
                tensor_name = key + ':0'
                if tensor_name in all_tensor_names:
                    tensor_dict[key] = tf.get_default_graph().get_tensor_by_name(
                        tensor_name)
            if 'detection_masks' in tensor_dict:
                # The following processing is only for single image
                detection_boxes = tf.squeeze(
                    tensor_dict['detection_boxes'], [0])
                detection_masks = tf.squeeze(
                    tensor_dict['detection_masks'], [0])
                # Reframe is required to translate mask from box coordinates to image coordinates and fit the image size.
                real_num_detection = tf.cast(
                    tensor_dict['num_detections'][0], tf.int32)
                detection_boxes = tf.slice(detection_boxes, [0, 0], [
                                           real_num_detection, -1])
                detection_masks = tf.slice(detection_masks, [0, 0, 0], [
                                           real_num_detection, -1, -1])
                detection_masks_reframed = utils_ops.reframe_box_masks_to_image_masks(
                    detection_masks, detection_boxes, image.shape[0], image.shape[1])
                detection_masks_reframed = tf.cast(
                    tf.greater(detection_masks_reframed, 0.5), tf.uint8)
                # Follow the convention by adding back the batch dimension
                tensor_dict['detection_masks'] = tf.expand_dims(
                    detection_masks_reframed, 0)
            image_tensor = tf.get_default_graph().get_tensor_by_name('image_tensor:0')

            # Run inference
            output_dict = sess.run(tensor_dict,
                                   feed_dict={image_tensor: np.expand_dims(image, 0)})

            # all outputs are float32 numpy arrays, so convert types as appropriate
            output_dict['num_detections'] = int(
                output_dict['num_detections'][0])
            output_dict['detection_classes'] = output_dict[
                'detection_classes'][0].astype(np.uint8)
            output_dict['detection_boxes'] = output_dict['detection_boxes'][0]
            output_dict['detection_scores'] = output_dict['detection_scores'][0]
            if 'detection_masks' in output_dict:
                output_dict['detection_masks'] = output_dict['detection_masks'][0]
    return output_dict


#_______________________________________________________________________________



# running inferences.  This should show images with bounding boxes
%matplotlib inline

print('Running inferences on %s' % TEST_IMAGE_PATHS)
for image_path in TEST_IMAGE_PATHS:
    image = Image.open(image_path)
    # the array based representation of the image will be used later in order to prepare the
    # result image with boxes and labels on it.
    image_np = load_image_into_numpy_array(image)
    # Expand dimensions since the model expects images to have shape: [1, None, None, 3]
    image_np_expanded = np.expand_dims(image_np, axis=0)
    # Actual detection.
    output_dict = run_inference_for_single_image(image_np, detection_graph)
    # Visualization of the results of a detection.
    vis_util.visualize_boxes_and_labels_on_image_array(
        image_np,
        output_dict['detection_boxes'],
        output_dict['detection_classes'],
        output_dict['detection_scores'],
        category_index,
        instance_masks=output_dict.get('detection_masks'),
        use_normalized_coordinates=True,
        line_thickness=2)
    plt.figure(figsize=IMAGE_SIZE)
    plt.imshow(image_np)