# Import Tools

In [37]:
import os
import supervision as sv
import cv2
from ultralytics import YOLO
import torch
import numpy as np
import json

HOME = os.getcwd()
HOME

'C:\\Users\\USER\\Downloads\\DeepFashion2\\DeepFashion2'

In [7]:
!pip install -q ultralytics supervision --upgrade supervision
from ultralytics import YOLO
import supervision as sv

In [8]:
DATA_PATH = f"{HOME}/yolov9/deepfashion2-m-11k-1"
IMAGE_PATH = f"{HOME}/yolov9/deepfashion2-m-11k-1/train/images"
WEIGHT_PATH = f"{HOME}/yolov9/weights/yolov9c.pt"
# SINGLE_IMAGE = IMAGE_PATH + "/000001_png.rf.3efc63de2fe2bab8882abc87437bf048.jpg"
SINGLE_IMAGE = "yolov9/mytest/5.jpeg"

# Get the model

In [9]:
model = YOLO(WEIGHT_PATH)
model.info()

YOLOv9c summary: 618 layers, 25590912 parameters, 0 gradients, 104.0 GFLOPs


(618, 25590912, 0, 104.02268160000003)

# Get the data

In [10]:
source_video_path = f"{HOME}/yolov9/myvid/background video _ people _ walking _.mp4"

# ByteTrack

In [51]:
COLORS = sv.ColorPalette.from_hex(["#E6194B", "#3CB44B", "#FFE119", "#3C76D1"])
ELLIPSE_ANNOTATOR = sv.EllipseAnnotator()
LABEL_ANNOTATOR = sv.LabelAnnotator(text_color=sv.Color.from_hex("#000000"))
tracker = sv.ByteTrack(minimum_matching_threshold=0.5)
device = "cuda" if torch.cuda.is_available() else "cpu"

polygons = load_zones_config(file_path=zone_configuration_path)
video_info = sv.VideoInfo.from_video_path(video_path=source_video_path)

## Timer

In [50]:
class FPSBasedTimer:
    """
    A timer that calculates the duration each object has been detected based on frames
    per second (FPS).

    Attributes:
        fps (int): The frame rate of the video stream, used to calculate time durations.
        frame_id (int): The current frame number in the sequence.
        tracker_id2frame_id (Dict[int, int]): Maps each tracker's ID to the frame number
            at which it was first detected.
    """

    def __init__(self, fps: int = 30) -> None:
        """Initializes the FPSBasedTimer with the specified frames per second rate.

        Args:
            fps (int, optional): The frame rate of the video stream. Defaults to 30.
        """
        self.fps = fps
        self.frame_id = 0
        self.tracker_id2frame_id: Dict[int, int] = {}

    def tick(self, detections: sv.Detections) -> np.ndarray:
        """Processes the current frame, updating time durations for each tracker.

        Args:
            detections: The detections for the current frame, including tracker IDs.

        Returns:
            np.ndarray: Time durations (in seconds) for each detected tracker, since
                their first detection.
        """
        self.frame_id += 1
        times = []

        for tracker_id in detections.tracker_id:
            self.tracker_id2frame_id.setdefault(tracker_id, self.frame_id)

            start_frame_id = self.tracker_id2frame_id[tracker_id]
            time_duration = (self.frame_id - start_frame_id) / self.fps
            times.append(time_duration)

        return np.array(times)


## Draw Pologon Zones

In [29]:
source_path = "yolov9/myvid/background video _ people _ walking _.mp4"
zone_configuration_path =  "scripts/config.json"

In [34]:
!python scripts/draw_zones.py --source_path "{source_path}" --zone_configuration_path "{zone_configuration_path}"

Polygons saved to scripts/config.json


In [40]:
def load_zones_config(file_path: str):
    """
    Load polygon zone configurations from a JSON file.

    This function reads a JSON file which contains polygon coordinates, and
    converts them into a list of NumPy arrays. Each polygon is represented as
    a NumPy array of coordinates.

    Args:
        file_path (str): The path to the JSON configuration file.

    Returns:
        List[np.ndarray]: A list of polygons, each represented as a NumPy array.
    """
    with open(file_path, "r") as file:
        data = json.load(file)
        return [np.array(polygon, np.int32) for polygon in data]

def find_in_list(array: np.ndarray, search_list):
    """Determines if elements of a numpy array are present in a list.

    Args:
        array (np.ndarray): The numpy array of integers to check.
        search_list (List[int]): The list of integers to search within.

    Returns:
        np.ndarray: A numpy array of booleans, where each boolean indicates whether
        the corresponding element in `array` is found in `search_list`.
    """
    if not search_list:
        return np.ones(array.shape, dtype=bool)
    else:
        return np.isin(array, search_list)

## Short Ver

In [54]:
# Zones
polygons = load_zones_config(file_path=zone_configuration_path)
zones = [
    sv.PolygonZone(
        polygon=polygon,
        frame_resolution_wh=resolution_wh,
        triggering_anchors=(sv.Position.CENTER,),
    )
    for polygon in polygons
]

# Timers
timers = [FPSBasedTimer(video_info.fps) for _ in zones]

def callback(frame: np.ndarray, index: int) -> np.ndarray:
    results = model(frame, verbose=False, device=device, conf=0.3)[0]
    
    # Detections
    detections = sv.Detections.from_ultralytics(results)
    detections = detections[detections.class_id == 0]
    detections = detections.with_nms(threshold=0.7)
    detections = detections[find_in_list(detections.class_id, [0])]
    detections = tracker.update_with_detections(detections)

    # Annotation
    annotated_frame = frame.copy()
    for idx, zone in enumerate(zones):
        annotated_frame = sv.draw_polygon(
            scene=annotated_frame, polygon=zone.polygon, color=COLORS.by_idx(idx)
        )
        detections_in_zone = detections[zone.trigger(detections)]
        time_in_zone = timers[idx].tick(detections_in_zone)
        
        labels = [
                f"#{tracker_id} {int(time // 60):02d}:{int(time % 60):02d}"
                for tracker_id, time in zip(detections_in_zone.tracker_id, time_in_zone)
        ]
        
        annotated_frame = ELLIPSE_ANNOTATOR.annotate(
            scene=annotated_frame, detections=detections_in_zone)
        annotated_frame = LABEL_ANNOTATOR.annotate(
            scene=annotated_frame, detections=detections_in_zone, labels=labels)
        
        return annotated_frame

sv.process_video(
    source_path=source_video_path,
    target_path="output_video.mp4",
    callback=callback
)

## Full Ver

In [18]:
# video_info = sv.VideoInfo.from_video_path(video_path=source_video_path)
# frames_generator = sv.get_video_frames_generator(source_video_path)

# frame = next(frames_generator)
# resolution_wh = frame.shape[1], frame.shape[0]

# # For saving the video
# output_video_path = "output_video.mp4"
# fourcc = cv2.VideoWriter_fourcc(*"mp4v")
# fps = video_info.fps
# output_video = cv2.VideoWriter(output_video_path, fourcc, fps, resolution_wh)

# for frame in frames_generator:
#     results = model(frame, verbose=False, device=device, conf=0.3)[0]
#     detections = sv.Detections.from_ultralytics(results)
#     detections = detections.with_nms(threshold=0.7)
#     detections = tracker.update_with_detections(detections)
#     detections = detections[detections.class_id == 0]

#     annotated_frame = frame.copy()

#     annotated_frame = COLOR_ANNOTATOR.annotate(
#         scene=annotated_frame,
#         detections=detections,
#     )

#     labels = [
#         f"#{tracker_id}"
#         for tracker_id in detections.tracker_id
#     ]

#     annotated_frame = LABEL_ANNOTATOR.annotate(
#         scene=annotated_frame,
#         detections=detections,
#         labels=labels,
#     )
#     output_video.write(annotated_frame)

#     cv2.imshow("Processed Video", annotated_frame)
#     if cv2.waitKey(1) & 0xFF == ord("q"):
#         break

# output_video.release()
# cv2.destroyAllWindows()