In [156]:
import time
from pathlib import Path
import cv2 as cv
import numpy as np
import pandas as pd
import torch
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort


In [157]:
video_path = "video.mp4" 
cap = cv.VideoCapture(video_path)

In [158]:
output_video = "annotated_output.mp4"
output_csv = "vehicle_counts.csv"

In [159]:
LANES = [
(0, 426),     
(427, 852),   
(853, 1280)   
]

In [160]:
model = YOLO("yolov8l.pt")  
print(torch.cuda.is_available())  
print(torch.cuda.get_device_name(0))  
model.to("cuda")
tracker = DeepSort(max_age=15)

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8l.pt to 'yolov8l.pt': 100%|██████████| 83.7M/83.7M [00:26<00:00, 3.26MB/s]


True
NVIDIA GeForce RTX 2050


In [161]:
cap = cv.VideoCapture(video_path)
width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv.CAP_PROP_FPS)

In [162]:
fourcc = cv.VideoWriter_fourcc(*"mp4v")
out = cv.VideoWriter(output_video, fourcc, fps, (width, height))


In [163]:
vehicle_classes = [2, 3, 5, 7] 
counts = {1: 0, 2: 0, 3: 0}
seen_ids = set()
csv_data = []

In [164]:
frame_num = 0
start_time = time.time()


In [165]:
def get_traffic_status(count):
    if count > 7:
        return "High Traffic"
    elif 3 <= count <= 7:
        return "Medium Traffic"
    else:
        return "Low Traffic"
lane_width = width // 3        
LANES = [(0, lane_width), (lane_width, 2*lane_width), (2*lane_width, width)]    

In [166]:
while True:
    ret, frame = cap.read()
    if not ret:
        break
    frame = cv.resize(frame, (1280, 720))
    frame_num += 1
    timestamp = frame_num / fps

    
    results = model(frame,conf=0.5, verbose=False)[0]
    detections = []
    for box in results.boxes:
        cls = int(box.cls)
        if cls in vehicle_classes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            conf = float(box.conf)
            detections.append(([x1, y1, x2 - x1, y2 - y1], conf, cls))

    # update tracker
    tracks = tracker.update_tracks(detections, frame=frame)

    # count CURRENT active vehicles per lane
    lane_counts = [0, 0, 0]
    for tr in tracks:
        if not tr.is_confirmed():
            continue
        tlbr = tr.to_tlbr()           
        x1, y1, x2, y2 = map(int, tlbr)
        cx = (x1 + x2) // 2

        # determine which lane the track is currently in 
        if cx < lane_width:
            li = 0
        elif cx < 2 * lane_width:
            li = 1
        else:
            li = 2
        lane_counts[li] += 1

        
        track_id = tr.track_id
        if (track_id, li+1) not in seen_ids:
            counts[li+1] += 1                
            seen_ids.add((track_id, li+1))
            csv_data.append([track_id, li+1, frame_num, timestamp])

        # draw tracked box 
        cv.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv.putText(frame, f"ID {track_id}", (x1, y1 - 6),
                   cv.FONT_HERSHEY_SIMPLEX, 0.45, (0, 255, 0), 1)
        cv.circle(frame, (cx, (y1 + y2)//2), 3, (0, 255, 255), -1)

    #draw lane dividers
    cv.line(frame, (lane_width, 0), (lane_width, height), (255, 0, 0), 2)
    cv.line(frame, (2*lane_width, 0), (2*lane_width, height), (255, 0, 0), 2)

    #traffic status
    statuses = [get_traffic_status(c) for c in lane_counts]
    text_x = width - 300    # shift left from right edge; tune 300 if longer text
    for i, (s, c) in enumerate(zip(statuses, lane_counts)):
        cv.putText(frame, f"Lane {i+1}: {s} ({c})", (text_x, 30 + i*28),
                   cv.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

    for i in range(1, 4):
        cv.putText(frame, f"Total L{i}: {counts[i]}", (10, height - 20*i),
                   cv.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    out.write(frame)
    cv.imshow("Vehicle Counting", frame)
    if cv.waitKey(1) & 0xFF == 27:
        break

In [None]:
cap.release()
out.release()
cv.destroyAllWindows()

confidences = [box.conf[0].item() for box in results.boxes]
if confidences:
    avg_confidence = sum(confidences) / len(confidences) * 100
    print(f"Average Detection Confidence: {avg_confidence:.2f}%")

# csv
df = pd.DataFrame(csv_data, columns=["Vehicle ID", "Lane", "Frame", "Timestamp"])
df.to_csv(output_csv, index=False)

print("Processing done.")
print("Counts per lane:", counts)

Average Detection Confidence: 81.20%
Processing done.
Counts per lane: {1: 18, 2: 15, 3: 1}
