In [2]:
pip install ultralytics supervision

Collecting ultralytics
  Downloading ultralytics-8.3.15-py3-none-any.whl.metadata (34 kB)
Collecting supervision
  Downloading supervision-0.24.0-py3-none-any.whl.metadata (14 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.9-py3-none-any.whl.metadata (9.3 kB)
Downloading ultralytics-8.3.15-py3-none-any.whl (870 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m870.5/870.5 kB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading supervision-0.24.0-py3-none-any.whl (158 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m158.2/158.2 kB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.9-py3-none-any.whl (26 kB)
Installing collected packages: ultralytics-thop, supervision, ultralytics
Successfully installed supervision-0.24.0 ultralytics-8.3.15 ultralytics-thop-2.0.9


In [3]:
import supervision as sv
from supervision.assets import VideoAssets, download_assets
from ultralytics import YOLO
import cv2
from google.colab.patches import cv2_imshow
import matplotlib.pyplot as plt
import numpy as np
from collections import defaultdict

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [4]:
download_assets(VideoAssets.VEHICLES)

Downloading vehicles.mp4 assets 



  0%|          | 0/35345757 [00:00<?, ?it/s]

'vehicles.mp4'

In [101]:
# Initialize frame generator for the video file

frame_generator = sv.get_video_frames_generator("vehicles.mp4")
# frame = next(iter(frame_generator))

In [None]:
# cv2.imwrite("image.jpg",frame)

In [102]:
# Define source and target points for perspective transformation
# Source represents the four points in the original video frame

source = np.array([
    [1254, 784],
    [2297, 806],
    [3350, 1299],
    [563, 1303]
])
# Target represents the four points after transformation

target = np.array([
    [0, 0],
    [24, 0],
    [24, 199],
    [0, 199]
])

start = sv.Point(3350, 1299)
end = sv.Point(563, 1303)


In [103]:
class PerspectiveTransformer():
  """
    This class is responsible for performing perspective transformation
    on a set of points using the given source and target coordinates.
    """
  def __init__ (self, source, target):
        # Convert source and target points to float32
        source = source.astype(np.float32)
        target = target.astype(np.float32)
        # Calculate perspective transformation matrix
        self.matrix = cv2.getPerspectiveTransform(source, target)

  def transform(self, points):
    """
        Transforms the input points using the perspective transformation matrix.
        :param points: array of points to transform
        :return: transformed points
        """
    if points.size == 0:
      return points

    else:
      points = points.reshape(-1,1,2).astype(np.float32)
      points = cv2.perspectiveTransform(points, self.matrix)
      return points.reshape(-1,2)

In [106]:
# Retrieve video information
video_inf = sv.VideoInfo.from_video_path('/content/vehicles.mp4')

# Calculate optimal thickness for annotations based on video resolution
thickness = sv.calculate_optimal_line_thickness(video_inf.resolution_wh)

# Calculate optimal text scale based on video resolution
text_scale = sv.calculate_optimal_text_scale(video_inf.resolution_wh)

# Initialize annotators for bounding boxes, labels, and trace
bb_ann = sv.BoxAnnotator(thickness=thickness)
lab_ann = sv.LabelAnnotator(text_scale=text_scale,
                            text_thickness=thickness,
                            text_position=sv.Position.BOTTOM_CENTER )
tace_ann = sv.TraceAnnotator(thickness=thickness,
                             trace_length=video_inf.fps ,
                             position = sv.Position.BOTTOM_CENTER)

line_ann = sv.LineZoneAnnotator(thickness=thickness,text_thickness = thickness, text_scale = text_scale)

# Define a polygon zone from the source points
polygon_zone = sv.PolygonZone(polygon=source)
# zone_ann = sv.PolygonZoneAnnotator(thickness=thickness,zone=polygon_zone,color= sv.Color.RED)

line = sv.LineZone(start,end, triggering_anchors=[sv.Position.BOTTOM_CENTER])



In [105]:
model = YOLO('yolov8n.pt')

# Initialize ByteTrack tracker with frame rate and tracking threshold
tracker = sv.ByteTrack(frame_rate= video_inf.fps, track_activation_threshold= 0.3
                       )
# Initialize perspective transformer
pers_trans = PerspectiveTransformer(source, target)

# Dictionary to store y-coordinates of tracked objects
co_or = defaultdict(list)

In [107]:
# Process video frames and save the annotated output
with sv.VideoSink("out.mp4", video_inf) as sink:
    for frame in frame_generator:
        # Run YOLO model on the current frame
        result = model(frame, verbose=False)[0]

        # Extract detections from the model result
        detections = sv.Detections.from_ultralytics(result)

        # Filter detections based on confidence threshold
        detections = detections[detections.confidence > 0.3]

        # Keep only detections inside the polygon zone
        # detections = detections[polygon_zone.trigger(detections)]

        # Apply Non-Maximum Suppression (NMS) to remove redundant detections
        detections = detections.with_nms(0.3)

        # Update tracker with the filtered detections
        detections = tracker.update_with_detections(detections)

        # Get the anchor points of detected objects (bottom-center of bounding boxes)
        points = detections.get_anchors_coordinates(sv.Position.BOTTOM_CENTER)

        # Apply perspective transformation to the points
        points = pers_trans.transform(points)

        # Update y-coordinates for each tracked object
        for id, point in zip(detections.tracker_id, points):
            x, y = point.astype(int)
            co_or[id].append(y)

        # Prepare labels for each object
        labels = []
        for id in detections.tracker_id:
            if len(co_or[id]) < (video_inf.fps / 2):  # Not enough frames to calculate speed
                labels.append(f"{id}")
            else:
                start = co_or[id][0]
                end = co_or[id][-1]
                dist = abs(start - end)

                # Calculate speed in km/h
                time = len(co_or[id]) / video_inf.fps
                speed = dist / time * 3.6  # Convert to km/h
                labels.append(f"{id}={int(speed)} km/h")

        # Annotate the frame with bounding boxes, labels, and traces
        frame = sv.draw_polygon(frame.copy(), source, color=sv.Color.RED,thickness=2)
        annotated_frame = bb_ann.annotate(frame, detections=detections)
        annotated_frame = lab_ann.annotate(annotated_frame, detections=detections, labels=labels)
        annotated_frame = tace_ann.annotate(annotated_frame, detections=detections)
        line.trigger(detections=detections)

        annotated_frame =  line_ann.annotate(annotated_frame,line_counter=line)
        # print(f"In Count: {line_zone.in_count}, Out Count: {line_zone.out_count}")



        # Write the annotated frame to the output video
        sink.write_frame(annotated_frame)


In [108]:
from google.colab import files
files.download('/content/out.mp4')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>