<a href="https://colab.research.google.com/github/Aditya948351/MargVedhaMain/blob/YOLOv8/MargVedha.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install gradio ultralytics opencv-python-headless

Collecting ultralytics
  Downloading ultralytics-8.3.170-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.8.0->ultralytics)
  Downloading n

In [3]:
import gradio as gr
import cv2
import tempfile
import os
from ultralytics import YOLO
import numpy as np

# Load YOLOv8x model
model = YOLO("yolov8x.pt")

# Define relevant classes (COCO dataset IDs) and their desired display order
# Removed 'train' and 'boat' as requested
relevant_classes = {
    "car": 2,
    "bus": 5,
    "truck": 7,
    "motorcycle": 3,
    "bicycle": 1,
    "person": 0
}

# Global dictionary to store tracked objects across frames
# Each entry: {track_id: {'last_centroid': (x,y), 'last_bbox': [x1,y1,x2,y2], 'last_label': str, 'last_conf': float,
#                        'prev_centroid_y': int, 'counted_upcoming': bool, 'counted_outgoing': bool,
#                        'frames_unseen': int, 'direction_label': str}}
tracked_objects = {}
next_track_id = 0 # To assign unique IDs to new tracks

# Thresholds for tracking and counting
MAX_DISTANCE_THRESHOLD = 100 # Max pixel distance for a detection to be considered the same object
MAX_FRAMES_UNSEEN = 10 # How many frames an object can be unseen before being removed

def get_centroid(bbox):
    """Calculates the centroid of a bounding box."""
    x1, y1, x2, y2 = bbox
    return (x1 + x2) / 2, (y1 + y2) / 2

def euclidean_distance(p1, p2):
    """Calculates Euclidean distance between two points."""
    return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)

def detect_vehicles_and_people_with_logs(video_input):
    global tracked_objects, next_track_id # Access global variables

    cap = cv2.VideoCapture(video_input)
    if not cap.isOpened():
        # Return default values for all outputs in case of error
        return None, "", "❌ Error: Could not open video.", *([0] * (len(relevant_classes) * 2))

    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps    = cap.get(cv2.CAP_PROP_FPS)

    # Define counting line in the middle of the frame
    # Objects moving upwards (decreasing Y) crossing this line are "Upcoming"
    # Objects moving downwards (increasing Y) crossing this line are "Outgoing"
    counting_line_y = height * 0.5

    with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmpfile:
        out_path = tmpfile.name

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out_vid = cv2.VideoWriter(out_path, fourcc, fps, (width, height))

    logs = ""
    frame_index = 0

    # Initialize granular counts for all relevant classes
    upcoming_counts = {label: 0 for label in relevant_classes.keys()}
    outgoing_counts = {label: 0 for label in relevant_classes.keys()}

    # Reset tracking state for each new video processing
    tracked_objects = {}
    next_track_id = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_index += 1

        results = model(frame, verbose=False)[0]
        current_frame_detections = [] # Store {bbox, cls_id, conf, centroid, label} for current frame

        # Process current frame detections
        for box in results.boxes:
            cls_id = int(box.cls.item())
            label = model.names[cls_id] # Get label from model.names
            if label in relevant_classes: # Check if the label is in our defined relevant_classes
                xyxy = box.xyxy[0].cpu().numpy().astype(int)
                conf = box.conf.item()
                centroid_x, centroid_y = get_centroid(xyxy)
                current_frame_detections.append({
                    'bbox': xyxy,
                    'cls_id': cls_id,
                    'conf': conf,
                    'label': label,
                    'centroid': (centroid_x, centroid_y)
                })

        # --- Tracking and Counting Logic ---
        matched_current_detections_indices = set()
        matched_tracked_ids_in_current_frame = set()

        # Try to match current detections with existing tracked objects
        for i, current_det in enumerate(current_frame_detections):
            best_match_id = None
            min_dist = float('inf')

            for track_id, track_info in tracked_objects.items():
                # Only consider objects not yet matched in this frame
                if track_id in matched_tracked_ids_in_current_frame:
                    continue

                dist = euclidean_distance(current_det['centroid'], track_info['last_centroid'])
                if dist < min_dist and dist < MAX_DISTANCE_THRESHOLD:
                    best_match_id = track_id
                    min_dist = dist

            if best_match_id is not None:
                # Update existing tracked object
                tracked_objects[best_match_id]['prev_centroid_y'] = tracked_objects[best_match_id]['last_centroid'][1] # Store previous Y for direction
                tracked_objects[best_match_id]['last_centroid'] = current_det['centroid']
                tracked_objects[best_match_id]['last_bbox'] = current_det['bbox']
                tracked_objects[best_match_id]['last_label'] = current_det['label']
                tracked_objects[best_match_id]['last_conf'] = current_det['conf']
                tracked_objects[best_match_id]['frames_unseen'] = 0

                # Determine direction and count
                curr_y = current_det['centroid'][1]
                prev_y = tracked_objects[best_match_id]['prev_centroid_y']

                object_label = current_det['label']

                if curr_y < prev_y: # Moving upwards (decreasing Y)
                    tracked_objects[best_match_id]['direction_label'] = "UPCOMING"
                    if not tracked_objects[best_match_id]['counted_upcoming'] and curr_y < counting_line_y and prev_y >= counting_line_y:
                        upcoming_counts[object_label] += 1
                        logs += f"Frame {frame_index}: {object_label.capitalize()} {best_match_id} counted as UPCOMING.\n"
                        tracked_objects[best_match_id]['counted_upcoming'] = True
                elif curr_y > prev_y: # Moving downwards (increasing Y)
                    tracked_objects[best_match_id]['direction_label'] = "OUTGOING"
                    if not tracked_objects[best_match_id]['counted_outgoing'] and curr_y > counting_line_y and prev_y <= counting_line_y:
                        outgoing_counts[object_label] += 1
                        logs += f"Frame {frame_index}: {object_label.capitalize()} {best_match_id} counted as OUTGOING.\n"
                        tracked_objects[best_match_id]['counted_outgoing'] = True
                else: # No significant vertical movement
                    tracked_objects[best_match_id]['direction_label'] = "" # Reset if not clear direction

                matched_current_detections_indices.add(i)
                matched_tracked_ids_in_current_frame.add(best_match_id)

        # Add new detections (unmatched current detections)
        for i, current_det in enumerate(current_frame_detections):
            if i not in matched_current_detections_indices:
                tracked_objects[next_track_id] = {
                    'last_centroid': current_det['centroid'],
                    'last_bbox': current_det['bbox'],
                    'last_label': current_det['label'],
                    'last_conf': current_det['conf'],
                    'prev_centroid_y': current_det['centroid'][1], # Initialize prev_y for direction calculation
                    'frames_unseen': 0,
                    'counted_upcoming': False,
                    'counted_outgoing': False,
                    'direction_label': ""
                }
                logs += f"Frame {frame_index}: New {current_det['label']} {next_track_id} detected.\n"
                next_track_id += 1

        # Clean up old tracks (objects not seen in current frame) and draw all tracked objects
        tracks_to_remove = []
        for track_id, track_info in list(tracked_objects.items()): # Iterate over a copy
            if track_id not in matched_tracked_ids_in_current_frame:
                tracked_objects[track_id]['frames_unseen'] += 1
                if tracked_objects[track_id]['frames_unseen'] > MAX_FRAMES_UNSEEN:
                    logs += f"Frame {frame_index}: Object {track_id} removed (unseen for too long).\n"
                    tracks_to_remove.append(track_id)

            # Draw bounding boxes for all currently tracked objects (even if briefly unseen)
            bbox_to_draw = track_info['last_bbox']
            label_to_draw = f"ID:{track_id} {track_info['last_label']}"

            # Add direction label if determined
            if track_info['direction_label']:
                label_to_draw += f" ({track_info['direction_label']})"

            conf_to_draw = track_info['last_conf']

            color = (0, 255, 0) # Green for general detection
            if track_info['counted_upcoming']:
                color = (255, 0, 0) # Red for upcoming
            elif track_info['counted_outgoing']:
                color = (0, 0, 255) # Blue for outgoing

            # If object is unseen for a few frames, draw it faded
            if track_info['frames_unseen'] > 0:
                alpha = max(0.2, 1.0 - (track_info['frames_unseen'] / MAX_FRAMES_UNSEEN)) # Fade out
                color = (int(color[0] * alpha), int(color[1] * alpha), int(color[2] * alpha))

            x1, y1, x2, y2 = bbox_to_draw
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, f"{label_to_draw} ({conf_to_draw:.2f})", (x1, y1 - 5),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

        for track_id in tracks_to_remove:
            del tracked_objects[track_id]

        # Draw the counting line
        cv2.line(frame, (0, int(counting_line_y)), (width, int(counting_line_y)), (255, 255, 0), 2) # Yellow line
        cv2.putText(frame, "Counting Line", (10, int(counting_line_y) - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)

        out_vid.write(frame)

    cap.release()
    out_vid.release()

    if not os.path.exists(out_path):
        return None, "", "❌ Error: Processed video not found.", *([0] * (len(relevant_classes) * 2))

    # Prepare the return values in the specified order
    return_values = [out_path, out_path, logs]
    for label in relevant_classes.keys():
        return_values.append(upcoming_counts[label])
        return_values.append(outgoing_counts[label])

    return tuple(return_values)


# Dynamically create outputs for Gradio Interface
gr_outputs = [
    gr.Video(label="Processed Video"),
    gr.File(label="Download Processed Video"),
    gr.Textbox(label="Detection Logs", lines=10, interactive=False)
]

for label in relevant_classes.keys():
    gr_outputs.append(gr.Number(label=f"Upcoming {label.capitalize()} Count", interactive=False))
    gr_outputs.append(gr.Number(label=f"Outgoing {label.capitalize()} Count", interactive=False))


demo = gr.Interface(
    fn=detect_vehicles_and_people_with_logs,
    inputs=gr.Video(label="Upload a video"),
    outputs=gr_outputs,
    title="YOLOv8x Vehicle and People Directional Counting (Detailed)",
    description="Upload a video to detect various vehicles and people, and count 'Upcoming' (moving upwards) and 'Outgoing' (moving downwards) objects. A yellow line indicates the counting threshold. Objects are tracked and counted once as they cross this line."
)

if __name__ == "__main__":
    demo.launch(share=True)


Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://204aac5ffa7a99904f.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
