In [1]:
from collections import deque
from scipy.optimize import linear_sum_assignment
import cv2
import numpy as np
import onnxruntime as ort
import time

In [2]:
def compute_iou(box, boxes):
    if box is None or len(box) != 4 or len(boxes) == 0:
        return None, None

    box = np.array(box)
    boxes = np.array(boxes)

    x1 = np.maximum(box[0], boxes[:, 0])
    y1 = np.maximum(box[1], boxes[:, 1])
    x2 = np.minimum(box[2], boxes[:, 2])
    y2 = np.minimum(box[3], boxes[:, 3])

    inter_w = np.maximum(0, x2 - x1)
    inter_h = np.maximum(0, y2 - y1)
    inter_area = inter_w * inter_h

    box_area = (box[2] - box[0]) * (box[3] - box[1])
    boxes_area = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
    union_area = box_area + boxes_area - inter_area

    ious = inter_area / np.maximum(union_area, 1e-6)

    max_iou = np.max(ious)
    best_match_idx = np.argmax(ious)

    return max_iou, best_match_idx

# Need to maintain these globals in your module:
#   tracked_objects: {track_id: [x1,y1,x2,y2], …}
#   last_seen:       {track_id: last_frame_count, …}
#   next_id:         integer
#   dwell_time_start:{track_id: frame_count, …}
# And add:
#   centroids:       {track_id: np.array([cx,cy]), …}
#   directions:      {track_id: np.array([dx,dy]), …}  # unit-vectors

def compute_centroid(box):
    x1,y1,x2,y2 = box
    return np.array([(x1+x2)/2, (y1+y2)/2])

def unit_vector(v, eps=1e-6):
    norm = np.linalg.norm(v)
    return v/norm if norm>eps else None

def fun_track_id(boxes, frame_count,
                 iou_threshold=0.2,
                 max_dist=100,       # distance gating radius in px
                 expire_after=120):
    global tracked_objects, next_id, last_seen, dwell_time_start
    global centroids, directions

    # 1. Purge stale tracks (same as before)
    stale = [tid for tid, last in last_seen.items()
             if frame_count - last > expire_after]
    for tid in stale:
        tracked_objects.pop(tid, None)
        last_seen.pop(tid, None)
        centroids.pop(tid, None)
        directions.pop(tid, None)

    prev_ids   = list(tracked_objects.keys())
    prev_boxes = [tracked_objects[tid] for tid in prev_ids]
    curr_boxes = list(boxes)
    curr_cents = [compute_centroid(b) for b in curr_boxes]

    P, C = len(prev_boxes), len(curr_boxes)
    assigned_prev = set()
    assigned_curr = set()
    new_assignments = {}

    if P>0 and C>0:
        # precompute prev centroids & directions
        prev_cents = np.array([centroids[tid] for tid in prev_ids])
        prev_dirs  = [directions.get(tid) for tid in prev_ids]

        iou_mat = np.zeros((P,C), dtype=np.float32)
        dir_mat = np.zeros((P,C), dtype=np.float32)
        cost_mat= np.ones ((P,C), dtype=np.float32)

        for i, p in enumerate(prev_boxes):
            p = np.array(p)
            cb = np.array(curr_boxes)
            # IoU
            x1 = np.maximum(p[0], cb[:,0]); y1 = np.maximum(p[1], cb[:,1])
            x2 = np.minimum(p[2], cb[:,2]); y2 = np.minimum(p[3], cb[:,3])
            inter = np.maximum(0, x2-x1)*np.maximum(0, y2-y1)
            area_p = (p[2]-p[0])*(p[3]-p[1])
            area_c = (cb[:,2]-cb[:,0])*(cb[:,3]-cb[:,1])
            union  = area_p + area_c - inter
            iou_mat[i] = inter/np.maximum(union,1e-6)

            # distance gating + direction
            for j, cc in enumerate(curr_cents):
                # 2.a Distance gating
                dist = np.linalg.norm(cc - prev_cents[i])
                if dist > max_dist:
                    continue  # cost stays at 1 → effectively forbidden

                # 2.b Direction score
                prev_dir = prev_dirs[i]
                delta    = cc - prev_cents[i]
                cand_dir = unit_vector(delta)
                if prev_dir is not None and cand_dir is not None:
                    dir_mat[i,j] = max(0.0, np.dot(prev_dir, cand_dir))
                # otherwise dir_mat[i,j] stays 0

                # 3. combined cost (equal weights)
                score = 0.75 * iou_mat[i,j] + 0.25 * dir_mat[i,j]
                cost_mat[i,j] = 1.0 - score

        # 4. Hungarian on fused cost
        row_ind, col_ind = linear_sum_assignment(cost_mat)

        # 5. accept only good matches
        for r,c in zip(row_ind, col_ind):
            if iou_mat[r,c] >= iou_threshold:
                pid = prev_ids[r]
                assigned_prev.add(pid)
                assigned_curr.add(c)
                new_assignments[pid] = curr_boxes[c]

    # 6. Update matched tracks
    for pid, box in new_assignments.items():
        tracked_objects[pid] = box
        last_seen[pid] = frame_count

        # update centroid & direction
        new_c = compute_centroid(box)
        old_c = centroids[pid]
        centroids[pid] = new_c
        delta = new_c - old_c
        u = unit_vector(delta)
        if u is not None:
            directions[pid] = u

    # 7. Spawn new IDs
    for idx, box in enumerate(curr_boxes):
        if idx in assigned_curr:
            continue
        pid = next_id
        tracked_objects[pid]    = box
        last_seen[pid]          = frame_count
        dwell_time_start[pid]   = frame_count
        centroids[pid]          = compute_centroid(box)
        directions[pid]         = None  # no direction yet
        next_id += 1

    # 8. Return current‐frame tracks
    return {
        tid: tracked_objects[tid]
        for tid, last in last_seen.items()
        if last == frame_count
    }

def point_in_polygon(point, polygon):
    return cv2.pointPolygonTest(np.array(polygon, dtype=np.int32), point, False) >= 0

# Precompute ROI diagonals and matching gate
def roi_diag(roi):
    xs, ys = zip(*roi)
    return np.hypot(max(xs)-min(xs), max(ys)-min(ys))

In [3]:
model_path = "yolo12n.onnx"
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
session = ort.InferenceSession(model_path,providers=providers)
 
# Get model details
input_name = session.get_inputs()[0].name
output_names = [output.name for output in session.get_outputs()]
input_shape = session.get_inputs()[0].shape
input_width, input_height = input_shape[2], input_shape[3]  # ONNX input size
 
# YOLO-specific preprocessing function
def preprocess_image(img, input_size):
    img_height, img_width = img.shape[:2]
    r = min(input_size[0]/img_width, input_size[1]/img_height)
    new_width, new_height = int(img_width * r), int(img_height * r)
    resized = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
    canvas = np.zeros((input_size[1], input_size[0], 3), dtype=np.uint8)
    canvas[:new_height, :new_width, :] = resized
    canvas = cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
    blob = np.transpose(canvas, (2, 0, 1))[np.newaxis, ...]
    return blob, r, (img_width, img_height)
 
# Process YOLOv11 output with NMS
def process_output(outputs, conf_threshold=0.3, nms_threshold=0.5, img_shape=None, ratio=1.0):
    output = outputs[0][0]
    boxes, confidences = [], []
 
    for idx in range(output.shape[1]):
        confidence = output[4, idx]
        if confidence >= conf_threshold:
            x, y, w, h = output[:4, idx]
            x1 = int((x - w / 2) / ratio)
            y1 = int((y - h / 2) / ratio)
            width, height = int(w / ratio), int(h / ratio)
 
            boxes.append([x1, y1, width, height])
            confidences.append(float(confidence))
 
    indices = cv2.dnn.NMSBoxes(boxes, confidences, conf_threshold, nms_threshold)
    detections = []
 
    if len(indices) > 0:
        for i in indices.flatten():
            x, y, w, h = boxes[i]
            detections.append({
                'box': [x, y, x + w, y + h],
                'confidence': confidences[i]
            })
 
    return detections



In [4]:
# ─────────────── MAIN LOOP ───────────────
input_video = '/Users/saptarshimallikthakur/Desktop/tracking/Custom tracking/Trackers/BT-OrderCounter-2.mp4'
cap = cv2.VideoCapture(input_video)

output_video = 'BT-OrderCounter-2 dwell time & interactio time.mp4'

fps = int(cap.get(cv2.CAP_PROP_FPS))
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_skip = 2
frame_count = 0

diag = np.hypot(w, h)
max_dist = 0.1 * diag    # e.g. 10% of the diagonal

# Setup video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video, fourcc, fps // 2, (w, h))  # fps//2 because of frame_skip=30

tracked_objects = {}      # track_id → box
last_seen = {}            # track_id → last seen frame
dwell_time_start = {}     # track_id → first seen frame
next_id = 0

# new for direction & distance:
centroids  = {}           # track_id → last centroid (np.array)
directions = {}           # track_id → last unit‐vector (np.array) or None

# Define the ROIs
ROIS = [
    [(600, 530), (1200, 530), (1200, 970), (550, 970)],
    [(550,970), (180,970), (350, 470), (600,470)],
]

diag1 = roi_diag(ROIS[0])
diag2 = roi_diag(ROIS[1])
d_max_match = 0.5 * (diag1 + diag2)

# Analytics storage: total frames & current active start
analytics_interactions = {}  # (id1,id2) → {'total_frames':int,'active_start':int}

inoccupancy = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_count += 1
    if frame_count % frame_skip != 0:
        continue

    blob, ratio, orig_shape = preprocess_image(frame, (input_width, input_height))
    outputs = session.run(output_names, {input_name: blob})
 
    detections = process_output(outputs, conf_threshold=0.35, nms_threshold=0.5, img_shape=orig_shape, ratio=ratio)

    roi_filtered_boxes = []
    roi1_occupied = False


    for det in detections:
        x1, y1, x2, y2 = map(int, det['box'])
        cx, cy = (x1 + x2) // 2, (y1 + y2) // 2  # Centroid

        # Check if inside first ROI
        if point_in_polygon((cx, cy), ROIS[0]):
            roi1_occupied = True

        # Check if centroid falls inside any ROI
        for roi in ROIS:
            if point_in_polygon((cx, cy), roi):
                roi_filtered_boxes.append(det['box'])
                break  # No need to check other ROIs

    tracked = fun_track_id(
        np.array(roi_filtered_boxes),
        frame_count,
        iou_threshold=0.2,
        max_dist=max_dist,
        expire_after=120
    )

    for track_id, box in tracked.items():
        x1, y1, x2, y2 = map(int, box)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

        # Calculate dwell time
        start_frame = dwell_time_start.get(track_id, frame_count)
        dwell_frames = frame_count - start_frame
        dwell_seconds = dwell_frames // fps

        label = f'ID {track_id} | Dwell time ({dwell_seconds}s)'
        cv2.putText(cv2.rectangle(frame, (x1, y2 - 30), (x1 + 10 + len(label)*15, y2), (0, 0, 0), -1), label, (x1, y2 - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) 

    # Draw each ROI
    for roi in ROIS:
        pts = np.array(roi, dtype=np.int32).reshape((-1, 1, 2))
        cv2.polylines(frame, [pts], isClosed=True, color=(0, 255, 0), thickness=2)

    # Draw alert if ROI 1 is empty
    if not roi1_occupied:
        inoccupancy +=1
        if inoccupancy >= 120:
            cv2.putText(frame, "Alert No staff present !", (4*w//5, 50),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)
    else:
        inoccupancy = 0


    # 1) Collect centroids in each ROI
    roi1_ids, roi1_cents = [], []
    roi2_ids, roi2_cents = [], []
    for tid, box in tracked.items():
        x1,y1,x2,y2 = map(int, box)
        cx,cy = (x1+x2)//2, (y1+y2)//2
        if point_in_polygon((cx,cy), ROIS[0]):
            roi1_ids.append(tid); roi1_cents.append((cx,cy))
        if point_in_polygon((cx,cy), ROIS[1]):
            roi2_ids.append(tid); roi2_cents.append((cx,cy))

    # 2) Hungarian match if both non-empty
    current_keys = set()
    if roi1_cents and roi2_cents:
        A = np.array(roi1_cents)  # shape (n1,2)
        B = np.array(roi2_cents)  # shape (n2,2)
        cost = np.linalg.norm(A[:,None,:] - B[None,:,:], axis=2)
        cost[cost > d_max_match] = 1e6

        rows, cols = linear_sum_assignment(cost)

        current_keys = set()
        for r, c in zip(rows, cols):
            if cost[r, c] < 1e5:
                # IDs and key
                id1, id2 = roi1_ids[r], roi2_ids[c]
                key = (id1, id2)
                current_keys.add(key)

                # ─── Analytics: start or resume ───
                rec = analytics_interactions.get(key)
                if rec is None:
                    # first time ever seen
                    analytics_interactions[key] = {
                        'total_frames': 0,
                        'active_start': frame_count
                    }
                    rec = analytics_interactions[key]
                elif rec['active_start'] is None:
                    # resuming after a pause
                    rec['active_start'] = frame_count

                # ─── Compute cumulative duration ───
                running = (frame_count - rec['active_start']) if rec['active_start'] is not None else 0
                total_frames = rec['total_frames'] + running
                duration_s = total_frames / fps

                # ─── Draw arrow and labels ───
                p1 = tuple(A[r]); p2 = tuple(B[c])
                # arrow
                cv2.arrowedLine(frame, p1, p2, (255, 0, 0), 2, tipLength=0.2)
                # ID label
                cv2.putText(frame,
                            f"{id1}->{id2}",
                            (p1[0], p1[1] - 10),
                            cv2.FONT_HERSHEY_SIMPLEX,
                            0.6,
                            (255, 0, 0),
                            2)
                # duration at midpoint
                mx, my = (p1[0] + p2[0]) // 2, (p1[1] + p2[1]) // 2
                cv2.putText(cv2.rectangle(frame, (mx, my - 30), (mx + 100, my), (0, 0, 0), -1), f"{duration_s:.1f}s", (mx, my - 5), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 255), 2)


                
    # 3) Update analytics_interactions with pause/resume logic
    for key in current_keys:
        rec = analytics_interactions.get(key)
        if rec is None:
            # first time ever seen
            analytics_interactions[key] = {'total_frames': 0,
                                        'active_start': frame_count}
        elif rec['active_start'] is None:
            # resuming after a break
            rec['active_start'] = frame_count

    # 4) Pause any interactions that ended this frame
    for key, rec in analytics_interactions.items():
        if key not in current_keys and rec['active_start'] is not None:
            rec['total_frames'] += (frame_count - rec['active_start'])
            rec['active_start'] = None

    # write and show frame
    out.write(frame)
    cv2.imshow("Tracked", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
 
cap.release()
out.release()
cv2.destroyAllWindows()

# ─────────────── FINAL DWELL REPORT ───────────────
print("\nFinal Dwell Times:")
for tid, start_frame in dwell_time_start.items():
    dwell_frames = frame_count - start_frame
    dwell_seconds = dwell_frames // fps
    print(f"Track ID {tid}: {dwell_seconds} seconds")

print("\n=== Cumulative Interactions (>5s) ===")
for (id1, id2), rec in analytics_interactions.items():
    # if still active, add up to last frame
    if rec['active_start'] is not None:
        total = rec['total_frames'] + (frame_count - rec['active_start'])
    else:
        total = rec['total_frames']
    duration_s = total / fps
    if duration_s > 5.0:
        print(f"Track {id1}->{id2}: {duration_s:.1f}s")

2025-04-30 21:17:34.288 Python[26092:590680] +[IMKClient subclass]: chose IMKClient_Modern
2025-04-30 21:17:34.288 Python[26092:590680] +[IMKInputSession subclass]: chose IMKInputSession_Modern



Final Dwell Times:
Track ID 0: 158 seconds
Track ID 1: 158 seconds
Track ID 2: 141 seconds
Track ID 3: 98 seconds
Track ID 4: 96 seconds
Track ID 5: 86 seconds
Track ID 6: 81 seconds
Track ID 7: 74 seconds
Track ID 8: 74 seconds
Track ID 9: 56 seconds
Track ID 10: 52 seconds
Track ID 11: 49 seconds
Track ID 12: 48 seconds
Track ID 13: 39 seconds
Track ID 14: 35 seconds
Track ID 15: 34 seconds
Track ID 16: 29 seconds
Track ID 17: 14 seconds
Track ID 18: 10 seconds

=== Cumulative Interactions (>5s) ===
Track 3->1: 33.2s
Track 9->1: 9.4s
