In [None]:
# Import Required Packages
import os
import re
from glob import glob
import cv2
import torch
import numpy as np
from yolox.data.data_augment import ValTransform
from yolox.data.datasets import COCO_CLASSES
from yolox.exp import get_exp
from yolox.utils import  postprocess
import matplotlib.pyplot as plt
from filterpy.kalman import KalmanFilter
from filterpy.common import Q_discrete_white_noise
from scipy.optimize import linear_sum_assignment

In [None]:
# Read Input Frames
input_video = "input_frames/palace.mp4"
frames_dir = "input_frames/"

In [None]:
# # Generate Input Frames
# if not os.path.exists(frames_dir):
#     os.mkdir(frames_dir)
# video = cv2.VideoCapture(input_video)
# frame_count = 0
# while True:
#     ret, frame = video.read()
#     if not ret:
#         break
#     filename = re.search(r"(\w*).mp4",input_video).group(1) + "_" + str(frame_count).zfill(3) + ".jpg"
#     cv2.imwrite(os.path.join(frames_dir, filename), frame)
#     frame_count += 1
# video.release()
# cv2.destroyAllWindows()

In [None]:
# Detection Functions
def init_detector(testing=False):
    exp = get_exp(None, 'yolox-nano')
    if testing:
        exp.test_conf = 0.35
    model = exp.get_model()
    model.eval()
    ckpt = torch.load('yolox_nano.pth', map_location="cpu")
    model.load_state_dict(ckpt["model"])
    return model, exp

def detect(model, exp, img_path):
    img = cv2.imread(img_path)
    ratio = min(exp.test_size[0] / img.shape[0], exp.test_size[1] / img.shape[1])
    pre_proc = ValTransform(legacy=False)
    img,_ = pre_proc(img, None, exp.test_size)
    img = torch.from_numpy(img).unsqueeze(0)
    img = img.float()
    with torch.no_grad():
        outputs = model(img)
    outputs = postprocess(outputs, exp.num_classes, exp.test_conf, exp.nmsthre, class_agnostic=True)
    outputs = (outputs[0]).cpu()
    bboxes = outputs[:, 0:4]
    bboxes /= ratio # resize
    scores = outputs[:, 4] * outputs[:, 5]
    return bboxes, scores

def display(img_path, bboxes, scores=None):
    if scores != None:
        assert(len(bboxes) == len(scores))
    n = len(bboxes)
    colors = cmap = plt.get_cmap('tab20', n)
    img = plt.imread(img_path)
    for i in range(n):
        bb = bboxes[i]
        score = str(scores[i].item())[:4]
        color = colors(i)
        rect = plt.Rectangle((int(bb[0]),int(bb[1])),int(np.abs(bb[2]-bb[0])),int(np.abs(bb[3]-bb[1])), fill=False, edgecolor=color, linewidth=2)
        plt.gca().add_patch(rect)
        if scores != None:
            plt.text(int(bb[0]),int(bb[1])+50, score, color=color, fontsize=12)
    plt.imshow(img, origin='upper', interpolation='nearest')
    plt.axis('off')
    plt.show()
    plt.close()

In [None]:
# Test Detection Functions
model,exp = init_detector(testing=True)
for img_path in sorted(glob(frames_dir+'*.jpg')):
    # predict detection boxes and scores
    bboxes, scores = detect(model, exp, img_path)
    display(img_path, bboxes, scores)
    break

In [None]:
# Track and Kalman Filter Functions

# Track Class keeps track of an object over time.
# Includes lists for frames and bounding boxes and a kalman filter
# References: * https://www.thinkautonomous.ai/blog/computer-vision-for-tracking/
#             * https://github.com/andrewadare/kalman-tracker
#             * https://filterpy.readthedocs.io/en/latest/kalman/KalmanFilter.html
class Track:
    def __init__(self, frame, bbox, testing=False):
        self.testing = testing
        self.frames = [frame]
        self.bboxes = [bbox]
        self.kf = KalmanFilter(dim_x=8, dim_z=4) # Define the Kalman Filter for this track
        # Get the center of bounding box
        cx = (bbox[0] + bbox[2])/2
        cy = (bbox[1] + bbox[3])/2
        # Get the width and height
        w = np.abs(bbox[0] - bbox[2])
        h = np.abs(bbox[1] - bbox[3])
        # Define the state vector x = [cx cy w h vx vy vw vh]
        x = [cx, cy, w, h, 0, 0, 0, 0]
        self.kf.x = np.array(x)
        if self.testing:
            print('x:\n\t', self.kf.x)
        # Define the uncertainty matrix P (use arbritrary large number to be changed over time)
        self.kf.P *= 100
        if self.testing:
            print('P:')
            for i in range(len(self.kf.P)):
                print('\t', self.kf.P[i])
        # Define state transition matrix F using constant velocity model
        dt = 1 # time step
        self.kf.F = np.eye(self.kf.dim_x)
        self.kf.F[0, 4] = self.kf.F[1, 5] = self.kf.F[2, 6] = self.kf.F[3, 7]= dt
        if self.testing:
            print('F:')
            for i in range(len(self.kf.F)):
                print('\t', self.kf.F[i])
        # Define process noise matrix Q (initialize with 0.01 to be changed over time)
        self.kf.Q = Q_discrete_white_noise(dim=2, dt=dt, var=0.1, block_size=4)
        if self.testing:
            print('Q:')
            for i in range(len(self.kf.Q)):
                print('\t', self.kf.Q[i])
        # Define measurement function H
        self.kf.H = np.zeros([self.kf.dim_z, self.kf.dim_x])
        self.kf.H[0, 0] = self.kf.H[1, 1] = self.kf.H[2, 2] = self.kf.H[3, 3] = 1
        if self.testing:
            print('H:')
            for i in range(len(self.kf.H)):
                print('\t', self.kf.H[i])
        # Define measurement noise R (1.5 for cx and cy, 10 for w and h)
        self.kf.R = np.eye(self.kf.dim_z)
        std_center = 1.5
        std_dim = 10
        self.kf.R[0,0] = self.kf.R[1,1] = std_center
        self.kf.R[2,2] = self.kf.R[3,3] = std_dim
        if self.testing:
            print('R:')
            for i in range(len(self.kf.R)):
                print('\t', self.kf.R[i])

    def predict(self):
        if self.testing:
            print('x before predict:',self.kf.x)
        self.kf.predict()
        if self.testing:
            print('x after predict:',self.kf.x)

    def update(self, frame, bbox):
        self.frames.append(frame)
        self.bboxes.append(bbox)
        # Get the center of bounding box
        cx = (bbox[0] + bbox[2])/2
        cy = (bbox[1] + bbox[3])/2
        # Get the width and height
        w = np.abs(bbox[0] - bbox[2])
        h = np.abs(bbox[1] - bbox[3])
        if self.testing:
            print('x before update:',self.kf.x)
        self.kf.update((cx,cy,w,h))
        if self.testing:
            print('x after update:',self.kf.x)    

In [None]:
# Test Track and Kalman Filter Functions
t = Track(0, (0,2,5,10), testing=True)
print()
t.predict()
t.update(1,(1,1,6,10))

In [None]:
# Association Functions
def IOU(bbox1, bbox2): # bbox1: A list of four numbers [x1, y1, x2, y2] representing the bounding box
    assert bbox1[0] < bbox1[2]
    assert bbox1[1] < bbox1[3]
    assert bbox2[0] < bbox2[2]
    assert bbox2[1] < bbox2[3]
    x1 = max(bbox1[0], bbox2[0])
    y1 = max(bbox1[1], bbox2[1])
    x2 = min(bbox1[2], bbox2[2])
    y2 = min(bbox1[3], bbox2[3])
    if x1 > x2 or y1 > y2:
        return 0
    intersection_area = (x2 - x1) * (y2 - y1)
    union_area =  (bbox1[2] - bbox1[0])*(bbox1[3] - bbox1[1]) + (bbox2[2] - bbox2[0])*(bbox2[3] - bbox2[1]) - intersection_area
    return intersection_area / union_area

def gen_cost_matrix(tracks, detections):
    # generate cost matrix with tracks as rows and detections as columns
    num = max(len(tracks), len(detections)) 
    cost_matrix = np.zeros((num,num))
    for i in range(num):
        if i < len(tracks):
            t = tracks[i]
            for j in range(num):
                if j < len(detections):
                    det = detections[j]
                    cost_matrix[i][j] = IOU(t, det)
    return cost_matrix

def associate(tracks, detections, rejection_threshold): # Needs work
    cost_matrix = gen_cost_matrix(tracks, detections)
    association = linear_sum_assignment(cost_matrix,maximize=True)
    tracks_remain = []
    detections_remain = []
    for i in range(len(association[0])):
        track_ind = association[0][i]
        detection_ind = association[1][i]
        if cost_matrix[track_ind][detection_ind] < 0.2
        if track_ind < len(tracks) and detection_ind < len(detections):
            if cost_matrix[track_ind][detection_ind] < 0.2:
                tracks_remain.append(tracks[track_ind])
                detections_remain.append(detections[detection_ind])
                continue
    return (tracks_remain, detections_remain)

In [None]:
# Test Association Functions
# IOU tests
assert(IOU([1,1,3,5],[1,1,3,5]) == 1)
assert(IOU([0,2,4,8],[2,5,4,8]) == 0.25)
assert(IOU([0,2,4,8],[2,5,3,7]) == 2/24)
assert(IOU([0,2,4,8],[2,5,6,10]) == 6/38)
model,exp = init_detector(testing=True)
for img_path in sorted(glob(frames_dir+'*.jpg')):
    bboxes, scores = detect(model, exp, img_path)
    i1 = 3
    i2 = 5
    display(img_path, [bboxes[i1]]+[bboxes[i2]], [scores[i1]]+[scores[i2]])
    print('IOU:',IOU(bboxes[i1], bboxes[i2]).item())
    break
# Cost Matrix tests
tracks = [
    [0,0,10,10],
    [1,1,5,5],
    [9,5,23,6],
]
detections = [
    [1,1,5,5],
    [9,5,23,8],
]
cm = gen_cost_matrix(tracks, detections)
print('Cost Matrix:\n',cm)
ha = linear_sum_assignment(cm,maximize=True)
print('Hungarian Algorithm:\n',ha)
t_remain, det_remain = associate(tracks, detections, 0.3)
print(t_remain)
print(det_remain)

In [None]:
# Track_List to hold all the tracks
class Track_List:
    def __init__(self, testing=False):
        self.curr_ind = 0
        self.track_list = []
    def addTrack(self, frame, bbox):
        self.track_list[self.curr_id] = Track(frame, bbox)
        self.curr_ind += 1
    def predictAll(self):
        for track in self.track_list:
            track.predict()
        bboxes = []
        for track in self.track_list:
            x = track.kf.x
            cx = x[0]
            cy = x[1]
            w = x[2]
            h = x[3]
            bboxes.append([cx-w/2, cy-h/2, cx+w/2, cy+h/2])
        return bboxes
    def updateTrack(track_ind, frame, bbox):
        self.track_list[track_ind].update(frame, bbox)

In [None]:
# Test Track_List Functions

In [None]:
# BYTE Association Algorithm
model,exp = init_detector()
detection_threshold = 0.6 # Detection score threshold for high vs low
rejection_threshold = 0.2 # Reject associations if the IOU is less than this

tracks = Track_List()
frame_num = 0
for img_path in sorted(glob(frames_dir+'*.jpg')):
    # predict detection boxes and scores
    bboxes, scores = detect(model, exp, img_path)
    det_high = []
    det_low = []
    for i in range(len(bboxes)):
        bbox = bboxes[i].tolist()
        score = scores[i].item()
        if score > detection_threshold:
            det_high.append((bbox,score))
        else:
            det_low.append((bbox,score))
        
    # predict new locations of tracks
    track_bboxes = tracks.predictAll()
    
    # first association
    tracks_remain, det_remain = associate(track_bboxes, det_high, rejection_threshold)
    # TODO update kalman filter for associated track,detection pairs
    
    # second association
    tracks_re_remain,_ = associate(tracks_remain, det_low, rejection_threshold)
    # TODO update kalman filter for associated track,detection pairs
    
    # initialize new tracks
    for det in det_remain:
        tracks.addTrack(frame_num, det)
    frame_num += 1
    break