In [None]:
from IPython.display import clear_output


## INSTALLS

In [None]:
!pip install ultralytics 
clear_output()

In [None]:
!pip install git+https://github.com/KaiyangZhou/deep-person-reid.git
clear_output()

In [None]:
!pip install deep-sort-realtime
clear_output()

## Imports

Note: most of the comments were genreated using LLM

In [None]:
# --- Core Computer Vision Libraries ---
from ultralytics import YOLO  # YOLO for object detection
from deep_sort_realtime.deepsort_tracker import DeepSort # DeepSORT for initial tracking
import cv2 # OpenCV for video and image processing

# --- Re-identification and Feature Handling ---
import torchreid # Used by DeepSORT to extract appearance features
from torchreid.utils import FeatureExtractor
from scipy.spatial.distance import cosine # To calculate similarity between feature vectors

# --- Utility and System Libraries ---
from tqdm import tqdm # For creating progress bars
import numpy as np # For numerical operations (coordinates, heatmaps)
import torch # PyTorch, underlying framework for YOLO and torchreid
import os # For interacting with the operating system (e.g., file paths)
import glob # For finding files matching a specific pattern
import csv # For writing analysis results to CSV files

## Tracker Class

In [None]:
# MAIN TRACKER USING DEEPSORT
class Tracker:
    def __init__(self):
    
         # Initialize the DeepSort tracker object
         self.object_tracker = DeepSort(
            max_age=150,           # Max number of frames to keep a track alive without new detections.
            n_init=3,              # Min number of consecutive detections to start a new track.
            nms_max_overlap=0.7,   # NMS threshold to avoid overlapping tracks.
            max_cosine_distance=0.4, # Max appearance distance. A larger value allows re-associating less similar looking objects.
            nn_budget=100,         # Size of the appearance feature gallery for each track.
            override_track_class=None,
            embedder="torchreid",  # Use the torchreid library for feature extraction.
            half=True,             
            bgr=False,             
            embedder_model_name='osnet_x1_0', # The specific re-identification model to use.
            
            embedder_wts='osnet_x1_0_market_256x128_amsgrad_ep150_stp60_lr0.0015_b64_fb10_softmax_labelsmooth_flip.pth',
            polygon=False,
            today=None,
         )
         
    def track(self, detections, frame):
        """
        Updates the tracker with new detections for the current frame.
        
        Args:
            detections: A list of detections from the YOLO model.
            frame: The current video frame.
            
        Returns:
            A list of valid, confirmed tracks.
        """
        # Pass the detections and frame to the DeepSORT update method
        tracks = self.object_tracker.update_tracks(detections, frame=frame)

        valid_tracks = []
        # Filter out tracks that are not yet confirmed or have been lost
        for track in tracks:
            if not track.is_confirmed() or track.time_since_update > 0:
                continue
            
            track_info = {
                "id": track.track_id,         # The ID assigned by DeepSORT
                "box": track.to_ltrb(),       # Bounding box in (left, top, right, bottom) format
                "feature": track.features[-1], # The latest appearance feature vector
                "class": track.get_det_class(), # The object's class name (e.g., 'person')
                "conf": track.get_det_conf()      # The detection confidence score
            }
            valid_tracks.append(track_info)
            
        return valid_tracks
    

In [None]:
class PostTracker:
     """
    A robust post-processing tracker to handle ID re-association and prevent ID switching.
    DeepSORT can sometimes recycle IDs, causing a new person to get an old ID.
    This class maintains a permanent 'original_id' for each person, even if their
    DeepSORT ID changes after they are lost and re-detected.
    """
    def __init__(self, similarity_threshold=0.65, max_dist_from_last_seen=150, time_threshold=45,
                 motion_lambda=0.3, max_cost_threshold=0.6, feature_history_size=10, debug=False):
        
        # --- Parameters ---
        self.similarity_threshold = similarity_threshold # Min appearance similarity (1-cosine_distance) to be considered a match.
        self.spatial_threshold = max_dist_from_last_seen # Max pixel distance to search for a lost track.
        self.time_threshold = time_threshold             # Max frames a track can be 'lost' before being discarded.
        self.motion_lambda = motion_lambda               # Weight for spatial distance in the cost function (0.0 to 1.0). (1 - motion_lambda) is the weight for appearance.
        self.max_cost_threshold = max_cost_threshold     # The maximum combined cost for a re-association to be considered valid.
        self.feature_history_size = feature_history_size # Number of recent appearance features to store for each track.
        self.debug = debug                               # Flag to print debugging information.

        # --- State Tracking Dictionaries ---
        self.track_map = {}            # Maps current DeepSORT ID -> permanent Original ID {deepsort_id: original_id}
        self.active_tracks = {}        # Stores data for currently visible tracks {original_id: data}
        self.lost_tracks = {}          # Stores data for tracks that are temporarily lost {original_id: data}
        self.next_original_id = 1      # Counter for assigning new permanent IDs.

    def _get_centroid(self, box):
        # calc the center of the box
        return (int((box[0] + box[2]) / 2), int((box[1] + box[3]) / 2))

    def _get_new_original_id(self):
        # Increments and returns a new unique ID for a person.
        new_id = self.next_original_id
        self.next_original_id += 1
        return new_id

    def update(self, current_frame_tracks, frame_number):
        # Get the set of all DeepSORT IDs visible in the current frame
        current_deepsort_ids = {t['id'] for t in current_frame_tracks}

        # 1. Identify tracks that were active but are now lost.
        lost_original_ids = {oid for oid, data in self.active_tracks.items() if data['deepsort_id'] not in current_deepsort_ids}
        for original_id in lost_original_ids:
            lost_data = self.active_tracks[original_id]
            avg_feature = np.mean(lost_data['features'], axis=0)
            self.lost_tracks[original_id] = {'box': lost_data['box'], 'feature': avg_feature, 'frame_lost': frame_number}
            
            # IMPORTANT: Forget the DeepSORT ID mapping. This prevents a new person who might
            # get this recycled DeepSORT ID from being incorrectly mapped to the old original ID.
            deepsort_id_to_forget = lost_data['deepsort_id']
            if deepsort_id_to_forget in self.track_map:
                del self.track_map[deepsort_id_to_forget]
            
            del self.active_tracks[original_id]
            if self.debug: print(f"-Frame {frame_number}: Moved Original ID {original_id} to LOST list")

        # 2. Process currently visible tracks
        for track in current_frame_tracks:
            deepsort_id = track['id']
            mapped_id = self.track_map.get(deepsort_id)

            # Case A: This DeepSORT ID is already mapped to an active original ID (continuous tracking).
            if mapped_id is not None: 
                self.active_tracks[mapped_id]['box'] = track['box']
                self.active_tracks[mapped_id]['features'].append(track['feature'])
                if len(self.active_tracks[mapped_id]['features']) > self.feature_history_size:
                    self.active_tracks[mapped_id]['features'].pop(0)

            # Case B: This is a "newcomer" - either a truly new person or a re-appearing lost person.
            else: 
                best_match_id = None
                min_cost = float('inf')
                # Try to match this newcomer with tracks in the 'lost' list.
                for lost_id, lost_data in list(self.lost_tracks.items()):
                    if frame_number - lost_data['frame_lost'] > self.time_threshold:
                        del self.lost_tracks[lost_id]
                        continue

                    spatial_dist = np.linalg.norm(np.array(self._get_centroid(track['box'])) - np.array(self._get_centroid(lost_data['box'])))
                    if spatial_dist > self.spatial_threshold: continue

                    similarity = 1 - cosine(track['feature'], lost_data['feature'])
                    if similarity < self.similarity_threshold: continue

                    # --- Weighted Cost Function ---
                    # Combine appearance and motion cues into a single cost. Lower is better.
                    appearance_cost = 1 - similarity
                    appearance_cost = 1 - similarity
                    spatial_cost = spatial_dist / self.spatial_threshold
                    combined_cost = ((1 - self.motion_lambda) * appearance_cost) + (self.motion_lambda * spatial_cost)

                    if self.debug:
                        print(f"-Frame {frame_number}: Checking New DS_ID {deepsort_id} against Lost Original_ID {lost_id}-")
                        print(f"-Cost Components: Appearance={appearance_cost:.3f}, Spatial={spatial_cost:.3f}")
                        print(f"-Combined Cost: {combined_cost:.4f} (lower is better)")

                    # If this is the best match so far, save it.
                    if combined_cost < min_cost:
                        min_cost = combined_cost
                        best_match_id = lost_id
                        
                # If a suitable match was found in the lost list... Re-associate the track.
                if best_match_id is not None and min_cost < self.max_cost_threshold:
                    original_id = best_match_id
                    self.track_map[deepsort_id] = original_id # Create new mapping
                    self.active_tracks[original_id] = {
                        'box': track['box'], 'features': [track['feature']], 'deepsort_id': deepsort_id,
                        'class': track['class'], 'conf': track['conf']
                    }
                    del self.lost_tracks[original_id]
                    if self.debug: print(f"=>RE-ASSOCIATED: New DS_ID {deepsort_id} -> OriginalID {original_id} (Cost: {min_cost:.4f})")
                        
                # If no suitable match was found, this is a truly new person.
                else:
                    original_id = self._get_new_original_id()
                    self.track_map[deepsort_id] = original_id # Create new mapping
                    self.active_tracks[original_id] = {
                        'box': track['box'], 'features': [track['feature']], 'deepsort_id': deepsort_id,
                        'class': track['class'], 'conf': track['conf']
                    }
                    if self.debug: print(f"=>NEW PERSON: New DS_ID {deepsort_id} -> New OriginalID {original_id}")

    def get_active_tracks(self):
        # Returns a clean list of currently active tracks with their permanent original IDs.
        final_tracks = []
        for original_id, data in self.active_tracks.items():
            final_tracks.append({
                "id": original_id, "box": data['box'],
                "class": data.get('class', 'Person'), "conf": data.get('conf', 0)
            })
        return final_tracks

## YOLO Class

In [None]:
# MAIN YOLO DETECTOR CUSTOMIZED FOR DEEPSORT
# A wrapper class for the YOLO object detector, customized for DeepSORT integration.
class YoloDetector:
    def __init__(self, model, confidence, target_classes = None):
        self.model = YOLO(model).to('cuda')
        self.confidence = confidence
        self.target_classes = target_classes

    def detect(self, image):
        # run yolo and get detections
        results = self.model.predict(image, conf = self.confidence, classes = self.target_classes, verbose=False, iou=0.45)
        result = results[0]
        names = result.names
        detections, labels, confidences = self.conver_detections(result, names)
        return detections, labels, confidences
    
    def conver_detections(self, result, names):
        """
        Converts YOLO's output format to the DeepSORT required format.
        DeepSORT requires: (([left, top, width, height]), confidence, class_name)
        """
        
        boxes = result.boxes
        detections = []
        labels = []
        confidences = []
        for box in boxes:

            # Convert to (left, top, width, height) format.
            x1,y1,x2,y2 = box.xyxy[0]
            x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
            w, h = x2 - x1, y2 - y1
            
            class_number = int(box.cls[0])
            conf = float(box.conf[0])
            class_name = names[class_number]
            detections.append((([x1, y1, w, h]),conf, class_name))
            labels.append(class_name)
            confidences.append(conf)
            
        return detections, labels, confidences
            
        

## HeatMap Class

In [None]:

class HeatmapGenerator:
    """
    Generates and overlays a heatmap based on bounding box locations.
    """

    def __init__(self, h, w, decay_factor=0.97, colormap=cv2.COLORMAP_JET):

        self.h = h
        self.w = w
        self.decay_factor = decay_factor # Rate at which heat fades. 1.0 = no decay, 0.9 = fast decay.
        self.colormap = colormap
        self.heatmap = np.zeros((h, w), dtype=np.float32) # A heatmap that decays over time, showing recent activity.           
        self.cumulative_heatmap = np.zeros((h, w), dtype=np.float32) # A heatmap that accumulates all activity over the entire video. 

    def _add_heat(self, box):
        x0, y0, x1, y1 = map(int, box)
    
        # Clamp box to valid heatmap boundaries (this is to ensure that the cordinates inside the frame boundries )
        x0 = max(0, min(x0, self.w - 1))
        x1 = max(0, min(x1, self.w))
        y0 = max(0, min(y0, self.h - 1))
        y1 = max(0, min(y1, self.h))
    
        if x1 <= x0 or y1 <= y0:
            return
    
        radius = int(min(x1 - x0, y1 - y0) * 0.25)
        if radius <= 0:
            return
        radius_squared = radius ** 2
    
        # Meshgrid over the ROI
        # Center the heat spot at the bottom-center of the box (approximates foot position).
        xv, yv = np.meshgrid(np.arange(x0, x1), np.arange(y0, y1))
        center_x = (x0 + x1) // 2
        center_y = max(y0, y1 - 15)  # foot level
    
        dist_squared = (xv - center_x) ** 2 + (yv - center_y) ** 2
    
        # Gaussian-like heat
        heat = np.exp(-dist_squared / (2 * radius_squared))
    
        heat_patch = heat.astype(np.float32)
        
        # Update decaying heatmap
        roi_live = self.heatmap[y0:y1, x0:x1]
        roi_live += heat_patch
        self.heatmap[y0:y1, x0:x1] = roi_live
        
        # Update cumulative heatmap
        roi_cum = self.cumulative_heatmap[y0:y1, x0:x1]
        roi_cum += heat_patch
        self.cumulative_heatmap[y0:y1, x0:x1] = roi_cum

    def update(self, boxes):

        # 1. Apply decay to the entire heatmap
        self.heatmap *= self.decay_factor

        # 2. Add new heat for each bounding box
        for box in boxes:
            self._add_heat(box)

    def generate(self, frame):

        # Normalize the heatmap to the 0-255 range
        normalized_heatmap = cv2.normalize(self.heatmap, None, 0, 255, cv2.NORM_MINMAX)
        colored_heatmap = normalized_heatmap.astype(np.uint8)

        # Apply the colormap
        colored_heatmap = cv2.applyColorMap(colored_heatmap, self.colormap)

        # Blend the heatmap with the original frame
        final_frame = cv2.addWeighted(frame, 0.6, colored_heatmap, 0.4, 0)

        return final_frame

## Counting Class

In [None]:
class CountingSystem:
    """
    Manages counting, dwell time calculation, and visitor behavior analysis (dwelled vs. passed-by)
    for a specific polygonal area.
    """
    def __init__(self, main_area, entrance_areas, label, position_index=0, dwell_time_seconds=3, fps=30, debug=False):
        """
        Initializes the counting system for one area.
        Args:
            main_area: A numpy array of points defining the main counting zone.
            entrance_areas: A list of numpy arrays, each defining an entrance zone.
            label: A string name for this area (e.g., 'Track_1').
            dwell_time_seconds: The minimum time in seconds a person must be in the main_area to be counted.
            fps: The frames-per-second of the video.
        """
        self.main_area = main_area
        self.entrances = entrance_areas
        self.label = label
        self.fps = fps
        self.debug = debug

        # --- State Tracking for Counting & Behavior Analysis ---
        self.counted_ids = set()            # Stores IDs that have been confirmed as "dwelled".
        self.passed_by_ids = set()          # Stores IDs that entered but left before the dwell time threshold.
        self.entered_entrance_ids = set()   # Stores IDs that have been seen in an entrance zone.
        self.main_area_entries = set()      # Stores IDs that have been seen in the main area.
        self.dwelling_candidates = {}       # {track_id: entry_frame} for people currently inside the main area.
        self.person_count = 0               # The final count of people who dwelled.

        # --- State Tracking for Dwell Time Calculation ---
        self.entry_frames = {} # {track_id: frame_number} when a person enters the main area.
        self.dwell_times = {}  # {track_id: total_frames_dwelled} cumulative dwell time.

        # --- Parameters ---
        self.dwell_time_frames = int(dwell_time_seconds * fps) # Convert dwell time from seconds to frames.
        
        # --- Display Settings ---
        self.text_y_position = 70 + (position_index * 60) # Stagger display text for multiple counters.

    def _get_centroid(self, bbox):
        """
        Consistent centroid calculation for both counting and pass-by logic.
        """
        x1, y1, x2, y2 = map(int, bbox)
        middle_y = (y1 + y2) / 2
        fraction = 0.1
        count_y = middle_y + (y2 - middle_y) * fraction
        return (int((x1 + x2) / 2), int(count_y))

    def update(self, tracking_ids, boxes, frame_number, frame=None):
        """
        Updates counter, dwell time tracking, and pass-by analysis.
        """
        current_frame_ids = set(tracking_ids)
        
        # --- Process each visible track ---
        for track_id, bbox in zip(tracking_ids, boxes):
            centroid = self._get_centroid(bbox)

            # Draw debug circle
            if self.debug and frame is not None:
                cv2.circle(frame, centroid, 4, (0, 255, 0), -1)

            is_in_main = cv2.pointPolygonTest(self.main_area, centroid, False) >= 0
            is_in_entrance = any(cv2.pointPolygonTest(entrance, centroid, False) >= 0 for entrance in self.entrances)

            # Simple tracking for pass-by
            if is_in_entrance:
                self.entered_entrance_ids.add(track_id)
            if is_in_main:
                self.main_area_entries.add(track_id)

        # --- Original Counting Logic ---
        for track_id, bbox in zip(tracking_ids, boxes):
            if track_id in self.counted_ids:
                continue

            centroid = self._get_centroid(bbox)
            is_in_main = cv2.pointPolygonTest(self.main_area, centroid, False) >= 0
            is_in_entrance = any(cv2.pointPolygonTest(entrance, centroid, False) >= 0 for entrance in self.entrances)

            # Check if this person is eligible to be a dwelling candidate
            if track_id not in self.dwelling_candidates:
                is_eligible = False
                
                # Early frames - anyone in main area is eligible
                if frame_number <= self.dwell_time_frames and is_in_main:
                    is_eligible = True
                else:
                    # Later frames - must have come through entrance
                    if track_id in self.entered_entrance_ids and is_in_main: 
                        is_eligible = True
                
                if is_eligible: 
                    self.dwelling_candidates[track_id] = frame_number

            # Process dwelling candidates
            if track_id in self.dwelling_candidates:
                if not is_in_main:
                    # Left main area before dwelling time was reached
                    del self.dwelling_candidates[track_id]
                else:
                    # Check if they've dwelled long enough
                    frames_in_main = frame_number - self.dwelling_candidates[track_id]
                    if frames_in_main >= self.dwell_time_frames:
                        self.person_count += 1
                        self.counted_ids.add(track_id)
                        del self.dwelling_candidates[track_id]
        
        # Clean up lost candidates
        lost_candidates = set(self.dwelling_candidates.keys()) - current_frame_ids
        for track_id in lost_candidates:
            del self.dwelling_candidates[track_id]

        # --- Original Dwell Time Calculation ---
        for track_id, bbox in zip(tracking_ids, boxes):
            centroid = self._get_centroid(bbox)
            is_in_main = cv2.pointPolygonTest(self.main_area, centroid, False) >= 0

            if is_in_main:
                if track_id not in self.entry_frames:
                    self.entry_frames[track_id] = frame_number
            else:
                if track_id in self.entry_frames:
                    duration = frame_number - self.entry_frames[track_id]
                    self.dwell_times[track_id] = self.dwell_times.get(track_id, 0) + duration
                    del self.entry_frames[track_id]

        # Process disappeared tracks for dwell time
        lost_ids = set(self.entry_frames.keys()) - current_frame_ids
        for track_id in lost_ids:
            duration = frame_number - self.entry_frames[track_id]
            self.dwell_times[track_id] = self.dwell_times.get(track_id, 0) + duration
            del self.entry_frames[track_id]

    def finalize_dwell_times(self, last_frame_number):
        """
        Call this after the video loop to finalize analysis.
        """
        # Original dwell time finalization
        for track_id, entry_frame in self.entry_frames.items():
            duration = last_frame_number - entry_frame
            self.dwell_times[track_id] = self.dwell_times.get(track_id, 0) + duration
        self.entry_frames.clear()

        # Enhanced classification logic for dwelled vs passed-by
        for track_id in self.main_area_entries:
            if track_id in self.entered_entrance_ids:  # Only people who entered through entrance
                # Check actual dwell time instead of relying only on original counting algorithm
                total_dwell_frames = self.dwell_times.get(track_id, 0)
                
                if total_dwell_frames >= self.dwell_time_frames:
                    # They dwelled long enough - classify as dwelled
                    # (Add to counted_ids for analysis only, don't change person_count)
                    if track_id not in self.counted_ids:
                        self.counted_ids.add(track_id)
                else:
                    # They didn't dwell long enough - classify as passed by
                    self.passed_by_ids.add(track_id)

    def print_analysis(self):
        """
        Print the analysis of dwelled vs passed-by people.
        """
        # Remove any overlap (safety check)
        self.passed_by_ids = self.passed_by_ids - self.counted_ids
        
        dwelled_count = len(self.counted_ids)
        passed_by_count = len(self.passed_by_ids)
        total_visitors = dwelled_count + passed_by_count
        
        print(f"\n=== ANALYSIS FOR {self.label} ===")
        print(f"👥 Total Visitors: {total_visitors}")
        print(f"🏠 Dwelled (stayed ≥{self.dwell_time_frames/self.fps:.1f}s): {dwelled_count} ({dwelled_count/total_visitors*100 if total_visitors > 0 else 0:.1f}%)")
        print(f"🚶 Passed By (left <{self.dwell_time_frames/self.fps:.1f}s): {passed_by_count} ({passed_by_count/total_visitors*100 if total_visitors > 0 else 0:.1f}%)")
        
        if self.debug:
            print(f"Dwelled IDs: {sorted(list(self.counted_ids))}")
            print(f"Passed-by IDs: {sorted(list(self.passed_by_ids))}")
            print(f"Entered entrance IDs: {sorted(list(self.entered_entrance_ids))}")
            print(f"Main area entries: {sorted(list(self.main_area_entries))}")

    def save_to_csv(self, filename):
        """
        Saves dwell times to CSV for people who stayed >= dwell_time_seconds.
        """
        import csv
        with open(filename, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['track_id', 'dwell_time_seconds'])

            for track_id, total_frames in self.dwell_times.items():
                if total_frames >= self.dwell_time_frames:
                    dwell_seconds = round(total_frames / self.fps, 2)
                    writer.writerow([track_id, dwell_seconds])

        print(f"✅ Dwell times (>= {self.dwell_time_frames / self.fps} seconds) saved to {filename}")

    def save_analysis_to_csv(self, filename):
        """
        Save the analysis (dwelled vs passed-by) to a CSV file.
        """
        import csv
        # Remove any overlap before saving
        self.passed_by_ids = self.passed_by_ids - self.counted_ids
        
        with open(filename, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['area', 'category', 'count', 'percentage'])
            
            dwelled_count = len(self.counted_ids)
            passed_by_count = len(self.passed_by_ids)
            total_visitors = dwelled_count + passed_by_count
            
            if total_visitors > 0:
                dwelled_pct = round(dwelled_count / total_visitors * 100, 1)
                passed_by_pct = round(passed_by_count / total_visitors * 100, 1)
            else:
                dwelled_pct = passed_by_pct = 0
                
            writer.writerow([self.label, 'Dwelled', dwelled_count, dwelled_pct])
            writer.writerow([self.label, 'Passed By', passed_by_count, passed_by_pct])
            writer.writerow([self.label, 'Total', total_visitors, 100.0])

        print(f"📊 Analysis saved to {filename}")

    def draw_overlays(self, frame):
        """
        Draws the counting zones and enhanced count labels on the video frame.
        """
        import cv2
        # Draw main area in red
        cv2.polylines(frame, [self.main_area], isClosed=True, color=(0, 0, 255), thickness=2)
        
        # Draw entrances in blue
        for entrance in self.entrances:
            cv2.polylines(frame, [entrance], isClosed=True, color=(255, 0, 0), thickness=2)
        
        count_text = f"{self.label}: {self.person_count}"
        font_scale = 0.7
        font_thickness = 2
        font = cv2.FONT_HERSHEY_SIMPLEX
        
        # Get text size for background box
        (text_w, text_h), _ = cv2.getTextSize(count_text, font, font_scale, font_thickness)
        
        # Coordinates for background box
        x, y = 50, self.text_y_position
        cv2.rectangle(frame, (x - 5, y - text_h - 5), (x + text_w + 5, y + 5), (0, 0, 0), -1)
        
        # Draw white text on top
        cv2.putText(frame, count_text, (x, y), font, font_scale, (255, 255, 255), font_thickness)

##  Running

In [None]:
# --- Define colors and font scale based on your Desired Drawing Format ---
box_color = (255, 0, 0)  # Blue color for the box (BGR format)
text_color = (255, 255, 255) # White color for the text (BGR format)
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.6
font_thickness = 1
box_thickness = 2

In [None]:
# Read video and grab the first frame (for taking the store layout to define the xzones)
cap = cv2.VideoCapture('PATH TO VIDEO')
ret, frame = cap.read()
cap.release()

# Save it as an image
cv2.imwrite('PATH TO SAVE THE IMAGE', frame)
print("Saved first frame as 'frame_0.jpg'")

In [None]:
####-----S2G40A------#### (NEW data, laptop section camera)

# Area 1
area_1_main_pts = np.array([[1019, 235], [658, 215], [563, 663], [1279, 606]], np.int32)
area_1_entrance_1_pts = np.array([[665, 163], [987, 184], [1014, 231], [661, 210]], np.int32)
area_1_entrance_2_pts = np.array([[561, 666], [1278, 609], [1272, 711], [553, 711]], np.int32)

# Area 2
area_2_main_pts = np.array([[343, 189], [629, 212], [486, 713], [1, 564]], np.int32)
area_2_entrance_1_pts = np.array([[376, 719], [405, 695], [2, 576], [4, 714]], np.int32)
area_2_entrance_2_pts = np.array([[345, 172], [641, 192], [632, 209], [338, 184]], np.int32)

# Area 3
area_3_main_pts = np.array([[145, 194], [247, 217], [54, 344], [2, 278]], np.int32)
area_3_entrance_1_pts = np.array([[1, 279], [49, 345], [4, 384], [-1, 296]], np.int32)
area_3_entrance_2_pts = np.array([[148, 193], [169, 170], [281, 196], [253, 217]], np.int32)



AREA_1_TEXT = 'Track_1' 
AREA_2_TEXT = 'Track_2'
AREA_3_TEXT = 'Track_3'

In [None]:
# Main Loop YOLO + DEEPSORT 

detector = YoloDetector('yolov8l', confidence = 0.55, target_classes = [0])
tracker = Tracker()
# 💡 Instantiate the merger with new parameters that prioritize LOCATION
merger = PostTracker(
    similarity_threshold=0.62,      # Keep appearance threshold
    max_dist_from_last_seen=150,    # Keep spatial search radius
    time_threshold=500,              # Keep time threshold
    motion_lambda=0.33,              #  Give 30% weight to location, 70% to appearance
    max_cost_threshold=0.6,         #  A match is only valid if its total cost is below this value
    feature_history_size=20,
    debug=False                      # Set to True to see the new cost calculations
)


speed_factor = 1 # tried first to process one frame yes then no to make the process faster
DETECT_EVERY = 1


cap = cv2.VideoCapture('PATH TO VIDEO')
w, h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

output_fps = fps / speed_factor
frame_count = 0

video_writer = cv2.VideoWriter('PATH TO SAVE THE NEW VIDEO',
                               cv2.VideoWriter_fourcc(*'mp4v'), output_fps, (w, h))

new_total_frames = total_frames // speed_factor + (1 if total_frames % speed_factor != 0 else 0)
pbar = tqdm(total=new_total_frames, desc="Processing video frames ")

heatmap_generator = HeatmapGenerator(h, w, decay_factor=0.95, colormap=cv2.COLORMAP_JET)


counter1 = CountingSystem(main_area=area_1_main_pts, entrance_areas=[area_1_entrance_1_pts , area_1_entrance_2_pts ], label = AREA_1_TEXT, position_index=0,dwell_time_seconds=5,fps=fps, debug = True)
counter2 = CountingSystem(main_area=area_2_main_pts , entrance_areas=[area_2_entrance_1_pts , area_2_entrance_2_pts  ], label = AREA_2_TEXT, position_index=1,dwell_time_seconds=3,fps=fps, debug = True)
counter3 = CountingSystem(main_area=area_3_main_pts  , entrance_areas=[area_3_entrance_1_pts  , area_3_entrance_2_pts   ], label = AREA_3_TEXT, position_index=2,dwell_time_seconds=3,fps=fps, debug = True)



while True:
    ret, frame = cap.read()
    
    if not ret:
        break

    frame_count += 1

    # 1. Run detection to get bounding boxes
    detections, _, _ = detector.detect(frame)

    # 2. Get raw tracks from DeepSORT
    raw_tracks = tracker.track(detections, frame)

    # 3. Update the post-tracker to handle ID management and re-association
    merger.update(raw_tracks, frame_count)

    # 4. Get the final, clean list of active tracks
    final_tracks = merger.get_active_tracks()

    # 5. Extract data for other modules (like counting and heatmap)
    tracking_ids = [t['id'] for t in final_tracks]
    boxes = [t['box'] for t in final_tracks]

    # Update the counting system and heatmap generator
    
    counter1.update(tracking_ids, boxes, frame_count, frame)
    counter2.update(tracking_ids, boxes, frame_count, frame)
    counter3.update(tracking_ids, boxes, frame_count, frame)

    heatmap_generator.update(boxes)
    heatmap_frame = heatmap_generator.generate(frame)
    
    # --- DRAWING LOGIC ---
    for track_info in final_tracks:
        box = track_info['box']
        original_id = track_info['id'] # This is the final, persistent ID
        conf = track_info.get('conf', 0)
        label = track_info.get('class', 'Person')
        
        x1, y1, x2, y2 = map(int, box)
        
        display_label = f"{label} {conf:.2f} ID:{original_id}"
        (tw, th), baseline = cv2.getTextSize(display_label, font, font_scale, font_thickness)
        cv2.rectangle(heatmap_frame, (x1, y1), (x2, y2), box_color, box_thickness)
        cv2.rectangle(heatmap_frame, (x1, y1 - th - baseline - 3), (x1 + tw, y1), box_color, -1)
        cv2.putText(heatmap_frame, display_label, (x1, y1 - 5), font, font_scale, text_color, font_thickness, cv2.LINE_AA)

    
    # Draw counting zones
    counter1.draw_overlays(heatmap_frame)
    counter2.draw_overlays(heatmap_frame)
    counter3.draw_overlays(heatmap_frame)


    # Write the final frame and update progress
    video_writer.write(heatmap_frame)
    pbar.update(1)

# Cleanup
cap.release()
video_writer.release()
pbar.close()


# --- NEW: Finalize and Save Dwell Times ---
# Finalize timings for people who were still inside when the video ended
counter1.finalize_dwell_times(frame_count)
counter2.finalize_dwell_times(frame_count)
counter3.finalize_dwell_times(frame_count)


# Save all recorded dwell times to a CSV file in the output directory
output_csv_path_area1 = f'dwell_times_{AREA_1_TEXT}.csv'
output_csv_path_area2 = f'dwell_times_{AREA_2_TEXT}.csv'
output_csv_path_area3 = f'dwell_times_{AREA_3_TEXT}.csv'

counter1.save_to_csv(output_csv_path_area1)
counter2.save_to_csv(output_csv_path_area2)
counter3.save_to_csv(output_csv_path_area3)

# Add this analysis section:
print("\n" + "="*50)
print("🔍 VISITOR BEHAVIOR ANALYSIS")
print("="*50)

# Print analysis for each area
counter1.print_analysis()
counter2.print_analysis()
counter3.print_analysis()

# OPTIONAL: Save analysis results to CSV files
counter1.save_analysis_to_csv(f'analysis_{AREA_1_TEXT}.csv')
counter2.save_analysis_to_csv(f'analysis_{AREA_2_TEXT}.csv')
counter3.save_analysis_to_csv(f'analysis_{AREA_3_TEXT}.csv')

## Static Heatmap Image

In [None]:
# Read the last frame for overlay
# Load first frame as background
cap = cv2.VideoCapture('PATH TO VIDEO')
ret, store_layout_frame = cap.read()


if not ret:
    raise RuntimeError("Failed to read background frame.")

# Normalize cumulative heatmap to 0–255
normalized_heat = cv2.normalize(heatmap_generator.cumulative_heatmap, None, 0, 255, cv2.NORM_MINMAX)
normalized_heat = normalized_heat.astype(np.uint8)

# Apply color map
colored_heat = cv2.applyColorMap(normalized_heat, cv2.COLORMAP_JET)

# Resize heatmap to match background frame
if colored_heat.shape[:2] != store_layout_frame.shape[:2]:
    colored_heat = cv2.resize(colored_heat, (store_layout_frame.shape[1], store_layout_frame.shape[0]))

# Ensure 3-channel background
if len(store_layout_frame.shape) == 2 or store_layout_frame.shape[2] == 1:
    store_layout_frame = cv2.cvtColor(store_layout_frame, cv2.COLOR_GRAY2BGR)

# Blend heatmap onto last frame
final_overlay = cv2.addWeighted(store_layout_frame, 0.5, colored_heat, 0.5, 0)

# ✅ Draw counter overlays (e.g., people counts, area names, etc.)
counter1.draw_overlays(final_overlay)
counter2.draw_overlays(final_overlay)
counter3.draw_overlays(final_overlay)

# Save results
cv2.imwrite('final_heatmap_colored.jpg', colored_heat)
cv2.imwrite('final_heatmap_overlay.jpg', final_overlay)
