# Tensorflow Object Detection

- **Author:** Sakthi Santhosh
- **Created on:** 21/08/2022

## Importing Modules

In [None]:
from cv2 import (
    COLOR_BGR2RGB,
    VideoCapture,
    imwrite,
    imread,
    cvtColor
)
from os import path
from time import sleep

## Global Declarations

In [None]:
PATHS = {
    "capture": "./workspace/camera",
    "testing": (
        "./workspace/testing",
        "./workspace/testing/images",
        "./workspace/testing/annotations"
    ),
    "training": (
        "./workspace/training",
        "./workspace/training/images",
        "./workspace/training/annotations"
    ),
    "export": (
        "./workspace/export",
        "./workspace/export/tflite"
    ),
    "label_map": "./workspace/label_map.pbtxt",
    "pretrained_model": "./models/pretrained",
    "custom_model": "./models/custom",
    "tools": "./tools/research/object_detection",
    "output": "./workspace/output",
    "record": "./workspace/record"
}

CAPTURE_COUNT = 12
DETECTION_THRESHOLD = 0.7

## Create Label and Label Map

In [None]:
with open("./objects.txt", 'r') as file_handle:
    LABELS = tuple(map(str.strip, file_handle.readlines()))

LABEL_MAP = []
for index, label in enumerate(LABELS, start=1):
    LABEL_MAP.append({
        "name": label,
        "id": index
    })

## Setup Folders

In [None]:
if not path.exists("./workspace"):
    !mkdir ./workspace/ {PATHS["export"][0]} {PATHS["export"][1]} \
        {PATHS["output"]} {PATHS["record"]}

if not path.exists("./models"):
    !mkdir ./models/ {PATHS["pretrained_model"]} {PATHS["custom_model"]}

if not path.exists("./tools"):
    !mkdir ./tools/

if not path.exists(PATHS["capture"]):
    !mkdir {PATHS["capture"]}

if not path.exists(PATHS["output"]):
    !mkdir {PATHS["output"]}

if not path.exists(PATHS["testing"][0]):
    !mkdir {PATHS["testing"][0]} {PATHS["testing"][1]} {PATHS["testing"][2]}

if not path.exists(PATHS["training"][0]):
    !mkdir {PATHS["training"][0]} {PATHS["training"][1]} {PATHS["training"][2]}

for label in LABELS:
    folder = path.join(PATHS["capture"], label)
    if not path.exists(folder):
        !mkdir {folder} {path.join(folder, "images")} {path.join(folder, "annotations")}

## Capture Images

In [None]:
camera_handle = VideoCapture(0)
for label in LABELS:
    print("\033[30;01mImage Capture (%s)\033[00m"%(label))
    for counter in range(1, CAPTURE_COUNT + 1):
        success, frame = camera_handle.read()
        if not success:
            print("  \033[31;01mError:\033[00m Image capture failed.")
            break
        print("  %s_image%d.jpg."%(label, counter))
        imwrite(
            path.join(PATHS["capture"], label, "images", label + "_image%d.jpg"%(counter)),
            frame
        )
        sleep(2)
    print()
    sleep(5)

camera_handle.release()

## Annotating Images

- Annotate the images with [Make Sense](https://makesense.ai).
- Set the project's name to "images".
- After downloading the annotations, place them in their respective folders.
- Change the `<path>` tag to location of the corresponding image.

## Copy Files for Training and Testing

In [None]:
for label in LABELS:
    folder1 = path.join(PATHS["capture"], label, "images")
    folder2 = path.join(PATHS["capture"], label, "annotations")

    for counter in range(1, CAPTURE_COUNT + 1, 3):
        !cp {path.join(folder1, label + "_image%d.jpg"%(counter))} \
            {path.join(folder1, label + "_image%d.jpg"%(counter + 1))} \
            {PATHS["training"][1]}
        !cp {path.join(folder2, label + "_image%d.xml"%(counter))} \
            {path.join(folder2, label + "_image%d.xml"%(counter + 1))} \
            {PATHS["training"][2]}

    for counter in range(3, CAPTURE_COUNT + 1, 3):
        !cp {path.join(folder1, label + "_image%d.jpg"%(counter))} \
            {PATHS["testing"][1]}
        !cp {path.join(folder2, label + "_image%d.xml"%(counter))} \
            {PATHS["testing"][2]}

## Download Models and Tools

In [None]:
!wget -O ./models/pretrained/model.tar.gz \
    http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_mobilenet_v2_fpnlite_320x320_coco17_tpu-8.tar.gz
!tar -x -z -f ./models/pretrained/model.tar.gz -C ./models/pretrained/

!git clone https://github.com/tensorflow/models ./tools/
!cd ./tools/research/ && protoc ./object_detection/protos/*.proto --python_out=./

!cp ./tools/research/object_detection/packages/tf2/setup.py ./tools/research/ \
    && python3 -m pip install ./tools/research

!python3 ./tools/research/object_detection/builders/model_builder_tf2_test.py

## Create Label Map File

In [None]:
with open(PATHS["label_map"], 'w') as file_handle:
    for label_map in LABEL_MAP:
        file_handle.write("item {\n")
        file_handle.write("\tname: \"%s\"\n"%(label_map["name"]))
        file_handle.write("\tid: %d\n}\n"%(label_map["id"]))

## Create Tensorflow Records From Images and Annotations

In [None]:
!python3 ./scripts/record.py {PATHS["testing"][0]}/images/ \
    {PATHS["testing"][0]}/annotations/ {PATHS["label_map"]} \
    {PATHS["record"]}/test.record

!python3 ./scripts/record.py {PATHS["training"][0]}/images/ \
    {PATHS["training"][0]}/annotations/ {PATHS["label_map"]} \
    {PATHS["record"]}/train.record

## Copy Pipeline File From Pretrained Model to Custom Model

In [None]:
!cp {PATHS["pretrained_model"]}/ssd_mobilenet_v2_fpnlite_320x320/pipeline.config \
    {PATHS["custom_model"]}

## Modify Config File for Custom Model

### Parameters to Change

- `num_classes`: According to number of objects.
- `batch_size`: 2-4
- `fine_tune_checkpoint`: ./models/pretrained/ssd_mobilenet_v2_fpnlite_320x320/checkpoint/ckpt-0
- `fine_tune_checkpoint_type`: detection
- `label_map_path`: ./workspace/label_map.pbtxt
- `tf_record_input_reader.input_path`: ./workspace/record/train.record
- `eval_input_reader.label_map_path`: ./workspace/label_map.pbtxt
- `eval_input_reader.tf_record_input_reader.input_path`: ./workspace/record/test.record
- Optionally, one can change the parameter `total_steps` to increase/decrease the number of steps to train the model or specify it as a parameter before training.

## Train the Model (Finally!)

In [None]:
!python3 ./tools/research/object_detection/model_main_tf2.py \
    --model_dir={PATHS["custom_model"]} \
    --pipeline_config_path={PATHS["custom_model"]}/pipeline.config \
    --num_train_steps=2000

## Evaluate the Model (Optional)


In [None]:
!python3 ./tools/research/object_detection/model_main_tf2.py \
    --model_dir={PATHS["custom_model"]} \
    --pipeline_config_path={PATHS["custom_model"]}/pipeline.config \
    --checkpoint_dir={PATHS["custom_model"]}

## Load Trained Model From Checkpoint

In [None]:
from matplotlib import pyplot

from numpy import array, expand_dims, int64
from object_detection.builders import model_builder
from object_detection.utils import (
    config_util,
    label_map_util,
    visualization_utils
)
import tensorflow

configs = config_util.get_configs_from_pipeline_file(
    path.join(PATHS["custom_model"], "pipeline.config")
)
detection_model = model_builder.build(model_config=configs["model"], is_training=False)

checkpoint = tensorflow.compat.v2.train.Checkpoint(model=detection_model)
checkpoint.restore(path.join(PATHS["custom_model"], "ckpt-3")).expect_partial()

@tensorflow.function
def detect_fn(image):
    image, shapes = detection_model.preprocess(image)
    prediction_dict = detection_model.predict(image, shapes)
    detections = detection_model.postprocess(prediction_dict, shapes)
    return detections

## Image Detector

In [None]:
def detect_image(image_file):
    count = 0
    image_handle = imread(image_file)
    image_array = array(image_handle)
    image_expanded = expand_dims(image_array, axis=0)

    input_tensor = tensorflow.convert_to_tensor(image_expanded, dtype=tensorflow.float32)
    detection = detect_fn(input_tensor)

    detection_count = int(detection.pop("num_detections"))
    detection = {key: value[0, :detection_count].numpy() for key, value in detection.items()}
    detection["num_detections"] = detection_count

    detection["detection_classes"] = detection["detection_classes"].astype(int64)

    label_id_offset = 1
    image_array_with_detections = image_array.copy()

    visualization_utils.visualize_boxes_and_labels_on_image_array(
        image_array_with_detections,
        detection["detection_boxes"],
        detection["detection_classes"] + label_id_offset,
        detection["detection_scores"],
        category_index,
        use_normalized_coordinates=True,
        max_boxes_to_draw=5,
        min_score_thresh=DETECTION_THRESHOLD,
        line_thickness=20,
        agnostic_mode=False
    )
    pyplot.savefig(path.join(PATHS["output"], path.basename(image_file)))

category_index = label_map_util.create_category_index_from_labelmap(PATHS["label_map"])

## Detecting Objects From Saved Images

In [None]:
for label in LABELS:
    for counter in range(3, CAPTURE_COUNT + 1, 3):
        detect_image(path.join(PATHS["testing"][1], label + "_image%d.jpg"%(counter)))

## Freezing the Trained Model

In [None]:
!python3 ./tools/research/object_detection/exporter_main_v2.py \
    --input_type=image_tensor \
    --pipeline_config_path={PATHS["custom_model"]}/pipeline.config \
    --trained_checkpoint_dir={PATHS["custom_model"]} \
    --output_directory={PATHS["export"][0]}

## Conversion to TFLite

In [None]:
!python3 ./tools/research/object_detection/export_tflite_graph_tf2.py \
    --pipeline_config_path={PATHS["custom_model"]}/pipeline.config \
    --trained_checkpoint_dir={PATHS["custom_model"]} \
    --output_directory={PATHS["export"][1]}

!tflite_convert --saved_model_dir={PATHS["export"][1]}/saved_model/ \
    --output_file={PATHS["export"][1]}/saved_model/custom.tflite \
    --input_shapes=1,300,300,3 --input_arrays=normalized_input_image_tensor \
    --output_arrays='TFLite_Detection_PostProcess','TFLite_Detection_PostProcess:1', \
        'TFLite_Detection_PostProcess:2','TFLite_Detection_PostProcess:3' \
    --inference_type=FLOAT --allow_custom_ops