In [1]:
!pip install opencv-python numpy roboflow pafy youtube_dl





In [None]:
import cv2
import numpy as np
from roboflow import Roboflow

# ====================================
# 1. MODEL SETUP VIA ROBOFLOW
# ====================================
rf = Roboflow(api_key="JEQebQNC9eADoMrYVTfR")  # Replace with your Roboflow API key
workspace = rf.workspace()

# Load your project model – now using "crowd-density-ou3ne" version 1.
project_crowd = workspace.project("crowd-density-ou3ne")  # Ensure this matches your project slug.
model_crowd = project_crowd.version(1).model

# Choose this model for inference.
model = model_crowd

# ====================================
# 2. INFERENCE PARAMETERS & SPEED OPTIMIZATION
# ====================================
# These parameters can be modified later.
confidence_threshold = 1   # For example, 1% confidence (modify as needed)
overlap_threshold = 50     # For example, 50% overlap threshold
label_display = False      # Set to False to not display labels on bounding boxes

# Resize factor for processing frames (e.g., 0.5 means process at half resolution)
resize_factor = 0.5  # Adjust this factor to speed up inference (values between 0 and 1)
# Frame skip: process every 'frame_skip'-th frame (1 = all frames, 2 = every other frame, etc.)
frame_skip = 1       

# ====================================
# 3. SELECT INPUT MODE (Image, YouTube Video, Local Video, or Camera)
# ====================================
print("Select input mode:")
print("1) Local Image")
print("2) YouTube Video")
print("3) Camera")
print("4) Local Video")
mode = input("Enter 1, 2, 3 or 4: ")

input_mode = ""
if mode == "1":
    input_mode = "image"
    image_path = input("Enter the path to your image file: ")
    image = cv2.imread(image_path)
    if image is None:
        print("Error: Image not found.")
        exit(1)
elif mode == "2":
    input_mode = "video"
    import pafy
    video_url = input("Enter the YouTube video URL: ")
    video = pafy.new(video_url)
    best = video.getbest(preftype="mp4")
    cap = cv2.VideoCapture(best.url)
elif mode == "3":
    input_mode = "video"
    cap = cv2.VideoCapture(0)
elif mode == "4":
    input_mode = "video"
    video_path = input("Enter the path to your local video file (.mov supported): ")
    cap = cv2.VideoCapture(video_path)
else:
    print("Invalid mode selected.")
    exit(1)

# ====================================
# 4. SETUP REGIONS (ZONES) COVERING THE FULL IMAGE AREA
# ====================================
# Determine frame dimensions from a static image or from the first video frame.
if input_mode == "image":
    orig_height, orig_width = image.shape[:2]
else:
    ret, first_frame = cap.read()
    if not ret:
        print("Error: Cannot read video stream.")
        exit(1)
    orig_height, orig_width = first_frame.shape[:2]
    # rewind the stream if needed
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

# Apply resizing for processing: all processing will be done on this scaled frame.
proc_width = int(orig_width * resize_factor)
proc_height = int(orig_height * resize_factor)

# Define the full image area (processed resolution) as four rectangular zones (2×2 grid).
zones = {
    "Zone 1": {
        "points": [(0, 0), (proc_width // 2, 0), (proc_width // 2, proc_height // 2), (0, proc_height // 2)],
        "color": (255, 0, 0)  # Blue in BGR
    },
    "Zone 2": {
        "points": [(proc_width // 2, 0), (proc_width, 0), (proc_width, proc_height // 2), (proc_width // 2, proc_height // 2)],
        "color": (0, 255, 0)  # Green
    },
    "Zone 3": {
        "points": [(0, proc_height // 2), (proc_width // 2, proc_height // 2), (proc_width // 2, proc_height), (0, proc_height)],
        "color": (0, 0, 255)  # Red
    },
    "Zone 4": {
        "points": [(proc_width // 2, proc_height // 2), (proc_width, proc_height // 2), (proc_width, proc_height), (proc_width // 2, proc_height)],
        "color": (0, 255, 255)  # Yellow
    }
}

# Convert each zone's point list into a NumPy array (polygon).
for zname, zdata in zones.items():
    pts = np.array(zdata["points"], dtype=np.int32)
    zones[zname]["polygon"] = pts

def point_in_polygon(point, polygon):
    """Return True if the point lies inside or on the boundary of the polygon."""
    return cv2.pointPolygonTest(polygon, point, False) >= 0

# ====================================
# 5. DEFINE A FUNCTION TO PROCESS A FRAME
# ====================================
def process_frame(frame):
    # Resize frame for faster inference.
    proc_frame = cv2.resize(frame, (proc_width, proc_height))
    
    # Run inference using the Roboflow model on the resized frame.
    # (Assumes model.predict() accepts a frame directly.)
    result = model.predict(proc_frame, confidence=confidence_threshold, overlap=overlap_threshold).json()
    detections = result.get("predictions", [])
    
    # Convert center-based predictions to bounding boxes.
    # (Coordinates are relative to proc_frame, so no scaling needed.)
    person_bboxes = []
    for det in detections:
        x_center = det["x"]
        y_center = det["y"]
        box_width = det["width"]
        box_height = det["height"]
        x1 = int(x_center - box_width / 2)
        y1 = int(y_center - box_height / 2)
        x2 = int(x_center + box_width / 2)
        y2 = int(y_center + box_height / 2)
        label = det.get("class", "Person")
        person_bboxes.append([x1, y1, x2, y2, label])
    
    # Assign detections to zones.
    zone_counts = {zname: 0 for zname in zones.keys()}
    annotated_bboxes = []
    for bbox in person_bboxes:
        x1, y1, x2, y2, label = bbox
        rep_pt = (int((x1 + x2) / 2), y2)  # bottom-center of bounding box in proc_frame coordinates
        matched_zone = None
        for zname, zdata in zones.items():
            if point_in_polygon(rep_pt, zdata["polygon"]):
                matched_zone = zname
                zone_counts[zname] += 1
                break
        annotated_bboxes.append({
            "bbox": bbox[:4],
            "zone": matched_zone,
            "label": label
        })
    
    # For visualization, work on the processed (resized) frame.
    output = proc_frame.copy()
    # Draw zones and display zone counts.
    for zname, zdata in zones.items():
        polygon = zdata["polygon"]
        zone_color = zdata["color"]
        cv2.polylines(output, [polygon], isClosed=True, color=zone_color, thickness=3)
        # Use midpoint of top-left and bottom-right of the zone rectangle.
        pt1 = polygon[0]
        pt3 = polygon[2]
        cx = (pt1[0] + pt3[0]) // 2
        cy = (pt1[1] + pt3[1]) // 2
        count_text = f"{zname}: {zone_counts[zname]}"
        text_size, _ = cv2.getTextSize(count_text, cv2.FONT_HERSHEY_SIMPLEX, 1.2, 2)
        cv2.rectangle(output, (cx - 5, cy - text_size[1] - 10),
                      (cx + text_size[0] + 5, cy), zone_color, thickness=-1)
        cv2.putText(output, count_text, (cx, cy - 5),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 0), 2)
    
    # Draw detection bounding boxes.
    for entry in annotated_bboxes:
        x1, y1, x2, y2 = entry["bbox"]
        zone = entry["zone"]
        box_color = zones[zone]["color"] if zone is not None else (128, 128, 128)
        cv2.rectangle(output, (x1, y1), (x2, y2), box_color, 2)
        if label_display:
            text_label = zone if zone is not None else "None"
            cv2.putText(output, text_label, (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, box_color, 2)
    
    # Draw the global total count in the UPPER LEFT CORNER.
    total_count = len(person_bboxes)
    global_text = f"Total: {total_count}"
    g_text_size, _ = cv2.getTextSize(global_text, cv2.FONT_HERSHEY_SIMPLEX, 2, 4)
    cv2.rectangle(output, (10, 10), (10 + g_text_size[0] + 20, 10 + g_text_size[1] + 20), (0, 0, 0), thickness=-1)
    cv2.putText(output, global_text, (20, 10 + g_text_size[1] + 5),
                cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 4)
    
    return output

# ====================================
# 6. PROCESS THE SELECTED INPUT
# ====================================
if input_mode == "image":
    # Process a single image.
    output = process_frame(image)
    cv2.imshow("Output", output)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
else:
    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Process only every Nth frame based on frame_skip.
        if frame_count % frame_skip == 0:
            output = process_frame(frame)
            cv2.imshow("Output", output)
        else:
            cv2.imshow("Output", frame)
        
        frame_count += 1
        if cv2.waitKey(1) & 0xFF == 27:  # Press ESC to exit.
            break
    cap.release()
    cv2.destroyAllWindows()

loading Roboflow workspace...
loading Roboflow project...
Select input mode:
1) Local Image
2) YouTube Video
3) Camera
4) Local Video
