# Implementation of SORT
https://github.com/abewley/sort

#### Import Dependencies

In [74]:
import numpy as np
from scipy.optimize import linear_sum_assignment # Hungarian Method
from filterpy.kalman import KalmanFilter

#### Bounding Box Conversions

In [21]:
def convert_x_to_bbox(x,score=None):
    """
    Takes a bounding box in the centre form [x,y,s,r] and returns it in the form
    [x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right
    """
    w = np.sqrt(x[2] * x[3])
    h = x[2] / w
    if(score==None):
        return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.]).reshape((1,4))
    else:
        return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.,score]).reshape((1,5))


In [44]:
def convert_bbox_to_z(bbox):
    """
    Takes a bounding box in the form [x1,y1,x2,y2] (or [x1,y1,x2,y2, score]) and returns z in the form
    [x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is
    the aspect ratio
    """
    w = bbox[2] - bbox[0]
    h = bbox[3] - bbox[1]
    x = bbox[0] + w/2.
    y = bbox[1] + h/2.
    s = w * h    #scale is just area
    r = w / float(h)
    return np.array([x, y, s, r]).reshape((4, 1))


#### Hungarian Method

In [2]:
def linear_assignment(cost_matrix):
    x, y = linear_sum_assignment(cost_matrix)
    return np.array(list(zip(x, y)))

In [3]:
def iou_batch(bb_test, bb_gt):
    # From SORT: Computes IOU between two bboxes in the form [x1,y1,x2,y2]
    bb_gt = np.expand_dims(bb_gt, 0)
    bb_test = np.expand_dims(bb_test, 1)
    
    xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
    yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
    xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
    yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    wh = w * h
    o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1])                                      
        + (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh)                                              
    return(o)  ## returns matrix with iou, rows: bb_test, cols: bb_gt

#### Detection to Tracker Association

In [4]:
def associate_detections_to_tracker(detections, trackers, iou_threshold=0.3):
    # Assigns detections to tracked object (both represented as bounding boxes)
    # Returns 3 lists of matches, unmatched_detections and unmatched_trackers
    if(len(trackers)==0):
        return np.empty((0,5), dtype=int), np.arange(len(detections)), np.empty((0,5), dtype=int)
    
    iou_matrix = iou_batch(detections, trackers) ## matrix with iou, rows: detections, cols: trackers
    
    if min(iou_matrix.shape) > 0:     ## iou_matrix has entries (?)
        a = (iou_matrix > iou_threshold).astype(np.int32)      # potential matches
        if a.sum(1).max() == 1 and a.sum(0).max() == 1:        # in every row and column only one match
            matched_indices = np.stack(np.where(a), axis=1)
        else:
            ## Hungarian Method: indices of matching detections (col 0) and trackers (col 1), maximizing IOU
            matched_indices = linear_assignment(-iou_matrix)   
    else:
        matched_indices = np.empty(shape=(0,2))
    
    ## Unmatched detections and trackers
    unmatched_detections = []
    for d, det in enumerate(detections):
        if(d not in matched_indices[:, 0]):
            unmatched_detections.append(d)
    unmatched_trackers = []
    for t, trk in enumerate(trackers):
        if(t not in matched_indices[:, 1]):
            unmatched_trackers.append(t)
    
    # filter out matched with low IOU
    matches = []
    for m in matched_indices:
        if (iou_matrix[m[0], m[1]] < iou_threshold):
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])
        else:
            matches.append(m.reshape(1,2))
    if(len(matches)==0):
        matches = np.empty((0, 2), dtype=int)
    else:
        matches = np.concatenate(matches, axis=0)
        
    return matches, np.array(unmatched_detections), np.array(unmatched_trackers)
        

#### Class Sort

In [73]:
class Sort(object):
    def __init__(self, max_age=1, min_hits=3, ioun_threshold=0.3):
        self.max_age = max_age   ## dead tracklets
        self.min_hits = min_hits
        self.iou_threshold = iou_threshold
        self.trackers = []       ## list of tracklets as KalmanBoxTracker-Objects
        self.frame_count = 0
    
    def update(self, dets=np.empty((0,5))):   ## rows = detections, columns = bounding box, score
        # dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]
        self.frame_count += 1
        
        # get predicted locations from existing trackers.
        ## create new tracker-array from actual tracker-array self.trackers with Kalman-Predicted positions
        trks = np.zeros((len(self.trackers), 5))
        to_del =[] ## delete-list
        ret = []   ## ???
        
        for t, trk in enumerate(trks):
            pos = self.trackers[t].predict()[0]  ## predicted position by Kalman Filter
            trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
            if np.any(np.isnan(pos)):   ## if NaN in positions set tracklet on delete-list
                to_del.append(t)
            
        trks = np.ma.compress_rows(np.ma.masked_invalid(trks))     # np-array without rows with invalid entries
        for t in reversed(to_del):
            self.trackers.pop(t)
        
        ## Use Hungarian Method to associate dets and trks according to IOU (intersection over union)
        ## get matched as lists of index-pairs [[det, trk], ... [det, trk]] according to dets and trks
        ## and unmatched as lists of indices
        matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets, trks, self.iou_threshold)
        
        # update matched trackers with assigned detections
        for m in matched:
            self.trackers[m[1]].update(dets[m[0], :])    # Kalman-Update
        
        # create and initialize new trackers for unmatched detections
        for i in unmatched_dets:
            trk = KalmanBoxTracker(dets[i,:])
            self.trackers.append(trk)
        
        # clean up trackers (remove aged, ...)
        i = len(self.trackers)
        for trk in reversed(self.trackers):
            d =trk.get_state()[0]   # actual bbox [x1, y1, x2, y2] of tracker trk
            if (
                (trk.time_since_update < 1) and       # trk got an update
                (trk.hit_streak >= self.min_hits or   # trk has hits in a series
                 self.frame_count <= self.min_hits)   # we are in the beginning
            ):
                ret.append(np.concatenate((d,[trk.id+1])).reshape(1,-1)) # +1 as MOT benchmark requires positive
            i -= 1
            # remove tracklets
            if(trk.time_since_update > self.max_age):
                self.trackers.pop(i)
            
        if(len(ret) > 0):
            return np.concatenate(ret)
        return np.empty((0,5))


#### Kalman-Filter Class-Definition

In [72]:
class KalmanBoxTracker(object):
    count = 0
    def __init__(self, bbox):
        # x= [u, v, s, r, du, dv, ds] bounding box position (u, v), scale s, ratio r, change du, dv, ds (ratio assumed to be constant)
        self.kf = KalmanFilter(dim_x=7, dim_z=4) 
        
        # transition function [u  v  s  r  du dv ds] -> [u+du, v+dv, s+ds, r, du, dv, ds]
        self.kf.F = np.array([[1, 0, 0, 0, 1, 0, 0],
                              [0, 1, 0, 0, 0, 1, 0],
                              [0, 0, 1, 0, 0, 0, 1],
                              [0, 0, 0, 1, 0, 0, 0],
                              [0, 0, 0, 0, 1, 0, 0],
                              [0, 0, 0, 0, 0, 1, 0],
                              [0, 0, 0, 0, 0, 0, 1]])
        
        # measurement function[u  v  s  r  du dv ds] -> [u, v, s, r]
        self.kf.H = np.array([[1, 0, 0, 0, 0, 0, 0],
                              [0, 1, 0, 0, 0, 0, 0],
                              [0, 0, 1, 0, 0, 0, 0],
                              [0, 0, 0, 1, 0, 0, 0]])
        
        # measurement variance
        self.kf.R = np.array([[ 1., 0.,  0., 0.],
                              [ 0., 1.,  0., 0.],
                              [ 0., 0., 10., 0.],
                              [ 0., 0., 0., 10.]])
        
        # state variance with high uncertainty to the unobservable initial velocities
        self.kf.P =  np.array(
                      [[10.,   0.,   0.,     0.,     0.,     0.,     0.],
                      [ 0.,  10.,   0.,     0.,     0.,     0.,     0.],
                      [ 0.,   0.,  10.,     0.,     0.,     0.,     0.],
                      [ 0.,   0.,   0.,    10.,     0.,     0.,     0.],
                      [ 0.,   0.,   0.,     0., 10000.,     0.,     0.],
                      [ 0.,   0.,   0.,     0.,     0., 10000.,     0.],
                      [ 0.,   0.,   0.,     0.,     0.,     0., 10000.]])
         
        
        # process covariance
        self.kf.Q[-1, -1] *= 0.01
        self.kf.Q[4:, 4:] *= 0.01
        
        # initial state x0 = [u0, v0, s0, r0, 0, 0, 0]
        self.kf.x[:4] = convert_bbox_to_z(bbox) ## Convert BoundingBox [x1, y1, x2, y2] to [u, v, s, r]
        
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        
        self.time_since_update = 0        ## measures time between predictions and update
        self.history = []                 ## contains predicted bboxes [x1, y1, x2, y2] without update (gets cleared when updated)
        self.hits = 0                     ## number of hits (increases with every update)
        self.hit_streak = 0               ## number of hits in a series / streak (reset when prediction without update)
        self.age = 0                      ## increases with each prediction
        
    def predict(self):
        ## Check if scale gets non-positive
        if((self.kf.x[6] + self.kf.x[2]) <= 0):
            self.kf.x[6] *= 0.0
        
        self.kf.predict()
        self.age += 1
        
        if(self.time_since_update>0):
            self.hit_streak = 0
        self.time_since_update += 1
        
        self.history.append(convert_x_to_bbox(self.kf.x))
        return self.history[-1]  ## returns prediction as bbox [x1, y1, x2, y2]
    
    def update(self, bbox):   # takes bbox [x1, y1, x2, y2]
        self.time_since_update = 0
        self.history = []
        self.hits += 1
        self.hit_streak += 1
        self.kf.update(convert_bbox_to_z(bbox))
        
    def get_state(self):
        return convert_x_to_bbox(self.kf.x)        
        

## Experiments

#### Linear Sum Assignment

In [None]:
bb1 = np.array([[0., 0, 4, 4], [5, 4, 7, 5], [0, 0, 6, 5]])
bb2 = np.array([[1., 1., 3, 3], [0, 0, 8, 2.], [5, 4, 6, 5]])


#### Bounding Box Matches

In [7]:
bb1 = np.array([[0., 0, 4, 4], [5, 4, 7, 5], [0, 0, 6, 5]])
bb2 = np.array([[1., 1., 3, 3], [0, 0, 8, 2.], [5, 4, 6, 5]])
iou_matrix = iou_batch(bb1, bb2); iou_matrix

array([[0.25      , 0.33333333, 0.        ],
       [0.        , 0.        , 0.5       ],
       [0.13333333, 0.35294118, 0.03333333]])

In [8]:
matched_indices = linear_assignment(-iou_matrix); matched_indices

array([[0, 0],
       [1, 2],
       [2, 1]])

In [9]:
matched_indices[:,0]

array([0, 1, 2])

In [10]:
matched_indices[0].reshape(1, 2)

array([[0, 0]])

In [11]:
for m in matched_indices:
    print(min(m.shape))

2
2
2


In [12]:
np.stack(np.where(a), axis=1)

NameError: name 'a' is not defined

#### Kalman Filter

In [71]:
kf = KalmanBoxTracker([0, 0, 2, 1]); kf.kf

KalmanFilter object
dim_x = 7
dim_z = 4
dim_u = 0
x = [[1.  0.5 2.  2.  0.  0.  0. ]].T
P = [[   10.     0.     0.     0.     0.     0.     0.]
     [    0.    10.     0.     0.     0.     0.     0.]
     [    0.     0.    10.     0.     0.     0.     0.]
     [    0.     0.     0.    10.     0.     0.     0.]
     [    0.     0.     0.     0. 10000.     0.     0.]
     [    0.     0.     0.     0.     0. 10000.     0.]
     [    0.     0.     0.     0.     0.     0. 10000.]]
x_prior = [[0. 0. 0. 0. 0. 0. 0.]].T
P_prior = [[1. 0. 0. 0. 0. 0. 0.]
           [0. 1. 0. 0. 0. 0. 0.]
           [0. 0. 1. 0. 0. 0. 0.]
           [0. 0. 0. 1. 0. 0. 0.]
           [0. 0. 0. 0. 1. 0. 0.]
           [0. 0. 0. 0. 0. 1. 0.]
           [0. 0. 0. 0. 0. 0. 1.]]
x_post = [[0. 0. 0. 0. 0. 0. 0.]].T
P_post = [[1. 0. 0. 0. 0. 0. 0.]
          [0. 1. 0. 0. 0. 0. 0.]
          [0. 0. 1. 0. 0. 0. 0.]
          [0. 0. 0. 1. 0. 0. 0.]
          [0. 0. 0. 0. 1. 0. 0.]
          [0. 0. 0. 0. 0. 1. 0.]
         

In [63]:
kf.kf.P

[[10.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
 [0.0, 10.0, 0.0, 0.0, 0.0, 0.0, 0.0],
 [0.0, 0.0, 10.0, 0.0, 0.0, 0.0, 0.0],
 [0.0, 0.0, 0.0, 10.0, 0.0, 0.0, 0.0],
 [0.0, 0.0, 0.0, 0.0, 10000.0, 0.0, 0.0],
 [0.0, 0.0, 0.0, 0.0, 0.0, 10000.0, 0.0],
 [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 10000.0]]

In [66]:
kf2 = KalmanFilter(dim_x=7, dim_z=4); kf2.P

array([[1., 0., 0., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0., 0., 0.],
       [0., 0., 1., 0., 0., 0., 0.],
       [0., 0., 0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 1., 0., 0.],
       [0., 0., 0., 0., 0., 1., 0.],
       [0., 0., 0., 0., 0., 0., 1.]])

In [67]:
type(kf2.P)

numpy.ndarray

In [69]:
type(kf.kf.P)

list

In [34]:
kf.get_state()

array([[0., 0., 2., 1.]])

In [35]:
kf.predict()

array([[0., 0., 2., 1.]])

#### Masked Arrays

In [39]:
x = np.arange(12, dtype=float).reshape(3, 4); x

array([[ 0.,  1.,  2.,  3.],
       [ 4.,  5.,  6.,  7.],
       [ 8.,  9., 10., 11.]])

In [40]:
x[1, 1] = np.nan

In [42]:
x = np.ma.masked_invalid(x); x

masked_array(
  data=[[0.0, 1.0, 2.0, 3.0],
        [4.0, --, 6.0, 7.0],
        [8.0, 9.0, 10.0, 11.0]],
  mask=[[False, False, False, False],
        [False,  True, False, False],
        [False, False, False, False]],
  fill_value=1e+20)

In [43]:
x = np.ma.compress_rows(x); x

array([[ 0.,  1.,  2.,  3.],
       [ 8.,  9., 10., 11.]])

#### Sandbox

In [60]:
if(
    (True) and   # comment
    (False or 
     False)
):
    print('true')
else:
    print('false')

false


In [None]:
np.concatenate()