## Norfair Tracking

In [None]:
#!/usr/bin/env python
# -*-coding:utf-8 -*-
"""
  ████
██    ██   Datature
  ██  ██   Powering Breakthrough AI
    ██
 
@File    :   Norfair.ipynb
@Author  :   Keechin Goh
@Version :   1.0
@Contact :   hello@datature.io
@License :   Apache License 2.0
@Desc    :   Norfair Tracking Script
"""

### Introduction
This jupyter notebook runs a Norfair Algorithm atop of various object detection (bounding box) models to track objects in video files.

## Install Python Dependencies

Python Version Used: 3.7 =< version =< 3.9

In [None]:
!pip install -U tensorflow==2.5.0
!pip install -U numpy==1.18.5
!pip install -U Pillow==7.2.0
!pip install -U opencv-python==4.5.1.48
!pip install -U norfair

## Import Python Libraries


In [None]:
import os
import cv2
import time
import norfair
import os.path
import numpy as np
import tensorflow as tf

from PIL import Image
from norfair import Detection, Tracker


## Load Label

In [None]:
def load_label(model_path):
    """Reads label map in the format of .pbtxt and parse into dictionary

    Args:
        label_map_path: the file path to the label_map

    Returns:
        dictionary with the format of {label_index: {'id': label_index, 'name': label_name}}
    """
    label_map_path = os.path.join(model_path, "label_map.pbtxt")

    if os.path.exists(label_map_path) is False:
        raise FileNotFoundError("No valid label map found.")

    label_map = {}

    with open(label_map_path, "r") as label_file:
        for line in label_file:
            if "id" in line:
                label_index = int(line.split(":")[-1])
                label_name = next(label_file).split(":")[-1].strip().strip("'")
                label_map[label_index] = {"id": label_index, "name": label_name}

    return label_map


## Load Image

In [None]:
def load_image_into_numpy_array(img, height, width):
    """Load an image from base64 into a numpy array.

    Puts image into numpy array to feed into tensorflow graph.
    Note that by convention we put it into a numpy array with shape
    (height, width, channels), where channels=3 for RGB.

    Args:
        image_base64: image in base64 format
        height: height of image
        width: width of image

    Returns:
        uint8 numpy array with shape (img_height, img_width, 3)
    """
    image_shape = np.asarray(img).shape

    image_resized = img.resize((height, width))
    return np.array(image_resized), (image_shape[0], image_shape[1])


## Prediction Functions

### Bounding Box Tensorflow Prediction

In [None]:
def bbox_tf_predition(img, trained_model, size, threshold):
    """Run prediction on image in base64 format with trained tf bounding box model and label.

    Args:
        img: input image
        trained_model: loaded tensorflow model.
        size: size of image to load.
        threshold: confidence threshold.

    Returns:
        norfair Detection type
    """

    ## Run prediction
    height, width = size.split(",")

    ## Returned original_shape is in the format of width, height
    image_resized, origi_shape = load_image_into_numpy_array(
        img, int(height), int(width)
    )

    ## The input needs to be a tensor, convert it using `tf.convert_to_tensor`.
    input_tensor = tf.convert_to_tensor(image_resized)

    ## The model expects a batch of images, so add an axis with `tf.newaxis`.
    input_tensor = input_tensor[tf.newaxis, ...]

    ## Feed image into model
    detections_output = trained_model(input_tensor)

    num_detections = int(detections_output.pop("num_detections"))
    detections = {
        key: value[0, :num_detections].numpy()
        for key, value in detections_output.items()
    }
    detections["num_detections"] = num_detections

    ## Filter out predictions below threshold
    indexes = np.where(detections["detection_scores"] > float(threshold))

    ## Extract predictions
    bboxes = detections["detection_boxes"][indexes]
    classes = detections["detection_classes"][indexes].astype(np.int64)
    scores = detections["detection_scores"][indexes]
    detection_info = {
        "boxes": bboxes.tolist(),
        "classes": classes.tolist(),
        "scores": scores.tolist(),
    }
    if len(detection_info["boxes"]) == 0:
        return None
    else:
        detection_out = []
        for i in range(0, len(detection_info["boxes"])):
            temp_box = [[0, 0], [0, 0]]
            temp_box[0][0] = detection_info["boxes"][i][1] * origi_shape[1]
            temp_box[0][1] = detection_info["boxes"][i][0] * origi_shape[0]
            temp_box[1][0] = detection_info["boxes"][i][3] * origi_shape[1]
            temp_box[1][1] = detection_info["boxes"][i][2] * origi_shape[0]

            detection_out.append(
                Detection(
                    points=np.array(temp_box),
                    scores=np.array(
                        [detection_info["scores"][i], detection_info["scores"][i]]
                    ),
                    label=int(detection_info["classes"][i]),
                )
            )

    return detection_out


## Norfair Tracking

In [None]:
def bbox_distance(detected_pose, tracked_pose):
    detection_centroid = np.sum(detected_pose.points, axis=0) / len(detected_pose.points)
    tracked_centroid = np.sum(tracked_pose.estimate, axis=0) / len(detected_pose.points)
    distances = np.linalg.norm(detection_centroid - tracked_centroid, axis=0)
    return distances / (KEYPOINT_DIST_THRESHOLD + distances)


In [None]:
def Norfair_tracking(
    video_path,
    model_path,
    size,
    threshold,
    output_format,
    output_vid_trk_path,
    COLORS,
    DISTANCE_THRESHOLD,
    DETECTION_THRESHOLD,
    HIT_COUNTER_MAX,
    INITIALIZATION_DELAY,
    POINTWISE_HIT_COUNTER_MAX,
):
    global KEYPOINT_DIST_THRESHOLD
    # load model
    trained_model = tf.saved_model.load(os.path.join(model_path, "saved_model"))
    # load map
    category_index = load_label(model_path)

    # read in video
    vid = cv2.VideoCapture(video_path)

    # initialize video writer
    width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(vid.get(cv2.CAP_PROP_FPS))
    codec = cv2.VideoWriter_fourcc(*output_format)
    out = cv2.VideoWriter(output_vid_trk_path, codec, fps, (width, height))

    # initialize tracker

    tracker = Tracker(
        distance_function=bbox_distance,
        distance_threshold=DISTANCE_THRESHOLD,
        detection_threshold=DETECTION_THRESHOLD,
        hit_counter_max=HIT_COUNTER_MAX,
        initialization_delay=INITIALIZATION_DELAY,
        pointwise_hit_counter_max=POINTWISE_HIT_COUNTER_MAX,
    )
    KEYPOINT_DIST_THRESHOLD = height / 40

    idx = 0
    track_count = [0] * 1000000
    while True:
        # Read Video
        (grabbed, frame) = vid.read()
        if not grabbed:
            break

        # Do detection on every video frame
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        prediction = bbox_tf_predition(
            Image.fromarray(frame), trained_model, size, threshold
        )
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

        if prediction == None:
            out.write(frame)
            print("Frame " + str(idx) + " is written!")
            idx += 1
            continue

        tracks = tracker.update(detections=prediction)
        for track in tracks:
            if track_count[int(track.id)] > track.hit_counter:
                track_count[int(track.id)] = track.hit_counter
                continue
            track_count[int(track.id)] = track.hit_counter
            bbox = [0, 0, 0, 0]
            bbox[0] = track.last_detection.points[0][0]
            bbox[1] = track.last_detection.points[0][1]
            bbox[2] = track.last_detection.points[1][0]
            bbox[3] = track.last_detection.points[1][1]
            class_name = category_index[int(track.label)]["name"].strip('"')
            color = tuple(int(c) for c in COLORS[int(track.id) % len(COLORS)])

            # draw bbox on screen
            cv2.rectangle(
                frame,
                (
                    int(bbox[0]),
                    int(bbox[1]),
                ),
                (
                    int(bbox[2]),
                    int(bbox[3]),
                ),
                color,
                int((width + height) / 600),
            )
            ## Draw label background
            cv2.rectangle(
                frame,
                (
                    int(bbox[0]),
                    int(bbox[3]),
                ),
                (
                    int(bbox[2]),
                    int(bbox[3] + int((width + height) / 108)),
                ),
                color,
                -1,
            )
            cv2.putText(
                frame,
                class_name,
                (int(bbox[0]), int(bbox[3] + int((width + height) / 300))),
                cv2.FONT_HERSHEY_SIMPLEX,
                (width + height) / 7500,
                (0, 0, 0),
                int((width + height) / 3000),
                cv2.LINE_AA,
            )
            cv2.putText(
                frame,
                "ID: " + str(track.id),
                (int(bbox[0]), int(bbox[3] + int((width + height) / 136))),
                cv2.FONT_HERSHEY_SIMPLEX,
                (width + height) / 7500,
                (0, 0, 0),
                int((width + height) / 3000),
                cv2.LINE_AA,
            )

        result = np.asarray(frame)
        result = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        # save video
        out.write(frame)
        print("Frame " + str(idx) + " is written!")
        idx += 1

    vid.release()
    out.release()


## Run Norfair for Video

In [None]:
video_path = "./sample_video.mp4"
model_path = "./model"
output_vid_trk_path = "./output_" + video_path.split("./")[1].split(".")[0] + ".mp4"
size = "320,320"
threshold = "0.7"
output_format = "mp4v"
COLORS = np.random.randint(0, 255, size=(200, 3))

# Define constants
DETECTION_THRESHOLD = 0.6
HIT_COUNTER_MAX = 45
INITIALIZATION_DELAY = 4
POINTWISE_HIT_COUNTER_MAX = 10
DISTANCE_THRESHOLD = 0.8

start = time.time()
output_vid = Norfair_tracking(
    video_path,
    model_path,
    size,
    threshold,
    output_format,
    output_vid_trk_path,
    COLORS,
    DISTANCE_THRESHOLD,
    DETECTION_THRESHOLD,
    HIT_COUNTER_MAX,
    INITIALIZATION_DELAY,
    POINTWISE_HIT_COUNTER_MAX
)
end = time.time()
print(
    "Tracking by Norfair for " + video_path + " takes " + str(int(end - start)) + "s."
)


## Display Video

In [None]:
from IPython.display import display, HTML
from base64 import b64encode


def display_video(path):
    mp4 = open(path, "rb").read()
    data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
    display(
        HTML(
            """
          <video width=400 controls>
                <source src="%s" type="video/mp4">
          </video>
      """
            % data_url
        )
    )


In [None]:
display_video(video_path)

In [None]:
display_video(output_vid_trk_path)