In [11]:
import cv2
import numpy as np
from ultralytics import YOLO
import time

VIDEO_SOURCE = "traffic.mp4"   # or 0 for webcam
OUTPUT_FILE = "output.mp4"     # output video file

CONF_THRESHOLD = 0.4
DISTANCE_THRESHOLD = 50
MAX_DISAPPEARED_FRAMES = 10

VEHICLE_CLASSES = ["car", "motorbike", "bus", "truck"]


class CentroidTracker:
    def __init__(self, max_disappeared=MAX_DISAPPEARED_FRAMES):
        self.next_object_id = 0
        self.objects = {}
        self.disappeared = {}
        self.counted = {}
        self.class_ids = {}
        self.prev_positions = {}
        self.max_disappeared = max_disappeared

    def register(self, centroid, cls_name):
        self.objects[self.next_object_id] = centroid
        self.disappeared[self.next_object_id] = 0
        self.counted[self.next_object_id] = False
        self.class_ids[self.next_object_id] = cls_name
        self.prev_positions[self.next_object_id] = centroid
        self.next_object_id += 1

    def deregister(self, object_id):
        del self.objects[object_id]
        del self.disappeared[object_id]
        del self.counted[object_id]
        del self.class_ids[object_id]
        del self.prev_positions[object_id]

    def update(self, rects, class_names):
        if len(rects) == 0:
            removable_ids = []
            for object_id in list(self.disappeared.keys()):
                self.disappeared[object_id] += 1
                if self.disappeared[object_id] > self.max_disappeared:
                    removable_ids.append(object_id)
            for object_id in removable_ids:
                self.deregister(object_id)
            return self.objects

        input_centroids = []
        for (x1, y1, x2, y2) in rects:
            cX = int((x1 + x2) / 2.0)
            cY = int((y1 + y2) / 2.0)
            input_centroids.append((cX, cY))

        if len(self.objects) == 0:
            for centroid, cls_name in zip(input_centroids, class_names):
                self.register(centroid, cls_name)
        else:
            object_ids = list(self.objects.keys())
            object_centroids = list(self.objects.values())

            D = np.zeros((len(object_centroids), len(input_centroids)), dtype=np.float32)
            for i, (ocx, ocy) in enumerate(object_centroids):
                for j, (ncx, ncy) in enumerate(input_centroids):
                    D[i, j] = np.sqrt((ocx - ncx) ** 2 + (ocy - ncy) ** 2)

            rows = D.min(axis=1).argsort()
            cols = D.argmin(axis=1)[rows]

            used_rows = set()
            used_cols = set()

            for row, col in zip(rows, cols):
                if row in used_rows or col in used_cols:
                    continue
                if D[row, col] > DISTANCE_THRESHOLD:
                    continue

                object_id = object_ids[row]
                self.prev_positions[object_id] = self.objects[object_id]
                self.objects[object_id] = input_centroids[col]
                self.disappeared[object_id] = 0
                self.class_ids[object_id] = class_names[col]

                used_rows.add(row)
                used_cols.add(col)

            unmatched_rows = set(range(D.shape[0])) - used_rows
            for row in unmatched_rows:
                object_id = object_ids[row]
                self.disappeared[object_id] += 1
                if self.disappeared[object_id] > self.max_disappeared:
                    self.deregister(object_id)

            unmatched_cols = set(range(D.shape[1])) - used_cols
            for col in unmatched_cols:
                self.register(input_centroids[col], class_names[col])

        return self.objects


def load_model():
    print("[INFO] Loading YOLO model...")
    return YOLO("yolov8n.pt")


def main():
    model = load_model()
    cap = cv2.VideoCapture(VIDEO_SOURCE)

    if not cap.isOpened():
        print("[ERROR] Could not open video.")
        return

    tracker = CentroidTracker()
    vehicle_counts = {cls: 0 for cls in VEHICLE_CLASSES}

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = None

    time.sleep(1.0)

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if out is None:
            height, width = frame.shape[:2]
            out = cv2.VideoWriter(OUTPUT_FILE, fourcc, 20, (width, height))

        results = model(frame, verbose=False)[0]

        rects = []
        class_names = []

        if results.boxes is not None:
            for box in results.boxes:
                x1, y1, x2, y2 = box.xyxy[0].tolist()
                conf = float(box.conf[0])
                cls_id = int(box.cls[0])

                if conf < CONF_THRESHOLD:
                    continue

                cls_name = model.names[cls_id]
                if cls_name not in VEHICLE_CLASSES:
                    continue

                x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
                rects.append((x1, y1, x2, y2))
                class_names.append(cls_name)

                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
                cv2.putText(frame, f"{cls_name} {conf:.2f}", (x1, y1 - 5),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)

        objects = tracker.update(rects, class_names)

        frame_h, frame_w = frame.shape[:2]
        line_y = frame_h // 2

        for object_id, centroid in objects.items():
            cx, cy = centroid
            cls_name = tracker.class_ids[object_id]

            cv2.circle(frame, (cx, cy), 4, (0, 255, 0), -1)
            cv2.putText(frame, f"ID {object_id}", (cx - 10, cy - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

            prev_cx, prev_cy = tracker.prev_positions.get(object_id, centroid)
            if not tracker.counted[object_id]:
                if prev_cy < line_y <= cy:
                    if cls_name in vehicle_counts:
                        vehicle_counts[cls_name] += 1
                    tracker.counted[object_id] = True

        y_offset = 30
        cv2.rectangle(frame, (0, 0), (250, 25 + 20 * len(vehicle_counts)), (0, 0, 0), -1)
        cv2.putText(frame, "Vehicle Counts:", (10, y_offset),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
        y_offset += 25
        for cls_name, count in vehicle_counts.items():
            cv2.putText(frame, f"{cls_name}: {count}", (10, y_offset),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
            y_offset += 20

        # save the annotated frame
        out.write(frame)

        cv2.imshow("Vehicle Detection & Counting", frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    out.release()
    cv2.destroyAllWindows()
    print("✔ DONE — output saved as:", OUTPUT_FILE)


if __name__ == "__main__":
    main()


[INFO] Loading YOLO model...
✔ DONE — output saved as: output.mp4
