## prerequisites

In [1]:
import cv2
import numpy as np
import onnxruntime as ort

# --- Constants ---
ONNX_MODEL_PATH = 'object_detector_model_tf/person_detector_model.onnx'
VIDEO_FILE_PATH = 'archive/ssvid.net--HUGE-Speech-Practice-Audience-with-Applause-5-minute-presentation_1080p.mp4'
IMAGE_FILE_PATH = 'output_image.jpg'


IMAGE_SIZE = (226, 226)  # Input image dimensions for the model
GRID_SIZE = 7           # The image will be divided into a 7x7 grid
CONFIDENCE_THRESHOLD = 0.5  # Confidence threshold for displaying a bounding box
NMS_IOU_THRESHOLD = 0.4     # IoU threshold for Non-Max Suppression
CROWD_THRESHOLD = 4         # Number of people to trigger a crowd warning

## detection mechanism

In [2]:
def decode_predictions(predictions, confidence_threshold, nms_iou_threshold):
    """
    Decodes the model's raw output tensor into a list of final bounding boxes.
    Applies confidence thresholding and Non-Max Suppression.
    """
    boxes = []
    confidences = []

    cell_w = IMAGE_SIZE[1] / GRID_SIZE
    cell_h = IMAGE_SIZE[0] / GRID_SIZE

    # Iterate over each grid cell
    for y in range(GRID_SIZE):
        for x in range(GRID_SIZE):
            cell_preds = predictions[y, x, :]
            confidence = cell_preds[4]

            if confidence >= confidence_threshold:
                # Decode box coordinates
                x_rel, y_rel, w_rel, h_rel = cell_preds[:4]

                # Convert relative coordinates to absolute image coordinates
                abs_x_center = (x * cell_w) + (x_rel * cell_w)
                abs_y_center = (y * cell_h) + (y_rel * cell_h)
                abs_w = w_rel * IMAGE_SIZE[1]
                abs_h = h_rel * IMAGE_SIZE[0]

                # Convert center coordinates to top-left coordinates for NMS
                x1 = abs_x_center - (abs_w / 2)
                y1 = abs_y_center - (abs_h / 2)
                x2 = x1 + abs_w
                y2 = y1 + abs_h

                boxes.append([y1, x1, y2, x2]) # NMS expects [y1, x1, y2, x2]
                confidences.append(float(confidence))

    if not boxes:
        return [], []

    # Apply Non-Max Suppression
    indices = cv2.dnn.NMSBoxes(boxes, confidences, confidence_threshold, nms_iou_threshold)
    
    final_boxes = [boxes[i] for i in indices]
    final_confidences = [confidences[i] for i in indices]

    return final_boxes, final_confidences


def draw_boxes_with_warning(image, boxes, confidences, crowd_threshold):
    """
    Draws bounding boxes on an image and displays a crowd warning if the
    number of boxes exceeds the threshold.
    """
    for box, conf in zip(boxes, confidences):
        y1, x1, y2, x2 = [int(coord) for coord in box]
        # Draw the rectangle
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
        # Prepare the label text
        label = f"Person: {conf:.2f}"
        # Get text size to draw a background rectangle
        (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
        # Draw background and text
        cv2.rectangle(image, (x1, y1 - h - 10), (x1 + w, y1), (0, 255, 0), -1)
        cv2.putText(image, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)

    # Display crowd warning
    if len(boxes) > crowd_threshold:
        warning_text = "CROWD DETECTED!"
        (w, h), _ = cv2.getTextSize(warning_text, cv2.FONT_HERSHEY_SIMPLEX, 1, 3)
        cv2.rectangle(image, (10, 10), (10 + w + 10, 10 + h + 10), (0, 0, 255), -1)
        cv2.putText(image, warning_text, (15, 15 + h), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 3)

    return image

## Video inference

In [3]:
def crowd_monitoring_video(onnx_model_path, video_path, output_path='output_video_crowd.mp4'):
    """
    Runs crowd monitoring on a video file using the ONNX model.
    """
    print(f"--- Running crowd monitoring on video: {video_path} ---")
    session = ort.InferenceSession(onnx_model_path)
    input_name = session.get_inputs()[0].name

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Could not open video file.")
        return

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_resized = cv2.resize(frame, (IMAGE_SIZE[1], IMAGE_SIZE[0]))
        frame_input = frame_resized / 255.0
        frame_input = np.expand_dims(frame_input, axis=0).astype(np.float32)

        preds = session.run(None, {input_name: frame_input})[0][0]

        boxes, confidences = decode_predictions(preds, CONFIDENCE_THRESHOLD, NMS_IOU_THRESHOLD)

        final_boxes_scaled = []
        for box in boxes:
            y1, x1, y2, x2 = box
            y1_s = y1 * (height / IMAGE_SIZE[0])
            x1_s = x1 * (width / IMAGE_SIZE[1])
            y2_s = y2 * (height / IMAGE_SIZE[0])
            x2_s = x2 * (width / IMAGE_SIZE[1])
            final_boxes_scaled.append([y1_s, x1_s, y2_s, x2_s])

        output_frame = draw_boxes_with_warning(frame.copy(), final_boxes_scaled, confidences, CROWD_THRESHOLD)
        out.write(output_frame)

        # Display the resulting frame (optional, can be slow)
        cv2.imshow('Crowd Monitoring - Press Q to Quit', output_frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    print(f"--- Video processing complete. Output video saved to '{output_path}' ---")
    cap.release()
    out.release()
    cv2.destroyAllWindows()

## image inference

In [4]:
def crowd_monitoring_image(onnx_model_path, image_path, output_path='output_image_crowd.jpg'):
    """
    Runs crowd monitoring on a single image using the ONNX model.
    """
    print(f"--- Running crowd monitoring on image: {image_path} ---")
    session = ort.InferenceSession(onnx_model_path)
    input_name = session.get_inputs()[0].name

    image = cv2.imread(image_path)
    if image is None:
        print("Error: Could not read image.")
        return

    original_h, original_w, _ = image.shape

    # Preprocess image
    image_resized = cv2.resize(image, (IMAGE_SIZE[1], IMAGE_SIZE[0]))
    image_input = image_resized / 255.0
    image_input = np.expand_dims(image_input, axis=0).astype(np.float32)

    # Get predictions
    preds = session.run(None, {input_name: image_input})[0][0]

    # Decode predictions and draw boxes
    boxes, confidences = decode_predictions(preds, CONFIDENCE_THRESHOLD, NMS_IOU_THRESHOLD)

    # Scale boxes back to original image size
    final_boxes_scaled = []
    for box in boxes:
        y1, x1, y2, x2 = box
        y1_s = y1 * (original_h / IMAGE_SIZE[0])
        x1_s = x1 * (original_w / IMAGE_SIZE[1])
        y2_s = y2 * (original_h / IMAGE_SIZE[0])
        x2_s = x2 * (original_w / IMAGE_SIZE[1])
        final_boxes_scaled.append([y1_s, x1_s, y2_s, x2_s])

    output_image = draw_boxes_with_warning(image.copy(), final_boxes_scaled, confidences, CROWD_THRESHOLD)

    # Save the output
    cv2.imwrite(output_path, output_image)
    print(f"Found {len(final_boxes_scaled)} person(s). Output saved to '{output_path}'")

    # Display the output image (optional)
    # cv2.imshow('Crowd Detections', output_image)
    # cv2.waitKey(0)
    # cv2.destroyAllWindows()

## function calling for inference

In [7]:
# --- Run on Video ---
crowd_monitoring_video(ONNX_MODEL_PATH, VIDEO_FILE_PATH)

# --- Run on Image ---
#crowd_monitoring_image(ONNX_MODEL_PATH, IMAGE_FILE_PATH)

--- Running crowd monitoring on video: archive/ssvid.net--HUGE-Speech-Practice-Audience-with-Applause-5-minute-presentation_1080p.mp4 ---
--- Video processing complete. Output video saved to 'output_video_crowd.mp4' ---
