In [1]:
!pip install ultralytics opencv-python numpy yt-dlp pytube

Collecting ultralytics
  Downloading ultralytics-8.3.179-py3-none-any.whl.metadata (37 kB)
Collecting yt-dlp
  Downloading yt_dlp-2025.8.11-py3-none-any.whl.metadata (175 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m175.5/175.5 kB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pytube
  Downloading pytube-15.0.0-py3-none-any.whl.metadata (5.0 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.15-py3-none-any.whl.metadata (14 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manyl

In [21]:
import cv2
import numpy as np
import csv
import datetime
from ultralytics import YOLO
from collections import defaultdict
import os

# ---------------- SORT Tracker ---------------- #
class SortTracker:
    """Lightweight SORT tracker (IoU-based, simple)."""
    def __init__(self):
        self.trackers = []
        self.next_id = 1

    def update(self, detections: np.ndarray) -> np.ndarray:
        updated_trackers = []

        for det in detections:
            x1, y1, x2, y2, conf = det
            matched = False

            for trk in self.trackers:
                tx1, ty1, tx2, ty2, tid = trk
                iou = self.calc_iou(det, trk)
                if iou > 0.3:  # IoU threshold
                    updated_trackers.append([x1, y1, x2, y2, tid])
                    matched = True
                    break

            if not matched:
                updated_trackers.append([x1, y1, x2, y2, self.next_id])
                self.next_id += 1

        self.trackers = updated_trackers
        return np.array(updated_trackers)

    @staticmethod
    def calc_iou(bb_test, bb_gt):
        xx1 = max(bb_test[0], bb_gt[0])
        yy1 = max(bb_test[1], bb_gt[1])
        xx2 = min(bb_test[2], bb_gt[2])
        yy2 = min(bb_test[3], bb_gt[3])
        w = max(0., xx2 - xx1)
        h = max(0., yy2 - yy1)
        intersection = w * h
        union = ((bb_test[2]-bb_test[0])*(bb_test[3]-bb_test[1]) +
                 (bb_gt[2]-bb_gt[0])*(bb_gt[3]-bb_gt[1]) - intersection)
        return intersection / union if union > 0 else 0


# ---------------- Traffic Analyzer ---------------- #
class TrafficAnalyzer:
    def __init__(self, video_path: str):
        self.model = YOLO('yolov8n.pt')
        self.tracker = SortTracker()
        self.cap = cv2.VideoCapture(video_path)

        if not self.cap.isOpened():
            raise ValueError(f"Could not open video file {video_path}")

        self.fps = self.cap.get(cv2.CAP_PROP_FPS)
        self.frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        # Direction: 'horizontal' or 'vertical' for counting logic
        self.counting_lines = {
            "Lane 1": [(58, 194), (234, 302), 'horizontal'],
            "Lane 2": [(262, 194), (368, 274), 'horizontal'],
            "Lane 3": [(430, 225), (570, 332), 'horizontal']
        }

        self.vehicle_counts = defaultdict(int)
        self.vehicle_history = defaultdict(dict)  # Store crossing info
        self.csv_data = []
        self.frame_count = 0

    def process_video(self, output_video_path: str = 'output.mp4'):
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_video_path, fourcc, self.fps,
                              (self.frame_width, self.frame_height))

        try:
            while self.cap.isOpened():
                ret, frame = self.cap.read()
                if not ret:
                    break

                self.frame_count += 1
                timestamp = self.frame_count / self.fps

                # Detect vehicles
                results = self.model(frame, classes=[2, 3, 5, 7], verbose=False)
                detections = self.process_detections(results)

                # Track vehicles
                tracked_objects = self.tracker.update(detections)

                # Update counts
                self.update_counts(tracked_objects, timestamp)

                # Draw results
                self.draw_results(frame, tracked_objects)

                out.write(frame)

        finally:
            self.cap.release()
            out.release()
            cv2.destroyAllWindows()
            self.save_to_csv()
            self.print_summary()

    def process_detections(self, results) -> np.ndarray:
        detections = []
        for result in results:
            for box in result.boxes:
                x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
                conf = float(box.conf[0])
                if conf > 0.4:
                    detections.append([x1, y1, x2, y2, conf])
        return np.array(detections) if detections else np.empty((0, 5))

    def update_counts(self, tracked_objects: np.ndarray, timestamp: float):
        for obj in tracked_objects:
            x1, y1, x2, y2, obj_id = map(int, obj[:5])
            center = ((x1 + x2) // 2, (y1 + y2) // 2)

            for lane_name, (line_start, line_end, direction) in self.counting_lines.items():
                if obj_id not in self.vehicle_history or lane_name not in self.vehicle_history[obj_id]:
                    if self.crossed_line(center, line_start, line_end, direction):
                        self.vehicle_counts[lane_name] += 1
                        self.vehicle_history[obj_id][lane_name] = True
                        self.csv_data.append([
                            obj_id, lane_name, self.frame_count,
                            str(datetime.timedelta(seconds=timestamp))
                        ])

    def crossed_line(self, point, line_start, line_end, direction):
        x, y = point
        (x1, y1), (x2, y2) = line_start, line_end

        if direction == 'horizontal':
            # Check if vehicle center crossed the horizontal line
            if (y1 - 5 <= y <= y1 + 5) and (min(x1, x2) <= x <= max(x1, x2)):
                return True
        elif direction == 'vertical':
            # Check if vehicle center crossed the vertical line
            if (x1 - 5 <= x <= x1 + 5) and (min(y1, y2) <= y <= max(y1, y2)):
                return True
        return False

    def draw_results(self, frame: np.ndarray, tracked_objects: np.ndarray):
        # Draw counting lines
        for lane_name, (line_start, line_end, _) in self.counting_lines.items():
            cv2.line(frame, line_start, line_end, (0, 255, 0), 2)
            cv2.putText(frame, lane_name, line_start,
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

        # Draw tracked vehicles
        for obj in tracked_objects:
            x1, y1, x2, y2, obj_id = map(int, obj[:5])
            cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
            cv2.putText(frame, f"ID: {obj_id}", (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)

        # Display counts
        y_offset = 30
        for i, (lane, count) in enumerate(self.vehicle_counts.items(), start=1):
            cv2.putText(frame, f"{lane}: {count}", (10, y_offset * i),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)

    def save_to_csv(self):
        with open('traffic_data.csv', 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(['Vehicle ID', 'Lane', 'Frame', 'Timestamp'])
            writer.writerows(self.csv_data)

    def print_summary(self):
        print("\nTraffic Analysis Summary:")
        for lane in sorted(self.vehicle_counts.keys()):
            print(f"{lane}: {self.vehicle_counts[lane]} vehicles")


# ---------------- Main ---------------- #
def main():
    video_path = "traffic_video.mp4"
    if not os.path.exists(video_path):
        print(f"Video file {video_path} not found!")
        return

    print("Starting traffic analysis...")
    analyzer = TrafficAnalyzer(video_path)
    analyzer.process_video()
    print("Analysis completed successfully!")


if __name__ == "__main__":
    main()

Starting traffic analysis...

Traffic Analysis Summary:
Lane 1: 239 vehicles
Lane 2: 98 vehicles
Lane 3: 36 vehicles
Analysis completed successfully!


In [22]:
from google.colab import drive
drive.mount('/content/drive')

# copy video to Google Drive
!cp output.mp4 /content/drive/MyDrive/
!cp traffic_data.csv /content/drive/MyDrive/


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
