# **Imports and installations**

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install ultralytics

Installing collected packages: thop, ultralytics
Successfully installed thop-0.1.1.post2209072238 ultralytics-8.1.18


In [3]:
from ultralytics import YOLO, RTDETR
import cv2
import pandas as pd
import numpy as np
import math
from typing import List, Tuple

In [4]:
model = RTDETR('rtdetr-l.pt')

Downloading https://github.com/ultralytics/assets/releases/download/v8.1.0/rtdetr-l.pt to 'rtdetr-l.pt'...


100%|██████████| 63.4M/63.4M [00:00<00:00, 227MB/s]


# **Functions and classes**

In [5]:
class Tracker:
    """
      Tracker class to keep track of object IDs and their positions.
    """
    def __init__(self):
        """
        Initialize the Tracker object.

        Attributes:
            center_points (dict): Dictionary to store the center positions of the objects.
            id_count (int): Counter to keep track of the IDs.
        """
        self.center_points = {}
        self.id_count = 0

    def update(self, objects_rect):
        """
        Update the tracker with new object bounding boxes.

        Parameters:
            objects_rect (List[Tuple[int, int, int, int]]): List of object bounding boxes
                (x, y, width, height) for the current frame.

        Returns:
            List[List[int]]: List of object bounding boxes with assigned IDs
                (x, y, width, height, object_id).
        """
        # Objects boxes and ids
        objects_bbs_ids = []

        # Get center point of new object
        for rect in objects_rect:
            x, y, w, h = rect
            cx = (x + x + w) // 2
            cy = (y + y + h) // 2

            # Find out if that object was detected already
            same_object_detected = False
            for id, pt in self.center_points.items():
                dist = math.hypot(cx - pt[0], cy - pt[1])

                if dist < 35:
                    self.center_points[id] = (cx, cy)
                    objects_bbs_ids.append([x, y, w, h, id])
                    same_object_detected = True
                    break

            # New object is detected we assign the ID to that object
            if same_object_detected is False:
                self.center_points[self.id_count] = (cx, cy)
                objects_bbs_ids.append([x, y, w, h, self.id_count])
                self.id_count += 1

        # Clean the dictionary by center points to remove IDS not used anymore
        new_center_points = {}
        for obj_bb_id in objects_bbs_ids:
            _, _, _, _, object_id = obj_bb_id
            center = self.center_points[object_id]
            new_center_points[object_id] = center

        # Update dictionary with IDs not used removed
        self.center_points = new_center_points.copy()
        return objects_bbs_ids

In [6]:
def capture_input_video(input_path: str) -> cv2.VideoCapture:
    """
    Capture input video from the specified path.

    Parameters:
        input_path (str): Path to the input video file.

    Returns:
        cv2.VideoCapture: VideoCapture object for the input video.
    """
    return cv2.VideoCapture(input_path)

In [7]:
def create_output_video(input_video: cv2.VideoCapture, output_path: str) -> cv2.VideoWriter:
    """
    Create an output video writer object.

    Parameters:
        input_video (cv2.VideoCapture): VideoCapture object for the input video.
        output_path (str): Path to save the output video file.

    Returns:
        cv2.VideoWriter: VideoWriter object for the output video.
    """
    fps = input_video.get(cv2.CAP_PROP_FPS)
    frame_width = int(input_video.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(input_video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    return cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

In [8]:
def process_frame(frame: np.ndarray, model: object, task: str, threshold: float, tracker: Tracker, specific_area: List = None) -> tuple:
    """
    Process a single frame of the video.

    Parameters:
        frame (numpy.ndarray): Input frame to be processed.
        model (object): Object representing the model used for prediction.
        task (str): Task to be performed ('bounding_box', 'all_area', 'specific_area').
        threshold (float): Threshold value for confidence.
        tracker (Tracker): Object representing the tracker.
        specific_area (numpy.ndarray, optional): Area for specific area task.

    Returns:
        tuple: A tuple containing processed frame and count of people (int).
    """
    results = model.predict(frame, classes=0, conf=threshold)
    a = results[0].boxes.data.cpu()
    px = pd.DataFrame(a).astype("float")
    bboxes = []
    counter = 0

    for index, row in px.iterrows():
        x1, y1, x2, y2, _, d = map(int, row)
        bboxes.append([x1, y1, x2, y2])

    bbox_id = tracker.update(bboxes)
    for bbox in bbox_id:
        x1, y1, x2, y2, id = bbox
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cx, cy = (x1 + x2) // 2, (y1 + y2) // 2

        if task == 'specific_area' and specific_area is not None:
            results = cv2.pointPolygonTest(np.array(specific_area, np.int32), ((cx, cy)), False)
            if results >= 0:
                counter += 1

    return frame, counter if task == 'specific_area' else len(bboxes)


In [9]:
def main(input_path: str = '/content/drive/MyDrive/test.mp4', output_name: str = 'bbox_0.55_rtdetrl.mp4', task: str = 'bounding_box', threshold: float = 0.55) -> None:
    """
    Main function to process the input video.

    Parameters:
        input_path (str): Path to the input video file.
        output_name (str): Name to save the output video file.
        task (str): Task to be performed ('bounding_box', 'all_area', 'specific_area').
        threshold (float): Threshold value for confidence.

    Raises:
        ValueError: If the task argument does not match with the allowed values.
    """
    if task not in ['bounding_box', 'all_area', 'specific_area']:
        raise ValueError("Invalid task. Allowed values: 'bounding_box', 'all_area', 'specific_area'")

    cap = capture_input_video(input_path)
    output_video_path = f'/content/drive/MyDrive/{output_name}'
    out = create_output_video(cap, output_video_path)
    tracker = Tracker()
    specific_area = [(250, 80), (380, 80), (380, 160), (250, 160)]

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame, counter = process_frame(frame, model, task, threshold, tracker, specific_area)

        if task == 'all_area':
            cv2.putText(frame, f'Number of people: {counter}', (10, 15), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1)
        elif task == 'specific_area':
            cv2.putText(frame, f'Number of people in specific area: {counter}', (10, 15), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1)

        out.write(frame)
        if cv2.waitKey(1) & 0xFF == 27:
            break

    cap.release()
    out.release()
    cv2.destroyAllWindows()

# **Step1: Bounding Box**

In [None]:
main(input_path='/content/drive/MyDrive/test.mp4', output_name='bbox_0.55_rtdetrl.mp4', task='bounding_box', threshold=0.55)

# **Step2: count people**

In [None]:
main(input_path='/content/drive/MyDrive/test.mp4', output_name='out_conf_0.55_all_rtdetrl.mp4', task='all_area', threshold=0.55)

# **Step3: count people in specific area**

In [None]:
main(input_path='/content/drive/MyDrive/test.mp4', output_name='specific_area_0.55_rtdetrl.mp4', task='specific_area', threshold=0.55)