In [None]:
# ------------------------- #
# 🚀 Full Colab Code (YOLOv8 + Tracker + Counts + Preview + Sympy Fix)
# ------------------------- #

# 1) Install dependencies
!pip install -q ultralytics opencv-python-headless
!pip install -q --upgrade sympy==1.13.3 torch torchvision torchaudio

# 2) Imports
import os
import cv2
import math
import numpy as np
from collections import defaultdict
from ultralytics import YOLO
from IPython.display import Video, display
from google.colab import drive
import torch

# ------------------------- #
# 3) Mount Google Drive
# ------------------------- #
print("👉 Mounting Google Drive...")
drive.mount('/content/drive', force_remount=True)

# ⚡ Input & Output paths
INPUT_VIDEO = "/content/drive/MyDrive/Colab Notebooks/Trafficsmall.mp4"
OUTPUT_VIDEO = "/content/drive/MyDrive/Colab Notebooks/output_Trafficsmall.mp4"

# Parameters
CONFIDENCE_THRESHOLD = 0.35
MAX_DISAPPEARED = 30
MAX_DISTANCE_RATE = 0.05
VEHICLE_CLASSES = {"car", "truck", "bus", "motorcycle", "bicycle"}

# ------------------------- #
# 4) Simple Tracker Class
# ------------------------- #
class SimpleTracker:
    def __init__(self, max_disappeared=30, max_distance=50):
        self.next_object_id = 0
        self.objects = dict()
        self.max_disappeared = max_disappeared
        self.max_distance = max_distance
        self.counts = defaultdict(int)

    def _centroid(self, bbox):
        x1, y1, x2, y2 = bbox
        return (int((x1 + x2) / 2), int((y1 + y2) / 2))

    def register(self, bbox, class_name):
        cid = self.next_object_id
        self.next_object_id += 1
        centroid = self._centroid(bbox)
        self.objects[cid] = {
            "bbox": bbox,
            "centroid": centroid,
            "class": class_name,
            "disappeared": 0
        }
        self.counts[class_name] += 1
        return cid

    def deregister(self, object_id):
        if object_id in self.objects:
            del self.objects[object_id]

    def update(self, detections):
        if len(detections) == 0:
            to_deregister = []
            for oid in list(self.objects.keys()):
                self.objects[oid]["disappeared"] += 1
                if self.objects[oid]["disappeared"] > self.max_disappeared:
                    to_deregister.append(oid)
            for oid in to_deregister:
                self.deregister(oid)
            return self.objects

        if len(self.objects) == 0:
            for bbox, cls in detections:
                self.register(bbox, cls)
            return self.objects

        obj_ids = list(self.objects.keys())
        obj_centroids = [self.objects[oid]["centroid"] for oid in obj_ids]
        det_centroids = [(int((b[0]+b[2])/2), int((b[1]+b[3])/2)) for (b,_) in detections]

        pairs = []
        for i, oid in enumerate(obj_ids):
            (ox, oy) = obj_centroids[i]
            for j, (dx, dy) in enumerate(det_centroids):
                dist = math.hypot(ox - dx, oy - dy)
                pairs.append((dist, oid, j))
        pairs.sort(key=lambda x: x[0])

        assigned_objs = set()
        assigned_dets = set()

        for dist, oid, j in pairs:
            if oid in assigned_objs or j in assigned_dets:
                continue
            if dist <= self.max_distance:
                bbox, cls = detections[j]
                self.objects[oid]["bbox"] = bbox
                self.objects[oid]["centroid"] = det_centroids[j]
                self.objects[oid]["class"] = cls
                self.objects[oid]["disappeared"] = 0
                assigned_objs.add(oid)
                assigned_dets.add(j)

        for oid in obj_ids:
            if oid not in assigned_objs:
                self.objects[oid]["disappeared"] += 1
                if self.objects[oid]["disappeared"] > self.max_disappeared:
                    self.deregister(oid)

        for j, (bbox, cls) in enumerate(detections):
            if j not in assigned_dets:
                self.register(bbox, cls)

        return self.objects

# ------------------------- #
# 5) Load YOLO & Video
# ------------------------- #
print("Loading YOLOv8 model...")
model = YOLO("yolov8n.pt")

# ✅ Use GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
model.to(device)

cap = cv2.VideoCapture(INPUT_VIDEO)
if not cap.isOpened():
    raise RuntimeError(f"❌ Cannot open video file {INPUT_VIDEO}. Check path or permissions.")

width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(OUTPUT_VIDEO, fourcc, fps, (width, height))

max_dist = max(30, int(max(width, height) * MAX_DISTANCE_RATE))
tracker = SimpleTracker(max_disappeared=MAX_DISAPPEARED, max_distance=max_dist)

print(f"🎥 Video: {INPUT_VIDEO} | size: {width}x{height} | fps: {fps} | max_distance: {tracker.max_distance}")

# ------------------------- #
# 🔍 Quick 5s Preview
# ------------------------- #
preview_path = os.path.join(os.path.dirname(INPUT_VIDEO), "preview.mp4")
preview_writer = cv2.VideoWriter(preview_path, fourcc, fps, (width, height))
frame_count = int(min(fps * 5, cap.get(cv2.CAP_PROP_FRAME_COUNT))) # 5 sec or full

print(f"🔍 Creating 5-second preview ({frame_count} frames)...")
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # rewind

for i in range(frame_count):
    ret, frame = cap.read()
    if not ret:
        break
    preview_writer.write(frame)

preview_writer.release()
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # rewind for full processing
print("✅ Preview saved:", preview_path)
display(Video(preview_path, embed=True))

# ------------------------- #
# 6) Process Frames
# ------------------------- #
frame_idx = 0
while True:
    ret, frame = cap.read()
    if not ret:
        break
    frame_idx += 1

    results = model(frame, conf=CONFIDENCE_THRESHOLD, verbose=False)
    r = results[0]

    detections = []
    if hasattr(r, "boxes") and len(r.boxes) > 0:
        xyxy = r.boxes.xyxy.cpu().numpy()
        cls_ids = r.boxes.cls.cpu().numpy().astype(int)
        confs = r.boxes.conf.cpu().numpy()
        for i, box in enumerate(xyxy):
            x1, y1, x2, y2 = map(int, box)
            cls_id = int(cls_ids[i])
            conf = float(confs[i])
            cls_name = model.names.get(cls_id, str(cls_id))
            if cls_name in VEHICLE_CLASSES and conf >= CONFIDENCE_THRESHOLD:
                detections.append(((x1, y1, x2, y2), cls_name))

    tracked_objects = tracker.update(detections)
    annotated = frame.copy()

    for oid, info in tracked_objects.items():
        x1, y1, x2, y2 = info["bbox"]
        cls_name = info["class"]
        cx, cy = info["centroid"]
        color = (int((oid * 37) % 255), int((oid * 91) % 255), int((oid * 53) % 255))
        cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
        label = f"{cls_name}-{oid}"
        cv2.putText(annotated, label, (x1, y1 - 6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
        cv2.circle(annotated, (cx, cy), 3, color, -1)

    # Show counts
    y = 25
    cv2.rectangle(annotated, (5,5), (220, 5 + 20 * (len(VEHICLE_CLASSES) + 1)), (0,0,0), -1)
    cv2.putText(annotated, "Counts (unique):", (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2)
    for cls in sorted(VEHICLE_CLASSES):
        cnt = tracker.counts.get(cls, 0)
        cv2.putText(annotated, f"{cls}: {cnt}", (10, y + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2)
        y += 20

    out.write(annotated)

    if frame_idx % 100 == 0:
        print(f"✅ Processed {frame_idx} frames...")

# ------------------------- #
# 7) Cleanup & Results
# ------------------------- #
cap.release()
out.release()
print("🎯 Processing finished.")
print("Final unique counts by class:")
for cls in sorted(VEHICLE_CLASSES):
    print(f" {cls}: {tracker.counts.get(cls, 0)}")
print("✅ Saved annotated video to:", OUTPUT_VIDEO)

# ------------------------- #
# 8) Show Result in Colab
# ------------------------- #
display(Video(OUTPUT_VIDEO, embed=True))


Output hidden; open in https://colab.research.google.com to view.