In [None]:
!pip install ultralytics > null
!pip install filterpy > null

In [None]:
import os
import numpy as np
from skimage import io
import glob
import time
import argparse
from filterpy.kalman import KalmanFilter
import cv2
from ultralytics import YOLO
import tensorflow as tf
from scipy.optimize import linear_sum_assignment
import random

In [None]:
from collections import deque

In [None]:
np.random.seed(0)

In [None]:
def linear_assignment(cost_matrix):
  x, y = linear_sum_assignment(cost_matrix)
  return np.array(list(zip(x, y)))

In [None]:
def iou_batch(bb_test, bb_gt):

  bb_gt = np.expand_dims(bb_gt, 0)
  bb_test = np.expand_dims(bb_test, 1)

  xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
  yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
  xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
  yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
  w = np.maximum(0., xx2 - xx1)
  h = np.maximum(0., yy2 - yy1)
  wh = w * h
  o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1])
    + (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh)
  return(o)

In [None]:
def convert_bbox_to_z(bbox):

  w = bbox[2] - bbox[0]
  h = bbox[3] - bbox[1]
  x = bbox[0] + w/2.
  y = bbox[1] + h/2.
  s = w * h    #scale is just area
  r = w / float(h)
  return np.array([x, y, s, r]).reshape((4, 1))

In [None]:
def convert_x_to_bbox(x,score=None):

  w = np.sqrt(x[2] * x[3])
  h = x[2] / w
  if(score==None):
    return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.]).reshape((1,4))
  else:
    return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.,score]).reshape((1,5))


In [None]:
class KalmanBoxTracker(object):
  count = 0
  def __init__(self,bbox):

    #define constant velocity model
    self.kf = KalmanFilter(dim_x=7, dim_z=4)
    self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],[0,0,0,1,0,0,0],  [0,0,0,0,1,0,0],[0,0,0,0,0,1,0],[0,0,0,0,0,0,1]])
    self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],[0,0,1,0,0,0,0],[0,0,0,1,0,0,0]])

    self.kf.R[2:,2:] *= 10.
    self.kf.P[4:,4:] *= 1000. #give high uncertainty to the unobservable initial velocities
    self.kf.P *= 10.
    self.kf.Q[-1,-1] *= 0.01
    self.kf.Q[4:,4:] *= 0.01

    self.kf.x[:4] = convert_bbox_to_z(bbox)
    self.time_since_update = 0
    self.id = KalmanBoxTracker.count
    KalmanBoxTracker.count += 1
    self.history = []
    self.hits = 0
    self.hit_streak = 0
    self.age = 0

  def update(self,bbox):

    self.time_since_update = 0
    self.history = []
    self.hits += 1
    self.hit_streak += 1
    self.kf.update(convert_bbox_to_z(bbox))

  def predict(self):

    if((self.kf.x[6]+self.kf.x[2])<=0):
      self.kf.x[6] *= 0.0
    self.kf.predict()
    self.age += 1
    if(self.time_since_update>0):
      self.hit_streak = 0
    self.time_since_update += 1
    self.history.append(convert_x_to_bbox(self.kf.x))
    return self.history[-1]

  def get_state(self):
    return convert_x_to_bbox(self.kf.x)


In [None]:
def associate_detections_to_trackers(detections,trackers,iou_threshold = 0.3):

  if(len(trackers)==0):
    return np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,5),dtype=int)

  iou_matrix = iou_batch(detections, trackers)

  if min(iou_matrix.shape) > 0:
    a = (iou_matrix > iou_threshold).astype(np.int32)
    if a.sum(1).max() == 1 and a.sum(0).max() == 1:
        matched_indices = np.stack(np.where(a), axis=1)
    else:
      matched_indices = linear_assignment(-iou_matrix)
  else:
    matched_indices = np.empty(shape=(0,2))

  unmatched_detections = []
  for d, det in enumerate(detections):
    if(d not in matched_indices[:,0]):
      unmatched_detections.append(d)
  unmatched_trackers = []
  for t, trk in enumerate(trackers):
    if(t not in matched_indices[:,1]):
      unmatched_trackers.append(t)

  #filter out matched with low IOU
  matches = []
  for m in matched_indices:
    if(iou_matrix[m[0], m[1]]<iou_threshold):
      unmatched_detections.append(m[0])
      unmatched_trackers.append(m[1])
    else:
      matches.append(m.reshape(1,2))
  if(len(matches)==0):
    matches = np.empty((0,2),dtype=int)
  else:
    matches = np.concatenate(matches,axis=0)

  return matches, np.array(unmatched_detections), np.array(unmatched_trackers)


In [None]:
class Sort(object):
  def __init__(self, max_age=1, min_hits=3, iou_threshold=0.3):

    self.max_age = max_age
    self.min_hits = min_hits
    self.iou_threshold = iou_threshold
    self.trackers = []
    self.frame_count = 0

  def update(self, dets=np.empty((0, 5))):

    self.frame_count += 1
    # get predicted locations from existing trackers.
    trks = np.zeros((len(self.trackers), 5))
    to_del = []
    ret = []
    for t, trk in enumerate(trks):
      pos = self.trackers[t].predict()[0]
      trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
      if np.any(np.isnan(pos)):
        to_del.append(t)
    trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
    for t in reversed(to_del):
      self.trackers.pop(t)
    matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets,trks, self.iou_threshold)

    # update matched trackers with assigned detections
    for m in matched:
      self.trackers[m[1]].update(dets[m[0], :])

    # create and initialise new trackers for unmatched detections
    for i in unmatched_dets:
        trk = KalmanBoxTracker(dets[i,:])
        self.trackers.append(trk)
    i = len(self.trackers)
    for trk in reversed(self.trackers):
        d = trk.get_state()[0]
        if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
          ret.append(np.concatenate((d,[trk.id+1])).reshape(1,-1)) # +1 as MOT benchmark requires positive
        i -= 1
        # remove dead tracklet
        if(trk.time_since_update > self.max_age):
          self.trackers.pop(i)
    if(len(ret)>0):
      return np.concatenate(ret)
    return np.empty((0,5))

In [None]:
namefile = "MOT1614.mp4"
markfile = "stk"
model = YOLO("yolov8n.pt")
tracker = Sort()
colors = [(random.randint(0, 255), random.randint(0, 255),
           random.randint(0, 255)) for _ in range(10)]
detection_threshold = 0.37254
cap = cv2.VideoCapture(namefile)
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'MP4V') #codec
out = cv2.VideoWriter(markfile+namefile, fourcc, 20.0, (width,  height))
counter=0
pts = [deque(maxlen=30) for _ in range(10000)]
while cap.isOpened():

  ret, frame = cap.read()
  if not ret:
    print("Fineshed ...")
    break

  results = model(frame)

  for result in results:
    detections = []
    for r in result.boxes.data.tolist():
      x1, y1, x2, y2, score, class_id = r
      x1 = int(x1)
      x2 = int(x2)
      y1 = int(y1)
      y2 = int(y2)
      if score > detection_threshold:
          detections.append([x1, y1, x2, y2])

    tracks = tracker.update(np.array(detections))
    tracks = tracks.astype(int)

    for x1, y1, x2, y2, track_id in tracks:
      frame = cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)),
       (colors[track_id % len(colors)]), 3)
      name = f"Id: {track_id}"
      frame = cv2.rectangle(frame, (int(x1), int(y1-30)),
       (int(x1)+(len(name))*17,int(y1)), (colors[track_id % len(colors)]), -1)
      frame = cv2.putText(frame, name, (int(x1), int(y1-10)), 0, 0.75,
       (255, 255, 255), 2)
      frame = cv2.rectangle(frame, (0, 0), (260, 40), (45,232,101), -1)
      frame = cv2.putText(frame, "SORT, Tracking ", (5, 25), 2, 0.75,
                        (255, 255, 255 ), 2)
      #MOT Exercise, trayectory history
      center = (int(((x1+x2)/2)),int(((y1+y2)/2)))
      pts[track_id].append(center)
      #MOT Exercise, trayectory history
      for j in range(1, len(pts[track_id])):
        if pts[track_id][j-1] is None or pts[track_id][j] is None:
          continue
        thickness = int(np.sqrt(64/float(j+1))*2)
        frame = cv2.line(frame, (pts[track_id][j-1]), (pts[track_id][j]),
                                colors[track_id % len(colors)], thickness)

  counter+=1
  # if counter == 440:
  #     break
  print("Progress: ",counter,"/",length)
  out.write(frame)

cap.release()
out.release()
cv2.destroyAllWindows()

In [None]:
cap = cv2.VideoCapture(markfile+namefile)
framewant = 28
counter = 0
while cap.isOpened():

  ret, frame = cap.read()
  if not ret:
    print("Fineshed ...")
    break
  cv2.imwrite(markfile+namefile[:-4]+'frame'+str(framewant)+'.jpg',frame)
  counter+=1
  if counter == framewant:
      break
  print("Progress: ",counter,"/",length)

cap.release()
cv2.destroyAllWindows()