# Simple Online Realtime Tracking (SORT)

### Visión Artificial. GITT-GIEC
#### Luis M. Bergasa, Carlos Gómez Huélamo. Department of Electronics. University of Alcalá. Spain

## Goal

The goal of this practice is to use SORT algorithm to track multiple objects across images in order to associate them with a unique identity. As detector we use YOLO algorithm than next to SORT will allow the correct tracking of the objects.

## Preliminaries

Before designing the system, the corresponding libraries to be used must be imported.

In [2]:
from __future__ import print_function

import os
import numpy as np
import matplotlib
matplotlib.use('Agg') # In order to avoid: RuntimeError: main thread is not in main loop exception
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from skimage import io

import glob
import time
import argparse
from filterpy.kalman import KalmanFilter
import cv2

from ultralytics import YOLO

np.random.seed(0)

  from .autonotebook import tqdm as notebook_tqdm


## SORT Foundations 

SORT is an algorithm to object tracking where classical approaches like Kalman filters and Hungarian methods are used to track objects better than many online trackers. SORT is made of 4 key components which are as follows:

- **Detection**: This is the first step in the tracking module. In this step, an object detector detects the objects in the frame that are to be tracked. These detections are then passed on to the next step. In this practice we use YOLO (You Only Look Once) because it is an efficient state-of-the-art open source algorithm for real-time object detection, which makes use of a single convolutional neural network to detect objects in images.

- **Estimation**: In this step, the detections are propagated from the current frame to the next, which is estimating the position of the target in the next frame using a constant velocity model. When a detection is associated with a target, the detected bounding box is used to update the target state where the velocity components are optimally solved via the Kalman filter framework.

- **Data association**: We now have the target bounding box and the detected bounding box. So, a cost matrix is computed as the intersection-over-union (IOU) distance between each detection and all predicted bounding boxes from the existing targets. The assignment is solved optimally using the Hungarian algorithm. If the IOU of detection and target is less than a certain threshold value called IOUmin then that assignment is rejected. This technique solves the occlusion problem and helps maintain the IDs.

- **Creation and Deletion of Track Identities**: This module is responsible for the creation and deletion of IDs. Unique identities are created and destroyed according to the `iou_Th`. If the overlap of detection and target is less than  `iou_Th` then it signifies the untracked object. Tracks are created when the number of consecutive detections are higher than `min_hits`. Tracks are terminated if they are not detected for `max_age` frames. If an object reappears after `max_age` frames, tracking will implicitly resume under a new identity.

## SORT without detection

In this section we are going to focus only on tracking. To do this, the detections are given directly in a `det.txt` file found in the `\det` directory of the `KITTI-17` folder.

Taking into account that the state vector and the measurement vector of the tracker is:

$$\mathbf{x} = 
\begin{bmatrix}x & y & s & r & \dot{x}  & \dot{y} & \dot{s}\end{bmatrix}^\mathsf{T}$$
$$\mathbf{z} = 
\begin{bmatrix}x & y & s & r\end{bmatrix}^\mathsf{T}$$


1. Indicate the meaning of each of the components of the state vector

2. Complete the state transition matrix **F** and the observation matrix **H**.

3. Run the code shown below

4. Obtain the value of the following matrices: **P**, **Q**, **R**


In [3]:
def linear_assignment(cost_matrix):
  try:
    import lap
    _, x, y = lap.lapjv(cost_matrix, extend_cost=True)
    return np.array([[y[i],i] for i in x if i >= 0]) #
  except ImportError:
    from scipy.optimize import linear_sum_assignment
    x, y = linear_sum_assignment(cost_matrix)
    return np.array(list(zip(x, y)))


def iou_batch(bb_test, bb_gt):
  """
  From SORT: Computes IOU between two bboxes in the form [x1,y1,x2,y2]
  """
  bb_gt = np.expand_dims(bb_gt, 0)
  bb_test = np.expand_dims(bb_test, 1)
  
  xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
  yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
  xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
  yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
  w = np.maximum(0., xx2 - xx1)
  h = np.maximum(0., yy2 - yy1)
  wh = w * h
  o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1])                                      
    + (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh)                                              
  return(o)  


def convert_bbox_to_z(bbox):
  """
  Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form
    [x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is
    the aspect ratio
  """
  w = bbox[2] - bbox[0]
  h = bbox[3] - bbox[1]
  x = bbox[0] + w/2.
  y = bbox[1] + h/2.
  s = w * h    #scale is just area
  r = w / float(h)
  return np.array([x, y, s, r]).reshape((4, 1))


def convert_x_to_bbox(x,score=None):
  """
  Takes a bounding box in the centre form [x,y,s,r] and returns it in the form
    [x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right
  """
  w = np.sqrt(x[2] * x[3])
  h = x[2] / w
  if(score==None):
    return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.]).reshape((1,4))
  else:
    return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.,score]).reshape((1,5))


class KalmanBoxTracker(object):
  """
  This class represents the internal state of individual tracked objects observed as bbox.
  """
  count = 0
  def __init__(self,bbox):
    """
    Initialises a tracker using initial bounding box.
    """
    #define constant velocity model
    self.kf = KalmanFilter(dim_x=7, dim_z=4) 
    self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],[0,0,0,1,0,0,0],  [0,0,0,0,1,0,0],[0,0,0,0,0,1,0],[0,0,0,0,0,0,1]])
    self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],[0,0,1,0,0,0,0],[0,0,0,1,0,0,0]])

    self.kf.R[2:,2:] *= 10.
    self.kf.P[4:,4:] *= 1000. #give high uncertainty to the unobservable initial velocities
    self.kf.P *= 10.
    self.kf.Q[-1,-1] *= 0.01
    self.kf.Q[4:,4:] *= 0.01

    self.kf.x[:4] = convert_bbox_to_z(bbox)
    self.time_since_update = 0
    self.id = KalmanBoxTracker.count
    KalmanBoxTracker.count += 1
    self.history = []
    self.hits = 0
    self.hit_streak = 0
    self.age = 0

  def update(self,bbox):
    """
    Updates the state vector with observed bbox.
    """
    self.time_since_update = 0
    self.history = []
    self.hits += 1
    self.hit_streak += 1
    self.kf.update(convert_bbox_to_z(bbox))

  def predict(self):
    """
    Advances the state vector and returns the predicted bounding box estimate.
    """
    if((self.kf.x[6]+self.kf.x[2])<=0):
      self.kf.x[6] *= 0.0
    self.kf.predict()
    self.age += 1
    if(self.time_since_update>0):
      self.hit_streak = 0
    self.time_since_update += 1
    self.history.append(convert_x_to_bbox(self.kf.x))
    return self.history[-1]

  def get_state(self):
    """
    Returns the current bounding box estimate.
    """
    return convert_x_to_bbox(self.kf.x)


def associate_detections_to_trackers(detections,trackers,frame,iou_threshold = 0.3):
  """
  Assigns detections to tracked object (both represented as bounding boxes)

  Returns 3 lists of matches, unmatched_detections and unmatched_trackers
  """
  if(len(trackers)==0):
    return np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,5),dtype=int)

  iou_matrix = iou_batch(detections, trackers)
  # Show the Association matrix for the frame 100
  if frame == 100:
      print("Print Association matrix: ", iou_matrix)

  if min(iou_matrix.shape) > 0:
    a = (iou_matrix > iou_threshold).astype(np.int32)
    if a.sum(1).max() == 1 and a.sum(0).max() == 1:
        matched_indices = np.stack(np.where(a), axis=1)
    else:
      matched_indices = linear_assignment(-iou_matrix)
  else:
    matched_indices = np.empty(shape=(0,2))

  unmatched_detections = []
  for d, det in enumerate(detections):
    if(d not in matched_indices[:,0]):
      unmatched_detections.append(d)
  unmatched_trackers = []
  for t, trk in enumerate(trackers):
    if(t not in matched_indices[:,1]):
      unmatched_trackers.append(t)

  #filter out matched with low IOU
  matches = []
  for m in matched_indices:
    if(iou_matrix[m[0], m[1]]<iou_threshold):
      unmatched_detections.append(m[0])
      unmatched_trackers.append(m[1])
    else:
      matches.append(m.reshape(1,2))
  if(len(matches)==0):
    matches = np.empty((0,2),dtype=int)
  else:
    matches = np.concatenate(matches,axis=0)

  return matches, np.array(unmatched_detections), np.array(unmatched_trackers)


class Sort(object):
  def __init__(self, max_age=1, min_hits=3, iou_threshold=0.3):
    """
    Sets key parameters for SORT
    """
    self.max_age = max_age
    self.min_hits = min_hits
    self.iou_threshold = iou_threshold
    self.trackers = []
    self.frame_count = 0

  def update(self, dets=np.empty((0, 5))):
    """
    Params:
      dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]
    Requires: this method must be called once for each frame even with empty detections (use np.empty((0, 5)) for frames without detections).
    Returns the a similar array, where the last column is the object ID.

    NOTE: The number of objects returned may differ from the number of detections provided.
    """
    self.frame_count += 1
    # get predicted locations from existing trackers.
    trks = np.zeros((len(self.trackers), 5))
    to_del = []
    ret = []
    for t, trk in enumerate(trks):
      pos = self.trackers[t].predict()[0]
      trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
      if np.any(np.isnan(pos)):
        to_del.append(t)
    trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
    for t in reversed(to_del):
      self.trackers.pop(t)
      
    matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets, trks, self.frame_count, self.iou_threshold)

    # update matched trackers with assigned detections
    for m in matched:
      self.trackers[m[1]].update(dets[m[0], :])

    # create and initialise new trackers for unmatched detections
    for i in unmatched_dets:
        trk = KalmanBoxTracker(dets[i,:])
        self.trackers.append(trk)
    i = len(self.trackers)
    for trk in reversed(self.trackers):
        d = trk.get_state()[0]
        if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
          ret.append(np.concatenate((d,[trk.id+1])).reshape(1,-1)) # +1 as MOT benchmark requires positive
        i -= 1
        # remove dead tracklet
        if(trk.time_since_update > self.max_age):
          self.trackers.pop(i)
    if(len(ret)>0):
      return np.concatenate(ret)
    return np.empty((0,5))

def parse_args():
    """Parse input arguments."""
    parser = argparse.ArgumentParser(description='SORT demo')
    parser.add_argument('--display', dest='display', help='Display online tracker output (slow) [False]',action='store_true', default=True)
    parser.add_argument("--seq_path", help="Path to detections.", type=str, default='mot_benchmark')
    parser.add_argument("--phase", help="Subdirectory in seq_path.", type=str, default='train')
    parser.add_argument("--max_age", 
                        help="Maximum number of frames to keep alive a track without associated detections.", 
                        type=int, default=5)
    parser.add_argument("--min_hits", 
                        help="Minimum number of associated detections before track is initialised.", 
                        type=int, default=3)
    parser.add_argument("--iou_threshold", help="Minimum IOU for match.", type=float, default=0.3)
    args = parser.parse_args(args=[])
    return args

In [6]:
############################################################################################
SAVE_VIDEO = True # If True, save the qualitative results as a video file. Otherwise, plot in real-time
USE_OWN_DETECTIONS = False # Control flaw for using our own objects detector
############################################################################################

args = parse_args()
display = args.display
phase = args.phase
total_time = 0.0
total_frames = 0
colours = np.random.rand(32, 3) #used only for display

output_resolution = (864,576)

if(display):
  if not os.path.exists('mot_benchmark'):
    print('\n\tERROR: mot_benchmark link not found!\n\n    Create a symbolic link to the MOT benchmark\n    (https://motchallenge.net/data/2D_MOT_2015/#download). E.g.:\n\n    $ ln -s /path/to/MOT2015_challenge/2DMOT2015 mot_benchmark\n\n')
    exit()
  plt.ion()
  fig = plt.figure()
  ax1 = fig.add_subplot(111, aspect='equal')

if not os.path.exists('output'):
  os.makedirs('output')
pattern = os.path.join(args.seq_path, phase, '*', 'det', 'det.txt')

# Initialize detector if required

if USE_OWN_DETECTIONS:
  detection_model = YOLO("yolov8n.pt")  # load a pretrained model (recommended for training)
  
for seq_dets_fn in glob.glob(pattern):
  if SAVE_VIDEO:
    if USE_OWN_DETECTIONS: detections = "YOLO-detections"
    else: detections = "default-detections"
    video_writer = cv2.VideoWriter(f'output/KITTI-17-qualitative-results-{detections}.avi',cv2.VideoWriter_fourcc(*'XVID'),20.0,output_resolution)
    
  # Call to the tracker
  mot_tracker = Sort(max_age=args.max_age, 
                      min_hits=args.min_hits,
                      iou_threshold=args.iou_threshold) #create instance of the SORT tracker
  seq_dets = np.loadtxt(seq_dets_fn, delimiter=',')
  seq = seq_dets_fn[pattern.find('*'):].split(os.path.sep)[0]

  print(f"Analyzing {seq}")

  with open(os.path.join('output', '%s.txt'%(seq)),'w') as out_file:
    print("Processing %s."%(seq))
    for frame in range(int(seq_dets[:,0].max())):
      if frame % 20 == 0:
        print("Files remaining: ", int(seq_dets[:,0].max()) - frame)

      frame += 1 #detection and frame numbers begin at 1
      if not USE_OWN_DETECTIONS: # Detections come from a file
        dets = seq_dets[seq_dets[:, 0]==frame, 2:7]
        dets[:, 2:4] += dets[:, 0:2] #convert to [x1,y1,w,h] to [x1,y1,x2,y2]
      total_frames += 1

      if(display):
        fn = os.path.join('mot_benchmark', phase, seq, 'img1', '%06d.jpg'%(frame))
        im =io.imread(fn)
        ax1.imshow(im)
        plt.title(seq + ' Tracked Targets')

      if USE_OWN_DETECTIONS: # Detections come from YOLO
        detection_results = detection_model(fn, verbose=False)[0] # 0 since batch_size = 1 in this case
        
        # OBS: Set verbose to True if you want to activate the detector output info
        # detection_results is an object derived from the class 'ultralytics.yolo.engine.results.Results'
        # Attributes:
          # orig_img (numpy.ndarray): The original image as a numpy array.
          # orig_shape (tuple): The original image shape in (height, width) format.
          # boxes (Boxes, optional): A Boxes object containing the detection bounding boxes.
          # masks (Masks, optional): A Masks object containing the detection masks.
          # probs (numpy.ndarray, optional): A 2D numpy array of detection probabilities for each class.
          # names (dict): A dictionary of class names.
          # path (str): The path to the image file.
          # keypoints (List[List[float]], optional): A list of detected keypoints for each object.
          # speed (dict): A dictionary of preprocess, inference and postprocess speeds in milliseconds per image.
          # _keys (tuple): A tuple of attribute names for non-empty attributes.
          
        # 1. Get coordinates (boxes have different formats: xywh, xywhn, xyxy, xyxyn (n means normalized)). 
        # Since in this lesson we require the xy coordinates of the top-left and bottom-right corner -> xyxy

        dets = detection_results.boxes.xyxy.cpu().detach().numpy()
        
        # 2. In this lesson we will only track pedestrians. To check the classes of our DL model (YOLOv8 with this configuration):
        
        # detection_results.names -> You can check 0 is person
        
        # 2.1. Get object types:
        
        object_types = detection_results.boxes.cls.cpu().detach().numpy()
        
        # 2.2. Filter 2D bounding boxes coordinates according to our class of interest (Pedestrian == 0)
        
        type_of_interest_id = 0 # Introduce Pedestrian ID
        dets = dets[object_types == type_of_interest_id,:] # Chose the files corresponding with Pedestrians

      start_time = time.time()
      trackers = mot_tracker.update(dets)
      cycle_time = time.time() - start_time
      total_time += cycle_time

      for d in trackers:
        print('%d,%d,%.2f,%.2f,%.2f,%.2f,1,-1,-1,-1'%(frame,d[4],d[0],d[1],d[2]-d[0],d[3]-d[1]),file=out_file)
        if(display):
          d = d.astype(np.int32)
          tracker_colour = colours[d[4]%32,:]
          ax1.add_patch(patches.Rectangle((d[0],d[1]),d[2]-d[0],d[3]-d[1],fill=False,lw=3,ec=tracker_colour))
          label = 'ID %0d'%d[4].astype(int) # Generate a label with the tracker ID
          ax1.text(d[0]+20,d[1],label,color=tracker_colour) # Show the label for each box using ax1.text(). Hint: use tracker_color

      fig.canvas.draw()

      # Now we can save it to a numpy array.
      data = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
      data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,))
      
      if display:
        if SAVE_VIDEO:
          data = cv2.resize(data,output_resolution, interpolation = cv2.INTER_AREA)
          video_writer.write(data)
        else:
          data1 = cv2.resize(data,output_resolution, interpolation = cv2.INTER_AREA) # To see better the image
          cv2.imshow("Output Image", data1)
          ax1.cla()
          
          # Exit if ESC pressed
          k = cv2.waitKey(1) & 0xff
          if k == 27 : break
          
        fig.canvas.flush_events()
        plt.draw()
        ax1.cla()

cv2.destroyAllWindows()

print("Total Tracking took: %.3f seconds for %d frames or %.1f FPS" % (total_time, total_frames, total_frames / total_time))

if(display):
  print("Note: to get real runtime results run without the option: --display")

Analyzing KITTI-17
Processing KITTI-17.
Files remaining:  145
Files remaining:  125
Files remaining:  105
Files remaining:  85
Files remaining:  65
Print Association matrix:  [[   0.069304     0.79653           0           0           0]
 [          0           0     0.87982           0           0]
 [          0           0           0     0.89269           0]
 [    0.83658     0.04047           0           0           0]]
Files remaining:  45
Files remaining:  25
Files remaining:  5
Total Tracking took: 0.147 seconds for 145 frames or 983.3 FPS
Note: to get real runtime results run without the option: --display


## Additional Questions

**1-Which value of dt should be selected in this type of system for the Kalman fi

Run the code shown below, where the following state vector has been used:


Specify default values for iou_th, max_age and min_hits

Modify the value of iou_th = 0.7 and observe the results obtained
Modify the value of min_hits = 10 and observe the results obtained
Modify the value of max_age = 10 and observe the results obtained
Add a text to each box that includes the tracker identifier number


## YOLOv8 implementation

The following code cell contains the heart of YOLO, meaning that it includes both the YOLO feature extractor and its base class, which in turn defines the initialisation, weight loading and prediction methods.

In [8]:
from ultralytics import YOLO

# Load a model
model = YOLO("yolov8n.yaml")  # build a new model from scratch
model = YOLO("yolov8n.pt")  # load a pretrained model (recommended for training)

# Use the model
model.train(data="coco128.yaml", epochs=3)  # train the model
metrics = model.val()  # evaluate model performance on the validation set
# results = model("https://ultralytics.com/images/bus.jpg")  # predict on an image
# success = model.export(format="onnx")  # export the model to ONNX format
# results = model("mot_benchmark/train/KITTI-17/img1/000001.jpg")


                   from  n    params  module                                       arguments                     
  0                  -1  1       464  ultralytics.nn.modules.Conv                  [3, 16, 3, 2]                 
  1                  -1  1      4672  ultralytics.nn.modules.Conv                  [16, 32, 3, 2]                
  2                  -1  1      7360  ultralytics.nn.modules.C2f                   [32, 32, 1, True]             
  3                  -1  1     18560  ultralytics.nn.modules.Conv                  [32, 64, 3, 2]                
  4                  -1  2     49664  ultralytics.nn.modules.C2f                   [64, 64, 2, True]             
  5                  -1  1     73984  ultralytics.nn.modules.Conv                  [64, 128, 3, 2]               
  6                  -1  2    197632  ultralytics.nn.modules.C2f                   [128, 128, 2, True]           
  7                  -1  1    295424  ultralytics.nn.modules.Conv                  [128

## SORT with YOLO 

In [5]:
# Switch the control flaw USE_OWN_DETECTIONS to True in the previous code and compare the obtained results