In [None]:
import sys
from typing import Any

from ultralytics import YOLO
from PIL import Image
import cv2 as cv
import numpy as np

## Utilities functions

In [None]:
def view(frame, *, scale=0.5):
    """
    :param frame: frame to draw on
    :param scale: scale factor to scale the frame by
    :return: stop the drawing of the frame
    """
    # Resize frame to    a normal view
    frame = cv.resize(frame, None, fx=scale, fy=scale, interpolation=cv.INTER_LINEAR)
    cv.imshow('Frame', frame)
    key = cv.waitKey(1)
    if key in [27, ord('q'), ord('Q')]:
        return False
    return True

In [None]:
# utility crop on BBoxes
def crop_bbox(frame: np.array, bbox: tuple[int, int, int, int]) -> np.ndarray:
    """
    Crops the frame based on the bounding box.
    :param frame: frame to crop.
    :param bbox: bbox to crop [x1, y1, x2, y2], NO AREA IS NEEDED.
    :return: the new frame cropped.

    bbox example: [1947,  475, 1954,  698, 1561]
    """
    # calculate width and height
    x1, y1, x2, y2 = bbox
    w = x2 - x1
    h = y2 - y1

    cropped_frame = frame[y1:y1+h, x1:x1+w]
    return cropped_frame

def crop_bboxes(frame, bboxes: list[tuple[int, int, int, int]]) -> list[np.ndarray]:
    # cropped = []
    # for bbox in bboxes:
    #     cropped_frame = crop_bbox(frame, bbox)
    #     cropped.append(cropped_frame)
    return [
        crop_bbox(frame, bbox) for bbox in bboxes
    ]

In [None]:

def merge_overlapping_detections(detections: list, overlap_threshold: float = 0.3) -> list:
    """
    Merges overlapping bounding boxes based on Intersection over Union (IoU).

    Parameters:
        detections: List of bounding boxes to merge.
        overlap_threshold: IoU threshold for merging boxes. Boxes with IoU >= threshold are merged.

    Returns:
        list: List of merged bounding boxes.
    """
    if not detections:
        return []

    boxes = np.array(detections)
    x1, y1 = boxes[:, 0], boxes[:, 1]
    x2, y2 = boxes[:, 2], boxes[:, 3]
    areas = (x2 - x1 + 1) * (y2 - y1 + 1)
    '''
    [     x      y    w      h   Area
        [ 1842   647  1926   771 10416]
        [ 1918   512  1947   575  1827]
        [ 1855   467  1912   635  9576]
     ]
    ------------------------
    [1842 1918 1855] all x1
    ------------------------
    [647 512 467] all y1
    ------------------------
    [1926 1947 1912] all x2
    ------------------------
    [771 575 635] all y2
    ------------------------
    [10625  1920  9802] all Area between them
    '''

    # Sort boxes by their area in descending order
    order = areas.argsort()[::-1]  # a[start:end:step]
    merged_boxes = []

    while len(order) > 0:
        i = order[0]
        merged_boxes.append(boxes[i])

        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])

        w = np.maximum(0, xx2 - xx1 + 1)
        h = np.maximum(0, yy2 - yy1 + 1)

        inter = w * h
        union = areas[i] + areas[order[1:]] - inter
        iou = inter / union

        # Keep boxes with IoU below the threshold
        remain_indices = np.where(iou < overlap_threshold)[0] + 1
        order = order[remain_indices]

    return merged_boxes

## MOG2 Movement Detection

In [None]:
def detect_moving_objects(frame: np.ndarray, background_subtractor: cv.BackgroundSubtractor,
                          area_threshold: int = 100) -> tuple[Any, Any]:
    """
    Detects moving objects using a background subtractor, returns their bounding boxes.

    Parameters:
        frame: Current video frame.
        background_subtractor: Background subtractor for motion detection.
        area_threshold: Minimum area for detected bounding boxes.


    Returns:
        List of detected bounding boxes with their areas, [x1, y1, x2, y2, area]

    Notes:
        - https://medium.com/analytics-vidhya/opencv-findcontours-detailed-guide-692ee19eeb18
    """

    fg_mask = background_subtractor.apply(frame)
    contours, _ = cv.findContours(fg_mask, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)
    detections = []
    for cnt in contours:
        x, y, w, h = cv.boundingRect(cnt)
        area = w * h
        if area > area_threshold:
            detections.append([x, y, x + w, y + h, area])

    return detections, fg_mask

In [None]:
# utility function
def mog2_movement_detection(frame: np.ndarray, *, background_subtractor: cv.BackgroundSubtractor, area_threshold: int = 100, overlap_threshold = 0.0, draw=False) -> tuple[Any, list, np.ndarray]:
        detections, _ = detect_moving_objects(frame, background_subtractor, area_threshold=area_threshold)
        merged_detections = merge_overlapping_detections(detections, overlap_threshold)

        # Draw bounding boxes
        if draw:
            for det in merged_detections:
                x1, y1, x2, y2, _ = det
                cv.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

        return detections, merged_detections, frame

## YOLO11 People Detection

In [None]:
import torch


class PeopleDetector(YOLO):
    """
    0: 320x640 1 person, 1 umbrella, 3 chairs, 1 couch, 3 potted plants, 2 tvs, 21.1ms
    Speed: 1.2ms preprocess, 21.1ms inference, 1.1ms postprocess per image at shape (1, 3, 320, 640)
    [ultralytics.engine.results.Results object with attributes:

    boxes: ultralytics.engine.results.Boxes object
    keypoints: None
    masks: None
    names: {0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard', 67: 'cell phone', 68: 'microwave', 69: 'oven', 70: 'toaster', 71: 'sink', 72: 'refrigerator', 73: 'book', 74: 'clock', 75: 'vase', 76: 'scissors', 77: 'teddy bear', 78: 'hair drier', 79: 'toothbrush'}
    obb: None
    orig_img: array([[[  0,   1,   1],
        [ 93, 102, 103],
        [ 93, 102, 103],
    """
    def __init__(self, model_name, threshold=0.25,  **kwargs):
        super().__init__(model_name, **kwargs)
        self.focus_on_classes = [0]
        self.threshold = threshold


    def detect(self, frame: np.ndarray) -> tuple[np.array, np.array, Any]:
        """
        :param frame: frame in which detect people
        :return: a np.array of the confidences scores, a np.array of the bounding box coordinates, and the result obj
        """
        results = self(frame, classes=self.focus_on_classes, conf=self.threshold, device=self.device)[0] # list of 1 Results object, because we can predict in batches (for video only)
        """
        cls: tensor([0., 0., 0., 0., 0., 0.], device='cuda:0')
        conf: tensor([0.9429, 0.9262, 0.8841, 0.8833, 0.8824, 0.8773], device='cuda:0')
        xywh: tensor([
            [1207.5342,  576.8246,  510.5572,  977.0013],
            [ 447.3520,  351.2901,  267.4479,  537.2728],
            [ 709.4022,  370.9929,  235.7747,  489.7085],
            [ 317.8023,  557.6183,  408.7670,  422.3871],
            [1684.3063,  498.2565,  354.5173,  403.8601],
            [1512.9242,  217.1133,  132.6541,  321.3105]
        ])
        """
        # print(results)
        probs, bboxes = results.boxes.conf, results.boxes.xywh
        probs = probs.cpu()
        bboxes = bboxes.cpu()
        return probs.numpy(), bboxes.numpy(), results

    def detect_on_frames(self, frames: list[np.ndarray]) -> list[tuple[np.array, np.array, Any]]:
        return [
            self.detect(frame) for frame in frames
        ]




## PROCESS VIDEO
*Here happens the magick*


In [None]:
def process_video(video_path: str, people_detector: PeopleDetector, scale: float = 0.5, overlap_threshold: float = 0.3, area_threshold: int = 100, early_stop=None, starts_from=0):
    """
    Processes the input video.
    Parameters:
        video_path: Path to the video file.
        scale: Scaling factor for resizing frames.
        overlap_threshold: Threshold for merging overlapping detections using IoU.
        area_threshold: Minimum area for detected bounding boxes.
        people_detector: YOLO model to detect people.
        early_stop: stop after n frames
    """
    capture = cv.VideoCapture(video_path)
    back_sub = cv.createBackgroundSubtractorMOG2(detectShadows=False)
    iterations = 0
    while early_stop is None or iterations < early_stop:
        iterations += 1
        ret, frame = capture.read()
        if iterations <= starts_from:
            print("skipping frame {}".format(iterations))
            continue
        if not ret:
            break

        frame = cv.resize(frame, None, fx=scale, fy=scale, interpolation=cv.INTER_LINEAR)

        detections, merged_detections, frame = mog2_movement_detection(frame, background_subtractor=back_sub, area_threshold=area_threshold,
                                overlap_threshold=overlap_threshold, draw=False)

        # crop frames on bboxes
        cropped_frames = crop_bboxes(frame, bboxes=[md[:-1] for md in merged_detections])

        # detect people on cropped frames

        # probs, bboxes, result = people_detector.detect(frame)
        # annotated_frame = result.plot()
        # view(annotated_frame)

        detections = people_detector.detect_on_frames(cropped_frames)
        for detection in detections:
            probs, _, result = detection
            print(probs)
            if len(probs) == 0:
                continue
            annotated_frame = result.plot()
            # Image.fromarray(annotated_frame).save('./runs/detect/{}.png'.format(iterations))

            view(annotated_frame)

        # for frame in cropped_frames:
        #     stop = view(frame, scale=scale)
        #     if not stop:
        #         break
    else:
        print(f"Stopped after {iterations} frames due early stopping condition.")

    capture.release()
    cv.destroyAllWindows()

    if sys.platform == 'darwin':
        for _ in range(4):
            cv.waitKey(1)

    cv.destroyAllWindows()



## Main

In [None]:
yolosize = 'x'
yolo11 = PeopleDetector(f"yolo11{yolosize}.pt", verbose=False,)
yolo11.to('cpu')

probs, bboxes, _ = yolo11.detect("The-Office-HD-Background.jpg")
print('confidence scores', probs)
print('bboxes', bboxes)

In [None]:
video_path = 'SamsungGear360.mp4'
process_video(video_path, yolo11, scale=0.5, overlap_threshold=0.0005, area_threshold=700) # , starts_from=3000, early_stop=3500)

# People Detection TEST


Define the base functions for people detection. <br/>
It Takes as input an array of frames containing the motion detected by MOG2 above

In [None]:
# imports
from ultralytics import YOLO
from PIL import Image

In [None]:
# Load a model
model = PeopleDetector("./yolo11x.pt")  # load an official model, or use local path

In [None]:
def test_yolo11(video_path: str, scale: float = 0.5):
    # Open the video file
    cap = cv.VideoCapture(video_path)

    # Loop through the video frames
    while cap.isOpened():
        # Read a frame from the video
        success, frame = cap.read()

        if success:
            # Run YOLO inference on the frame
            # frame = cv.resize(frame, None, fx=0.5, fy=0.5, interpolation=cv.INTER_LINEAR)
            results = model(frame)

            # Visualize the results on the frame
            annotated_frame = results[0].plot()
            annotated_frame = cv.resize(annotated_frame, None, fx=scale, fy=scale, interpolation=cv.INTER_LINEAR)
            # Display the annotated frame
            cv.imshow("YOLO Inference", annotated_frame)

            # Break the loop if 'q' is pressed
            if cv.waitKey(1) & 0xFF == ord("q"):
                break
        else:
            # Break the loop if the end of the video is reached
            break

    # Release the video capture object and close the display window
    cap.release()
    cv.destroyAllWindows()

## Main test YOLO11

In [None]:
test_yolo11("SamsungGear360.mp4", scale=0.25)