# Multiple Object Tracking

### Single object tracking: mainly by finding the most similar area around the ball to complete the tracking
### Multi-object tracking: use yolov5 to complete target detection, and do matching by Kalman filters to predict position and Hungarian algorithm to complete tracking

##  Basic environment

In [1]:
# anaconda3
# opencv-python
# filterpy

## Basic heads


In [2]:

import time 
import cv2
import numpy as np
import matplotlib.pyplot as plt

from collections import deque

## yoloV5

In [3]:

class Yolov5():
    def __init__(self, modelpath, confThreshold=0.3, nmsThreshold=0.5, objThreshold=0.5):
        with open('class.names', 'rt') as f:
            self.classes = f.read().rstrip('\n').split('\n')
        self.num_classes = len(self.classes)
        if modelpath.endswith('6.onnx'):
            self.inpHeight, self.inpWidth = 1280, 1280
            anchors = [[19, 27, 44, 40, 38, 94], [96, 68, 86, 152, 180, 137], [140, 301, 303, 264, 238, 542],
                       [436, 615, 739, 380, 925, 792]]
            self.stride = np.array([8., 16., 32., 64.])
        else:
            self.inpHeight, self.inpWidth = 640, 640

            anchors = [[10, 13, 16, 30, 33, 23], [30, 61, 62, 45, 59, 119], [116, 90, 156, 198, 373, 326]]
            self.stride = np.array([8., 16., 32.])
        self.nl = len(anchors)
        self.na = len(anchors[0]) // 2
        self.grid = [np.zeros(1)] * self.nl
        self.anchor_grid = np.asarray(anchors, dtype=np.float32).reshape(self.nl, -1, 2)
        self.net = cv2.dnn.readNet(modelpath)
        self.confThreshold = confThreshold
        self.nmsThreshold = nmsThreshold
        self.objThreshold = objThreshold
        self._inputNames = ''

    def resize_image(self, srcimg, keep_ratio=True, dynamic=False):
        top, left, newh, neww = 0, 0, self.inpWidth, self.inpHeight
        if keep_ratio and srcimg.shape[0] != srcimg.shape[1]:
            hw_scale = srcimg.shape[0] / srcimg.shape[1]
            if hw_scale > 1:
                newh, neww = self.inpHeight, int(self.inpWidth / hw_scale)
                img = cv2.resize(srcimg, (neww, newh), interpolation=cv2.INTER_AREA)
                if not dynamic:
                    left = int((self.inpWidth - neww) * 0.5)
                    img = cv2.copyMakeBorder(img, 0, 0, left, self.inpWidth - neww - left, cv2.BORDER_CONSTANT,
                                             value=(114, 114, 114))  # add border
            else:
                newh, neww = int(self.inpHeight * hw_scale), self.inpWidth
                img = cv2.resize(srcimg, (neww, newh), interpolation=cv2.INTER_AREA)
                if not dynamic:
                    top = int((self.inpHeight - newh) * 0.5)
                    img = cv2.copyMakeBorder(img, top, self.inpHeight - newh - top, 0, 0, cv2.BORDER_CONSTANT,
                                             value=(114, 114, 114))
        else:
            img = cv2.resize(srcimg, (self.inpWidth, self.inpHeight), interpolation=cv2.INTER_AREA)
        return img, newh, neww, top, left

    def _make_grid(self, nx=20, ny=20):
        xv, yv = np.meshgrid(np.arange(ny), np.arange(nx))
        return np.stack((xv, yv), 2).reshape((-1, 2)).astype(np.float32)

    def preprocess(self, img):
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = img.astype(np.float32) / 255.0
        return img

    def postprocess(self, frame, outs, padsize=None):
        frameHeight = frame.shape[0]
        frameWidth = frame.shape[1]
        newh, neww, padh, padw = padsize
        ratioh, ratiow = frameHeight / newh, frameWidth / neww
        # Scan through all the bounding boxes output from the network and keep only the
        # ones with high confidence scores. Assign the box's class label as the class with the highest score.

        confidences = []
        boxes = []
        classIds = []
        for detection in outs:
            if detection[4] > self.objThreshold:
                scores = detection[5:]
                classId = np.argmax(scores)
                confidence = scores[classId] * detection[4]
                if confidence > self.confThreshold:
                    center_x = int((detection[0] - padw) * ratiow)
                    center_y = int((detection[1] - padh) * ratioh)
                    width = int(detection[2] * ratiow)
                    height = int(detection[3] * ratioh)
                    left = int(center_x - width * 0.5)
                    top = int(center_y - height * 0.5)

                    confidences.append(float(confidence))
                    boxes.append([left, top, width, height])
                    classIds.append(classId)
        if len(boxes)==0:
            return frame,[]
        # Perform non maximum suppression to eliminate redundant overlapping boxes with
        # lower confidences.
        indices = cv2.dnn.NMSBoxes(boxes, confidences, self.confThreshold, self.nmsThreshold).flatten()
        out=[]
        for i in indices:
            box = boxes[i]
            left = box[0]
            top = box[1]
            width = box[2]
            height = box[3]
            
            out.append((classIds[i],boxes[i]))
            # frame = self.drawPred(frame, classIds[i], confidences[i], left, top, left + width, top + height)
        return frame,out

    def drawPred(self, frame, classId, conf, left, top, right, bottom):
        # Draw a bounding box.
        cv2.rectangle(frame, (left, top), (right, bottom), (0, 0, 255), thickness=4)

        label = '%.2f' % conf
        label = '%s:%s' % (self.classes[classId], label)

        # Display the label at the top of the bounding box
        labelSize, baseLine = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
        top = max(top, labelSize[1])
        # cv.rectangle(frame, (left, top - round(1.5 * labelSize[1])), (left + round(1.5 * labelSize[0]), top + baseLine), (255,255,255), cv.FILLED)
        cv2.putText(frame, label, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), thickness=2)
        return frame

    def detect(self, srcimg):
        drawImg=srcimg.copy()
        
        img, newh, neww, padh, padw = self.resize_image(srcimg)
        blob = cv2.dnn.blobFromImage(img, scalefactor=1 / 255.0, swapRB=True)
        # blob = cv2.dnn.blobFromImage(self.preprocess(img))
        # Sets the input to the network
        self.net.setInput(blob, self._inputNames)

        # Runs the forward pass to get output of the output layers
        outs = self.net.forward(self.net.getUnconnectedOutLayersNames())[0].squeeze(axis=0)

        # inference output
        row_ind = 0
        for i in range(self.nl):
            h, w = int(self.inpHeight / self.stride[i]), int(self.inpWidth / self.stride[i])
            length = int(self.na * h * w)
            if self.grid[i].shape[2:4] != (h, w):
                self.grid[i] = self._make_grid(w, h)

            outs[row_ind:row_ind + length, 0:2] = (outs[row_ind:row_ind + length, 0:2] * 2. - 0.5 + np.tile(
                self.grid[i], (self.na, 1))) * int(self.stride[i])
            outs[row_ind:row_ind + length, 2:4] = (outs[row_ind:row_ind + length, 2:4] * 2) ** 2 * np.repeat(
                self.anchor_grid[i], h * w, axis=0)
            row_ind += length
        
        
        drawImg,out = self.postprocess(drawImg, outs, padsize=(newh, neww, padh, padw))
        return drawImg,out


## sort algorithm

In [4]:
from __future__ import print_function
# from numba import jit
import numpy as np
from scipy.optimize import linear_sum_assignment
from filterpy.kalman import KalmanFilter

# @jit
def iou(bb_test, bb_gt):
    """
    calculate IOU between 2 boxes
    :param bb_test: box1 = [x1y1x2y2]
    :param bb_gt: box2 = [x1y1x2y2]
    :return: Intersection-over-Union, IoU
    """
    xx1 = np.maximum(bb_test[0], bb_gt[0])
    yy1 = np.maximum(bb_test[1], bb_gt[1])
    xx2 = np.minimum(bb_test[2], bb_gt[2])
    yy2 = np.minimum(bb_test[3], bb_gt[3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    wh = w * h
    o = wh / ((bb_test[2] - bb_test[0]) * (bb_test[3] - bb_test[1]) + (bb_gt[2] - bb_gt[0]) * (
            bb_gt[3] - bb_gt[1]) - wh)
    return o

def convert_bbox_to_z(bbox):
    """
    The detection frame of the form [x1,y1,x2,y2] is converted into the state representation of the filter in the form [x,y,s,r]. 
        where x, y are the center coordinates of the box, s is the area, scale, and r is the aspect ratio
    :param bbox: [x1,y1,x2,y2] are the top-left and bottom-right coordinates, respectively
    :return: [ x, y, s, r ] 4 rows and 1 column, 
        where x,y are the coordinates of the box center position, s is the area, r is the aspect ratio w/h
    """
    w = bbox[2] - bbox[0]
    h = bbox[3] - bbox[1]
    x = bbox[0] + w / 2.
    y = bbox[1] + h / 2.
    s = w * h
    r = w / float(h)
    return np.array([x, y, s, r]).reshape((4, 1))

def convert_x_to_bbox(x, score=None):
    """
    Convert the target box representation of [cx, cy, s, r] to the form of [x_min, y_min, x_max, y_max]
    :param x:[ x, y, s, r ],where x,y are the coordinates of the box center position, s is the area, r
    :param score: confidence level
    :return:[x1,y1,x2,y2],Top-left and bottom-right coordinates
    """
    w = np.sqrt(x[2] * x[3])
    h = x[2] / w
    if score is None:
        return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2.]).reshape((1, 4))
    else:
        return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2., score]).reshape((1, 5))

class KalmanBoxTracker(object):
    count = 0

    def __init__(self, bbox):
        """
        Initialize bounding boxes and trackers
        :param bbox:
        """
        # constant velocity definition
        # use KalmanFilter inside，7 state variables and 4 observation inputs
        self.kf = KalmanFilter(dim_x=7, dim_z=4)
        self.kf.F = np.array(
            [[1, 0, 0, 0, 1, 0, 0], [0, 1, 0, 0, 0, 1, 0], [0, 0, 1, 0, 0, 0, 1], [0, 0, 0, 1, 0, 0, 0],
             [0, 0, 0, 0, 1, 0, 0], [0, 0, 0, 0, 0, 1, 0], [0, 0, 0, 0, 0, 0, 1]])
        self.kf.H = np.array(
            [[1, 0, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0, 0]])
        self.kf.R[2:, 2:] *= 10.
        self.kf.P[4:, 4:] *= 1000.  # give high uncertainty to the unobservable initial velocities
        self.kf.P *= 10.
        self.kf.Q[-1, -1] *= 0.01
        self.kf.Q[4:, 4:] *= 0.01
        self.kf.x[:4] = convert_bbox_to_z(bbox)
        self.time_since_update = 0  # record the number of predictions from the last update to the current frame, and clear 0 after each update (in the update function)
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        self.history = []
        self.hits = 0
        self.hit_streak = 0  # record number of tracked, directly clear to 0 once a frame is missed(in predict function)
        self.age = 0

    def update(self, bbox):
        """
        update state vector with the observed target box
        filterpy.kalman.KalmanFilter.update will modify internal state, estimate self.kf.x based on observations
        reset self.time_since_update, clear self.history。
        :param bbox: target box
        :return:
        """
        self.time_since_update = 0
        self.history = []
        self.hits += 1
        self.hit_streak += 1
        self.kf.update(convert_bbox_to_z(bbox))

    def predict(self):
        """
        Advance state vector and return predicted bounding box estimate
        append the prediction to self.history
        since get_state accesses self.kf.x directly, self.history isn't used
        :return:
        """
        if (self.kf.x[6] + self.kf.x[2]) <= 0:
            self.kf.x[6] *= 0.0
        self.kf.predict()
        # predition times
        self.age += 1
        # if there's no updates during tracking, put hit_streak = 0
        if self.time_since_update > 0:
            self.hit_streak = 0
        self.time_since_update += 1
        # append prediction results to history
        self.history.append(convert_x_to_bbox(self.kf.x))
        return self.history[-1]

    def get_state(self):
        """
        return current bounding box estimation
        :return:
        """
        return convert_x_to_bbox(self.kf.x)

def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3):
    """
    Match the detection box bbox with the tracking box of the Kalman filter by association
    :param detections:detection box
    :param trackers:tracking box, or tracking target
    :param iou_threshold:IOU threshold
    :return:matrix for tracking successful targets: matchs
            Matrix of added targets: unmatched_detections
            Tracking failed(leave the screen) target matrix: unmatched_trackers
    """
    # if the number of tracking targets = 0, construct results directly
    if (len(trackers) == 0) or (len(detections) == 0):
        return np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int)

    # iou doesn't support array calculation. Calculate IOU in pairs one by one and call linear_assignment for matching
    iou_matrix = np.zeros((len(detections), len(trackers)), dtype=np.float32)
    # traverse bbox sets of target detection, each identification of detection box is d
    for d, det in enumerate(detections):
        # Traverse tracking box (Kalman filter prediction) bbox set，each tracking box is identified by t
        for t, trk in enumerate(trackers):
            iou_matrix[d, t] = iou(det, trk)
    # store tracking and detection boxes as [[d,t]...] 2-d arrays in match_indices by Hungarian algorithm
    result = linear_sum_assignment(-iou_matrix)
    matched_indices = np.array(list(zip(*result)))

    # Record unmatched detection boxes and tracking boxes
    # put unmatched detection boxed into unmatched_detections，represent there's new target enter the screen, need to add tracker
    unmatched_detections = []
    for d, det in enumerate(detections):
        if d not in matched_indices[:, 0]:
            unmatched_detections.append(d)
    # put unmatched tracking boxed into unmatched_trackers, represent target leaves the screen and the corresponding tracker should be deleted
    unmatched_trackers = []
    for t, trk in enumerate(trackers):
        if t not in matched_indices[:, 1]:
            unmatched_trackers.append(t)
    # put tracking boxes matched successfully into matches
    matches = []
    for m in matched_indices:
        # filter mataches with low IOU, put them into unmatched_detections and unmatched_trackers
        if iou_matrix[m[0], m[1]] < iou_threshold:
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])
        # put the ones meet conditions into matches in form of [[d,t]...]
        else:
            matches.append(m.reshape(1, 2))
    # Initialize matches, return in the format of np.array
    if len(matches) == 0:
        matches = np.empty((0, 2), dtype=int)
    else:
        matches = np.concatenate(matches, axis=0)

    return matches, np.array(unmatched_detections), np.array(unmatched_trackers)


class Sort(object):
    def __init__(self, max_age=1, min_hits=3):
        """
        Initialization: set the key parameters of the SORT algorithm       
        """
        # max of detections: number of object frames that didn't detected, will be deleted when it exceed
        self.max_age = max_age
        # min of target hits, won't return if less than this
        self.min_hits = min_hits  
        # kalman filter
        self.trackers = []  
        # frame count
        self.frame_count = 0
    
    def update(self, dets):
        """_summary_

        dets  [[x1,y1 ,x2,y2]]
        """
        self.frame_count += 1
        # Predict trajectory positions one by one in the current frame and record the tracker index for state anomalies
        # create 2-d arrays according to the numbers of current all kalman trackers（Number of targets tracked in the previous frame）
        # ：row is the identification index of the Kalman filter, column is the position and ID of the tracking frame
        trks = np.zeros((len(self.trackers), 5))  # Store prediction of trackers
        to_del = []   # store target boxes to be deleted
        ret = []    # store tracking boxes to be returned
        # Loop traverse Kalman tracker list
        for t, trk in enumerate(trks):
            # use Kalman tracker t to generate tracking frame of corresponding target
            pos = self.trackers[t].predict()[0]
            # after traverse，trk stores predicted tracking box of the target tracked in previous frame
            trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
            # if contains null in tracking box, add this tracking box to the list to be delete
            if np.any(np.isnan(pos)):
                to_del.append(t)
        # numpy.ma.masked_invalid shield arrays with invalid values（NaN or inf）
        # numpy.ma.compress_rows compress entire line contains 2-d array with mask value，will remove entire line with mask value
        # trks stores object tracked in previous frame and the predicted tracking frame in current frame
        trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
        # Reverse delete abnormal trackers to prevent index corruption
        for t in reversed(to_del):
            self.trackers.pop(t)
        # associate target detection box and tracking box Kalman filter predicted with targets tracked successfully, added and leaved
        matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets, trks)

        # Update the successfully tracked target frame to the corresponding Kalman filter
        for t, trk in enumerate(self.trackers):
            if t not in unmatched_trks:
                d = matched[np.where(matched[:, 1] == t)[0], 0]
                # update state vector using observed bounding box
                trk.update(dets[d, :][0])

        # create new kalman filter object for the added target to track 
        for i in unmatched_dets:
            trk = KalmanBoxTracker(dets[i, :])
            self.trackers.append(trk)

        # Back-to-Forward Traverse，only return results appear in current frame and hit period more than self.min_hits（unless tracking just begins）；if unhit time more than self.max_age, delete the tracker.
        # hit_streak ignore some initial frames of the target
        i = len(self.trackers)
        for trk in reversed(self.trackers):
            # return the estimated value of the current bounding box
            d = trk.get_state()[0]
            # put box and id of targets successfully tracked into ret list
            if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
                ret.append(np.concatenate((d, [trk.id + 1])).reshape(1, -1))  # +1 as MOT benchmark requires positive
            i -= 1
            # Targets that fail to track or leave the screen are removed from the Kalman tracker
            if trk.time_since_update > self.max_age:
                self.trackers.pop(i)
        # Return box and id of all targets in the current screen, as a two-dimensional matrix
        if len(ret) > 0:
            return np.concatenate(ret)
        return np.empty((0, 5))

## Program entry

### Initialization

In [5]:

yolo = Yolov5('yolov5n.onnx')
# create tracker  max_age  number of shades
tracker = Sort(max_age=10)
# generate different colors
np.random.seed(42)
COLORS = np.random.randint(0, 255, size=(200, 3), dtype='uint8')
# store center point
pts = [deque(maxlen=30) for _ in range(9999)]
# frame rate
fps = 0
cap = cv2.VideoCapture("./multiObject.avi")

## Frame by frame detection and tracking

In [6]:
while True:
    # read the video
    (ret, frame) = cap.read()
    if not ret:
        break
    t1 = time.time()
    # object detection
    srcimg,outs = yolo.detect(frame)
    if len(outs)==0:
        continue
    dets=[x for id,x in outs ]
    ddets=[]
    for box in dets:
        x,y,w,h=box
        ddets.append([x,y,w+x,h+y])
    dets=np.array(ddets)
    if len(dets) == 0:
        continue
    else:
        # multiple object tracking
        # dets  [[x1,y1 ,x2,y2]]
        tracks = tracker.update(dets)
    # process and display tracking results
    num = 0
    for track in tracks:
        bbox = track[:4] # Tracking box coordinates
        indexID = int(track[4]) # Tracking ID
        # Randomly assigned colors
        color = [int(c) for c in COLORS[indexID % len(COLORS)]]
        # The parameters in order are: photo / (upper left corner, lower right corner) / color / line width
        cv2.rectangle(frame, (int(bbox[0]),int(bbox[1])), (int(bbox[2]), int(bbox[3])), color, 2)
        # The parameters in order are: photo / added text / top left corner coordinates / font / font size / color / font thickness
        cv2.putText(frame, str(indexID), (int(bbox[0]), int(bbox[1] - 10)), 0, 5e-1, color, 1)
        num += 1
        # center of detection frame(x,y)
        center = (int(((bbox[0]) + (bbox[2])) / 2), int(((bbox[1]) + (bbox[3])) / 2))
        pts[indexID].append(center)
        cv2.circle(frame, (center), 1, color, 2)
        # Display motion trajectory
        for j in range(1, len(pts[indexID])):
            if pts[indexID][j - 1] is None or pts[indexID][j] is None:
                continue
            cv2.line(frame, (pts[indexID][j -1]), (pts[indexID][j]), color, 2)
   
    #show results


    # cv2.putText(frame, "FPS: %f" %(fps), (int(20),int(20)), 0, 5e-1, (0,255,0), 2)
    cv2.namedWindow("track", 0)
    cv2.resizeWindow('track', 1280, 720)
    # # Calculate frame rate
    # fps = (fps + (1. / (time.time() - t1))) / 2

    cv2.imshow('track', frame)
    # Q to stop
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()