In [None]:
# Install required packages
!pip install ultralytics opencv-python supervision

# Import libraries
import cv2
import numpy as np
from ultralytics import YOLO
import supervision as sv
import os

# Load the pretrained YOLOv11 model
model = YOLO("yolo11m.pt")  # Medium model for better accuracy

# Define vehicle classes based on COCO dataset
VEHICLE_CLASSES = {
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}
VEHICLE_CLASS_IDS = list(VEHICLE_CLASSES.keys())

# Set up video input and output paths
input_video_path = "/content/test2.mp4"
output_video_path = "/content/output_video.mp4"

# Initialize video capture
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
    print(f"Error: Could not open video at {input_video_path}. Please check the file path and ensure the video exists.")
    exit()

# Get video properties
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"Video properties: Width={frame_width}, Height={frame_height}, FPS={fps}, Total Frames={total_frames}")

# Initialize video writer for output
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))
if not out.isOpened():
    print("Error: Could not initialize VideoWriter. Check codec and output path.")
    cap.release()
    exit()

# Initialize box and label annotators
box_annotator = sv.BoxAnnotator(thickness=2)
label_annotator = sv.LabelAnnotator(text_thickness=2, text_scale=0.5)

# Define the counting lines
LINE_Y_POSITION_YELLOW = int(frame_height * 0.6)  # Yellow line at 60% of frame height
LINE_Y_POSITION_BLUE = LINE_Y_POSITION_YELLOW + 100  # Blue line 100px below yellow line
yellow_line = [(0, LINE_Y_POSITION_YELLOW), (frame_width, LINE_Y_POSITION_YELLOW)]
blue_line = [(0, LINE_Y_POSITION_BLUE), (frame_width, LINE_Y_POSITION_BLUE)]

# Define vertical division
MID_X = frame_width // 2  # Middle of the frame for left/right division

# Initialize vehicle counters
incoming_counts = {cls_name: 0 for cls_name in VEHICLE_CLASSES.values()}  # Left half (yellow -> blue)
outgoing_counts = {cls_name: 0 for cls_name in VEHICLE_CLASSES.values()}  # Right half (blue -> yellow)

# Initialize tracker
tracked_vehicles = {}
vehicle_id_counter = 0
counted_vehicle_ids_incoming = set()  # Track IDs counted for incoming
counted_vehicle_ids_outgoing = set()  # Track IDs counted for outgoing

# Process video frame by frame
frame_idx = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        print(f"Reached end of video or failed to read frame at index {frame_idx}.")
        break

    # Perform inference
    results = model(frame, conf=0.5)[0]  # Confidence threshold

    # Convert results to supervision format
    detections = sv.Detections.from_ultralytics(results)

    # Filter for vehicle classes
    vehicle_mask = np.isin(detections.class_id, VEHICLE_CLASS_IDS)
    detections = detections[vehicle_mask]

    # Draw counting lines
    cv2.line(frame, yellow_line[0], yellow_line[1], (0, 255, 255), 2)  # Yellow line
    cv2.line(frame, blue_line[0], blue_line[1], (255, 0, 0), 2)  # Blue line

    # Process detections
    labels = []
    current_vehicles = {}

    for xyxy, confidence, class_id in zip(detections.xyxy, detections.confidence, detections.class_id):
        class_name = VEHICLE_CLASSES[class_id]
        x1, y1, x2, y2 = map(int, xyxy)

        # Calculate center of bounding box
        center_x = (x1 + x2) // 2
        center_y = (y1 + y2) // 2

        # Draw red dot at center
        cv2.circle(frame, (center_x, center_y), 5, (0, 0, 255), -1)

        # Determine direction based on center_x
        direction = "incoming" if center_x < MID_X else "outgoing"

        # Simple tracking: assign ID based on proximity to previous positions
        vehicle_id = None
        min_distance = float('inf')
        for vid, (prev_x, prev_y, _, _, _, _) in tracked_vehicles.items():
            distance = np.sqrt((center_x - prev_x)**2 + (center_y - prev_y)**2)
            if distance < min_distance and distance < 100:  # Threshold for matching
                min_distance = distance
                vehicle_id = vid

        if vehicle_id is None:
            vehicle_id = vehicle_id_counter
            vehicle_id_counter += 1

        # Initialize new vehicle
        if vehicle_id not in tracked_vehicles:
            current_vehicles[vehicle_id] = (center_x, center_y, class_name, False, False, direction)
            label_direction = direction  # Use current direction for new vehicles
        else:
            # Retain previous direction to avoid direction flipping
            prev_x, prev_y, prev_class, crossed_yellow, crossed_blue, prev_direction = tracked_vehicles[vehicle_id]
            current_vehicles[vehicle_id] = (center_x, center_y, class_name, crossed_yellow, crossed_blue, prev_direction)
            label_direction = prev_direction  # Use previous direction for tracked vehicles

        # Check line crossings
        if vehicle_id in tracked_vehicles:
            prev_x, prev_y, prev_class, crossed_yellow, crossed_blue, prev_direction = tracked_vehicles[vehicle_id]

            # Incoming (Left half, Yellow -> Blue)
            if prev_direction == "incoming":
                # Yellow line crossing
                if prev_y <= LINE_Y_POSITION_YELLOW < center_y and not crossed_yellow:
                    current_vehicles[vehicle_id] = (center_x, center_y, class_name, True, crossed_blue, prev_direction)

                # Blue line crossing after yellow
                if prev_y <= LINE_Y_POSITION_BLUE < center_y and crossed_yellow and not crossed_blue:
                    if vehicle_id not in counted_vehicle_ids_incoming:
                        incoming_counts[class_name] += 1
                        counted_vehicle_ids_incoming.add(vehicle_id)
                    current_vehicles[vehicle_id] = (center_x, center_y, class_name, crossed_yellow, True, prev_direction)

            # Outgoing (Right half, Blue -> Yellow)
            elif prev_direction == "outgoing":
                # Blue line crossing (from below to above)
                if center_y <= LINE_Y_POSITION_BLUE < prev_y and not crossed_blue:
                    current_vehicles[vehicle_id] = (center_x, center_y, class_name, crossed_yellow, True, prev_direction)

                # Yellow line crossing after blue
                if center_y <= LINE_Y_POSITION_YELLOW < prev_y and crossed_blue and not crossed_yellow:
                    if vehicle_id not in counted_vehicle_ids_outgoing:
                        outgoing_counts[class_name] += 1
                        counted_vehicle_ids_outgoing.add(vehicle_id)
                    current_vehicles[vehicle_id] = (center_x, center_y, class_name, True, crossed_blue, prev_direction)

        labels.append(f"{class_name} {confidence:.2f} ID:{vehicle_id} {label_direction}")

    # Update tracked vehicles
    tracked_vehicles = current_vehicles

    # Annotate frame with bounding boxes and labels
    annotated_frame = box_annotator.annotate(scene=frame.copy(), detections=detections)
    annotated_frame = label_annotator.annotate(
        scene=annotated_frame, detections=detections, labels=labels
    )

    # Display incoming counts on top left, vertically
    text_x_left = 10
    text_y_start = 30
    line_spacing = 10  # Gap between lines
    max_width_left = 0

    # Draw "Incoming" heading
    heading_text = "Incoming"
    (heading_width, heading_height), heading_baseline = cv2.getTextSize(
        heading_text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2
    )
    cv2.rectangle(
        annotated_frame,
        (text_x_left, text_y_start - heading_height - heading_baseline),
        (text_x_left + heading_width, text_y_start + heading_baseline),
        (0, 0, 0),
        -1
    )
    cv2.putText(
        annotated_frame,
        heading_text,
        (text_x_left, text_y_start),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.8,
        (255, 255, 255),
        2
    )
    text_y_start += heading_height + heading_baseline + line_spacing

    # Calculate maximum width for left background rectangles
    for cls in incoming_counts:
        text = f"{cls}: {incoming_counts[cls]}"
        (text_width, _), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
        max_width_left = max(max_width_left, text_width)

    # Draw incoming counts (left)
    for i, cls in enumerate(incoming_counts):
        text = f"{cls}: {incoming_counts[cls]}"
        (text_width, text_height), baseline = cv2.getTextSize(
            text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2
        )
        text_y = text_y_start + i * (text_height + baseline + line_spacing)
        bg_top_left = (text_x_left, text_y - text_height - baseline)
        bg_bottom_right = (text_x_left + max_width_left, text_y + baseline)

        # Draw black background
        cv2.rectangle(
            annotated_frame,
            bg_top_left,
            bg_bottom_right,
            (0, 0, 0),
            -1
        )

        # Draw white text
        cv2.putText(
            annotated_frame,
            text,
            (text_x_left, text_y),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.8,
            (255, 255, 255),
            2
        )

    # Display outgoing counts on top right, vertically
    text_x_right = frame_width - max_width_left - 10  # Align right with padding
    text_y_start = 30
    max_width_right = 0

    # Draw "Outgoing" heading
    heading_text = "Outgoing"
    (heading_width, heading_height), heading_baseline = cv2.getTextSize(
        heading_text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2
    )
    cv2.rectangle(
        annotated_frame,
        (text_x_right, text_y_start - heading_height - heading_baseline),
        (text_x_right + heading_width, text_y_start + heading_baseline),
        (0, 0, 0),
        -1
    )
    cv2.putText(
        annotated_frame,
        heading_text,
        (text_x_right, text_y_start),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.8,
        (255, 255, 255),
        2
    )
    text_y_start += heading_height + heading_baseline + line_spacing

    # Calculate maximum width for right background rectangles
    for cls in outgoing_counts:
        text = f"{cls}: {outgoing_counts[cls]}"
        (text_width, _), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
        max_width_right = max(max_width_right, text_width)

    # Draw outgoing counts (right)
    for i, cls in enumerate(outgoing_counts):
        text = f"{cls}: {outgoing_counts[cls]}"
        (text_width, text_height), baseline = cv2.getTextSize(
            text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2
        )
        text_y = text_y_start + i * (text_height + baseline + line_spacing)
        bg_top_left = (text_x_right, text_y - text_height - baseline)
        bg_bottom_right = (text_x_right + max_width_right, text_y + baseline)

        # Draw black background
        cv2.rectangle(
            annotated_frame,
            bg_top_left,
            bg_bottom_right,
            (0, 0, 0),
            -1
        )

        # Draw white text
        cv2.putText(
            annotated_frame,
            text,
            (text_x_right, text_y),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.8,
            (255, 255, 255),
            2
        )

    # Write frame to output video
    out.write(annotated_frame)

    frame_idx += 1
    print(f"Processed frame {frame_idx}/{total_frames}")

# Release resources
cap.release()
out.release()
print("Video processing completed. Output saved to:", output_video_path)

# Print final counts
print("\nIncoming Vehicle Counts (Left Half, Yellow -> Blue):")
for cls, cnt in incoming_counts.items():
    print(f"{cls}: {cnt}")
print("\nOutgoing Vehicle Counts (Right Half, Blue -> Yellow):")
for cls, cnt in outgoing_counts.items():
    print(f"{cls}: {cnt}")

# Check if output video exists before downloading
if os.path.exists(output_video_path):
    from google.colab import files
    files.download(output_video_path)
else:
    print(f"Error: Output video {output_video_path} not found. Check video processing steps.")

Collecting ultralytics
  Downloading ultralytics-8.3.148-py3-none-any.whl.metadata (37 kB)
Collecting supervision
  Downloading supervision-0.25.1-py3-none-any.whl.metadata (14 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB

100%|██████████| 38.8M/38.8M [00:00<00:00, 86.0MB/s]


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Speed: 3.3ms preprocess, 19.6ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 385/1631

0: 384x640 8 cars, 21.6ms
Speed: 4.4ms preprocess, 21.6ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 386/1631

0: 384x640 7 cars, 1 truck, 22.6ms
Speed: 4.5ms preprocess, 22.6ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 387/1631

0: 384x640 8 cars, 19.8ms
Speed: 3.8ms preprocess, 19.8ms inference, 1.6ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 388/1631

0: 384x640 8 cars, 19.7ms
Speed: 4.2ms preprocess, 19.7ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 389/1631

0: 384x640 7 cars, 1 truck, 22.0ms
Speed: 3.9ms preprocess, 22.0ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)
Processed frame 390/1631

0: 384x640 8 cars, 1 truck, 19.7ms
Speed: 3.7ms p

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>