<a href="https://colab.research.google.com/github/THEOPHILUSg48/Mini_Project_2025/blob/main/Demo_count.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import cv2
import numpy as np
import time
import os
# NOTE: Make sure to install the libraries first:
# pip install opencv-python numpy ultralytics

from ultralytics import YOLO
# DeepSORT is often integrated by setting the 'tracker' argument in model.track()
# (e.g., tracker='bytetrack.yaml'), so a separate import is usually not needed.

# --- 1. Configuration ---
# The path below suggests you are running this in a Linux/Colab environment.
# Please ensure this file name EXACTLY matches the file name in your '/content/' directory.
VIDEO_SOURCE = r'/content/4K Road traffic video for object detection and tracking - free download now!.mp4'
# These classes must match the output labels from the YOLO model (COCO dataset)
CLASSES_TO_COUNT = ['car', 'truck', 'bus', 'motorcycle']
# Define the virtual counting line (Trip Wire)
COUNTING_LINE_Y = 300
LINE_THICKNESS = 5
CLASS_COLORS = {
    'car': (255, 0, 0),        # Blue in BGR
    'truck': (0, 255, 255),    # Yellow in BGR
    'motorcycle': (0, 255, 0), # Green in BGR
    'bus': (255, 100, 255)     # Purple in BGR
}

# --- 2. Tracking State Management (TrafficCounter Class) ---

class TrafficCounter:
    """Manages vehicle tracking state and counts based on the trip wire logic."""
    def __init__(self, counting_line_y, classes):
        self.counting_line_y = counting_line_y
        # Stores the state of each tracked object: {id: {'class': str, 'crossed': bool}}
        self.vehicle_state = {}
        # Stores the final counts: {'car': 0, 'truck': 0, ...}
        self.counts = {cls: 0 for cls in classes}

    def update(self, track_id, object_class, centroid_y):
        """Updates the state and increments count if the trip wire is crossed."""

        # 1. Initialize or retrieve state
        if track_id not in self.vehicle_state:
            self.vehicle_state[track_id] = {'class': object_class, 'crossed': False}

        state = self.vehicle_state[track_id]

        # 2. Crossing Detection (Moving downwards across the line)
        if centroid_y >= self.counting_line_y and not state['crossed']:
            # Increment the count for the specific class
            if object_class in self.counts:
                self.counts[object_class] += 1
                state['crossed'] = True
                print(f"Counted: {object_class} (ID: {track_id}). Total {object_class}: {self.counts[object_class]}")

# --- 3. Main Processing Loop ---

def run_vehicle_counter():
    """
    Main function to initialize the YOLOv8 model, read video, and process frames.
    """

    print("--- Initializing YOLOv8 Model and Video Stream ---")

    # 3a. Model Loading
    try:
        # Load the pre-trained nano version of YOLOv8 for speed
        model = YOLO('yolov8n.pt')
    except Exception as e:
        print(f"Error loading YOLO model: {e}")
        print("Please ensure you have run 'pip install ultralytics' and have network access to download weights.")
        return

    # --- CHECK 1: Use os.path.exists for better debugging ---
    if not os.path.exists(VIDEO_SOURCE):
        print(f"\nFATAL ERROR: The video file was not found at the specified location.")
        print(f"Checked Path: '{VIDEO_SOURCE}'")
        print("-" * 50)
        print("TROUBLESHOOTING TIP (If using Colab/Cloud Notebook):")
        print("1. **Verify File Name:** Run `!ls /content/` in a separate cell to check the exact file name.")
        print("2. **Rename File:** We highly recommend renaming your video file to something simple, like `traffic.mp4`, to avoid issues with spaces and special characters.")
        print("-" * 50)
        return

    cap = cv2.VideoCapture(VIDEO_SOURCE)
    if not cap.isOpened():
        # This error typically means permissions or an unsupported codec.
        print(f"\nError: Could not open video file at '{VIDEO_SOURCE}'")
        print("Possible Issue: Video file exists but OpenCV cannot read it (e.g., codec not supported or permissions issue).")
        return

    # Initialize the custom counter logic
    traffic_counter = TrafficCounter(COUNTING_LINE_Y, CLASSES_TO_COUNT)

    frame_count = 0
    start_time = time.time()

    # Import cv2_imshow from google.colab.patches
    from google.colab.patches import cv2_imshow

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_height, frame_width, _ = frame.shape

        # --- 3b. Object Detection & Tracking (YOLOv8 with ByteTrack) ---
        # The 'tracker' argument enables multi-object tracking (DeepSORT equivalent)
        results = model.track(frame, persist=True, tracker='bytetrack.yaml', verbose=False)

        # --- 3c. Parsing and Counting Logic ---

        # Draw the virtual trip wire
        cv2.line(frame, (0, COUNTING_LINE_Y), (frame_width, COUNTING_LINE_Y), (0, 0, 255), LINE_THICKNESS)
        cv2.putText(frame, 'COUNTING LINE', (frame_width - 300, COUNTING_LINE_Y - 15),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

        # Process results only if tracking data is present
        if results and results[0].boxes.id is not None:

            # Extract relevant tensors from the first result object
            boxes = results[0].boxes.xyxy.cpu().numpy()
            track_ids = results[0].boxes.id.cpu().numpy()
            classes = results[0].boxes.cls.cpu().numpy()
            class_names = results[0].names

            for box, track_id, cls_index in zip(boxes, track_ids, classes):

                obj_class = class_names[int(cls_index)]

                # Check if the detected object is one we want to count
                if obj_class in traffic_counter.counts:

                    x1, y1, x2, y2 = map(int, box)

                    # Calculate centroid (Center Point Tracking)
                    center_x = int((x1 + x2) / 2)
                    center_y = int((y1 + y2) / 2)

                    # Update Counter Logic (Core Trip Wire Check)
                    traffic_counter.update(int(track_id), obj_class, center_y)

                    # Draw Bounding Box and ID (Visualization)
                    color = CLASS_COLORS.get(obj_class, (255, 255, 255))
                    cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                    cv2.putText(frame, f"ID:{int(track_id)} {obj_class}", (x1, y1 - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
                    cv2.circle(frame, (center_x, center_y), 4, (0, 0, 255), -1)


        # Display the counts on the frame
        y_offset = 30
        for cls, count in traffic_counter.counts.items():
            text = f"{cls.capitalize()} Count: {count}"
            cv2.putText(frame, text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2, cv2.LINE_AA)
            y_offset += 30

        # Show the processed frame
        cv2_imshow(frame)

        # Exit if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

        frame_count += 1

    # --- 3e. Data Logging (Blueprint Phase 5) ---
    end_time = time.time()
    duration = end_time - start_time
    fps = frame_count / duration if duration > 0 else 0

    print(f"\n--- Analysis Complete ---")
    print(f"Total Frames Processed: {frame_count}")
    print(f"Total Duration (s): {duration:.2f}")
    print(f"Average FPS: {fps:.2f}")
    print("Final Counts:", traffic_counter.counts)

    cap.release()
    cv2.destroyAllWindows()

if __name__ == '__main__':
    run_vehicle_counter()