In [1]:
# to use YOLOv8
! pip install ultralytics
! pip install deep-sort-realtime

# needed libraries
! pip install torch torchvision torchaudio
! pip install opencv-python
! pip install Cython

^C


In [3]:
"""
    SORT: A Simple, Online and Realtime Tracker
    Copyright (C) 2016-2020 Alex Bewley alex@bewley.ai

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
from __future__ import print_function

import os
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from skimage import io

import glob
import time
import argparse
from filterpy.kalman import KalmanFilter

np.random.seed(0)


def linear_assignment(cost_matrix):
  try:
    import lap
    _, x, y = lap.lapjv(cost_matrix, extend_cost=True)
    return np.array([[y[i],i] for i in x if i >= 0]) #
  except ImportError:
    from scipy.optimize import linear_sum_assignment
    x, y = linear_sum_assignment(cost_matrix)
    return np.array(list(zip(x, y)))


def iou_batch(bb_test, bb_gt):
  """
  From SORT: Computes IOU between two bboxes in the form [x1,y1,x2,y2]
  """
  bb_gt = np.expand_dims(bb_gt, 0)
  bb_test = np.expand_dims(bb_test, 1)
  
  xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
  yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
  xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
  yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
  w = np.maximum(0., xx2 - xx1)
  h = np.maximum(0., yy2 - yy1)
  wh = w * h
  o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1])                                      
    + (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh)                                              
  return(o)  


def convert_bbox_to_z(bbox):
  """
  Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form
    [x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is
    the aspect ratio
  """
  w = bbox[2] - bbox[0]
  h = bbox[3] - bbox[1]
  x = bbox[0] + w/2.
  y = bbox[1] + h/2.
  s = w * h    #scale is just area
  r = w / float(h)
  return np.array([x, y, s, r]).reshape((4, 1))


def convert_x_to_bbox(x,score=None):
  """
  Takes a bounding box in the centre form [x,y,s,r] and returns it in the form
    [x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right
  """
  w = np.sqrt(x[2] * x[3])
  h = x[2] / w
  if(score==None):
    return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.]).reshape((1,4))
  else:
    return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.,score]).reshape((1,5))


class KalmanBoxTracker(object):
  """
  This class represents the internal state of individual tracked objects observed as bbox.
  """
  count = 0
  def __init__(self,bbox):
    """
    Initialises a tracker using initial bounding box.
    """
    #define constant velocity model
    self.kf = KalmanFilter(dim_x=7, dim_z=4) 
    self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],[0,0,0,1,0,0,0],  [0,0,0,0,1,0,0],[0,0,0,0,0,1,0],[0,0,0,0,0,0,1]])
    self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],[0,0,1,0,0,0,0],[0,0,0,1,0,0,0]])

    self.kf.R[2:,2:] *= 10.
    self.kf.P[4:,4:] *= 1000. #give high uncertainty to the unobservable initial velocities
    self.kf.P *= 10.
    self.kf.Q[-1,-1] *= 0.01
    self.kf.Q[4:,4:] *= 0.01

    self.kf.x[:4] = convert_bbox_to_z(bbox)
    self.time_since_update = 0
    self.id = KalmanBoxTracker.count
    KalmanBoxTracker.count += 1
    self.history = []
    self.hits = 0
    self.hit_streak = 0
    self.age = 0

  def update(self,bbox):
    """
    Updates the state vector with observed bbox.
    """
    self.time_since_update = 0
    self.history = []
    self.hits += 1
    self.hit_streak += 1
    self.kf.update(convert_bbox_to_z(bbox))

  def predict(self):
    """
    Advances the state vector and returns the predicted bounding box estimate.
    """
    if((self.kf.x[6]+self.kf.x[2])<=0):
      self.kf.x[6] *= 0.0
    self.kf.predict()
    self.age += 1
    if(self.time_since_update>0):
      self.hit_streak = 0
    self.time_since_update += 1
    self.history.append(convert_x_to_bbox(self.kf.x))
    return self.history[-1]

  def get_state(self):
    """
    Returns the current bounding box estimate.
    """
    return convert_x_to_bbox(self.kf.x)


def associate_detections_to_trackers(detections,trackers,iou_threshold = 0.3):
  """
  Assigns detections to tracked object (both represented as bounding boxes)

  Returns 3 lists of matches, unmatched_detections and unmatched_trackers
  """
  if(len(trackers)==0):
    return np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,5),dtype=int)

  iou_matrix = iou_batch(detections, trackers)

  if min(iou_matrix.shape) > 0:
    a = (iou_matrix > iou_threshold).astype(np.int32)
    if a.sum(1).max() == 1 and a.sum(0).max() == 1:
        matched_indices = np.stack(np.where(a), axis=1)
    else:
      matched_indices = linear_assignment(-iou_matrix)
  else:
    matched_indices = np.empty(shape=(0,2))

  unmatched_detections = []
  for d, det in enumerate(detections):
    if(d not in matched_indices[:,0]):
      unmatched_detections.append(d)
  unmatched_trackers = []
  for t, trk in enumerate(trackers):
    if(t not in matched_indices[:,1]):
      unmatched_trackers.append(t)

  #filter out matched with low IOU
  matches = []
  for m in matched_indices:
    if(iou_matrix[m[0], m[1]]<iou_threshold):
      unmatched_detections.append(m[0])
      unmatched_trackers.append(m[1])
    else:
      matches.append(m.reshape(1,2))
  if(len(matches)==0):
    matches = np.empty((0,2),dtype=int)
  else:
    matches = np.concatenate(matches,axis=0)

  return matches, np.array(unmatched_detections), np.array(unmatched_trackers)


class Sort(object):
  def __init__(self, max_age=1, min_hits=3, iou_threshold=0.3):
    """
    Sets key parameters for SORT
    """
    self.max_age = max_age
    self.min_hits = min_hits
    self.iou_threshold = iou_threshold
    self.trackers = []
    self.frame_count = 0

  def update(self, dets=np.empty((0, 5))):
    """
    Params:
      dets - a numpy array of detections in the format [[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]
    Requires: this method must be called once for each frame even with empty detections (use np.empty((0, 5)) for frames without detections).
    Returns the a similar array, where the last column is the object ID.

    NOTE: The number of objects returned may differ from the number of detections provided.
    """
    self.frame_count += 1
    # get predicted locations from existing trackers.
    trks = np.zeros((len(self.trackers), 5))
    to_del = []
    ret = []
    for t, trk in enumerate(trks):
      pos = self.trackers[t].predict()[0]
      trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
      if np.any(np.isnan(pos)):
        to_del.append(t)
    trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
    for t in reversed(to_del):
      self.trackers.pop(t)
    matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets,trks, self.iou_threshold)

    # update matched trackers with assigned detections
    for m in matched:
      self.trackers[m[1]].update(dets[m[0], :])

    # create and initialise new trackers for unmatched detections
    for i in unmatched_dets:
        trk = KalmanBoxTracker(dets[i,:])
        self.trackers.append(trk)
    i = len(self.trackers)
    for trk in reversed(self.trackers):
        d = trk.get_state()[0]
        if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
          ret.append(np.concatenate((d,[trk.id+1])).reshape(1,-1)) # +1 as MOT benchmark requires positive
        i -= 1
        # remove dead tracklet
        if(trk.time_since_update > self.max_age):
          self.trackers.pop(i)
    if(len(ret)>0):
      return np.concatenate(ret)
    return np.empty((0,5))

def parse_args():
    """Parse input arguments."""
    parser = argparse.ArgumentParser(description='SORT demo')
    parser.add_argument('--display', dest='display', help='Display online tracker output (slow) [False]',action='store_true')
    parser.add_argument("--seq_path", help="Path to detections.", type=str, default='data')
    parser.add_argument("--phase", help="Subdirectory in seq_path.", type=str, default='train')
    parser.add_argument("--max_age", 
                        help="Maximum number of frames to keep alive a track without associated detections.", 
                        type=int, default=1)
    parser.add_argument("--min_hits", 
                        help="Minimum number of associated detections before track is initialised.", 
                        type=int, default=3)
    parser.add_argument("--iou_threshold", help="Minimum IOU for match.", type=float, default=0.3)
    args = parser.parse_args()
    return args

In [5]:
import datetime
from ultralytics import YOLO
import cv2
from deep_sort_realtime.deepsort_tracker import DeepSort
# from google.colab.patches import cv2_imshow

# define come constants for later use
CONFIDENCE_THRESHOLD = 0.8
GREEN = (0, 255, 0)
BLUE = (0, 0, 255)
RED = (255, 0, 0)
MAGENTA = (255, 255, 0)
WHITE = (255, 255, 255)

COLOR = None

# initialize the video capture object
video_cap = cv2.VideoCapture("./TrafficJunction8.mp4")

# load the pre-trained YOLOv8n model
model = YOLO("yolov8x.pt") # Replace with yolov8m.pt (or) yolov8l.pt (or) yolov8x.pt (or) yolov8n.pt as needed
# tracker = DeepSort(max_age=50)
tracker = Sort()

# Get original frame rate and frame count
fps = video_cap.get(cv2.CAP_PROP_FPS)
frame_count = int(video_cap.get(cv2.CAP_PROP_FRAME_COUNT))

# Get the frame width and height from the video
frame_width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

output_frame_count = 0

class_names = model.names
print("class_names:", class_names)

distinct_ids_seen = set()

vehicle_class_names = ['car'] #, 'bicycle', 'motorcycle', 'bus', 'train', 'truck']
total_vehicle_count = 0
class_labels = []
avg_fps = 0
N = 0

total_start = datetime.datetime.now() 
while True:
  N+=1
  # start time to compute the fps
  start = datetime.datetime.now()

  ret, frame = video_cap.read()

  # if there are no more frames to process, break, out of loop
  if not ret:
    break

  
  # run the YOLO model on the frame
  detections = model(frame)[0]

  # initialize the list of bounding boxes and confidences
  results = []

  ######################################
  # DETECTION
  ######################################
  
  # loop over the detections
  for data in detections.boxes.data.tolist():
    # extract the confidence (i.e, probability) associated
    # with the detection

    confidence = data[4]

    # filter out weak detections by ensuring the
    # confidence is greater than the minimum confidence
    if float(confidence) < CONFIDENCE_THRESHOLD:
      continue
    
    # if the confidence is greater than the minimum confidence,
    # get the bounding box and class id
    xmin, ymin, xmax, ymax = int(data[0]), int(data[1]), int(data[2]), int(data[3])
    print("x1, y1, x2, y2", xmin, ymin, xmax, ymax)
    class_id = int(data[5])
    
    # Get class name
    class_name = class_names[class_id]
    
    if(class_name in vehicle_class_names):
      # add the bounding box (x1, y1, x2, y2), confidence and class id to the results list
      results.append([xmin, ymin, xmax, ymax, confidence]) # class_id]
      class_labels.append(class_name)
    
  ######################################
  # TRACKING
  ######################################
  # Update SORT tracker
    # Update the SORT tracker
  tracked_objects = tracker.update(np.array(results))
  
  # Count distinct vehicles
  # total_vehicles += count_distinct_vehicles(tracked_objects)
  # Count distinct vehicles
  tracked_ids = {obj[4] for obj in tracked_objects}  # Extract unique IDs from tracked objects
  distinct_ids_seen = distinct_ids_seen.union(set(tracked_ids))  # Update the set of all distinct IDs seen
  total_vehicle_count = len(distinct_ids_seen)
  
  # Display tracked vehicles on the frame (consider visual improvements)
  for obj in tracked_objects:
      x1, y1, x2, y2, track_id = obj
      cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
      cv2.putText(frame, str(track_id), (int(x1), int(y1 - 5)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1, cv2.LINE_AA)
  
  # # update the tracker with the new detections
  # tracks = tracker.update_tracks(results, frame=frame)
  # # loop over the tracks
  # for track in tracks:
  #     # if the track is not confirmed, ignore it
  #     if not track.is_confirmed():
  #         continue

  #     # get the track id and the bounding box
  #     track_id = track.track_id
  #     ltrb = track.to_ltrb()

  #     xmin, ymin, xmax, ymax = int(ltrb[0]), int(
  #         ltrb[1]), int(ltrb[2]), int(ltrb[3])
  #     # draw the bounding box and the track id
  #     cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), GREEN, 2)
  #     cv2.putText(frame, str(track_id), (xmin + 5, ymin - 8),
  #                 cv2.FONT_HERSHEY_SIMPLEX, 0.5, WHITE, 2)

  # end time to compute the fps
  end = datetime.datetime.now()

  # show the time it took to process 1 frame
  total = (end - start).total_seconds()
  print(f"Time to process 1 frame: {total *  1000:.0f} milliseconds")

  # calculate the frame per second and draw it on the frame
  fps = f"FPS: {1/total:.2f}  Vehicle Count: {total_vehicle_count}"
  cv2.putText(frame, fps, (20, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

  avg_fps = float(avg_fps*(N-1) + (1/total))/float(N) 
  # Write frame to output video
  output_frame_count+=1

  # show the frame to our screen
  cv2.imshow("Object Tracking YOLO_SORT", frame)

  if cv2.waitKey(1) == ord("q"):
    break
    
total_end = datetime.datetime.now() 
total_time = (total_start - total_end).total_seconds()

video_cap.release()
cv2.destroyAllWindows()

class_names: {0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard', 67: 'cell phone'

In [None]:
print(class_labels)

['truck', 'car', 'car', 'car', 'car', 'bus', 'truck', 'car', 'car', 'car', 'car', 'bus', 'motorcycle', 'truck', 'car', 'car', 'car', 'car', 'bus', 'truck', 'car', 'car', 'car', 'car', 'bus', 'motorcycle', 'truck', 'car', 'car', 'car', 'bus', 'bicycle', 'truck', 'car', 'car', 'car', 'bus', 'bicycle', 'truck', 'car', 'car', 'car', 'bus', 'bicycle', 'car', 'truck', 'car', 'car', 'bus', 'bicycle', 'motorcycle', 'car', 'truck', 'car', 'car', 'bicycle', 'bus', 'truck', 'car', 'car', 'car', 'bicycle', 'bus', 'truck', 'car', 'car', 'car', 'bus', 'bicycle', 'truck', 'car', 'car', 'car', 'bus', 'bicycle', 'truck', 'car', 'car', 'car', 'bus', 'bicycle', 'truck', 'car', 'car', 'car', 'bus', 'bicycle', 'truck', 'car', 'car', 'car', 'bus', 'bicycle', 'truck', 'car', 'car', 'car', 'bus', 'bicycle', 'truck', 'car', 'car', 'car', 'bus', 'bicycle', 'truck', 'car', 'car', 'car', 'bus', 'bicycle', 'truck', 'car', 'car', 'bus', 'car', 'bicycle', 'truck', 'car', 'car', 'car', 'bus', 'bicycle', 'truck', 'car

In [None]:
print(total_vehicle_count)
print(avg_fps)

31
1.2610802473147997


TypeError: unsupported operand type(s) for *: 'builtin_function_or_method' and 'float'