In [1]:
!pip install torch==1.12.1
!pip install torchvision==0.13.1
!pip install rich==12.5.1
!pip install opencv-python==4.6.0.66
!pip install tqdm==4.64.1
!pip install git+https://github.com/tryolabs/norfair.git@master

Collecting torch==1.12.1
  Downloading torch-1.12.1-cp310-cp310-manylinux1_x86_64.whl.metadata (22 kB)
Downloading torch-1.12.1-cp310-cp310-manylinux1_x86_64.whl (776.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m776.3/776.3 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torch
  Attempting uninstall: torch
    Found existing installation: torch 2.5.1+cu121
    Uninstalling torch-2.5.1+cu121:
      Successfully uninstalled torch-2.5.1+cu121
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
peft 0.14.0 requires torch>=1.13.0, but you have torch 1.12.1 which is incompatible.
torchaudio 2.5.1+cu121 requires torch==2.5.1, but you have torch 1.12.1 which is incompatible.
torchvision 0.20.1+cu121 requires torch==2.5.1, but you have torch 1.12.1 which is incompatible.[0m[31m
[0mSuccessfully installed torch

In [2]:
import numpy as np

import norfair

def draw(
    paths_drawer,
    track_points,
    frame,
    detections,
    tracked_objects,
    coord_transformations,
    fix_paths,
):
    if track_points == "centroid":
        norfair.draw_points(frame, detections)
        norfair.draw_tracked_objects(frame, tracked_objects)
    elif track_points == "bbox":
        norfair.draw_boxes(frame, detections)
        norfair.draw_tracked_boxes(frame, tracked_objects)

    if fix_paths:
        frame = paths_drawer.draw(frame, tracked_objects, coord_transformations)
    elif paths_drawer is not None:
        frame = paths_drawer.draw(frame, tracked_objects)

    return frame


def center(points):
    return [np.mean(np.array(points), axis=0)]

In [3]:
import os
from typing import List, Optional, Union

import numpy as np
import torch

from norfair import Detection


class YOLO:
    def __init__(self, model_path: str, device: Optional[str] = None):
        if device is not None and "cuda" in device and not torch.cuda.is_available():
            raise Exception(
                "Selected device='cuda', but cuda is not available to Pytorch."
            )
        # automatically set device if its None
        elif device is None:
            device = "cuda:0" if torch.cuda.is_available() else "cpu"

        if not os.path.exists(model_path):
            os.system(
                f"wget https://github.com/WongKinYiu/yolov7/releases/download/v0.1/{os.path.basename(model_path)} -O {model_path}"
            )

        # load model
        try:
            self.model = torch.hub.load("WongKinYiu/yolov7", "custom", model_path)
        except:
            raise Exception("Failed to load model from {}".format(model_path))

    def __call__(
        self,
        img: Union[str, np.ndarray],
        conf_threshold: float = 0.25,
        iou_threshold: float = 0.45,
        image_size: int = 720,
        classes: Optional[List[int]] = None,
    ) -> torch.tensor:

        self.model.conf = conf_threshold
        self.model.iou = iou_threshold
        if classes is not None:
            self.model.classes = classes
        detections = self.model(img, size=image_size)
        return detections


def yolo_detections_to_norfair_detections(
    yolo_detections: torch.tensor, track_points: str = "centroid"  # bbox or centroid
) -> List[Detection]:
    """convert detections_as_xywh to norfair detections"""
    norfair_detections: List[Detection] = []

    if track_points == "centroid":
        detections_as_xywh = yolo_detections.xywh[0]
        for detection_as_xywh in detections_as_xywh:
            centroid = np.array(
                [
                    [detection_as_xywh[0].item(), detection_as_xywh[1].item()],
                    [detection_as_xywh[0].item(), detection_as_xywh[1].item()],
                ]
            )
            scores = np.array(
                [detection_as_xywh[4].item(), detection_as_xywh[4].item()]
            )
            norfair_detections.append(Detection(points=centroid, scores=scores))
    elif track_points == "bbox":
        detections_as_xyxy = yolo_detections.xyxy[0]
        for detection_as_xyxy in detections_as_xyxy:
            bbox = np.array(
                [
                    [detection_as_xyxy[0].item(), detection_as_xyxy[1].item()],
                    [detection_as_xyxy[2].item(), detection_as_xyxy[3].item()],
                ]
            )
            scores = np.array(
                [detection_as_xyxy[4].item(), detection_as_xyxy[4].item()]
            )
            norfair_detections.append(Detection(points=bbox, scores=scores))

    return norfair_detections

In [None]:
import numpy as np
from typing import List, Tuple, Dict, Set
from norfair.tracker import Detection, TrackedObject
from norfair import AbsolutePaths, Paths, Tracker, Video
from norfair.camera_motion import HomographyTransformationGetter, MotionEstimator
from norfair.distances import create_normalized_mean_euclidean_distance

DISTANCE_THRESHOLD_CENTROID: float = 0.08

# Alerts with timestamps

In [21]:
def is_in_checkout_zone(points: np.ndarray, zone: Tuple[int, int, int, int]) -> bool:
    """Check if points are in the checkout zone"""
    x_min, y_min, x_max, y_max = zone

    box_x_min = min(points[0][0], points[1][0])
    box_y_min = min(points[0][1], points[1][1])
    box_x_max = max(points[0][0], points[1][0])
    box_y_max = max(points[0][1], points[1][1])

    return (box_x_min < x_max and box_x_max > x_min and
            box_y_min < y_max and box_y_max > y_min)

def check_line_crossing(current_pos: np.ndarray,
                       previous_pos: np.ndarray,
                       line_start: Tuple[int, int],
                       line_end: Tuple[int, int]) -> Tuple[bool, str]:
    """
    Check if a tracked object has crossed the line and determine the direction
    Returns: (has_crossed, direction) where direction is "entrance" or "exit"
    """
    if previous_pos is None:
        return False, ""

    current_centroid = np.mean(current_pos, axis=0)
    previous_centroid = np.mean(previous_pos, axis=0)

    def ccw(A, B, C):
        return (C[1] - A[1]) * (B[0] - A[0]) > (B[1] - A[1]) * (C[0] - A[0])

    A = current_centroid
    B = previous_centroid
    C = np.array(line_start)
    D = np.array(line_end)

    has_crossed = ccw(A, C, D) != ccw(B, C, D) and ccw(A, B, C) != ccw(A, B, D)

    if has_crossed:
        # Determine direction by checking if moving left to right or right to left
        # Since the line is diagonal, we'll use x-coordinate change as the primary indicator
        moving_right = current_centroid[0] > previous_centroid[0]
        direction = "entrance" if moving_right else "exit"
        return True, direction

    return False, ""

class ZoneTracker:
    def __init__(self):
        self.currently_in_checkout: Set[int] = set()   # IDs currently in checkout
        self.has_visited_checkout: Set[int] = set()    # IDs that have visited checkout
        self.alerted_entrance: Set[int] = set()        # IDs that have triggered entrance alert
        self.alerted_exit: Set[int] = set()           # IDs that have triggered exit alert
        self.frame_count = 0                          # Counter for frames processed

    def format_timestamp(self, frame_count: int, fps: float) -> str:
        """Convert frame count to MM:SS.mmm format"""
        total_seconds = frame_count / fps
        minutes = int(total_seconds // 60)
        seconds = int(total_seconds % 60)
        milliseconds = int((total_seconds % 1) * 1000)
        return f"{minutes:02d}:{seconds:02d}.{milliseconds:03d}"

    def process_tracked_objects(self,
                              tracked_objects: List[TrackedObject],
                              checkout_zone: Tuple[int, int, int, int],
                              line_points: Tuple[Tuple[int, int], Tuple[int, int]],
                              track_points: str,
                              fps: float = 30.0) -> List[str]:
        """Process tracked objects and return list of alerts"""
        alerts = []
        currently_in_checkout = set()

        # Increment frame counter
        self.frame_count += 1

        # Get current timestamp
        timestamp = self.format_timestamp(self.frame_count, fps)

        for obj in tracked_objects:
            current_points = obj.estimate
            past_points = obj.past_detections[-1].points if obj.past_detections else current_points

            # Check if object is in checkout zone
            in_checkout = is_in_checkout_zone(current_points, checkout_zone)
            if in_checkout:
                currently_in_checkout.add(obj.id)

                # Check for new entries to checkout
                if obj.id not in self.currently_in_checkout:
                    alerts.append(f"[{timestamp}] ALERT: Person {obj.id} entered checkout zone")
                    self.has_visited_checkout.add(obj.id)

            # Check for exits from checkout
            if obj.id in self.currently_in_checkout and not in_checkout:
                alerts.append(f"[{timestamp}] ALERT: Person {obj.id} left checkout zone")

            # Check for line crossing
            crossed, direction = check_line_crossing(current_points, past_points,
                                                  line_points[0], line_points[1])

            if crossed:
                if direction == "entrance" and obj.id not in self.alerted_entrance:
                    alerts.append(f"[{timestamp}] ALERT: Person {obj.id} entered through entrance line")
                    self.alerted_entrance.add(obj.id)
                elif direction == "exit" and obj.id not in self.alerted_exit:
                    if obj.id in self.has_visited_checkout:
                        alerts.append(f"[{timestamp}] ALERT: Person {obj.id} exited after visiting checkout")
                    else:
                        alerts.append(f"[{timestamp}] ALERT: Person {obj.id} exited WITHOUT visiting checkout")
                    self.alerted_exit.add(obj.id)

        # Update currently in checkout set
        self.currently_in_checkout = currently_in_checkout

        return alerts

def inference(
    input_video: str, model: str, track_points: str, model_threshold: str, classes: List
):
    coord_transformations = None
    paths_drawer = None
    fix_paths = True
    model = YOLO(model)
    video = Video(input_path=input_video)

    transformations_getter = HomographyTransformationGetter()
    motion_estimator = MotionEstimator(
        max_points=500, min_distance=7, transformations_getter=transformations_getter
    )

    distance_function = create_normalized_mean_euclidean_distance(
        video.input_height, video.input_width
    )
    distance_threshold = DISTANCE_THRESHOLD_CENTROID

    tracker = Tracker(
        distance_function=distance_function,
        distance_threshold=distance_threshold,
    )

    paths_drawer = Paths(center, attenuation=0.01)

    if fix_paths:
        paths_drawer = AbsolutePaths(max_history=40, thickness=2)

    results = []

    # Define zones
    checkout_zone = (874, 300, 1120, 1080)  # x_min, y_min, x_max, y_max
    entrance_exit_line = ((850, 80), (700, 204))  # (start_x, start_y), (end_x, end_y)

    # Initialize zone tracker
    zone_tracker = ZoneTracker()

    # Get video FPS
    fps = video.fps if hasattr(video, 'fps') else 25.0

    for frame in video:
        yolo_detections = model(
            frame,
            conf_threshold=model_threshold,
            iou_threshold=0.45,
            image_size=720,
            classes=classes,
        )

        mask = np.ones(frame.shape[:2], frame.dtype)
        coord_transformations = motion_estimator.update(frame, mask)
        detections = yolo_detections_to_norfair_detections(
            yolo_detections, track_points=track_points
        )
        tracked_objects = tracker.update(
            detections=detections, coord_transformations=coord_transformations
        )

        # Process zones and generate alerts with FPS parameter
        alerts = zone_tracker.process_tracked_objects(
            tracked_objects, checkout_zone, entrance_exit_line, track_points, fps=fps
        )

        # Print all alerts
        for alert in alerts:
            print(alert)

In [22]:
inference(
    "client-vid.mp4",
    "yolov7.pt",
    "bbox",
    0.25,
    [0],
)

Using cache found in /root/.cache/torch/hub/WongKinYiu_yolov7_main


[31m[1mrequirements:[0m numpy<1.24.0,>=1.18.5 not found and is required by YOLOR, attempting auto-update...

[31m[1mrequirements:[0m protobuf<4.21.3 not found and is required by YOLOR, attempting auto-update...

[31m[1mrequirements:[0m 2 packages updated per /root/.cache/torch/hub/WongKinYiu_yolov7_main/requirements.txt
[31m[1mrequirements:[0m ⚠️ [1mRestart runtime or rerun command for updates to take effect[0m





Adding autoShape... 


Output()

[00:00.320] ALERT: Person 1 entered checkout zone
[00:00.480] ALERT: Person 2 entered checkout zone
[00:00.760] ALERT: Person 3 entered checkout zone
[00:03.720] ALERT: Person 4 entered through entrance line
[00:07.160] ALERT: Person 5 entered checkout zone
[00:07.759] ALERT: Person 5 left checkout zone
[00:09.320] ALERT: Person 5 entered checkout zone
[00:09.880] ALERT: Person 5 left checkout zone
[00:10.039] ALERT: Person 5 entered checkout zone
[00:11.679] ALERT: Person 6 entered checkout zone
[00:12.039] ALERT: Person 5 left checkout zone
[00:13.160] ALERT: Person 5 entered checkout zone
[00:13.640] ALERT: Person 5 left checkout zone
[00:13.640] ALERT: Person 7 entered checkout zone
[00:19.440] ALERT: Person 8 entered checkout zone
[00:28.079] ALERT: Person 11 entered checkout zone
[00:30.839] ALERT: Person 9 entered checkout zone
[00:32.159] ALERT: Person 11 left checkout zone
[00:34.880] ALERT: Person 11 entered checkout zone
[00:35.159] ALERT: Person 11 left checkout zone
[00:37

# Visualization

In [None]:
import cv2
import numpy as np
from typing import List, Dict
from norfair.tracker import TrackedObject

def create_visualization(
    tracked_objects: List[TrackedObject],
    width: int = 1920,
    height: int = 1080,
    trajectories: Dict[int, List[tuple]] = None,
    checkout_zone: tuple = (874, 300, 1120, 1080),
    entry_exit_line: tuple = ((850, 80), (700, 204))
):
    """Create a visualization frame showing tracked objects and their movements"""
    # Initialize black canvas
    frame = np.zeros((height, width, 3), dtype=np.uint8)

    # Draw checkout zone in blue (semi-transparent)
    x_min, y_min, x_max, y_max = checkout_zone
    cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (255, 0, 0), 2)
    overlay = frame.copy()
    cv2.rectangle(overlay, (x_min, y_min), (x_max, y_max), (255, 0, 0), -1)
    cv2.addWeighted(overlay, 0.2, frame, 0.8, 0, frame)

    # Draw entry/exit line in yellow
    start_point, end_point = entry_exit_line
    cv2.line(frame, start_point, end_point, (0, 255, 255), 2)

    # Draw legend
    cv2.putText(frame, "Blue Box: Checkout Zone", (50, 50),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    cv2.putText(frame, "Yellow Line: Entry/Exit Line", (50, 100),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)
    cv2.putText(frame, "Green Dots: Current Positions", (50, 150),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    cv2.putText(frame, "White Lines: Trajectories", (50, 200),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

    # Initialize trajectories dict if None
    if trajectories is None:
        trajectories = {}

    # Update trajectories and draw current positions
    for obj in tracked_objects:
        obj_id = obj.id
        centroid = tuple(map(int, np.mean(obj.estimate, axis=0)))

        # Update trajectory
        if obj_id not in trajectories:
            trajectories[obj_id] = []
        trajectories[obj_id].append(centroid)

        # Draw trajectory (white line)
        if len(trajectories[obj_id]) > 1:
            for i in range(1, len(trajectories[obj_id])):
                cv2.line(frame,
                        trajectories[obj_id][i-1],
                        trajectories[obj_id][i],
                        (255, 255, 255), 1)

        # Draw current position (green dot)
        cv2.circle(frame, centroid, 5, (0, 255, 0), -1)

        # Draw ID
        cv2.putText(frame, f"ID: {obj_id}",
                    (centroid[0] + 10, centroid[1] - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

    return frame, trajectories

def inference_with_visualization(
    input_video: str,
    output_video: str,
    model: str,
    track_points: str,
    model_threshold: str,
    classes: List
):
    # Previous initialization code remains the same
    coord_transformations = None
    paths_drawer = None
    fix_paths = True
    model = YOLO(model)
    video = Video(input_path=input_video)

    # Get video properties
    width = int(video.input_width)
    height = int(video.input_height)

    # Create video writer for visualization
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video, fourcc, 30.0, (width, height))

    # Initialize other components
    transformations_getter = HomographyTransformationGetter()
    motion_estimator = MotionEstimator(
        max_points=500, min_distance=7, transformations_getter=transformations_getter
    )

    distance_function = create_normalized_mean_euclidean_distance(height, width)
    distance_threshold = DISTANCE_THRESHOLD_CENTROID

    tracker = Tracker(
        distance_function=distance_function,
        distance_threshold=distance_threshold,
    )

    # Define zones
    checkout_zone = (874, 300, 1120, 1080)
    entrance_exit_line = ((850, 80), (700, 204))

    # Initialize zone tracker and trajectories
    zone_tracker = ZoneTracker()
    trajectories = {}

    for frame in video:
        yolo_detections = model(
            frame,
            conf_threshold=model_threshold,
            iou_threshold=0.45,
            image_size=720,
            classes=classes,
        )

        mask = np.ones(frame.shape[:2], frame.dtype)
        coord_transformations = motion_estimator.update(frame, mask)
        detections = yolo_detections_to_norfair_detections(
            yolo_detections, track_points=track_points
        )
        tracked_objects = tracker.update(
            detections=detections, coord_transformations=coord_transformations
        )

        # Process zones and generate alerts
        alerts = zone_tracker.process_tracked_objects(
            tracked_objects, checkout_zone, entrance_exit_line, track_points
        )

        # Print alerts
        for alert in alerts:
            print(alert)

        # Create visualization frame
        viz_frame, trajectories = create_visualization(
            tracked_objects,
            width,
            height,
            trajectories,
            checkout_zone,
            entrance_exit_line
        )

        # Write visualization frame
        out.write(viz_frame)

    # Release video writer
    out.release()

    return "Visualization completed and saved to " + output_video

In [None]:
inference_with_visualization(
    input_video="client-vid.mp4",
    output_video="visualization.mp4",
    model="yolov7.pt",
    track_points="bbox",
    model_threshold=0.25,
    classes=[0]
)

Using cache found in /root/.cache/torch/hub/WongKinYiu_yolov7_main


[31m[1mrequirements:[0m numpy<1.24.0,>=1.18.5 not found and is required by YOLOR, attempting auto-update...

[31m[1mrequirements:[0m protobuf<4.21.3 not found and is required by YOLOR, attempting auto-update...

[31m[1mrequirements:[0m 2 packages updated per /root/.cache/torch/hub/WongKinYiu_yolov7_main/requirements.txt
[31m[1mrequirements:[0m ⚠️ [1mRestart runtime or rerun command for updates to take effect[0m





Adding autoShape... 


Output()

ALERT: Person 1 entered checkout zone
ALERT: Person 2 entered checkout zone
ALERT: Person 3 entered checkout zone
ALERT: Person 4 entered through entrance line
ALERT: Person 5 entered checkout zone
ALERT: Person 5 left checkout zone
ALERT: Person 5 entered checkout zone
ALERT: Person 5 left checkout zone
ALERT: Person 5 entered checkout zone
ALERT: Person 6 entered checkout zone
ALERT: Person 5 left checkout zone
ALERT: Person 5 entered checkout zone
ALERT: Person 5 left checkout zone
ALERT: Person 7 entered checkout zone
ALERT: Person 8 entered checkout zone
ALERT: Person 11 entered checkout zone
ALERT: Person 9 entered checkout zone
ALERT: Person 11 left checkout zone
ALERT: Person 11 entered checkout zone
ALERT: Person 11 left checkout zone
ALERT: Person 11 exited after visiting checkout


'Visualization completed and saved to visualization.mp4'