## Retail Analytics - Zone Density Transition

This guide walks us through building a custom analyzer, by extending result analyzer base class for retail analytics. Specifically, this analyzer tracks zone density and zone transition ie. people moving from one zone to another.

In [None]:
import time
import cv2
import numpy as np
from collections import deque
from degirum_tools.result_analyzer_base import ResultAnalyzerBase
import degirum as dg
import degirum_tools
from collections import defaultdict

## Zone Density Analyzer
We extend ResultAnalyzerBase class to write our custom analyzer logic for tracking zone density

In [None]:
class ZoneDensityAnalyzer(ResultAnalyzerBase):
    """
    Analyzer for tracking zone density - number of people in each zone
    Input: Video stream + zone polygon(s)
    Output (per frame/interval):
    - zone_id
    - person_count
    - avg_person_count
    - timestamp
    """
    def __init__(self, zones, config=None):
        """
        Args:
            zones: dict of {zone_id: np.array([[x1,y1], [x2,y2], ...])}
                   or list of np.arrays (will be auto-numbered as zone_1, zone_2, etc.)
            config: Optional configuration dict
        """
        super().__init__()
        if isinstance(zones, list):
            self.zones = {
                f"zone_{i+1}": np.array(zone, np.int32)
                for i, zone in enumerate(zones)
            }
        else:
            self.zones = {
                k: np.array(v, np.int32) for k, v in zones.items()
            }
        default_config = {
            "person_confidence": 0.35,
            "max_disappeared": 30,
            "tracking_threshold_percent": 10.0,
            "density_interval": 1.0,
        }
        self.config = {**default_config, **(config or {})}
        self.tracked_people = {}
        self.next_person_id = 1
        self.zone_counts = {zone_id: [] for zone_id in self.zones.keys()}
        self.frame_count = 0
        self.fps = 30.0
        self.last_density_measurement = 0.0

    def reset(self):
        """Reset all tracking state"""
        self.tracked_people = {}
        self.next_person_id = 1
        self.zone_counts = {zone_id: [] for zone_id in self.zones.keys()}
        self.frame_count = 0
        self.last_density_measurement = 0.0

    def analyze(self, inference_result):
        """Main processing function called by DeGirum for each frame"""
        timestamp = self.frame_count / self.fps
        self.frame_count += 1

        person_detections = [
            det for det in inference_result.results
            if det["label"].lower() == "person" and det["score"] >= self.config["person_confidence"]
        ]

        image_width = inference_result.image.shape[1] if hasattr(inference_result, 'image') else 640
        self._update_tracking(person_detections, timestamp, image_width)

        if timestamp - self.last_density_measurement >= self.config["density_interval"]:
            self._measure_density(timestamp)
            self.last_density_measurement = timestamp

        stats = self.get_stats()
        stats["timestamp"] = timestamp
        stats["frame_count"] = self.frame_count

        stats_result = {
            "label": "zone_density_stats",
            "score": 1.0,
            "stats": stats
        }
        inference_result.results.append(stats_result)

        # Remove person detections that are outside zones (modify in-place)
        results_to_remove = []
        for i, det in enumerate(inference_result.results):
            if det["label"].lower() == "person":
                bbox = det["bbox"]
                center = self._get_center(bbox)
                zone_id = self._get_person_zone(center)
                # Mark for removal if person is outside all zones
                if zone_id is None:
                    results_to_remove.append(i)

        # Remove in reverse order to maintain correct indices
        for i in reversed(results_to_remove):
            inference_result.results.pop(i)

        self._draw_annotations(inference_result.image_overlay, timestamp)

        return inference_result

    def annotate(self, inference_result, image):
        """Draw overlays on video frame"""
        timestamp = self.frame_count / self.fps
        self._draw_annotations(image, timestamp)
        return image

    def _match_to_existing_track(self, center, image_width):
        """Match detection to existing tracked person"""
        if not self.tracked_people:
            return None

        threshold_distance = (self.config["tracking_threshold_percent"] / 100.0) * image_width
        best_match = None
        best_distance = float('inf')

        for person_id, track in self.tracked_people.items():
            if track["frames_disappeared"] > self.config["max_disappeared"]:
                continue

            track_center = track["center"]
            distance = abs(center[0] - track_center[0]) + abs(center[1] - track_center[1])

            if distance < threshold_distance and distance < best_distance:
                best_distance = distance
                best_match = person_id

        return best_match

    def _get_person_zone(self, center):
        """Determine which zone (if any) the person is in"""
        for zone_id, zone in self.zones.items():
            if self._point_in_polygon(center, zone):
                return zone_id
        return None

    def _update_tracking(self, detections, timestamp, image_width):
        """Update tracking state for all detections"""
        for person_id in self.tracked_people:
            self.tracked_people[person_id]["frames_disappeared"] += 1

        for det in detections:
            bbox = det["bbox"]
            center = self._get_center(bbox)
            zone_id = self._get_person_zone(center)

            # Only track persons inside zones
            if zone_id is None:
                continue

            person_id = self._match_to_existing_track(center, image_width)

            if person_id is None:
                person_id = self.next_person_id
                self.next_person_id += 1
                self.tracked_people[person_id] = {
                    "center": center,
                    "bbox": bbox,
                    "frames_disappeared": 0,
                    "current_zone": None,
                }

            track = self.tracked_people[person_id]
            track["center"] = center
            track["bbox"] = bbox
            track["frames_disappeared"] = 0
            track["current_zone"] = zone_id

        to_remove = [
            pid for pid, track in self.tracked_people.items()
            if track["frames_disappeared"] > self.config["max_disappeared"]
        ]
        for person_id in to_remove:
            del self.tracked_people[person_id]

    def _measure_density(self, timestamp):
        """Measure current density in all zones"""
        for zone_id in self.zones.keys():
            count = sum(
                1 for track in self.tracked_people.values()
                if track["current_zone"] == zone_id and track["frames_disappeared"] == 0
            )
            self.zone_counts[zone_id].append(count)

    def _get_center(self, bbox):
        """Get center point of bounding box"""
        x1, y1, x2, y2 = bbox
        return (int((x1 + x2) / 2), int((y1 + y2) / 2))

    def _point_in_polygon(self, point, polygon):
        """Check if point is inside polygon"""
        return cv2.pointPolygonTest(polygon, point, False) >= 0

    def _draw_annotations(self, frame, timestamp):
        """Draw zones and density info on frame - only show persons in zones"""
        colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255)]

        for idx, (zone_id, zone) in enumerate(self.zones.items()):
            color = colors[idx % len(colors)]
            cv2.polylines(frame, [zone], True, color, 2)

            zone_center = np.mean(zone, axis=0).astype(int)
            cv2.putText(frame, zone_id.upper(),
                       tuple(zone_center - [40, 10]),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

            current_count = sum(
                1 for track in self.tracked_people.values()
                if track["current_zone"] == zone_id and track["frames_disappeared"] == 0
            )
            cv2.putText(frame, f"Count: {current_count}",
                       tuple(zone_center - [30, -10]),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        # Only draw bounding boxes for persons in zones
        for person_id, track in self.tracked_people.items():
            if track["frames_disappeared"] > 0:
                continue

            # Only draw if person is currently in a zone
            if track["current_zone"] is None:
                continue

            bbox = track["bbox"]
            x1, y1, x2, y2 = map(int, bbox)
            color = (0, 255, 255)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

    def get_stats(self):
        """Get current statistics"""
        current_time = self.frame_count / self.fps
        zone_data = []

        for zone_id in self.zones.keys():
            person_count = sum(
                1 for track in self.tracked_people.values()
                if track["current_zone"] == zone_id and track["frames_disappeared"] == 0
            )

            counts = self.zone_counts[zone_id]
            avg_person_count = round(sum(counts) / len(counts), 2) if counts else 0.0

            zone_data.append({
                "zone_id": zone_id,
                "person_count": person_count,
                "avg_person_count": avg_person_count,
                "timestamp": round(current_time, 2)
            })

        return {
            "zone_density_data": zone_data
        }

## Zone Transition Analyzer
We extend ResultAnalyzerBase class to write our custom analyzer logic for tracking zone transition

In [None]:
class ZoneTransitionAnalyzer(ResultAnalyzerBase):
    """
    Analyzer for tracking zone transitions - people moving from one zone to another
    Input: Video stream + multiple zone polygons
    Output (per transition):
    - person_id
    - from_zone
    - to_zone
    - transition_time
    - path_sequence
    """
    def __init__(self, zones, config=None):
        """
        Args:
            zones: dict of {zone_id: np.array([[x1,y1], [x2,y2], ...])}
                   or list of np.arrays (will be auto-numbered as zone_1, zone_2, etc.)
            config: Optional configuration dict
        """
        super().__init__()
        if isinstance(zones, list):
            self.zones = {
                f"zone_{i+1}": np.array(zone, np.int32)
                for i, zone in enumerate(zones)
            }
        else:
            self.zones = {
                k: np.array(v, np.int32) for k, v in zones.items()
            }

        default_config = {
            "person_confidence": 0.35,
            "max_disappeared": 30,
            "tracking_threshold_percent": 10.0,
            "min_zone_frames": 5,
        }
        self.config = {**default_config, **(config or {})}
        self.tracked_people = {}
        self.next_person_id = 1
        self.transitions = []
        self.active_transitions = {}
        self.frame_count = 0
        self.fps = 30.0

    def reset(self):
        """Reset all tracking state"""
        self.tracked_people = {}
        self.next_person_id = 1
        self.transitions = []
        self.active_transitions = {}
        self.frame_count = 0

    def analyze(self, inference_result):
        """Main processing function called by DeGirum for each frame"""
        timestamp = self.frame_count / self.fps
        self.frame_count += 1

        person_detections = [
            det for det in inference_result.results
            if det["label"].lower() == "person" and det["score"] >= self.config["person_confidence"]
        ]

        image_width = inference_result.image.shape[1] if hasattr(inference_result, 'image') else 640
        self._update_tracking(person_detections, timestamp, image_width)

        stats = self.get_stats()
        stats["timestamp"] = timestamp
        stats["frame_count"] = self.frame_count

        stats_result = {
            "label": "zone_transition_stats",
            "score": 1.0,
            "stats": stats
        }
        inference_result.results.append(stats_result)

        # Remove person detections that are outside zones (modify in-place)
        results_to_remove = []
        for i, det in enumerate(inference_result.results):
            if det["label"].lower() == "person":
                bbox = det["bbox"]
                center = self._get_center(bbox)
                zone_id = self._get_person_zone(center)
                # Mark for removal if person is outside all zones
                if zone_id is None:
                    results_to_remove.append(i)

        # Remove in reverse order to maintain correct indices
        for i in reversed(results_to_remove):
            inference_result.results.pop(i)

        self._draw_annotations(inference_result.image_overlay, timestamp)

        return inference_result

    def annotate(self, inference_result, image):
        """Draw overlays on video frame"""
        timestamp = self.frame_count / self.fps
        self._draw_annotations(image, timestamp)
        return image

    def _match_to_existing_track(self, center, image_width):
        """Match detection to existing tracked person"""
        if not self.tracked_people:
            return None

        threshold_distance = (self.config["tracking_threshold_percent"] / 100.0) * image_width
        best_match = None
        best_distance = float('inf')

        for person_id, track in self.tracked_people.items():
            if track["frames_disappeared"] > self.config["max_disappeared"]:
                continue

            track_center = track["center"]
            distance = abs(center[0] - track_center[0]) + abs(center[1] - track_center[1])

            if distance < threshold_distance and distance < best_distance:
                best_distance = distance
                best_match = person_id

        return best_match

    def _get_person_zone(self, center):
        """Determine which zone (if any) the person is in"""
        for zone_id, zone in self.zones.items():
            if self._point_in_polygon(center, zone):
                return zone_id
        return None

    def _update_tracking(self, detections, timestamp, image_width):
        """Update tracking state for all detections and detect transitions"""
        for person_id in self.tracked_people:
            self.tracked_people[person_id]["frames_disappeared"] += 1

        for det in detections:
            bbox = det["bbox"]
            center = self._get_center(bbox)
            current_zone = self._get_person_zone(center)

            # Only track persons inside zones
            if current_zone is None:
                continue

            person_id = self._match_to_existing_track(center, image_width)

            if person_id is None:
                person_id = self.next_person_id
                self.next_person_id += 1
                self.tracked_people[person_id] = {
                    "center": center,
                    "bbox": bbox,
                    "frames_disappeared": 0,
                    "current_zone": current_zone,
                    "confirmed_zone": current_zone,
                    "zone_entry_frame": self.frame_count,
                    "zone_frames": 1,
                    "path_sequence": [current_zone] if current_zone else [],
                }
                continue

            track = self.tracked_people[person_id]
            track["center"] = center
            track["bbox"] = bbox
            track["frames_disappeared"] = 0

            previous_zone = track["current_zone"]

            if current_zone == previous_zone:
                track["zone_frames"] += 1

                if (track["zone_frames"] >= self.config["min_zone_frames"] and
                    track["confirmed_zone"] != current_zone):
                    if track["confirmed_zone"] is not None and current_zone is not None:
                        self._record_transition(
                            person_id,
                            track["confirmed_zone"],
                            current_zone,
                            timestamp,
                            track
                        )
                    track["confirmed_zone"] = current_zone
            else:
                track["current_zone"] = current_zone
                track["zone_frames"] = 1
                track["zone_entry_frame"] = self.frame_count

        to_remove = [
            pid for pid, track in self.tracked_people.items()
            if track["frames_disappeared"] > self.config["max_disappeared"]
        ]
        for person_id in to_remove:
            del self.tracked_people[person_id]
            if person_id in self.active_transitions:
                del self.active_transitions[person_id]

    def _record_transition(self, person_id, from_zone, to_zone, timestamp, track):
        """Record a confirmed zone transition"""
        if to_zone not in track["path_sequence"]:
            track["path_sequence"].append(to_zone)

        transition = {
            "person_id": f"person-{person_id}",
            "from_zone": from_zone,
            "to_zone": to_zone,
            "transition_time": round(timestamp, 2),
            "path_sequence": track["path_sequence"].copy(),
        }
        self.transitions.append(transition)
        print(f"Transition recorded: {transition['person_id']} moved from {from_zone} to {to_zone} - {timestamp:.2f}s")

    def _get_center(self, bbox):
        """Get center point of bounding box"""
        x1, y1, x2, y2 = bbox
        return (int((x1 + x2) / 2), int((y1 + y2) / 2))

    def _point_in_polygon(self, point, polygon):
        """Check if point is inside polygon"""
        return cv2.pointPolygonTest(polygon, point, False) >= 0

    def _draw_annotations(self, frame, timestamp):
        """Draw zones and transition info on frame - only show persons in zones"""
        colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255)]

        for idx, (zone_id, zone) in enumerate(self.zones.items()):
            color = colors[idx % len(colors)]
            cv2.polylines(frame, [zone], True, color, 2)

            zone_center = np.mean(zone, axis=0).astype(int)
            cv2.putText(frame, zone_id.upper(),
                       tuple(zone_center - [40, 10]),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        # Only draw bounding boxes for persons in zones
        for person_id, track in self.tracked_people.items():
            if track["frames_disappeared"] > 0:
                continue

            # Only draw if person is currently in a confirmed zone
            if track["confirmed_zone"] is None:
                continue

            bbox = track["bbox"]
            x1, y1, x2, y2 = map(int, bbox)
            color = (0, 255, 255)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

            label = f"person-{person_id}"
            cv2.putText(frame, label, (x1, y1 - 10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

            if track["confirmed_zone"]:
                zone_text = f"{track['confirmed_zone']} ({track['zone_frames']}f)"
                cv2.putText(frame, zone_text, (x1, y1 - 30),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.4, color, 1)

            if len(track["path_sequence"]) > 1:
                path_text = " -> ".join(track["path_sequence"])
                cv2.putText(frame, path_text, (x1, y2 + 20),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)

        y_offset = 30
        cv2.putText(frame, f"Total Transitions: {len(self.transitions)}", (10, y_offset),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

        y_offset += 25
        cv2.putText(frame, "Recent Transitions:", (10, y_offset),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

        recent_transitions = self.transitions[-5:]
        for trans in recent_transitions:
            y_offset += 20
            text = f"{trans['person_id']}: {trans['from_zone']} -> {trans['to_zone']} - {trans['transition_time']}s"
            cv2.putText(frame, text, (10, y_offset),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.35, (0, 255, 0), 1)

    def get_stats(self):
        """Get current statistics"""
        current_time = self.frame_count / self.fps

        transition_counts = defaultdict(int)
        for trans in self.transitions:
            key = f"{trans['from_zone']} -> {trans['to_zone']}"
            transition_counts[key] += 1

        active_paths = []
        for person_id, track in self.tracked_people.items():
            if track["frames_disappeared"] == 0 and track["path_sequence"]:
                active_paths.append({
                    "person_id": f"person-{person_id}",
                    "current_zone": track["confirmed_zone"],
                    "zone_frames": track["zone_frames"],
                    "path_sequence": track["path_sequence"],
                })

        return {
            "total_transitions": len(self.transitions),
            "all_transitions": self.transitions,
            "transition_counts": dict(transition_counts),
            "active_paths": active_paths,
            "timestamp": round(current_time, 2),
        }

    def get_transitions(self):
        """Get all recorded transitions"""
        return self.transitions.copy()

    def get_person_path(self, person_id):
        """Get the path sequence for a specific person"""
        if person_id in self.tracked_people:
            return self.tracked_people[person_id]["path_sequence"].copy()
        return []

## Attach analyzers and run inference

In [None]:
# Configuration
hw_location = "@local"
model_zoo_url = "degirum/hailo"
model_name = "yolov8n_relu6_person--640x640_quant_hailort_multidevice_1"
video_source_path = "video.mp4" #change your video path

zone_transition = {
    "waiting_area": np.array([
        [680,  -100],   # top-left
        [850,  -100],   # top-right
        [850,   120],   # bottom-right
        [680,   120]    # bottom-left
    ], np.int32),

    "service_area": np.array([
        [680, 130],
        [850, 130],
        [850, 350],
        [680, 350]
    ], np.int32),

    "exit_area": np.array([
        [700, 370],    # top-left
        [900, 370],    # top-right
        [900, 590],    # bottom-right
        [700, 590]     # bottom-left
    ], np.int32)
}

# Load person detection model
detection_model = dg.load_model(
    model_name="yolov8n_relu6_person--640x640_quant_hailort_multidevice_1",
    inference_host_address=inference_host_address,
    zoo_url=zoo_url
)


density_analyzer = ZoneDensityAnalyzer(
    zones=zone_transition,
    config={
        "person_confidence": 0.35,
        "tracking_threshold_percent": 10.0,
        "density_interval": 2.0,  # Measure density every 2 seconds
    }
)

transition_analyzer = ZoneTransitionAnalyzer(
    zones=zone_transition,
    config={
        "person_confidence": 0.35,
        "max_disappeared": 30,
        "tracking_threshold_percent": 10.0,
        "min_zone_frames": 5,  # Must be in zone for 5 frames to confirm
    }
)

# Attach analyzer
degirum_tools.attach_analyzers(detection_model, [density_analyzer, transition_analyzer])


with degirum_tools.Display("Wait Time Tracker") as display:
    for result in degirum_tools.predict_stream(detection_model, video_source):
        display.show(result)

## Print statistics from detection results

In [None]:
detection["stats"]