In [None]:
# This code cell was written for Google Colab
'''!nvidia-smi'''

Fri Jul 18 09:04:33 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 550.54.15              Driver Version: 550.54.15      CUDA Version: 12.4     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  Tesla T4                       Off |   00000000:00:04.0 Off |                    0 |
| N/A   47C    P8             10W /   70W |       0MiB /  15360MiB |      0%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [None]:
# This code cell was written for Google Colab
'''!pip install ultralytics
!pip install supervision[assets]
!yolo settings sync=False

from IPython import display
display.clear_output()'''

In [None]:
import torch
from ultralytics import YOLO
from collections import defaultdict
from typing import Tuple, List, Dict, Set
import supervision as sv
import numpy as np
import time
import os

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = YOLO("yolov8s.pt").to(device)


vehicle_ids = [1, 2, 3, 4, 5, 6, 7, 8]
vehicle_classes = ['bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat']
id_to_label = dict(zip(vehicle_ids, vehicle_classes))

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8s.pt to 'yolov8s.pt'...


100%|██████████| 21.5M/21.5M [00:00<00:00, 424MB/s]


In [None]:
def crop_detections_to_roi(a, b):
  pass
def draw_roi_on_frame(a, b):
  pass


In [None]:
def analyze_vehicle_flow(model, vehicle_ids,
                         input_video_path: str, sample_frames: int = 200) -> Tuple[sv.Point, sv.Point, str]:
    """
    Analyze vehicle movement pattern to determine optimal counting line placement.

    Args:
        model: YOLO model instance for vehicle detection
        vehicle_ids: List of class IDs for vehicle types to detect
        input_video_path (str): Path to input video file
        sample_frames (int): Number of frames to analyze. Defaults to 100.

    Returns:
        Tuple[sv.Point, sv.Point, str]: Start point, end point, and orientation
        ("horizontal" or "vertical") of the optimal counting line.
    """
    generator = sv.get_video_frames_generator(input_video_path)

    frame = next(iter(generator))
    height, width, _ = frame.shape

    # Tracking for traffic flow analysis
    byte_tracker = sv.ByteTrack(
        track_activation_threshold=0.25,
        lost_track_buffer=30,
        minimum_matching_threshold=0.8,
        frame_rate=30,
        minimum_consecutive_frames=3
    )

    trajectories = defaultdict(list)
    frame_count = 0

    for frame in generator:
        if frame_count >= sample_frames:
            break

        results = model(frame, verbose=False)[0]
        detections = sv.Detections.from_ultralytics(results)
        detections = detections[np.isin(detections.class_id, vehicle_ids)]

        detections = byte_tracker.update_with_detections(detections)

        # Use centroids to track -> less affected by minor variations than bbox
        for i, tracker_id in enumerate(detections.tracker_id):
            if tracker_id is not None:
                bbox = detections.xyxy[i]
                centroid = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2)
                trajectories[tracker_id].append(centroid)

        frame_count += 1

    horizontal_movements = []
    vertical_movements = []

    for track_id, points in trajectories.items():
        if len(points) >= 5:  # Minimum points for reliable direction (Gave it 5 hard coded)
            start_point = points[0]
            end_point = points[-1]

            dx = end_point[0] - start_point[0]  # horizontal displacement
            dy = end_point[1] - start_point[1]  # vertical displacement

            horizontal_movements.append(abs(dx))
            vertical_movements.append(abs(dy))

    # If the traffic flow cannot be found in given frames (200), then our default is a horizontal line in the center
    # (This decision was made by considering the test video "highway.ts")
    if not horizontal_movements and not vertical_movements:
        return (sv.Point(0, height // 2), sv.Point(width, height // 2), "horizontal")   # center horizontal line

    avg_horizontal = np.mean(horizontal_movements) if horizontal_movements else 0
    avg_vertical = np.mean(vertical_movements) if vertical_movements else 0

    if avg_horizontal > avg_vertical:   # Dominant horizontal movement -> vertical counting line
        x_pos = int(width * 0.5)
        return (sv.Point(x_pos, 0), sv.Point(x_pos, height), "vertical")
    else:
        y_pos = int(height * 0.5)   # Dominant vertical movement -> horizontal counting line
        return (sv.Point(0, y_pos), sv.Point(width, y_pos), "horizontal")

def create_multiple_counting_lines(width: int, height: int, direction: str, num_lines: int = 3) -> List[sv.LineZone]:
    """
    Create multiple counting lines that divide the video into equal segments.

    Args:
        width (int): Video frame width in pixels
        height (int): Video frame height in pixels
        direction (str): Line orientation, either "horizontal" or "vertical"
        num_lines (int): Number of counting lines to create. Defaults to 3.

    Returns:
        List[sv.LineZone]: List of LineZone objects representing the counting lines.
        Creates (num_lines + 1) equal segments across the frame.
    """
    lines = []
    # For n lines -> (n+1) equal segments
    # Single line -> at center
    if num_lines == 1:

        if direction == "horizontal":
            y_pos = int(height * 0.5)
            lines.append(sv.LineZone(start=sv.Point(0, y_pos), end=sv.Point(width, y_pos)))
        else:
            x_pos = int(width * 0.5)
            lines.append(sv.LineZone(start=sv.Point(x_pos, 0), end=sv.Point(x_pos, height)))

    # num_lines lines that equally far away from each other
    else:

        if direction == "horizontal":
            segment_height = height / (num_lines + 1)
            for i in range(num_lines):
                y_pos = int(segment_height * (i + 1))
                lines.append(sv.LineZone(start=sv.Point(0, y_pos), end=sv.Point(width, y_pos)))
        else:
            segment_width = width / (num_lines + 1)
            for i in range(num_lines):
                x_pos = int(segment_width * (i + 1))
                lines.append(sv.LineZone(start=sv.Point(x_pos, 0), end=sv.Point(x_pos, height)))

    return lines

class LineCrossingTracker:
    """
    Tracker class that identifies which specific vehicles cross which lines.
    """
    def __init__(self, num_lines: int):
        self.num_lines = num_lines
        self.vehicle_line_positions = {}  # vehicle positions relative to each line -> {tracker_id: {line_id: "side"}}
        self.crossed_vehicles = defaultdict(set)  # which vehicles have crossed which lines -> {tracker_id: {(line_id, direction)}}
        self.line_crossings = {}  # {line_id: {"in": [(tracker_id, class_name)], "out": [...]}}

        for line_id in range(num_lines):
            self.line_crossings[line_id] = {"in": [], "out": []}

    def get_vehicle_side_of_line(self, centroid: Tuple[float, float], line_zone: sv.LineZone, direction: str) -> str:
        """
        Determine which side of the line a vehicle is on.

        Args:
            centroid (Tuple[float, float]): Vehicle center coordinates (x, y)
            line_zone (sv.LineZone): Line zone object to check against
            direction (str): Line orientation ("horizontal" or "vertical")

        Returns:
            str: Side identifier ("top"/"bottom" for horizontal, "left"/"right" for vertical)
        """
        x, y = centroid

        if direction == "horizontal":
            # For horizontal lines, compare y coordinates
            line_y = line_zone.vector.start.y
            return "top" if y < line_y else "bottom"
        else:
            # For vertical lines, compare x coordinates
            line_x = line_zone.vector.start.x
            return "left" if x < line_x else "right"

    def update_vehicle_positions(self, detections: sv.Detections, counting_lines: List[sv.LineZone], direction: str, class_names: dict) -> List[Tuple[int, int, str, str]]:
        """
        Update vehicle positions and detect line crossings.

        Args:
            detections (sv.Detections): Current frame detections with tracking IDs
            counting_lines (List[sv.LineZone]): List of counting line zones
            direction (str): Line orientation ("horizontal" or "vertical")
            class_names (dict): Mapping of class IDs to class names

        Returns:
            List[Tuple[int, int, str, str]]: List of (tracker_id, line_id, direction, class_name)
            for newly crossed vehicles
        """
        newly_crossed = []

        for i, tracker_id in enumerate(detections.tracker_id):
            if tracker_id is None:
                continue

            class_id = detections.class_id[i]
            class_name = class_names[class_id]
            bbox = detections.xyxy[i]
            centroid = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2)

            if tracker_id not in self.vehicle_line_positions:
                self.vehicle_line_positions[tracker_id] = {}

            # position relative to each line
            for line_id, line_zone in enumerate(counting_lines):
                current_side = self.get_vehicle_side_of_line(centroid, line_zone, direction)

                if line_id in self.vehicle_line_positions[tracker_id]:
                    previous_side = self.vehicle_line_positions[tracker_id][line_id]    # previous position for this vehicle on this line

                    if previous_side != current_side:   # Vehicle crossed the line
                        if direction == "horizontal":
                            crossing_direction = "in" if (previous_side == "bottom" and current_side == "top") else "out"
                        else:
                            crossing_direction = "in" if (previous_side == "left" and current_side == "right") else "out"

                        crossing_key = (line_id, crossing_direction)
                        # Save the crossing if not recorded -> prevent duplication
                        if crossing_key not in self.crossed_vehicles[tracker_id]:
                            self.crossed_vehicles[tracker_id].add(crossing_key)
                            self.line_crossings[line_id][crossing_direction].append((tracker_id, class_name))
                            newly_crossed.append((tracker_id, line_id, crossing_direction, class_name))

                self.vehicle_line_positions[tracker_id][line_id] = current_side

        return newly_crossed

    def get_unique_vehicle_count(self) -> int:
        """
        Get count of unique vehicles that have crossed any line.

        Returns:
            int: Number of unique vehicles that have crossed at least one line
        """
        return len(self.crossed_vehicles)

    def get_line_stats(self, line_id: int) -> Dict:
        """
        Get statistics for a specific line.

        Args:
            line_id (int): ID of the line to get stats for

        Returns:
            Dict: Statistics containing in_count, out_count, in_vehicles, out_vehicles
        """
        if line_id in self.line_crossings:
            return {
                "in_count": len(self.line_crossings[line_id]["in"]),
                "out_count": len(self.line_crossings[line_id]["out"]),
                "in_vehicles": self.line_crossings[line_id]["in"],
                "out_vehicles": self.line_crossings[line_id]["out"]
            }
        return {"in_count": 0, "out_count": 0, "in_vehicles": [], "out_vehicles": []}

def count_vehicles_multiline(model, vehicle_ids,
                             input_video_path, output_video_path, stats_output_path,
                             roi_coords, frame_skip=0,
                             num_lines=3) -> int:
    """
    Count vehicles crossing multiple counting lines with tracking.

    Args:
        model: YOLO model instance for vehicle detection
        vehicle_ids: List of class IDs for vehicle types to detect
        input_video_path (str): Path to input video file
        output_video_path (str): Path to save annotated output video
        stats_output_path (str): Path to save counting statistics
        num_lines (int): Number of counting lines to create. Defaults to 3.

    Returns:
        int: Total number of unique vehicles that crossed any line
    """

    output_dir = os.path.dirname(output_video_path)
    if output_dir and not os.path.exists(output_dir):
        os.makedirs(output_dir)

    stats_dir = os.path.dirname(stats_output_path)
    if stats_dir and not os.path.exists(stats_dir):
        os.makedirs(stats_dir)

    frame_times = []

    generator = sv.get_video_frames_generator(input_video_path)

    frame = next(iter(generator))
    height, width, _ = frame.shape

    # Adaptive sizing based on video dimensions for aesthetic appearance
    base_size = min(width, height)
    line_thickness = max(2, int(base_size / 400))
    line_text_thickness = max(1, int(base_size / 600))
    line_text_scale = max(0.5, base_size / 1200)
    box_thickness = max(1, int(base_size / 500))
    label_text_thickness = max(1, int(base_size / 800))
    label_text_scale = max(0.4, base_size / 1500)

    # vehicle flow patterns
    line_start, line_end, direction = analyze_vehicle_flow(model=model, vehicle_ids=vehicle_ids, input_video_path=input_video_path)

    if roi_coords:
        x_min, y_min, x_max, y_max = roi_coords
        roi_width = x_max - x_min
        roi_height = y_max - y_min
        print(f"Using ROI: ({x_min}, {y_min}) to ({x_max}, {y_max})")

        # Create counting lines within ROI
        counting_lines = create_multiple_counting_lines(roi_width, roi_height, direction, num_lines)
        # Adjust line coordinates to ROI offset
        for line in counting_lines:
            line.vector.start.x += x_min
            line.vector.start.y += y_min
            line.vector.end.x += x_min
            line.vector.end.y += y_min
    else:
        roi_width, roi_height = width, height
        counting_lines = create_multiple_counting_lines(width, height, direction, num_lines)
        print("Processing entire frame")

    crossing_tracker = LineCrossingTracker(num_lines)

    # Annotators for each line
    line_annotators = []
    for i, line in enumerate(counting_lines):
        if direction == "horizontal":
            custom_in = f"Line {i+1}: Bottom to Top"
            custom_out = f"Line {i+1}: Top to Bottom"
        else:
            custom_in = f"Line {i+1}: Left to Right"
            custom_out = f"Line {i+1}: Right to Left"

        annotator = sv.LineZoneAnnotator(
            thickness=line_thickness,
            text_thickness=line_text_thickness,
            text_scale=line_text_scale,
            custom_in_text=custom_in,
            custom_out_text=custom_out
        )
        line_annotators.append(annotator)

    # Tracker
    byte_tracker = sv.ByteTrack(
        track_activation_threshold=0.25,
        lost_track_buffer=30,
        minimum_matching_threshold=0.8,
        frame_rate=30,
        minimum_consecutive_frames=3
    )

    # Reset generator for main processing
    generator = sv.get_video_frames_generator(input_video_path)

    # Annotators with adaptive sizing
    box_annotator = sv.BoxAnnotator(thickness=box_thickness)
    label_annotator = sv.LabelAnnotator(
        text_thickness=label_text_thickness,
        text_scale=label_text_scale,
        text_color=sv.Color.BLACK
    )

    frame_skip = frame_skip # If other than 0, then process the frames by skipping
    def callback(frame: np.ndarray, index: int) -> np.ndarray:
        """
        Process each video frame for vehicle detection and counting.

        Args:
            frame (np.ndarray): Current video frame
            index (int): Frame index in the video sequence

        Returns:
            np.ndarray: Annotated frame with detections, tracking, and counting lines
        """

        if index % (frame_skip+1) != 0:
            return frame
        start_time = time.time()

        # model prediction
        results = model(frame, verbose=False)[0]
        detections = sv.Detections.from_ultralytics(results)
        detections = detections[np.isin(detections.class_id, vehicle_ids)]

        if roi_coords:
            detections = crop_detections_to_roi(detections, roi_coords)

        detections = byte_tracker.update_with_detections(detections)
        crossing_tracker.update_vehicle_positions(
            detections, counting_lines, direction, model.model.names
        )
        labels = [
            f"#{model.model.names[class_id].capitalize()} {tracker_id}" # e.g #Car 5
            for class_id, tracker_id in zip(detections.class_id, detections.tracker_id)
        ]

        # Annotate frame
        annotated_frame = frame.copy()
        if roi_coords:
            annotated_frame = draw_roi_on_frame(annotated_frame, roi_coords)
        annotated_frame = box_annotator.annotate(scene=annotated_frame, detections=detections)
        annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=detections, labels=labels)

        # Update line counters for visualization (this is separate from tracking)
        for line in counting_lines:
            line.trigger(detections)
        for line, annotator in zip(counting_lines, line_annotators):
            annotated_frame = annotator.annotate(annotated_frame, line_counter=line)

        end_time = time.time()
        frame_times.append((end_time - start_time) * 1000)  # will be used to calculate average frame processing time

        return annotated_frame

    # Process video
    sv.process_video(
        source_path=input_video_path,
        target_path=output_video_path,
        callback=callback
    )

    # CALCULATE STATS
    unique_vehicle_count = crossing_tracker.get_unique_vehicle_count()

    # Calculate totals from tracker (not the line counters)
    total_in = sum(crossing_tracker.get_line_stats(i)["in_count"] for i in range(num_lines))
    total_out = sum(crossing_tracker.get_line_stats(i)["out_count"] for i in range(num_lines))

    avg_frame_time = sum(frame_times) / len(frame_times) if frame_times else 0

    total_frames = len(frame_times) * (frame_skip + 1)
    total_processing_time = sum(frame_times)
    avg_effective_time_per_frame = total_processing_time / total_frames if total_frames > 0 else 0

    with open(stats_output_path, "w") as f:
        if roi_coords:
            f.write(f"ROI coordinates: ({roi_coords[0]}, {roi_coords[1]}) to ({roi_coords[2]}, {roi_coords[3]})\n")
        f.write(f"Number of counting lines: {num_lines}\n")
        f.write(f"Total unique vehicles that crossed lines: {unique_vehicle_count}\n")
        f.write(f"Total crossings: IN={total_in}, OUT={total_out}\n")
        f.write(f"Average frame processing time: {avg_frame_time:.2f} ms\n")
        f.write(f"Number of frames skipped: {frame_skip}\n")
        f.write(f"Average effective time per video frame: {avg_effective_time_per_frame:.2f} ms\n")


        for i in range(num_lines):
            line_stats = crossing_tracker.get_line_stats(i)
            f.write(f"\nLine {i+1}:\n")
            f.write(f"  Total: IN={line_stats['in_count']}, OUT={line_stats['out_count']}\n")

            if line_stats['in_vehicles']:
                f.write(f"  Vehicles going IN: ")
                vehicle_list = [f"{class_name} #{tracker_id}" for tracker_id, class_name in line_stats['in_vehicles']]
                f.write(", ".join(vehicle_list) + "\n")

            if line_stats['out_vehicles']:
                f.write(f"  Vehicles going OUT: ")
                vehicle_list = [f"{class_name} #{tracker_id}" for tracker_id, class_name in line_stats['out_vehicles']]
                f.write(", ".join(vehicle_list) + "\n")

    return unique_vehicle_count

In [None]:
count_vehicles_multiline(
        model=model, vehicle_ids=vehicle_ids,
        input_video_path="/content/highway.ts",
        output_video_path="/content/result_highway_skip0_3lines_gpu.mp4",
        stats_output_path="/content/output_highway_skip0_3lines_gpu.txt",
        num_lines=3,
        frame_skip=0,
        roi_coords = None
    )

Processing entire frame


33

In [None]:
count_vehicles_multiline(
        model=model, vehicle_ids=vehicle_ids,
        input_video_path="/content/highway.ts",
        output_video_path="/content/result_highway_skip2_3lines_gpu.mp4",
        stats_output_path="/content/output_highway_skip2_3lines_gpu.txt",
        num_lines=3,
        frame_skip=2,
        roi_coords = None
    )

Processing entire frame


33