In [1]:
# Install ultralytics for YOLOv8n
%pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.1.47-py3-none-any.whl (750 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m750.4/750.4 kB[0m [31m7.5 MB/s[0m eta [36m0:00:00[0m
Collecting thop>=0.1.1 (from ultralytics)
  Downloading thop-0.1.1.post2209072238-py3-none-any.whl (15 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch>=1.8.0->ultralytics)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch>=1.8.0->ultralytics)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch>=1.8.0->ultralytics)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch>=1.8.0->ultralytics)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1

In [4]:
# Imports
from ultralytics import YOLO
import cv2
import numpy as np
import math
from ultralytics.solutions import object_counter
# Setup
from google.colab import drive
drive.mount('/content/drive')

base_folder = '/content/drive/My Drive/Colab Notebooks/ECSE415/Assignment6/'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [106]:
# Provides initial analytics of the video
class Analyzer:
  def __init__(self, video_path):
      self.cap = cv2.VideoCapture(video_path)
      w, h = (int(self.cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT))
      self.frame_shape = (h, w)

      self.tracking_model = YOLO('yolov8n.pt')
      self.names = self.tracking_model.names

      self.passed_cars = set()
      self.tracked_flows = {}

      self.passed_people = set()

      self.bottom_threshold_cars = int(0.735*h)
      self.bottom_threshold_person = int(0.6*h)
      self.car_confidence_threshold = 0.8
      self.person_confidence_threshold = 0.6

      self.classes_to_count = [0, 2, 5, 7] # 0: person, 2: car, 5: bus, 7: truck
      self.count_names = {i: self.tracking_model.names[i] for i in self.classes_to_count}

      self.counter_moving_cars = object_counter.ObjectCounter()
      line_points = [(int(w*0.2), int(0.762*h)), (int(w*0.8), int(0.9125*h))]
      self.counter_moving_cars.set_args(view_img = True, reg_pts = line_points, classes_names = self.count_names, draw_tracks=True)


  def compute_flow(self, img1, img2, flow):
    return cv2.calcOpticalFlowFarneback(prev=img1,
                                        next=img2,
                                        flow=flow,
                                        pyr_scale=0.5,
                                        levels=3,
                                        winsize=15,
                                        iterations=3,
                                        poly_n=5,
                                        poly_sigma=1.2,
                                        flags=0 if flow is None else cv2.OPTFLOW_USE_INITIAL_FLOW
                                        )


  # Returns True if car, optionally adds it to passed_cars
  def check_passed_car(self, id, cls, box, conf):
    conf = math.ceil(conf*100)/100

    is_car = self.names[int(cls)] in ('bus', 'car', 'truck') and conf > self.car_confidence_threshold
    if is_car and id:
      x_tl, y_tl, x_br, y_bl = box
      if (y_tl + abs(y_tl - y_bl)//2) > self.bottom_threshold_cars:
        self.passed_cars.add(id)
    return is_car


  # Adds person to passed_people if we (are going to) pass them
  def check_passed_person(self, id, cls, box, conf):
    if self.names[int(cls)] == 'person' and conf > self.person_confidence_threshold:
            x_tl, y_tl, x_br, y_bl = box
            if (y_tl + abs(y_tl - y_bl)//2) > self.bottom_threshold_person:
              self.passed_people.add(id)


  def analyse_frame_data(self, tracks, flow_mags, flow_angles):
      flow_not_zero_mask = np.zeros(self.frame_shape)
      flow_not_zero_mask[flow_mags > 0] = 1

      boxes = tracks[0].boxes.xyxy.cpu()
      clss = tracks[0].boxes.cls.cpu().tolist()
      confs = tracks[0].boxes.conf.cpu().tolist()
      trk_ids = tracks[0].boxes.id.int().cpu().tolist()

      for (box, clss, trk_id, conf) in zip(boxes, clss, trk_ids, confs):
          trk_id = int(trk_id)
          # Count passed cars
          if self.check_passed_car(trk_id, clss, box, conf):
            # Compute flow data for every tracked car
            # Get tracking box as mask
            box_mask = np.zeros(self.frame_shape)
            box_mask[int(box[1]):int(box[3]), int(box[0]):int(box[2])] = 1
            # Get flow for tracked object
            n_flows_in_box = np.sum(box_mask*flow_not_zero_mask)
            mean_flow_mag = np.sum(flow_mags * box_mask) / n_flows_in_box
            # Update tracked flow features
            if trk_id in self.tracked_flows:
                self.tracked_flows[trk_id] = max(self.tracked_flows[trk_id], mean_flow_mag)
            else:
                self.tracked_flows[trk_id] = mean_flow_mag
          else:
            # Count passed persons
            self.check_passed_person(trk_id, clss, box, conf)


  def preprocess_frame(self, frame):
      frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
      frame = cv2.equalizeHist(frame) # To improve robustness against lighting changes
      return frame


  def analyse(self):
    # Get first frame
    _, prev_frame = self.cap.read()
    prev_processed_frame = self.preprocess_frame(prev_frame)

    flow = None
    while True:
        # Get the next frame
        ret, frame = self.cap.read()
        if not ret:
            break
        # Preprocessing
        processed_frame = self.preprocess_frame(frame)
        # Compute dense optical flow
        flow = self.compute_flow(prev_processed_frame, processed_frame, flow)
        magnitude, angle = cv2.cartToPolar(flow[:, :, 0], flow[:, :, 1])
        # Track objects
        results = self.tracking_model.track(frame, classes=self.classes_to_count, persist=True, verbose=False, show=False)
        # Get first assumption of driving/parked
        self.counter_moving_cars.start_counting(frame, results)
        # Get corresponding flows
        self.analyse_frame_data(results, magnitude, angle)
        # Update previous frame
        prev_processed_frame = processed_frame

    self.cap.release()

  def get_results(self):
    return (self.passed_cars, self.counter_moving_cars.count_ids, self.tracked_flows, self.passed_people)

In [120]:
def evaluate_results(video_name, analyzer):
  passed_cars, moving_ids, tracked_flows, passed_people = analyzer.get_results()

  car_ids = np.zeros(len(passed_cars))
  car_features = np.zeros((len(passed_cars)))

  # Convert flow data to numpy array corresponding to car ids
  for (i, c) in enumerate(passed_cars):
    max_mean_flow_mag = tracked_flows[c]

    car_ids[i] = c
    car_features[i] = max_mean_flow_mag
  # Normalize
  norm = np.linalg.norm(car_features, axis=0, keepdims=True)
  norm_car_features = car_features / norm

  # Get initial car labels (parked/moving)
  labels = np.zeros(norm_car_features.shape[0], dtype=np.int32)
  # For all cars that were detected as moving, change the label
  for id in moving_ids:
    # The id, features and label of a car are located at the same index
    car_index = np.where(car_ids == id)[0]
    labels[car_index] = 1

  # Apply clustering based on flow in order to refine results
  criteria = (cv2.TERM_CRITERIA_MAX_ITER+cv2.TERM_CRITERIA_EPS, 20, 0.05)
  cv2.kmeans(norm_car_features.astype(np.float32), K=2, bestLabels=labels,
                                            criteria=criteria, attempts=1, flags=cv2.KMEANS_USE_INITIAL_LABELS)
  parked_car_ids = car_ids[labels == 0]
  moving_car_ids = car_ids[labels == 1]

  # Output
  print('Results: ', video_name)
  print('# Passed Parked Cars: ', parked_car_ids.shape[0])
  print('# Passed Moving Cars: ', moving_car_ids.shape[0])
  print('# Passed People     : ', len(passed_people))

In [107]:
analyzer_mcgill = Analyzer(base_folder + 'mcgill_drive.mp4')
analyzer_mcgill.analyse()


Line Counter Initiated.


In [111]:
analyzer_st_cath = Analyzer(base_folder + 'st-catherines_drive.mp4')
analyzer_st_cath.analyse()


Line Counter Initiated.


In [121]:
evaluate_results('mcgill_drive.mp4', analyzer_mcgill)
print()
evaluate_results('st-catherines_drive.mp4', analyzer_st_cath)

Results:  mcgill_drive.mp4
# Passed Parked Cars:  10
# Passed Moving Cars:  23
# Passed People     :  21

Results:  st-catherines_drive.mp4
# Passed Parked Cars:  50
# Passed Moving Cars:  4
# Passed People     :  61
