<a href="https://colab.research.google.com/github/VishnuRathore98/Machine_Learning/blob/master/Speed_Estimation_and_Vehicle_tracking.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!nvidia-smi

In [None]:
!pip install -q supervision inference tqdm opencv-python numpy collection typing ultralytics

In [None]:
# Importing libraries

# For computations
import numpy as np

# For computer vision tasks
import cv2
from google.colab.patches import cv2_imshow

# For getting the model
import supervision as sv
from supervision.assets import VideoAssets, download_assets
# For displaying video
from IPython.display import Video

# For getting the model for detection
from inference.models.utils import get_roboflow_model

from tqdm import tqdm

# For gpu acceleration
import torch

from ultralytics import YOLO

In [None]:

# Check if cuda is available
torch.cuda.is_available()
# Select cuda as device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Downloading the video
download_assets(VideoAssets.VEHICLES)

In [None]:
# Declaring variables
SOURCE_VIDEO = "vehicles.mp4"
TARGET_VIDEO = "vehicles-result.mp4"
SOURCE = np.array([[1252, 787], [2298, 803], [5039, 2159], [-550, 2159]])
TARGET_WIDTH = 25
TARGET_HEIGHT = 250
TARGET = np.array([
    [0, 0],
    [TARGET_WIDTH-1, 0],
    [TARGET_WIDTH-1, TARGET_HEIGHT-1],
    [0, TARGET_HEIGHT-1]
])

In [None]:
# Playing the video
# display(Video(SOURCE_VIDEO, embed=True))

In [None]:
# For resolving perspective distortion and get the coordinates for objects as per their position in the frame
class ViewTransformer:
  def __init__(self, source, target):
    source = source.astype(np.float32)
    target = target.astype(np.float32)
    self.matrix = cv2.getPerspectiveTransform(source, target)

  def transformed_points(self, points):
    reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
    transformed_points = cv2.perspectiveTransform(reshaped_points, self.matrix)
    return transformed_points.reshape(-1, 2)

In [None]:
from collections import deque
from typing import DefaultDict
# Getting information about the video
video_info = sv.VideoInfo.from_video_path(SOURCE_VIDEO)

# Get the model from roboflow
model = get_roboflow_model("yolov11x-seg-640")

# Using byte track to track individual object to get its id using video frames
byte_track = sv.ByteTrack(frame_rate=video_info.fps)

# Getting bounding box line and text thickness
thickness = sv.calculate_optimal_line_thickness(resolution_wh=video_info.resolution_wh)
text_scale = sv.calculate_optimal_text_scale(resolution_wh=video_info.resolution_wh)

# Bounding boxes
bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=thickness, color_lookup=sv.ColorLookup.TRACK)

# Labelling the bounding box
label_annotator = sv.LabelAnnotator(text_scale=text_scale, text_thickness=thickness, text_position=sv.Position.BOTTOM_CENTER, color_lookup=sv.ColorLookup.TRACK)

# For tracing moving vehicle's track
trace_annotator = sv.TraceAnnotator(thickness=thickness, trace_length=video_info.fps*2, position=sv.Position.BOTTOM_CENTER, color_lookup=sv.ColorLookup.TRACK)

# Plotting the polygon box to limit the detection boundary, and calculating speed
polygon_zone = sv.PolygonZone(SOURCE, frame_resolution_wh=video_info.resolution_wh)



# Calling the ViewTransformer
view_transformer = ViewTransformer(source=SOURCE, target=TARGET)

# Getting video frames
frame_generator = sv.get_video_frames_generator(SOURCE_VIDEO)

# Getting the coordinate history
coordinates = DefaultDict(lambda: deque(maxlen=video_info.fps))

In [None]:

# # ----------------------------------------------------------------------------------------------------------------------
# # Annotating a single frame
# frame = iter(frame_generator)
# frame = next(frame)

# result = model.infer(frame)[0]
# detections = sv.Detections.from_inference(result)

# # Detecting only inside polygon zone
# detections = detections[polygon_zone.trigger(detections)]

# # Labelling objects with id's
# detections = byte_track.update_with_detections(detections=detections)

# #
# points = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
# points = view_transformer.transformed_points(points).astype(int)

# # Labels list
# labels = []

# for tracker_id, [_, y] in zip(detections.tracker_id, points):
#   coordinates[tracker_id].append(y)
#   if len(coordinates[tracker_id]) < video_info.fps/2:
#     labels.append(f"#{tracker_id}")
#   else:
#     coordinate_start = coordinates[tracker_id][-1]
#     coordinate_end = coordinates[tracker_id][0]
#     distance = abs(coordinate_start - coordinate_end)
#     time = len(coordinates[tracker_id]) / video_info.fps
#     speed = distance / time * 3.6
#     labels.append(f"#{tracker_id} {int(speed)}km/h")

# # Annotating the frame

# annotated_frame = frame.copy()

# # Drawing the polygon onto the frame
# annotated_frame = sv.draw_polygon(scene=annotated_frame, polygon=SOURCE, color=sv.Color.RED)

# annotated_frame = bounding_box_annotator.annotate(scene=annotated_frame, detections=detections)
# annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=detections, labels=labels)

# sv.plot_image(annotated_frame)
# # ---------------------------------------------------------------------------------------------------------------------

In [None]:

# open target video
with sv.VideoSink(TARGET_VIDEO, video_info) as sink:
    # Looping over frames to annotate vehicles frame by frame
    # loop over source video frame
    for frame in tqdm(frame_generator, total=video_info.total_frames):


      result = model.infer(frame)[0]

      detections = sv.Detections.from_inference(result)

      # Detecting only inside polygon zone
      detections = detections[polygon_zone.trigger(detections)]

      # Labelling objects with id's
      detections = byte_track.update_with_detections(detections=detections)

      #
      points = detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
      points = view_transformer.transformed_points(points).astype(int)

      # Labels list
      labels = []

      for tracker_id, [_, y] in zip(detections.tracker_id, points):
        coordinates[tracker_id].append(y)
        if len(coordinates[tracker_id]) < video_info.fps/2:
          labels.append(f"#{tracker_id}")
        else:
          coordinate_start = coordinates[tracker_id][-1]
          coordinate_end = coordinates[tracker_id][0]
          distance = abs(coordinate_start - coordinate_end)
          time = len(coordinates[tracker_id]) / video_info.fps
          speed = distance / time * 3.6
          labels.append(f"#{tracker_id} {int(speed)}km/h")

      # Annotating the frame

      annotated_frame = frame.copy()

      # # Drawing the polygon onto the frame
      # annotated_frame = sv.draw_polygon(scene=annotated_frame, polygon=SOURCE, color=sv.Color.RED)

      annotated_frame = trace_annotator.annotate(scene=annotated_frame, detections=detections)

      annotated_frame = bounding_box_annotator.annotate(scene=annotated_frame, detections=detections)
      annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=detections, labels=labels)

      # Writing the annotated frame to the target video
      sink.write_frame(annotated_frame)
