## ME5413: Autonomous Mobile Robot  

### Homework 1: Perception  
Due date: 22 February 2024 (Thurs) - 2359 


### Task 1.1 Single-Object Tracking 


Using Template Matching


In [None]:
import os
import sys
import time
import copy

import cv2
import json
import yaml
import numpy as np
import matplotlib.pyplot as plt

sys.path.append(os.path.join(sys.path[0], "../"))
from utils import (
    load_data,
    draw_demo_img,
    draw_polygon,
    restrain_bbox,
    draw_in_plt,
    calcu_iou,
    calcu_center_dist,
    calcu_p_norm,
    get_obj_in_img_hsv,
    calcu_temp_delta_hsv,
)


class KalmanFilter:
    def __init__(self, x, P, A, Q, H, R):
        self.x = x  # state vector
        self.P = P  # state covariance matrix
        self.A = A  # state transition matrix
        self.Q = Q  # process noise covariance matrix
        self.H = H  # measurement matrix
        self.R = R  # measurement noise covariance matrix

    def predict(self):
        self.x = self.A @ self.x
        self.P = self.A @ self.P @ self.A.T + self.Q
        return self.x

    def update(self, z):
        y = z - self.H @ self.x  # measurement residual
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)  # kalman gain
        self.x = self.x + K @ y
        self.P = self.P - K @ self.H @ self.P


def tm_final(data_all, param, demo_output_type="video", write_log=False, realtime=True, log_prefix=""):
    dt = 1 / 30
    temp_adj_max = param["task1"]["temp_size_adjust_ratio_limit"]
    temp_adj_coef = param["task1"]["temp_adjust_coef"]

    kalman = KalmanFilter(
        x=np.array([0, 0, 0, 0], dtype=float),
        P=np.eye(4),
        A=np.array([[1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]], dtype=float),  # x(k+1) = x(k) + v(k)*dt
        H=np.array([[1, 0, 0, 0], [0, 1, 0, 0]], dtype=float),
        Q=np.eye(4),
        R=np.eye(2) * 1,
    )
    kalman4size = KalmanFilter(
        x=np.array([0, 0, 0, 0], dtype=float),
        P=np.eye(4),
        A=np.array([[1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1]], dtype=float),
        H=np.array([[1, 0, 0, 0], [0, 1, 0, 0]], dtype=float),
        Q=np.eye(4),
        R=np.eye(2) * 1,
    )

    for seq in data_all.keys():  # traverse all the sequences
        print(f"Start to process {seq}...")
        data = data_all[seq]
        imgs, gts, fir_track = data["img"], data["ground_truth"], data["fir_track"]

        if demo_output_type == "video":
            if not os.path.exists(os.path.join(sys.path[0], f"./results/{log_prefix}/")):
                os.makedirs(os.path.join(sys.path[0], f"./results/{log_prefix}/"))
            file_name = seq + ".avi"
            file_path = os.path.join(sys.path[0], f"./results/{log_prefix}/", file_name)
            video_out = cv2.VideoWriter(
                file_path, cv2.VideoWriter_fourcc(*"MJPG"), 30, (imgs[0].shape[1], imgs[0].shape[0])
            )

        # load seq related params
        # enable_hsv_finetune = param["task1"][seq]["enable_hsv_finetune"]
        # enable_kalman = param["task1"][seq]["enable_kalman"]
        # enable_multi_scale = param["task1"][seq]["enable_multi_scale"]
        enable_hsv_finetune = True
        enable_kalman = True
        enable_multi_scale = True
        if enable_kalman:
            if enable_hsv_finetune:
                temp_delta = {"dx": 0, "dy": 0, "dw": 0, "dh": 0}
                search_region_delta = {"dx": -50, "dy": -50, "dw": 50, "dh": 50}
            else:
                temp_delta = param["task1"][seq]["kalman"]["temp_delta"]
                search_region_delta = param["task1"][seq]["kalman"]["search_region_delta"]
        else:
            temp_delta = param["task1"][seq]["temp_delta"]
            search_region_delta = param["task1"][seq]["search_region_delta"]
        if enable_multi_scale:
            tp_adap_scale = (0.9, 0.95, 1, 1.05, 1.1)
        else:
            tp_adap_scale = [1]
        obj_hsv = param["task1"][seq]["hsv"]

        template = {
            "x": fir_track[1] + temp_delta["dx"],
            "y": fir_track[0] + temp_delta["dy"],
            "w": fir_track[2] + temp_delta["dw"],
            "h": fir_track[3] + temp_delta["dh"],
        }
        init_temp = copy.deepcopy(template)
        img_temp = imgs[0][template["x"] : template["x"] + template["h"], template["y"] : template["y"] + template["w"]]
        x, y, w, h = get_obj_in_img_hsv(img_temp, obj_hsv)
        obj_in_temp_ratio = {
            "w": w / template["w"],
            "h": h / template["h"],
        }  # the initial ratio of the object size to the template size
        # print(f"obj_in_temp_ratio: {obj_in_temp_ratio}")

        Q = param["task1"][seq]["kalman"]["Q"]
        R = param["task1"][seq]["kalman"]["R"]
        vx = param["task1"][seq]["kalman"]["init"]["vx"]
        vy = param["task1"][seq]["kalman"]["init"]["vy"]
        kalman.Q = np.array(Q)
        kalman.R = np.array(R)
        kalman.x = np.array([template["x"], template["y"], vx, vy], dtype=float)
        kalman4size.Q = np.diag([1, 1, 0.01, 0.01])
        kalman4size.R = np.diag([100, 100])
        kalman4size.x = np.array([template["w"], template["h"], 0, 0], dtype=float)

        iou = []
        center_dist = []
        center_dist_norm = []
        template_list = [
            [template["y"], template["x"], template["w"], template["h"]]
        ]  # align to the original coordinate
        track_center_list = [[template["y"] + template["w"] / 2, template["x"] + template["h"] / 2]]

        for j, img in enumerate(imgs):  # traverse all the images in the sequence
            gt = {"x": gts[j][1], "y": gts[j][0], "w": gts[j][2], "h": gts[j][3]}

            search_region = {
                "x": template["x"] + search_region_delta["dx"],
                "y": template["y"] + search_region_delta["dy"],
                "w": template["w"] + search_region_delta["dw"] * 2,
                "h": template["h"] + search_region_delta["dh"] * 2,
            }
            restrain_bbox(search_region, img.shape)  # restrain search region according to the image size

            img_search = img[
                search_region["x"] : search_region["x"] + search_region["h"],
                search_region["y"] : search_region["y"] + search_region["w"],
            ]

            min_val_best = 1
            max_val_best = 0
            loc_best = None
            for i in range(len(tp_adap_scale)):  # try different scales
                width = int(img_temp.shape[1] * tp_adap_scale[i])
                height = int(img_temp.shape[0] * tp_adap_scale[i])
                if width > img_search.shape[1] or height > img_search.shape[0]:
                    continue
                img_temp_scaled = cv2.resize(img_temp, (width, height))
                match_result = cv2.matchTemplate(img_search, img_temp_scaled, match_method)
                min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(match_result)  # find the best match
                if match_method in [cv2.TM_SQDIFF, cv2.TM_SQDIFF_NORMED]:
                    if min_val < min_val_best:
                        min_val_best = min_val
                        loc_best = min_loc
                else:
                    if max_val > max_val_best:
                        max_val_best = max_val
                        loc_best = max_loc

            new_x = search_region["x"] + loc_best[1]
            new_y = search_region["y"] + loc_best[0]
            new_w = template["w"]
            new_h = template["h"]

            temp_before = copy.deepcopy(template)

            # adjust template to center the object
            if enable_hsv_finetune:
                img_temp_tmp = img[new_x : new_x + new_h, new_y : new_y + new_w]
                delta_center, delta_size = calcu_temp_delta_hsv(img_temp_tmp, obj_hsv, size_ratio=obj_in_temp_ratio)
                delta_center["x"] *= temp_adj_coef["pos"]
                delta_center["y"] *= temp_adj_coef["pos"]
                delta_size["w"] *= temp_adj_coef["size"]
                delta_size["h"] *= temp_adj_coef["size"]
                new_x += delta_center["x"]
                new_y += delta_center["y"]
                new_w += delta_size["w"]
                new_h += delta_size["h"]
                max_w = temp_adj_max["max"] * init_temp["w"]
                min_w = temp_adj_max["min"] * init_temp["w"]
                max_h = temp_adj_max["max"] * init_temp["h"]
                min_h = temp_adj_max["min"] * init_temp["h"]
                new_w = np.clip(new_w, min_w, max_w)
                new_h = np.clip(new_h, min_h, max_h)

            # update the template using kalman
            if enable_kalman:
                z = np.array([new_x, new_y], dtype=float)
                kalman.update(z)
                pos_pred = kalman.predict()
                new_x = int(pos_pred[0])
                new_y = int(pos_pred[1])
                if enable_hsv_finetune:
                    z = np.array([new_w, new_h], dtype=float)
                    kalman4size.update(z)
                    # size_pred = [new_w, new_h]
                    size_pred = kalman4size.predict()
                    new_w = int(size_pred[0])
                    new_h = int(size_pred[1])

            template["x"] = int(new_x)
            template["y"] = int(new_y)
            template["w"] = int(new_w)
            template["h"] = int(new_h)
            restrain_bbox(template, img.shape)

            img_temp = img[template["x"] : template["x"] + template["h"], template["y"] : template["y"] + template["w"]]

            # evaluate
            iou.append(calcu_iou(template, gt))
            center_dist.append(calcu_center_dist(template, gt))
            center_dist_norm.append(calcu_p_norm(template, gt))
            template_list.append([template["y"], template["x"], template["w"], template["h"]])
            track_center_list.append([template["y"] + template["w"] / 2, template["x"] + template["h"] / 2])

            # visualize the result
            if demo_output_type == "plt":
                draw_in_plt(img, temp_before, gt, search_region=search_region, match_result=match_result)
            if demo_output_type == "video":
                demo_img = img.copy()
                demo_img = draw_demo_img(
                    demo_img, temp_before, gt, search_region=search_region, iou=iou[-1], center_dist=center_dist[-1]
                )
                # cv2.imshow("demo", demo_img)
                video_out.write(demo_img)
                # if realtime:
                #     cv2.waitKey(1)
                # else:
                #     cv2.waitKey(0)

        # result post-processing
        if demo_output_type == "video":
            video_out.release()
        iou = np.array(iou)
        center_dist = np.array(center_dist)
        iou_sum = np.sum(iou)
        center_dist_sum = np.sum(center_dist)
        iou_mean = np.mean(iou)
        center_dist_mean = np.mean(center_dist)
        iou_std = np.std(iou)
        center_dist_std = np.std(center_dist)
        center_dist_norm = np.array(center_dist_norm)
        center_dist_norm_mean = np.mean(center_dist_norm)
        # print(f"iou_sum: {iou_sum}, center_dist_sum: {center_dist_sum}")
        print(f"iou_mean: {iou_mean}, center_dist_mean: {center_dist_mean}")
        print(f"iou_std: {iou_std}, center_dist_std: {center_dist_std}")
        print(f"center_dist_norm_mean: {center_dist_norm_mean}")

        # _, ax = plt.subplots(1, 2)
        # ax[0].plot(iou, label="iou", color="r")
        # ax[0].legend()
        # ax[1].plot(center_dist, label="center_dist", color="b")
        # ax[1].legend()
        # plt.show()

        if write_log:
            result_json = {
                "iou_sum": iou_sum,
                "center_dist_sum": center_dist_sum,
                "iou_mean": iou_mean,
                "center_dist_mean": center_dist_mean,
                "iou_std": iou_std,
                "center_dist_std": center_dist_std,
                "center_dist_norm_mean": center_dist_norm_mean,
                "param": param["task1"][seq],
                "precision": center_dist.tolist(),
                "success": iou.tolist(),
                "norm_precision": center_dist_norm.tolist(),
            }
            path_prefix = os.path.join(sys.path[0], f"results/{log_prefix}")
            time_stamp = time.strftime("%m-%d-%H-%M-%S", time.localtime())
            open(os.path.join(sys.path[0], path_prefix, seq + f"_{time_stamp}.json"), "w").write(
                json.dumps(result_json, indent=2)
            )
            np.savetxt(
                os.path.join(path_prefix, seq + f"_{time_stamp}_template.txt"), template_list, delimiter=",", fmt="%d"
            )
            np.savetxt(
                os.path.join(path_prefix, seq + f"_{time_stamp}_center.txt"), track_center_list, delimiter=",", fmt="%d"
            )
        print(f"-" * 50)


seq_to_run = [1, 2, 3, 4, 5]
data_all = load_data(seq_to_run)
param = yaml.load(open(os.path.join(sys.path[0], "../config/param.yaml"), "r"), Loader=yaml.FullLoader)

match_method = cv2.TM_CCOEFF_NORMED
# "plt" or "video". recommend "video" becasue showing animation in plt is too slow
#   If "video", the result will be saved in the Task1/results/nonkalman
demo_output_type = "video"

for seq in seq_to_run:
    param["task1"]["seq_" + str(seq)]["enable_hsv_finetune"] = False
    param["task1"]["seq_" + str(seq)]["enable_kalman"] = False
    param["task1"]["seq_" + str(seq)]["enable_multi_scale"] = False
tm_final(data_all, param, demo_output_type=demo_output_type, realtime=True, write_log=True, log_prefix="nonkalman")

Using Kalman Filter


In [None]:
seq_to_run = [1, 2, 3, 4, 5]
data_all = load_data(seq_to_run)
param = yaml.load(open(os.path.join(sys.path[0], "../config/param.yaml"), "r"), Loader=yaml.FullLoader)

match_method = cv2.TM_CCOEFF_NORMED

# "plt" or "video". recommend "video" becasue showing animation in plt is too slow
#   If "video", the result will be saved in the Task1/results/kalman
demo_output_type = "video"

for seq in seq_to_run:
    param["task1"]["seq_" + str(seq)]["enable_kalman"] = True
    param["task1"]["seq_" + str(seq)]["enable_hsv_finetune"] = False
    param["task1"]["seq_" + str(seq)]["enable_multi_scale"] = False
tm_final(data_all, param, demo_output_type=demo_output_type, realtime=True, write_log=True, log_prefix="kalman")

Evaluate the performance of the Single object tracking algorithm.

In [None]:
# Please see the json files in the Task1/results/nonkalman and Task1/results/kalman


Visualise the results as well. 


In [None]:
# Please see the .avi files in the Task1/results/nonkalman and Task1/results/kalman

Propose Improvements to the work if possible:

In [None]:
seq_to_run = [1, 2, 3, 4, 5]
data_all = load_data(seq_to_run)
param = yaml.load(open(os.path.join(sys.path[0], "../config/param.yaml"), "r"), Loader=yaml.FullLoader)

match_method = cv2.TM_CCOEFF_NORMED

# "plt" or "video". recommend "video" becasue showing animation in plt is too slow
#   If "video", the result will be saved in the Task1/results/kalman
demo_output_type = "video"

for seq in seq_to_run:
    param["task1"]["seq_" + str(seq)]["enable_kalman"] = True
    param["task1"]["seq_" + str(seq)]["enable_hsv_finetune"] = True
    param["task1"]["seq_" + str(seq)]["enable_multi_scale"] = True
tm_final(data_all, param, demo_output_type=demo_output_type, realtime=True, write_log=True, log_prefix="improve")