# Romuald Ricard 261194253
# Faiyad Irfan Hares 260914739

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import math
import cv2
from google.colab.patches import cv2_imshow
import random
from tqdm import tqdm
import torch
import torchvision

from google.colab import drive
drive.mount("/content/drive")

#PATH = "/content/drive/MyDrive/ECSE415/"
PATH = "/content/drive/MyDrive/McGill/ECSE415 FINAL PROJECT RICARD HARES/"

pathmcgill = PATH + "mcgill_drive.mp4"
pathstcat = '/content/drive/MyDrive/ECSE415/st-catherines_drive.mp4'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# TRACKING WITH BOUNDING-BOXES OVERLAP AREA AND OPTICAL FLOW
The idea here is to get a metric of how likely a detected bounding-box corresponds to a certain tracked object.

We use the overlaping area ratio for detection. By computing the Intersection Area Matrix (IAM) for each tracked-bb and detected-bb (which taks the following form).

A tracked object has a "confidence score" from 0 to 1. Once its reached a threshold (0.5), it is counted as a valid tracked object and added to the count. We increment the "confidence score" every time the object has been successfully tracked, and decrement if not. Once its reached 0, we end it.


In [None]:
class RectangleIntersectionTracker:

  def __init__(self, bb, label):

    # Initialize boundingbox
    self.bb = bb

    # Initialize confidence score and confirm flag
    self.confidence = 0
    self.confidence_rate = 0.1
    self.confirmed = False
    self.confidence_confirm_threshold = 0.5

    # Initialize parked/moving car
    self.RV = 0
    self.RV_threshold = 0.25

    self.moving_score = 0
    self.moving_score_rate = 0.1
    self.moving_score_threshold = 1.5

    self.parked = None

    # Tracker label & color
    self.label = label
    if label == "car":
      self.col = [0, 0, 1]
    elif label == "person":
      self.col = [1, 0, 0]
    else:
      self.col = [0, 0, 0]

  def update(self, bb):

    # Update bb
    self.bb = bb

    # Update confidence
    self.confidence = min(1, self.confidence + self.confidence_rate)
    self.check_confirm()

    # Update parked flag
    if self.label == "car":
      self.check_parked()

  def check_confirm(self):
    if not self.confirmed and self.confidence > self.confidence_confirm_threshold:

      # Confirm flag
      self.confirmed = True

      # Assign id and update global count
      if self.label == "car":
        global CARS
        CARS += 1
        self.id = CARS

      elif self.label == "person":
        global PEDESTRIANS
        PEDESTRIANS += 1
        self.id = PEDESTRIANS

  def check_parked(self):

    # Update parked confidence
    if self.RV < self.RV_threshold:
      self.moving_score += self.moving_score_rate
    else:
      self.moving_score -= self.moving_score_rate

    # Check if parked
    if self.parked is None and self.confirmed:
      if self.moving_score > self.moving_score_threshold:
        self.col = [0, 1, 0]
        self.parked = True
        global PARKED_CARS
        PARKED_CARS += 1

      if self.moving_score < - self.moving_score_threshold:
        self.col = [0, 0, 1]
        self.parked = False

  def update_no_measure(self):
    self.confidence = self.confidence - self.confidence_rate

  def is_dead(self):
    return(self.confidence < 0)

  def draw_on_mask(self, mask):

    if self.confirmed:
      # Color
      col = [int(255*self.confidence)*c for c in self.col]

      # Draw rectangle
      x1 = int(self.bb[0])
      y1 = int(self.bb[1])
      x2 = int(self.bb[2])
      y2 = int(self.bb[3])
      mask = cv2.rectangle(mask, [x1, y1], [x2, y2], col, 1)

      # Dislay id
      mask = cv2.putText(mask, self.label + " " + str(self.id), [x1, y1], cv2.FONT_HERSHEY_SIMPLEX, 0.4, col, 1, cv2.LINE_AA)

      # Dipslay RV
      mask = cv2.putText(mask, str(round(self.RV, 2)), [int((x1+x2)/2), y1], cv2.FONT_HERSHEY_SIMPLEX, 0.4, col, 1, cv2.LINE_AA)

    return(mask)

In [None]:
# ------------------------------------------------------------------------------
# COMPUTES THE INTERSECTION AREA OF 2 RECTANGLES
# ------------------------------------------------------------------------------

def compute_rectangle_intersection_area(bb1, bb2):
  """
  Bounding Boxes (bb) have to be of format xyxy.
  The intersection area is expressed as a ratio of bb1 area and intersection area.
  """

  # Area of bb1
  a = abs(bb1[2] - bb1[0])*abs(bb1[3] - bb1[1])

  # Compute intersection sides
  dx = min(bb1[2], bb2[2]) - max(bb1[0], bb2[0])
  dy = min(bb1[3], bb2[3]) - max(bb1[1], bb2[1])

  # If intersection
  if dx >= 0 and dy >= 0:
    return(dx*dy/a)
  else:
    return(0)

# ------------------------------------------------------------------------------
# RETURNS ALL LABELED BOUNDING-BOXES FOUND IN FRAME
# WITH CONFIDENCE GREATER THAN THRESHOLD
# ------------------------------------------------------------------------------

def detect_bbox_label(frame, label, model, threshold = 0.3):
  """
  INPUTS:
    - frame: cv2 image of the current frame
    - label: label to be detected ("car", "person"...)
    - model: classifier
  OUTPUT:
  """

  # Run model
  res = model(frame)
  df = res.pandas().xyxy[0]

  # Get cars detected bb
  bbox_list = []
  for idx in df.index:
    if df["name"][idx] == label and df["confidence"][idx] > threshold:
      bbox = [df["xmin"][idx], df["ymin"][idx], df["xmax"][idx], df["ymax"][idx]]
      bbox_list.append(bbox)
  return(bbox_list)

# ------------------------------------------------------------------------------
# DIVIDES TRACKERS AND BOUNDING-BOXES INTO 3 GROUPS (SEE REPORT)
# ------------------------------------------------------------------------------

def match_trackers_bboxes(trackers, bboxes, threshold = 0.5):

  # Compute intersection area matrix
  n = len(trackers)
  m = len(bboxes)

  # List of un-matched trackers/bboxes
  unmatched_trackers_idx = [i for i in range(n)]
  unmatched_bboxes_idx = [j for j in range(m)]
  matched_idx = []

  if n > 0 and m > 0:
    IAM = np.zeros([n, m])
    for i in range(n):
      t = trackers[i]
      for j in range(m):
        bb = bboxes[j]
        IAM[i, j] = compute_rectangle_intersection_area(t.bb, bb)

    success = True
    while success and IAM.shape[0]*IAM.shape[1] != 0:
      [i_max, j_max] = np.unravel_index(IAM.argmax(), IAM.shape)
      if IAM[i_max, j_max] > threshold:

        # Add (t_idx, b_idx) to matches
        matched_idx.append([unmatched_trackers_idx[i_max], unmatched_bboxes_idx[j_max]])

        # Remove t_idx and bb_idx from un-matched lists
        unmatched_trackers_idx = np.delete(unmatched_trackers_idx, i_max)
        unmatched_bboxes_idx = np.delete(unmatched_bboxes_idx, j_max)

        # Remove matched line and col. of IAM
        IAM = np.delete(IAM, i_max, axis = 0)
        IAM = np.delete(IAM, j_max, axis = 1)
      else:
        success = False

  # Output
  matches = [(trackers[m[0]], bboxes[m[1]]) for m in matched_idx]
  unmatched_trackers = [trackers[i] for i in unmatched_trackers_idx]
  unmatched_bboxes = [bboxes[j] for j in unmatched_bboxes_idx]
  return(matches, unmatched_trackers, unmatched_bboxes)

# ------------------------------------------------------------------------------
# COMPUTES GLOBAL VELOCITY AND TRACKERS RELATIVE VELOCITIES TO THE SCENE
# ------------------------------------------------------------------------------

def velocities_in_scene(current_frame, prev_frame, trackers):

    # Convert frames to grayscale
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    current_gray = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)

    # Calculate dense optical flow using Farneback method
    flow = cv2.calcOpticalFlowFarneback(prev_gray, current_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)

    # Compute magnitude and angle of the flow vectors across whole frame
    magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])

    # Scene average velocity
    scene_velocity = np.mean(magnitude)

    # Get average magnitude in bb of trackers
    tracker_velocities = []
    for t in trackers:
      bb = t.bb
      x1 = int(bb[0])
      y1 = int(bb[1])
      x2 = int(bb[2])
      y2 = int(bb[3])

      # Compute average velocity in bb
      magnitude_in_bb = magnitude[x1:x2, y1:y2]
      s = magnitude_in_bb.shape
      if s[0]*s[1] > 0:
        tracker_relative_velocity = np.mean(magnitude_in_bb)/scene_velocity
        t.RV = tracker_relative_velocity

    return(scene_velocity)

In [None]:
def analyse_video(model, cap, QUALITY = 30):

  # Read first frame
  success, prev_frame = cap.read()

  # Scale factor to reduce frame quality, resize first frame
  width = int(prev_frame.shape[1] * QUALITY/100)
  height = int(prev_frame.shape[0] * QUALITY/100)
  dim = (width, height)
  prev_frame = cv2.resize(prev_frame, dim, interpolation = cv2.INTER_AREA)

  # Initialize count of labels
  global AVERAGE_SCENE_SPEED
  AVERAGE_SCENE_SPEED = 0
  global CARS
  CARS = 0
  global PARKED_CARS
  PARKED_CARS = 0
  global PEDESTRIANS
  PEDESTRIANS = 0

  # Initialize tracker list
  car_trackers = []
  pedestrian_trackers = []

  # Create output video with new FPS and dimensions
  vid = cv2.VideoWriter('output.avi', cv2.VideoWriter_fourcc(*'MJPG'), 30, dim)

  TOTAL_FRAMES = 1399
  for N in tqdm(range(TOTAL_FRAMES)):

    # Read frame
    success, frame = cap.read()
    if success:

      # Reduce size of image
      frame = cv2.resize(frame, dim, interpolation = cv2.INTER_AREA)

      # Compute optical flow to estimate velocity of scene, update trackers rv
      AVERAGE_SCENE_SPEED = velocities_in_scene(frame, prev_frame, car_trackers)

      # Detect cars & pedestrians
      car_bboxes = detect_bbox_label(frame, "car", model, threshold = 0.5)
      pedestrian_bboxes = detect_bbox_label(frame, "person", model, threshold = 0.5)

      # Find matches, un-matched trackers and un-matches bboxes
      [match_cars, unmatched_car_trackers, unmatched_car_bboxes] = match_trackers_bboxes(car_trackers, car_bboxes)
      [match_pedestrians, unmatched_pedestrian_trackers, unmatched_pedestrian_bboxes] = match_trackers_bboxes(pedestrian_trackers, pedestrian_bboxes)

      # Update matched trackers
      for m in match_cars + match_pedestrians:
        m[0].update(m[1])

      # Update un-matched trackers
      for t in unmatched_car_trackers + unmatched_pedestrian_trackers:
        t.update_no_measure()

      # Remove "dead" trackers
      for t in car_trackers:
        if t.is_dead():
          car_trackers.remove(t)

      for t in pedestrian_trackers:
        if t.is_dead():
          pedestrian_trackers.remove(t)

      # Create trackers for un-matched bboxes
      for bb in unmatched_car_bboxes:
        car_trackers.append(RectangleIntersectionTracker(bb, "car"))
      for bb in unmatched_pedestrian_bboxes:
        pedestrian_trackers.append(RectangleIntersectionTracker(bb, "person"))

      # Update previous frame
      prev_frame = frame

      # Add info to frame
      mask = np.zeros_like(frame)
      for t in car_trackers + pedestrian_trackers:
        mask = t.draw_on_mask(mask)
      img = cv2.add(frame, mask)

      # Display and record
      vid.write(img)
      #cv2_imshow(img)

  cap.release()
  print("MOVING CARS " + str(CARS - PARKED_CARS) + " | PARKED CARS " + str(PARKED_CARS) + " | PEDESTRIANS " + str(PEDESTRIANS))

In [None]:
# Create object detection model (YOLOv5s)
model = torch.hub.load("ultralytics/yolov5", "yolov5s", _verbose = False)

# Open video
cap1 = cv2.VideoCapture(PATH + "st-catherines_drive.mp4")
cap2 = cv2.VideoCapture(PATH + "mcgill_drive.mp4")

# Analyse video
analyse_video(model, cap1)
analyse_video(model, cap2)

Using cache found in /root/.cache/torch/hub/ultralytics_yolov5_master
 79%|███████▉  | 1107/1399 [13:24<03:32,  1.38it/s]


KeyboardInterrupt: ignored

# Alternative Approach

In [None]:
import torch
import cv2
import numpy as np

# Function to calculate the centroid of a bounding box
def get_centroid(xmin, ymin, xmax, ymax):
    return (xmin + xmax) / 2, (ymin + ymax) / 2

# Function to find the closest tracked object
def find_closest_tracked(centroid, tracked_objects, distance_threshold=95):
    closest_distance = float('inf')
    closest_id = None
    for obj_id, obj_data in tracked_objects.items():
        distance = np.linalg.norm(np.array(obj_data['centroid']) - np.array(centroid))
        if distance < closest_distance and distance < distance_threshold:
            closest_distance = distance
            closest_id = obj_id
    return closest_id

# Function to check if the centroid is near the edge of the frame
def is_near_edge(centroid, frame_width, frame_height, edge_threshold=10000):
    cx, cy = centroid
    return cx < edge_threshold or cx > frame_width - edge_threshold or \
           cy < edge_threshold or cy > frame_height - edge_threshold

def process_cars(model, video_path, movement_threshold):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error opening video file")
        return

    parked_car_count = 0
    moving_car_count = 0
    tracked_objects = {}
    next_id = 1
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    movement_threshold = movement_threshold # Define a suitable threshold for movement

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Perform detection using YOLOv5
        results = model(frame)

        # Update centroids for tracked objects
        for obj_id in list(tracked_objects):
            tracked_objects[obj_id]['previous_centroid'] = tracked_objects[obj_id]['centroid']

        # Process detections for cars
        for *xyxy, conf, cls in results.xyxy[0]:
            label = model.names[int(cls)]
            if label == 'car':
                xmin, ymin, xmax, ymax = map(int, xyxy)
                centroid = get_centroid(xmin, ymin, xmax, ymax)

                if is_near_edge(centroid, frame_width, frame_height):
                    closest_id = find_closest_tracked(centroid, tracked_objects)
                    if closest_id is None:
                        tracked_objects[next_id] = {'centroid': centroid, 'previous_centroid': centroid, 'label': 'car'}
                        next_id += 1
                    else:
                        # Check if the car has moved
                        prev_centroid = tracked_objects[closest_id]['previous_centroid']
                        distance_moved = np.linalg.norm(np.array(prev_centroid) - np.array(centroid))
                        if distance_moved > movement_threshold:
                            # Car is moving
                            if 'counted' not in tracked_objects[closest_id] or not tracked_objects[closest_id]['counted']:
                                moving_car_count += 1
                                tracked_objects[closest_id]['counted'] = True
                        else:
                            # Car is parked
                            if 'counted' not in tracked_objects[closest_id] or not tracked_objects[closest_id]['counted']:
                                parked_car_count += 1
                                tracked_objects[closest_id]['counted'] = True

    cap.release()
    return parked_car_count, moving_car_count

# Function to process video for pedestrians
def process_pedestrians(model, video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error opening video file")
        return

    pedestrian_count = 0
    tracked_objects = {}
    next_id = 1
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Perform detection using YOLOv5
        results = model(frame)

        # Process detections for pedestrians
        for *xyxy, conf, cls in results.xyxy[0]:
            label = model.names[int(cls)]
            if label == 'person':
                xmin, ymin, xmax, ymax = map(int, xyxy)
                centroid = get_centroid(xmin, ymin, xmax, ymax)

                if is_near_edge(centroid, frame_width, frame_height):
                    closest_id = find_closest_tracked(centroid, tracked_objects)
                    if closest_id is None:
                        tracked_objects[next_id] = {'centroid': centroid, 'label': 'person'}
                        pedestrian_count += 1
                        next_id += 1

    cap.release()
    return pedestrian_count

In [None]:
# Load the YOLOv5 model
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)



#process
parkedMcgill, movingMcGill=process_cars(model, pathmcgill, 10)
pedestrianMcgill = process_pedestrians(model, pathmcgill)


parkedStcat, movingStCat=process_cars(model, pathstcat, 88)
pedestrianStcat = process_pedestrians(model, pathstcat)

print(f"Parked cars mcgill_drive.mp4: {parkedMcgill}")
print(f"Moving cars mcgill_drive.mp4: {movingMcGill}")
print(f"Total pedestrians mcgill_drive.mp4: {pedestrianMcgill}")

print(f"Parked cars st-catherines_drive.mp4: {parkedStcat}")
print(f"Moving cars st-catherines_drive.mp4: {movingStCat}")
print(f"Total pedestrians st-catherines_drive.mp4: {pedestrianStcat}")

Using cache found in /root/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2023-12-6 Python-3.10.12 torch-2.1.0+cu118 CUDA:0 (Tesla T4, 15102MiB)

Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients, 16.4 GFLOPs
Adding AutoShape... 


Parked cars mcgill_drive.mp4: 11
Moving cars mcgill_drive.mp4: 21
Total pedestrians mcgill_drive.mp4: 30
Parked cars st-catherines_drive.mp4: 45
Moving cars st-catherines_drive.mp4: 0
Total pedestrians st-catherines_drive.mp4: 55
