# Install Libraries

In [None]:
!pip install pytube ultralytics filterpy

# Import Dependencies

In [2]:
from pytube import YouTube
import os
import cv2
from filterpy.kalman import KalmanFilter
from google.colab import drive
from google.colab import files
from google.colab import userdata
# google patch for cv2.imshow
from google.colab.patches import cv2_imshow
import numpy as np
from ultralytics import YOLO

# Download Videos

Convenience function which uses the PyTube API to download the YouTube videos specified in the assignment and store them in my Google Drive

In [3]:
def download_videos(video_id_list, output_path="."):
    """
    Download videos from YouTube based on a list of video IDs using PyTube API

    Args:
        video_id_list (list): A list of YouTube video IDs to download.
        output_path (str, optional): The directory where the downloaded videos will be saved. Defaults to ./videos
    """
    for video_id in video_id_list:
      try:
        yt = YouTube(f"https://www.youtube.com/watch?v={video_id}")
        video = yt.streams.filter(file_extension='mp4', resolution='360p').first()
        video.download(output_path)
        print(f"Video downloaded successfully: {yt.title}")
      except Exception as e:
        print(f"Error downloading video: {e}")

In [4]:
# Video IDs specified in assignment
video_id_list = ["WeF4wpw7w9k", "2NFwY15tRtA", "5dRramZVu2Q"]

# Get current directory, define subdirectory for videos, and define full output path
current_directory = os.getcwd()
videos_directory = "videos"
output_path = os.path.join(current_directory, videos_directory)

# Join the current directory with the videos directory and download videos
download_videos(video_id_list, output_path)

# save all mp4 paths to video_paths list
video_paths = []
for root, dirs, files in os.walk(output_path):
  for file in files:
    full_path = os.path.join(root, file)
    video_paths.append(full_path)

# keep in sorted order for consistent IDs in PG instance
video_paths.sort()

for video_path in video_paths:
  print(video_path)

Video downloaded successfully: Cyclist and vehicle Tracking - 1
Video downloaded successfully: Cyclist and vehicle tracking - 2
Video downloaded successfully: Drone Tracking Video
/content/videos/Cyclist and vehicle Tracking - 1.mp4
/content/videos/Cyclist and vehicle tracking - 2.mp4
/content/videos/Drone Tracking Video.mp4


## Task 3: Kalman Filter (50 points)

Use the  [filterpy](https://filterpy.readthedocs.io/en/latest/kalman/KalmanFilter.html) library to implement Kalman filters that will track the cyclist and the vehicle (if present) in the video. You will need to use the detections from the previous task to initialize and run the Kalman filter.

You need to deliver a video that contains the trajectory of the objects as a line that connects the pixels that the tracker indicated. You can use the `ffmpeg` command line tool and OpenCV to superpose the bounding box of the drone on the video as well as plot its trajectory.

Suggest methods that you can use to address  false positives and how the tracker can help you in this regard.

You will need to have one Kalman filter to track each of the required and present objects (cyclist and vehicle).

In [123]:
def euclidean_distance(array_1, array_2):
    """
    Compute the Euclidean distance between two 2D arrays.

    Parameters:
    array1 (numpy.ndarray): First 2D array.
    array2 (numpy.ndarray): Second 2D array.

    Returns:
    float: Euclidean distance between the two arrays.
    """

    # Compute the squared differences between corresponding elements
    squared_diff = (array_1 - array_2) ** 2

    # Sum the squared differences along the last axis (assuming arrays are 2D)
    squared_diff_sum = np.sum(squared_diff, axis=-1)

    # Take the square root to get the Euclidean distance
    distance = np.sqrt(squared_diff_sum)

    return distance

def find_nearest_neighbor(results, class_id, prev_pred):
  tracked_car_centroids = []
  ret_z = []
  for result in results:
    boxes = result.boxes
    for box in boxes:
      # check if detected object is a car object (class 1), if it is, add to list of tracked car centroids
      if int(box.cls.item()) == class_id:
        x, y, w, h = box.xywh[0].tolist()
        cur_z = np.array([x + w/2, y + h/2])
        if len(cur_z) > 1:
          tracked_car_centroids.append(cur_z)

  # find NN car, then update KF
  min_dist = 999999999
  for centroid in tracked_car_centroids:
    dist_from_pred = euclidean_distance(centroid, prev_pred)
    if dist_from_pred < min_dist:
      min_dist = dist_from_pred
      ret_z = centroid

  return ret_z if len(ret_z) > 1 else prev_pred # return prev_pred if no cars detected

def get_num_cars(results):
  num_cars  = 0
  for result in results:
    boxes = result.boxes
    for box in boxes:
      # Check if detected object is a car object (class 1), if it is, add centroid. naive, but works because only one car is initially detected in this video
      if int(box.cls.item() == 1):
        num_cars += 1

  return num_cars

In [107]:
def initialize_kalman_filter_vid1():
    """
    Initialize Kalman Filter for an object moving initially upwards and towards the right,
    but eventually turns and moves towards the top-left.

    Returns:
    KalmanFilter: Initialized Kalman Filter object.
    """
    # Initialize Kalman Filter
    kf = KalmanFilter(dim_x=6, dim_z=2)
    kf.F = np.array([[1, 0, 1, 0, 0.5, 0],
                     [0, 1, 0, 1, 0, 0.5],
                     [0, 0, 1, 0, 1, 0],
                     [0, 0, 0, 1, 0, 1],
                     [0, 0, 0, 0, 1, 0],
                     [0, 0, 0, 0, 0, 1]])
    kf.H = np.array([[1, 0, 0, 0, 0, 0],
                     [0, 1, 0, 0, 0, 0]])
    kf.R = np.eye(2) * 0.01  # Measurement uncertainty

    # Initial state vector [x, y, vx, vy, ax, ay]
    # Assuming the object starts at the bottom-left corner and moves upwards and towards the right initially
    kf.x = np.array([0, 500, 10, -10, 0, 0])

    # Process uncertainty covariance matrix
    kf.Q = np.array([[0.25, 0.5, 0, 0, 0, 0],
                     [0.5, 1, 0, 0, 0, 0],
                     [0, 0, 0.1, 0.2, 0, 0],
                     [0, 0, 0.2, 0.4, 0, 0],
                     [0, 0, 0, 0, 0.1, 0.2],
                     [0, 0, 0, 0, 0.2, 0.4]])

    return kf

# First Video Results

In [None]:
# Load the YOLOv8 model
model_path = "/content/drive/MyDrive/Colab Notebooks/yolov8m_150_epochs_best.pt"
detection_model = YOLO(model_path)
prev_pred = []
kf = initialize_kalman_filter_vid1()
car_id = 1

video_path = video_paths[0]
sample_rate = 10

cap = cv2.VideoCapture(video_path)
# Variable to count frames
frame_count = 0
prev_pred = []
# Loop through the video frames
while cap.isOpened():
    # Read a frame from the video
    success, frame = cap.read()
    cur_pred = []
    z = []

    if success:
      # Increment the frame count
      frame_count += 1

      # Display every sample_rate'th frame
      if frame_count % sample_rate == 0:
        # Run YOLOv8 inference on the frame
        results = detection_model(frame)

        if(len(prev_pred) < 1 and len(results) < 1):  # Do nothing until first object is detected
          continue
        elif len(prev_pred) < 1:  # First detected results
          for result in results:
            boxes = result.boxes
            for box in boxes:
              # Check if detected object is a car object (class 1), if it is, add centroid. naive, but works because other objects leave z = []
              if int(box.cls.item()) == car_id:
                x, y, w, h = box.xywh[0].tolist()
                z = np.array([x + w/2, y + h/2])
        elif(len(results) < 1):
          # display new predicted position based on old predicted position
          z = prev_pred
        else:
          # update KF with position of NN and set prev_pred
          z = find_nearest_neighbor(results, car_id, prev_pred)

        # Visualize the results on the frame
        annotated_frame = results[0].plot()

        if len(z) > 0:
          # Update Kalman Filter and generate prediction
          kf.update(z)
          kf.predict()
          cur_pred = kf.x[:2]

          # Extract coordinates from z and cur_pred
          x_cur, y_cur = map(int, z)        # z coordinates
          x_pred, y_pred = map(int, cur_pred) # predicted position

          # reinitialize filter if predicted path goes off frame
          if(x_pred > 352 or y_pred > 640 or x_pred < 0 or y_pred < 0):
            kf = initialize_kalman_filter_vid1()

          # Draw purple line from z to cur_pred
          annotated_frame = cv2.line(annotated_frame, (x_cur, y_cur), (x_pred, y_pred), (255, 0, 255), 2)

        # Display the annotated frame
        cv2_imshow(annotated_frame)
        prev_pred = cur_pred
    else:
      # Release the video capture object
      cap.release()
      # Break the loop if the end of the video is reached
      break


# Save mp4

In [None]:
# Load the YOLOv8 model
model_path = "/content/drive/MyDrive/Colab Notebooks/yolov8m_150_epochs_best.pt"
detection_model = YOLO(model_path)
prev_pred = []
kf = initialize_kalman_filter_vid1()
car_id = 1

video_path = video_paths[0]
output_video_path = "/content/drive/MyDrive/Colab Notebooks/test_kf_results.mp4"
sample_rate = 1

cap = cv2.VideoCapture(video_path)
# Get video frame dimensions
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, 30, (frame_width, frame_height))

# Variable to count frames
frame_count = 0
prev_pred = []

# Loop through the video frames
while cap.isOpened():
    # Read a frame from the video
    success, frame = cap.read()
    cur_pred = []
    z =[]

    if success:
      # Increment the frame count
      frame_count += 1

      # Write every sample_rate'th frame
      if frame_count % sample_rate == 0:
        # Run YOLOv8 inference on the frame
        results = detection_model(frame)

        if(len(prev_pred) < 1 and len(results) < 1):  # Do nothing until first car object is detected
          continue
        elif len(prev_pred) < 1:  # First detected results
          for result in results:
            boxes = result.boxes
            for box in boxes:
              # Check if detected object is a car object (class 1), if it is, add centroid. naive, but works because only one car is initially detected in this video
              if int(box.cls.item()) == car_id:
                x, y, w, h = box.xywh[0].tolist()
                z = np.array([x + w/2, y + h/2])
        elif(len(results) < 1):
          # display new predicted position based on old predicted position
          z = prev_pred
        else:
          # set z to position of NN
          z = find_nearest_neighbor(results, car_id, prev_pred)

        # Visualize the results on the frame
        annotated_frame = results[0].plot()

        if len(z) > 0:
          # Update Kalman Filter and generate prediction
          kf.update(z)
          kf.predict()
          cur_pred = kf.x[:2]

          # Extract coordinates from z and cur_pred
          x_cur, y_cur = map(int, z)        # z coordinates
          x_pred, y_pred = map(int, cur_pred) # predicted position

          # reinitialize filter if predicted path goes off frame
          if(x_pred > 352 or y_pred > 640 or x_pred < 0 or y_pred < 0):
            kf = initialize_kalman_filter_vid1()

          # Draw purple line from z to cur_pred
          annotated_frame = cv2.line(annotated_frame, (x_cur, y_cur), (x_pred, y_pred), (255, 0, 255), 2)

        # Write the annotated frame to the output video
        out.write(annotated_frame)

        prev_pred = cur_pred
    else:
      # Release the video capture object and VideoWriter object
      cap.release()
      out.release()
      break

# Third Video Results

In [110]:
def initialize_kalman_filter_vid3():
    """
    Initialize Kalman Filter for an object consistently moving from bottom-right to top-left.

    Returns:
    KalmanFilter: Initialized Kalman Filter object.
    """
    # Initialize Kalman Filter
    kf = KalmanFilter(dim_x=6, dim_z=2)
    kf.F = np.array([[1, 0, -1, 0, 0.5, 0],
                     [0, 1, 0, -1, 0, 0.5],
                     [0, 0, 1, 0, 1, 0],
                     [0, 0, 0, 1, 0, 1],
                     [0, 0, 0, 0, 1, 0],
                     [0, 0, 0, 0, 0, 1]])
    kf.H = np.array([[1, 0, 0, 0, 0, 0],
                     [0, 1, 0, 0, 0, 0]])
    kf.R = np.eye(2) * 0.01  # Measurement uncertainty

    # Initial state vector [x, y, vx, vy, ax, ay]
    # Assuming the object starts at the bottom-right corner and moves towards the top-left
    kf.x = np.array([640, 320, -10, -10, 0, 0])

    # Process uncertainty covariance matrix
    kf.Q = np.array([[0.25, 0.5, 0, 0, 0, 0],
                     [0.5, 1, 0, 0, 0, 0],
                     [0, 0, 0.1, 0.2, 0, 0],
                     [0, 0, 0.2, 0.4, 0, 0],
                     [0, 0, 0, 0, 0.1, 0.2],
                     [0, 0, 0, 0, 0.2, 0.4]])

    return kf

In [None]:
# Load the YOLOv8 model
model_path = "/content/drive/MyDrive/Colab Notebooks/yolov8m_150_epochs_best.pt"
detection_model = YOLO(model_path)
prev_pred = []
kf = initialize_kalman_filter_vid3()
biker_id = 0

video_path = video_paths[2]
sample_rate = 10

cap = cv2.VideoCapture(video_path)
# Variable to count frames
frame_count = 0
prev_pred = []
# Loop through the video frames
while cap.isOpened():
    # Read a frame from the video
    success, frame = cap.read()
    cur_pred = []
    z = []

    if success:
      # Increment the frame count
      frame_count += 1

      # Display every sample_rate'th frame
      if frame_count % sample_rate == 0:
        # Run YOLOv8 inference on the frame
        results = detection_model(frame)

        if(len(prev_pred) < 1 and len(results) < 1):  # Do nothing until first object is detected
          continue
        elif len(prev_pred) < 1:  # First detected results
          for result in results:
            boxes = result.boxes
            for box in boxes:
              # Check if detected object is a car object (class 1), if it is, add centroid. naive, but works because other objects leave z = []
              if int(box.cls.item()) == biker_id:
                x, y, w, h = box.xywh[0].tolist()
                z = np.array([x + w/2, y + h/2])
        elif(len(results) < 1):
          # display new predicted position based on old predicted position
          z = prev_pred
        else:
          # update KF with position of NN and set prev_pred
          z = find_nearest_neighbor(results, biker_id, prev_pred)

        # Visualize the results on the frame
        annotated_frame = results[0].plot()

        if len(z) > 0:
          # Update Kalman Filter and generate prediction
          kf.update(z)
          kf.predict()
          cur_pred = kf.x[:2]

          # Extract coordinates from z and cur_pred
          x_cur, y_cur = map(int, z)        # z coordinates
          x_pred, y_pred = map(int, cur_pred) # predicted position

          # reinitialize filter if predicted path goes off frame
          if(x_pred > 352 or y_pred > 640 or x_pred < 0 or y_pred < 0):
            kf = initialize_kalman_filter_vid1()

          # Draw purple line from z to cur_pred
          annotated_frame = cv2.line(annotated_frame, (x_cur, y_cur), (x_pred, y_pred), (255, 0, 255), 2)

        # Display the annotated frame
        cv2_imshow(annotated_frame)
        prev_pred = cur_pred
    else:
      # Release the video capture object
      cap.release()
      # Break the loop if the end of the video is reached
      break


In [None]:
# Load the YOLOv8 model
model_path = "/content/drive/MyDrive/Colab Notebooks/yolov8m_150_epochs_best.pt"
detection_model = YOLO(model_path)
prev_pred = []
kf = initialize_kalman_filter_vid3()
biker_id = 0

video_path = video_paths[2]
output_video_path = "/content/drive/MyDrive/Colab Notebooks/test2_kf_results.mp4"
sample_rate = 1

cap = cv2.VideoCapture(video_path)
# Get video frame dimensions
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, 30, (frame_width, frame_height))

# Variable to count frames
frame_count = 0
prev_pred = []

# Loop through the video frames
while cap.isOpened():
    # Read a frame from the video
    success, frame = cap.read()
    cur_pred = []
    z =[]

    if success:
      # Increment the frame count
      frame_count += 1

      # Write every sample_rate'th frame
      if frame_count % sample_rate == 0:
        # Run YOLOv8 inference on the frame
        results = detection_model(frame)

        if(len(prev_pred) < 1 and len(results) < 1):  # Do nothing until first car object is detected
          continue
        elif len(prev_pred) < 1:  # First detected results
          for result in results:
            boxes = result.boxes
            for box in boxes:
              # Check if detected object is a car object (class 1), if it is, add centroid. naive, but works because only one car is initially detected in this video
              if int(box.cls.item()) == biker_id:
                x, y, w, h = box.xywh[0].tolist()
                z = np.array([x + w/2, y + h/2])
        elif(len(results) < 1):
          # display new predicted position based on old predicted position
          z = prev_pred
        else:
          # set z to position of NN
          z = find_nearest_neighbor(results, biker_id, prev_pred)

        # Visualize the results on the frame
        annotated_frame = results[0].plot()

        if len(z) > 0:
          # Update Kalman Filter and generate prediction
          kf.update(z)
          kf.predict()
          cur_pred = kf.x[:2]

          # Extract coordinates from z and cur_pred
          x_cur, y_cur = map(int, z)        # z coordinates
          x_pred, y_pred = map(int, cur_pred) # predicted position

          # reinitialize filter if predicted path goes off frame
          #if(x_pred > 352 or y_pred > 640 or x_pred < 0 or y_pred < 0):
           # kf = initialize_kalman_filter_vid1()
          if(x_pred > 35200 or y_pred > 64000 or x_pred < 0 or y_pred < 0):
            kf = initialize_kalman_filter_vid1()

          # Draw purple line from z to cur_pred
          annotated_frame = cv2.line(annotated_frame, (x_cur, y_cur), (x_pred, y_pred), (255, 0, 255), 2)

        # Write the annotated frame to the output video
        out.write(annotated_frame)

        prev_pred = cur_pred
    else:
      # Release the video capture object and VideoWriter object
      cap.release()
      out.release()
      break

## Extra Bonus (20 points)

```{eval-rst}
.. youtube:: 2hQx48U1L-Y
```

The cyclist in the video goes in and out of occlusions. In addition the object is small making detections fairly problematic without finetuning and other optimizations.  Fintetuning involves using the pretrained model and training it further using images of cyclists from a training dataset such as [VisDrone](https://github.com/VisDrone/VisDrone-Dataset). At the same time,  reducing the number of classes to a much smaller number such as person & bicycle may help.  Also some 2 stage detectors may need to be further optimized in terms of parameters for small objects. See [this paper](https://www.mdpi.com/1424-8220/23/15/6887) for ideas around small object tracking.


```{note}
The extra points can only be awarded in the category of `assignments` and cannot be used to compensate for any other category such as `exams`.
```

In [None]:
# Download the extra credit video using given video id
ec_video_id = ["2hQx48U1L-Y"]
download_videos(ec_video_id, output_path)
# Save full video path
ec_video_path = "/content/videos/Dji Mavic air 2 drone using litchi app with follow me mode on a bike occluded by trees.mp4"

# Load the YOLOv8 model
model_path = "/content/drive/MyDrive/Colab Notebooks/yolov8m_150_epochs_best.pt"
detection_model = YOLO(model_path)
prev_pred = []
kf = initialize_kalman_filter_vid3()
biker_id = 0

video_path = ec_video_path
output_video_path = "/content/drive/MyDrive/Colab Notebooks/test3_kf_results.mp4"
sample_rate = 1

cap = cv2.VideoCapture(video_path)
# Get video frame dimensions
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, 30, (frame_width, frame_height))

# Variable to count frames
frame_count = 0
prev_pred = []

# Loop through the video frames
while cap.isOpened():
    # Read a frame from the video
    success, frame = cap.read()
    cur_pred = []
    z =[]

    if success:
      # Increment the frame count
      frame_count += 1

      # Write every sample_rate'th frame
      if frame_count % sample_rate == 0:
        # Run YOLOv8 inference on the frame
        results = detection_model(frame)

        if(len(prev_pred) < 1 and len(results) < 1):  # Do nothing until first car object is detected
          continue
        elif len(prev_pred) < 1:  # First detected results
          for result in results:
            boxes = result.boxes
            for box in boxes:
              # Check if detected object is a car object (class 1), if it is, add centroid. naive, but works because only one car is initially detected in this video
              if int(box.cls.item()) == biker_id:
                x, y, w, h = box.xywh[0].tolist()
                z = np.array([x + w/2, y + h/2])
        elif(len(results) < 1):
          # display new predicted position based on old predicted position
          z = prev_pred
        else:
          # set z to position of NN
          z = find_nearest_neighbor(results, biker_id, prev_pred)

        # Visualize the results on the frame
        annotated_frame = results[0].plot()

        if len(z) > 0:
          # Update Kalman Filter and generate prediction
          kf.update(z)
          kf.predict()
          cur_pred = kf.x[:2]

          # Extract coordinates from z and cur_pred
          x_cur, y_cur = map(int, z)        # z coordinates
          x_pred, y_pred = map(int, cur_pred) # predicted position

          # reinitialize filter if predicted path goes off frame
          if(x_pred > 999999 or y_pred > 999999 or x_pred < -999999 or y_pred < -999999):
           kf = initialize_kalman_filter_vid1()

          # Draw purple line from z to cur_pred
          annotated_frame = cv2.line(annotated_frame, (x_cur, y_cur), (x_pred, y_pred), (255, 0, 255), 2)

        # Write the annotated frame to the output video
        out.write(annotated_frame)

        prev_pred = cur_pred
    else:
      # Release the video capture object and VideoWriter object
      cap.release()
      out.release()
      break