<a href="https://colab.research.google.com/github/JiteshSkumar/Phone-Tracker/blob/main/Phone_Tracker.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Install & Imports


In [23]:
!pip install ultralytics
!pip install open_clip_torch

import os, cv2, numpy as np, pandas as pd, torch
from ultralytics import YOLO
from google.colab import drive
from datetime import datetime
from pathlib import Path
from collections import deque, Counter
from PIL import Image
import open_clip
from IPython.display import display



Mount Google Drive

In [24]:
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Folders

In [25]:
INPUT_DIR  = "/content/drive/MyDrive/Task Video"
OUTPUT_DIR = "/content/drive/MyDrive/Task Video/op4"
os.makedirs(OUTPUT_DIR, exist_ok=True)
RUN_STAMP = datetime.now().strftime("%Y%m%d_%H%M%S")
OUTPUT_DIR_RUN = os.path.join(OUTPUT_DIR, f"run_{RUN_STAMP}")
os.makedirs(OUTPUT_DIR_RUN, exist_ok=True)
SUMMARY_CSV = os.path.join(OUTPUT_DIR_RUN, "summary.csv")

Config & Flags (Accuracy-focused)

In [26]:
HIGH_ACCURACY_MODE   = True
DETECTOR_WEIGHTS     = "yolov8m.pt" if HIGH_ACCURACY_MODE else "yolov8s.pt"
IMGSZ_LIST           = [960, 1280] if HIGH_ACCURACY_MODE else [960]
USE_HFLIP_TTA        = True if HIGH_ACCURACY_MODE else False
CONF_THRESHOLD       = 0.27
IOU_NMS              = 0.55
MIN_BOX_AREA_PIXELS  = 40 * 40

# Activity (multi-cue)
BRIGHTNESS_ON_BASE   = 38.0
ENTROPY_ON_THRESH    = 3.8
DIFF_ON_BASE         = 7.0
FLOW_ON_BASE         = 0.85
ADAPT_BRIGHT_WEIGHT  = 0.35

# Tracking / smoothing
IOU_MATCH_THRESHOLD  = 0.35
MAX_MISSES           = 20
TRACK_HISTORY        = 12
SMOOTH_WINDOW        = 15
ACTIVE_HYST_ON       = 0.62
ACTIVE_HYST_OFF      = 0.45

W_BRIGHTNESS = 0.30
W_ENTROPY    = 0.25
W_DIFF       = 0.25
W_FLOW       = 0.20

# Logging
LOG_NO_PHONE_EVERY   = 60
LOG_EVERY_FRAMES     = 100

# Shape fallback (for black/off screens)
USE_FALLBACK_SHAPE     = True
FALLBACK_MIN_AREA_FRAC = 0.003
FALLBACK_MAX_AREA_FRAC = 0.25
FALLBACK_ASPECT_PORTRAIT = (1.6, 3.1)
FALLBACK_ASPECT_LAND     = (0.32, 0.70)
FALLBACK_ENTROPY_MAX     = 4.3
FALLBACK_MEAN_MAX        = 105.0
FALLBACK_MIN_EDGE_DENS   = 0.015

# Skip credit-card readers entirely
SKIP_POS = True

# POS hard filter tuning
PHONE_MIN_SIM      = 0.22
PHONE_POS_MARGIN   = 0.07
POS_MIN_KEYS       = 10
POS_KEY_MIN_FRAC   = 0.002
POS_KEY_MAX_FRAC   = 0.040
POS_KEY_AR_TOL     = 0.35
POS_KEY_FILL_MIN   = 0.60
POS_KEY_REGION_Y0  = 0.35
POS_SLOT_AR_MIN    = 6.0
POS_SLOT_MIN_FRAC  = 0.001
POS_SLOT_MAX_FRAC  = 0.020
POS_SLOT_BORDER_FR = 0.08

# CLIP model
CLIP_ARCH     = "ViT-L-14" if HIGH_ACCURACY_MODE else "ViT-B-32"
CLIP_CHECKPT  = "laion2b_s32b_b82k" if HIGH_ACCURACY_MODE else "laion2b_s34b_b79k"

Models: YOLO + CLIP

In [27]:
model = YOLO(DETECTOR_WEIGHTS)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
clip_model, clip_preprocess, clip_tokenizer = open_clip.create_model_and_transforms(
    CLIP_ARCH, pretrained=CLIP_CHECKPT, device=DEVICE
)

PHONE_TEXTS = [
    "a photo of a smartphone",
    "a mobile phone on a table",
    "a handheld cell phone",
    "a touchscreen smartphone",
    "a cell phone on a table",
    "a person using a smartphone",
    "a person using a mobile phone",
    "a person using a handheld cell phone",
    "a person using a touchscreen smartphone",
    "a person using a cell phone on a table",
    "a hand holding a phone, partially blocked",
    "a mobile phone partly hidden by a hand",
    "a phone under a person's hand",
    "a phone visible behind fingers",
    "a phone under low lighting",
    "only part of a smartphone visible",
    "a hand holding a phone, obscured by motion blur"

]
POS_TEXTS = [
    "a point-of-sale terminal",
    "a credit card machine",
    "a card reader keypad",
    "a POS terminal with card slot",
    "a credit card machine on hand",
    "a credit card machine on table",
    "a credit card machine in use",
    "a person using a credit card machine",
    "a person using a POS terminal",
    "a payment terminal at a store counter",
    "a payment terminal with numeric keypad",
    "a POS machine connected to a register",
    "a card swipe machine",
    "a chip-and-pin reader",
    "a debit card terminal",
    "a cash register card reader",
    "a small POS machine with buttons",
    "a wired POS terminal",
    "a point-of-sale keypad device",
    "a merchant payment terminal"
]
with torch.no_grad():
    phone_tokens = open_clip.tokenize(PHONE_TEXTS).to(DEVICE)
    pos_tokens   = open_clip.tokenize(POS_TEXTS).to(DEVICE)
    phone_text_feat = clip_model.encode_text(phone_tokens)
    pos_text_feat   = clip_model.encode_text(pos_tokens)
    phone_text_feat = (phone_text_feat / phone_text_feat.norm(dim=-1, keepdim=True)).mean(dim=0, keepdim=True)
    pos_text_feat   = (pos_text_feat   / pos_text_feat.norm(dim=-1, keepdim=True)).mean(dim=0, keepdim=True)

Helpers (WBF, detection, features)

In [28]:
def to_xyxy_norm(box, W, H):
    x1,y1,x2,y2 = box
    return [x1/W, y1/H, x2/W, y2/H]

def to_xyxy_abs(box, W, H):
    x1,y1,x2,y2 = box
    return [int(x1*W), int(y1*H), int(x2*W), int(y2*H)]

def iou_xyxy(a, b):
    x1 = max(a[0], b[0]); y1 = max(a[1], b[1])
    x2 = min(a[2], b[2]); y2 = min(a[3], b[3])
    inter = max(0, x2-x1) * max(0, y2-y1)
    a1 = (a[2]-a[0])*(a[3]-a[1]); a2 = (b[2]-b[0])*(b[3]-b[1])
    return inter / (a1 + a2 - inter + 1e-6)

def weighted_boxes_fusion(boxes, scores, iou_thr=0.55):
    if not boxes: return [], []
    idxs = np.argsort(scores)[::-1]
    boxes = [boxes[i] for i in idxs]
    scores = [scores[i] for i in idxs]
    fused_boxes, fused_scores, used = [], [], [False]*len(boxes)
    for i in range(len(boxes)):
        if used[i]: continue
        cluster = [i]; used[i] = True
        for j in range(i+1, len(boxes)):
            if used[j]: continue
            if iou_xyxy(boxes[i], boxes[j]) >= iou_thr:
                used[j] = True; cluster.append(j)
        if len(cluster) == 1:
            fused_boxes.append(boxes[i]); fused_scores.append(scores[i])
        else:
            wsum = sum(scores[k] for k in cluster) + 1e-9
            bx = [0,0,0,0]; sc = 0.0
            for k in cluster:
                s = scores[k]; sc += s
                bx[0] += boxes[k][0]*s; bx[1] += boxes[k][1]*s
                bx[2] += boxes[k][2]*s; bx[3] += boxes[k][3]*s
            fused_boxes.append([b/wsum for b in bx]); fused_scores.append(sc/len(cluster))
    return fused_boxes, fused_scores

def yolo_tta_predict(frame_bgr):
    H, W = frame_bgr.shape[:2]
    boxes_norm, scores = [], []
    for s in IMGSZ_LIST:
        r = model.predict(frame_bgr, imgsz=s, conf=CONF_THRESHOLD, iou=IOU_NMS,
                          agnostic_nms=True, verbose=False)[0]
        names = r.names
        for b in r.boxes:
            cls_id = int(b.cls.item()); conf = float(b.conf.item())
            name = names.get(cls_id, str(cls_id)) if isinstance(names, dict) else names[cls_id]
            if name and "phone" in name.lower():
                x1,y1,x2,y2 = map(int, b.xyxy[0].tolist())
                if (x2-x1)*(y2-y1) >= MIN_BOX_AREA_PIXELS:
                    boxes_norm.append(to_xyxy_norm([x1,y1,x2,y2], W, H))
                    scores.append(conf)
        if USE_HFLIP_TTA:
            f = cv2.flip(frame_bgr, 1)
            r2 = model.predict(f, imgsz=s, conf=CONF_THRESHOLD, iou=IOU_NMS,
                               agnostic_nms=True, verbose=False)[0]
            for b in r2.boxes:
                cls_id = int(b.cls.item()); conf = float(b.conf.item())
                name = r2.names.get(cls_id, str(cls_id)) if isinstance(r2.names, dict) else r2.names[cls_id]
                if name and "phone" in name.lower():
                    x1,y1,x2,y2 = map(int, b.xyxy[0].tolist())
                    x1f, x2f = W - x2, W - x1
                    if (x2f-x1f)*(y2-y1) >= MIN_BOX_AREA_PIXELS:
                        boxes_norm.append(to_xyxy_norm([x1f,y1,x2f,y2], W, H))
                        scores.append(conf)
    fused_boxes_norm, fused_scores = weighted_boxes_fusion(boxes_norm, scores, iou_thr=0.55)
    fused_boxes_abs = [to_xyxy_abs(b, W, H) for b in fused_boxes_norm]
    return fused_boxes_abs, fused_scores

def safe_crop(frame, x1, y1, x2, y2):
    h, w = frame.shape[:2]
    x1, y1 = max(0, x1), max(0, y1); x2, y2 = min(w, x2), min(h, y2)
    if x2 <= x1 or y2 <= y1: return None
    return frame[y1:y2, x1:x2]

def shannon_entropy(gray_roi):
    if gray_roi is None or gray_roi.size == 0: return 0.0
    hist = cv2.calcHist([gray_roi], [0], None, [256], [0,256]).ravel()
    p = hist / (np.sum(hist) + 1e-9); p = p[p>0]
    return float(-np.sum(p * np.log2(p)))

def mean_abs_diff(prev_gray, curr_gray, bbox):
    if prev_gray is None or curr_gray is None or bbox is None: return 0.0
    x1,y1,x2,y2 = bbox
    h, w = curr_gray.shape[:2]
    x1, y1 = max(0, x1), max(0, y1); x2, y2 = min(w, x2), min(h, y2)
    if x2<=x1 or y2<=y1: return 0.0
    prev_roi = prev_gray[y1:y2, x1:x2]; curr_roi = curr_gray[y1:y2, x1:x2]
    if prev_roi.size==0 or curr_roi.size==0 or prev_roi.shape!=curr_roi.shape: return 0.0
    diff = cv2.absdiff(prev_roi, curr_roi)
    return float(np.mean(diff))

def dense_flow_mag(prev_gray, curr_gray, bbox):
    if prev_gray is None or curr_gray is None or bbox is None: return 0.0
    x1,y1,x2,y2 = bbox
    h, w = curr_gray.shape[:2]
    x1, y1 = max(0, x1), max(0, y1); x2, y2 = min(w, x2), min(h, y2)
    if x2<=x1 or y2<=y1: return 0.0
    prev_roi = prev_gray[y1:y2, x1:x2]; curr_roi = curr_gray[y1:y2, x1:x2]
    if prev_roi.size==0 or curr_roi.size==0: return 0.0
    flow = cv2.calcOpticalFlowFarneback(prev_roi, curr_roi, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    mag, _ = cv2.cartToPolar(flow[...,0], flow[...,1])
    return float(np.mean(mag))

def classify_phone_vs_pos(roi_bgr):
    if roi_bgr is None or roi_bgr.size == 0:
        return "unknown", 0.0, 0.0
    rgb = cv2.cvtColor(roi_bgr, cv2.COLOR_BGR2RGB)
    pil = Image.fromarray(rgb)
    with torch.no_grad():
        img = clip_preprocess(pil).unsqueeze(0).to(DEVICE)
        img_feat = clip_model.encode_image(img)
        img_feat = img_feat / img_feat.norm(dim=-1, keepdim=True)
        phone_sim = (img_feat @ phone_text_feat.T).item()
        pos_sim   = (img_feat @ pos_text_feat.T).item()
    return ("phone" if phone_sim >= pos_sim else "pos"), float(phone_sim), float(pos_sim)

def draw_box(frame, bbox, status, track_id=None, conf=None, obj_type="phone"):
    x1,y1,x2,y2 = bbox
    color = (0,255,0) if (obj_type=="phone" and status=="Active") else (0,0,255) if obj_type=="phone" else (255,0,0)
    label = f"ID {track_id} | {status if obj_type=='phone' else 'POS'}"
    if conf is not None: label += f" ({conf:.2f})"
    if status == "Active" and obj_type=='phone':
      cv2.rectangle(frame, (x1,y1), (x2,y2), color, 2)
      (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
      cv2.rectangle(frame, (x1, y1 - th - 8), (x1 + tw + 4, y1), color, -1)
      cv2.putText(frame, label, (x1 + 2, y1 - 4),
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2, cv2.LINE_AA)

# Improved shape-based fallback
def shape_phone_candidates(gray, frame_area):
    if not USE_FALLBACK_SHAPE: return []
    min_area = max(int(FALLBACK_MIN_AREA_FRAC*frame_area), MIN_BOX_AREA_PIXELS)
    max_area = int(FALLBACK_MAX_AREA_FRAC*frame_area)

    edges = cv2.Canny(gray, 60, 160)
    edges = cv2.dilate(edges, None, iterations=1)
    cnts, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    cands = []
    for c in cnts:
        area = cv2.contourArea(c)
        if area < min_area or area > max_area: continue
        x,y,w,h = cv2.boundingRect(c)
        if w<=0 or h<=0: continue

        rect_area = w*h
        edge_count = np.count_nonzero(edges[y:y+h, x:x+w])
        edge_density = edge_count / (rect_area + 1e-9)
        if edge_density < FALLBACK_MIN_EDGE_DENS:
            continue

        ar = max(w,h)/(min(w,h)+1e-6)
        portrait_ok  = (FALLBACK_ASPECT_PORTRAIT[0] <= ar <= FALLBACK_ASPECT_PORTRAIT[1])
        landscape_ok = (FALLBACK_ASPECT_LAND[0]     <= 1.0/ar <= FALLBACK_ASPECT_LAND[1])
        if not (portrait_ok or landscape_ok): continue

        peri = cv2.arcLength(c, True)
        approx = cv2.approxPolyDP(c, 0.02*peri, True)
        if len(approx) < 4 or not cv2.isContourConvex(approx): continue

        roi = gray[y:y+h, x:x+w]
        m = float(np.mean(roi)); ent = shannon_entropy(roi)
        if m <= FALLBACK_MEAN_MAX and ent <= FALLBACK_ENTROPY_MAX:
            ar_target = 2.0 if h>=w else 0.5
            ar_score = 1.0/(1.0 + abs(ar - max(ar_target,1.0)))
            darkness = np.clip((FALLBACK_MEAN_MAX - m)/max(FALLBACK_MEAN_MAX,1e-6), 0, 1)
            entropy_gain = np.clip((FALLBACK_ENTROPY_MAX - ent)/max(FALLBACK_ENTROPY_MAX,1e-6), 0, 1)
            score = 0.45*darkness + 0.25*entropy_gain + 0.30*ar_score
            cands.append((x, y, x+w, y+h, float(score)))

    if not cands: return []
    boxes = np.array([c[:4] for c in cands], dtype=np.float32)
    scores = np.array([c[4] for c in cands], dtype=np.float32)
    keep = []
    idxs = scores.argsort()[::-1]
    def iou(a,b):
        x1=max(a[0],b[0]); y1=max(a[1],b[1]); x2=min(a[2],b[2]); y2=min(a[3],b[3])
        inter=max(0,x2-x1)*max(0,y2-y1); A=(a[2]-a[0])*(a[3]-a[1]); B=(b[2]-b[0])*(b[3]-b[1])
        return inter/(A+B-inter+1e-6)
    while len(idxs):
        i = idxs[0]; keep.append(i)
        suppress = [0]
        for pos in range(1, len(idxs)):
            j = idxs[pos]
            if iou(boxes[i], boxes[j]) > 0.5:
                suppress.append(pos)
        idxs = np.delete(idxs, suppress)
    return [cands[i] for i in keep]

# POS heuristic (keypad/slot)
def likely_pos_heuristic(gray_roi):
    """
    Heuristic POS detector:
    - Many near-square 'keys' in bottom region (counts small rectangles)
    - Or a long, thin 'slot' near an ROI edge
    Returns True if likely a POS/card reader.
    """
    if gray_roi is None or gray_roi.size == 0:
        return False

    h, w = gray_roi.shape[:2]
    roi_area = float(h * w)

    # Bottom keypad region
    y0 = int(POS_KEY_REGION_Y0 * h)
    bot = gray_roi[y0:h, :]

    thr = cv2.adaptiveThreshold(bot, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                cv2.THRESH_BINARY_INV, 31, 7)
    edges = cv2.Canny(bot, 60, 160)
    fused = cv2.bitwise_or(thr, edges)
    fused = cv2.morphologyEx(fused, cv2.MORPH_CLOSE, np.ones((3,3), np.uint8), iterations=1)

    cnts, _ = cv2.findContours(fused, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    key_like = 0
    for c in cnts:
        area = cv2.contourArea(c)
        if area <= 1:
            continue
        frac = area / roi_area
        if not (POS_KEY_MIN_FRAC <= frac <= POS_KEY_MAX_FRAC):
            continue

        x, y, bw, bh = cv2.boundingRect(c)
        if bw == 0 or bh == 0:
            continue
        ar = bw / float(bh)
        if abs(1.0 - ar) > POS_KEY_AR_TOL:
            continue

        rect_area = bw * bh
        fill = area / (rect_area + 1e-6)
        if fill < POS_KEY_FILL_MIN:
            continue

        key_like += 1
        if key_like >= POS_MIN_KEYS:
            return True

    # Card slot near border
    edges_full = cv2.Canny(gray_roi, 60, 160)
    edges_full = cv2.dilate(edges_full, None, iterations=1)
    cnts2, _ = cv2.findContours(edges_full, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    border_margin = POS_SLOT_BORDER_FR * min(h, w)

    for c in cnts2:
        area = cv2.contourArea(c)
        if area <= 1:
            continue
        frac = area / roi_area
        if not (POS_SLOT_MIN_FRAC <= frac <= POS_SLOT_MAX_FRAC):
            continue
        x, y, bw, bh = cv2.boundingRect(c)
        if bw == 0 or bh == 0:
            continue
        ar_long = max(bw, bh) / float(min(bw, bh) + 1e-6)
        if ar_long < POS_SLOT_AR_MIN:
            continue
        near_border = (x <= border_margin or y <= border_margin or
                       (w - (x + bw)) <= border_margin or
                       (h - (y + bh)) <= border_margin)
        if near_border:
            return True

    return False

Activity scoring + hysteresis

In [29]:
def activity_score(bright, ent, diff_m, flow_m, b_on, e_on, d_on, f_on):
    sb = np.clip((bright/(b_on+1e-6)), 0, 1.5)/1.5
    se = np.clip((ent/(e_on+1e-6)),    0, 1.5)/1.5
    sd = np.clip((diff_m/(d_on+1e-6)), 0, 1.5)/1.5
    sf = np.clip((flow_m/(f_on+1e-6)), 0, 1.5)/1.5
    return float(W_BRIGHTNESS*sb + W_ENTROPY*se + W_DIFF*sd + W_FLOW*sf)

def hysteresis_label(prev_label, smoothed_score, on_thr=ACTIVE_HYST_ON, off_thr=ACTIVE_HYST_OFF):
    if prev_label == "Active":
        return "Active" if smoothed_score >= off_thr else "Idle"
    else:
        return "Active" if smoothed_score >= on_thr else "Idle"

Simple IOU Tracker

In [30]:
class Track:
    _next_id = 1
    def __init__(self, bbox, conf, obj_type="phone"):
        self.id = Track._next_id; Track._next_id += 1
        self.bbox = bbox; self.conf = conf; self.misses = 0
        self.history = deque(maxlen=TRACK_HISTORY)
        self.total_frames = 0; self.active_frames = 0; self.idle_frames = 0
        self.score_window = deque(maxlen=SMOOTH_WINDOW)
        self.last_label = "Idle"
        self.last_features = dict()
        self.type_window = deque(maxlen=SMOOTH_WINDOW)
        self.obj_type = obj_type

    def stable_type(self):
        if not self.type_window: return self.obj_type
        cnt = Counter(self.type_window)
        return "phone" if cnt["phone"] >= cnt["pos"] else "pos"

    def update(self, bbox, conf, score, label, features, obj_type_now=None):
        self.bbox = bbox; self.conf = conf; self.misses = 0
        self.total_frames += 1
        cx = int((bbox[0]+bbox[2])/2); cy = int((bbox[1]+bbox[3])/2)
        self.history.append((cx, cy))
        if score is not None: self.score_window.append(score)
        if obj_type_now is not None:
            self.type_window.append(obj_type_now)
            self.obj_type = self.stable_type()
        self.last_label = label; self.last_features = features
        if self.obj_type == "phone":
            if label == "Active": self.active_frames += 1
            else: self.idle_frames += 1

def match_tracks(tracks, detections, confs, iou_th=IOU_MATCH_THRESHOLD):
    if not tracks or not detections:
        return set(range(len(detections))), tracks[:]
    iou_mat = np.zeros((len(tracks), len(detections)), dtype=np.float32)
    for ti, t in enumerate(tracks):
        for di, d in enumerate(detections):
            iou_mat[ti, di] = iou_xyxy(t.bbox, d)
    flat = [(i, j, iou_mat[i, j]) for i in range(len(tracks)) for j in range(len(detections))]
    used_tracks, used_dets = set(), set()
    for ti, di, v in sorted(flat, key=lambda x: x[2], reverse=True):
        if v < iou_th: break
        if ti in used_tracks or di in used_dets: continue
        tracks[ti].bbox = detections[di]; tracks[ti].conf = confs[di]; tracks[ti].misses = 0
        used_tracks.add(ti); used_dets.add(di)
    unmatched_det_idx = set(range(len(detections))) - used_dets
    unmatched_tracks  = [tracks[ti] for ti in range(len(tracks)) if ti not in used_tracks]
    return unmatched_det_idx, unmatched_tracks

Video Processor

In [31]:
def process_video(input_path, output_path, log_every=LOG_EVERY_FRAMES):
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print(f"[WARN] Could not open: {input_path}")
        return None

    fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
    w   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out    = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

    prev_gray = None; frame_idx = 0
    total_frames = 0; phone_frames = 0
    any_phone_detected = False
    tracks = []

    while True:
        ret, frame = cap.read()
        if not ret: break
        total_frames += 1; frame_idx += 1

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        global_bright = float(np.mean(gray))
        BRIGHTNESS_ON = max(20.0, BRIGHTNESS_ON_BASE + ADAPT_BRIGHT_WEIGHT*(global_bright - 50))

        # Detector with TTA + WBF
        yolo_boxes, yolo_scores = yolo_tta_predict(frame)

        # Fallback: shape candidates if YOLO empty
        shape_added = False
        if not yolo_boxes and USE_FALLBACK_SHAPE:
            frame_area = w*h
            cands = shape_phone_candidates(gray, frame_area)
            if cands:
                shape_added = True
                for (x1,y1,x2,y2,s) in cands:
                    yolo_boxes.append([int(x1), int(y1), int(x2), int(y2)])
                    yolo_scores.append(float(max(0.2, s)))

        det_bboxes, det_confs = [], []
        det_scores, det_feats, det_type = [], [], []
        phones_this_frame = 0

        if yolo_boxes:
            for (x1,y1,x2,y2), conf in zip(yolo_boxes, yolo_scores):
                if (x2-x1)*(y2-y1) < MIN_BOX_AREA_PIXELS: continue
                roi = safe_crop(frame, x1, y1, x2, y2)
                if roi is None: continue

                # classify phone vs POS via CLIP
                obj_type, sim_phone, sim_pos = classify_phone_vs_pos(roi)
                gray_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)

                # extra POS filters (hard block)
                is_pos_heur = likely_pos_heuristic(gray_roi)
                phone_wins_margin = (sim_phone - sim_pos) >= PHONE_POS_MARGIN
                phone_above_min   = (sim_phone >= PHONE_MIN_SIM)
                treat_as_pos = (obj_type == "pos") or is_pos_heur or (not phone_wins_margin) or (not phone_above_min)

                if SKIP_POS and treat_as_pos:
                    continue

                # Treat as PHONE
                bright = float(np.mean(gray_roi))
                ent    = shannon_entropy(gray_roi)
                diff_m = mean_abs_diff(prev_gray, gray, (x1, y1, x2, y2))
                flow_m = dense_flow_mag(prev_gray, gray, (x1, y1, x2, y2))

                score_now = activity_score(
                    bright, ent, diff_m, flow_m,
                    BRIGHTNESS_ON, ENTROPY_ON_THRESH, DIFF_ON_BASE, FLOW_ON_BASE
                )
                feats = {
                    "bright": bright, "entropy": ent,
                    "diff": diff_m, "flow": flow_m,
                    "bright_thr": BRIGHTNESS_ON,
                    "src": "shape" if shape_added else "yolo",
                    "phone_sim": sim_phone, "pos_sim": sim_pos,
                    "pos_heur": bool(is_pos_heur),
                    "margin_ok": bool(phone_wins_margin),
                    "minsim_ok": bool(phone_above_min),
                }

                det_bboxes.append([x1,y1,x2,y2])
                det_confs.append(conf)
                det_scores.append(score_now)
                det_feats.append(feats)
                det_type.append("phone")
                phones_this_frame += 1

            if phones_this_frame > 0:
                any_phone_detected = True
                phone_frames += 1
            '''elif frame_idx % LOG_NO_PHONE_EVERY == 0:
                print(f"[{os.path.basename(input_path)}] No mobile found (frame {frame_idx}).")
        else:
            if frame_idx % LOG_NO_PHONE_EVERY == 0:
                print(f"[{os.path.basename(input_path)}] No mobile found (frame {frame_idx}).")'''

        # Track association
        unmatched_det_idx, unmatched_tracks = match_tracks(tracks, det_bboxes, det_confs)

        # Age unmatched tracks
        kept = []
        for t in tracks:
            if t in unmatched_tracks:
                t.misses += 1
                if t.misses <= MAX_MISSES: kept.append(t)
            else:
                kept.append(t)
        tracks = kept

        # New tracks
        for di in unmatched_det_idx:
            t = Track(det_bboxes[di], det_confs[di], obj_type=det_type[di]); tracks.append(t)

        # Smoothing + hysteresis
        det_map = {tuple(det_bboxes[i]): (det_scores[i], det_feats[i], det_type[i]) for i in range(len(det_bboxes))}
        for t in tracks:
            key = tuple(t.bbox)
            if key in det_map:
                score_now, feats, obj_type_now = det_map[key]
                if t.obj_type == "phone":
                    scores_for_win = (list(t.score_window) + [score_now]) if score_now is not None else list(t.score_window)
                    smooth_score = float(np.mean(scores_for_win)) if len(scores_for_win) else 0.0
                    new_label = hysteresis_label(t.last_label, smooth_score, ACTIVE_HYST_ON, ACTIVE_HYST_OFF)
                else:
                    smooth_score = None
                    new_label = "POS"
                t.update(t.bbox, t.conf, smooth_score, new_label, feats, obj_type_now=obj_type_now)

        # Draw
        for t in tracks:
            if t.misses <= MAX_MISSES and t.obj_type == "phone":
                draw_box(frame, t.bbox, t.last_label, track_id=t.id, conf=t.conf, obj_type=t.obj_type)
                for k in range(1, len(t.history)):
                    cv2.line(frame, t.history[k-1], t.history[k], (255,255,255), 2)

        out.write(frame)
        prev_gray = gray

        if log_every and frame_idx % log_every == 0:
            print(f"[{os.path.basename(input_path)}] Processed {frame_idx} frames...")

    cap.release(); out.release()

    if not any_phone_detected:
        print(f"[{os.path.basename(input_path)}] No mobile found in entire video.")

    # CSV rows
    track_rows = []
    for t in tracks:
        if t.total_frames > 0 and t.obj_type == "phone":
            track_rows.append({
                "video": os.path.basename(input_path),
                "track_id": t.id,
                "type": t.obj_type,
                "total_frames_for_track": t.total_frames,
                "active_frames_for_track": t.active_frames,
                "idle_frames_for_track": t.idle_frames,
                "active_ratio_for_track": (t.active_frames / max(1, t.total_frames))
            })

    summary_row = {
        "video": os.path.basename(input_path),
        "total_frames": total_frames,
        "phone_frames_with_any_detection": phone_frames,
        "num_tracks": len(track_rows)
    }
    return summary_row, track_rows

Batch over all videos (save outputs WITHOUT input file names)

In [32]:
video_exts = {".mp4", ".mov", ".avi", ".mkv", ".MP4", ".MOV", ".AVI", ".MKV"}
input_files = [str(p) for p in Path(INPUT_DIR).glob("*") if p.suffix in video_exts]
input_files.sort()

if not input_files:
    print(f"No videos found in: {INPUT_DIR}. Add files with: {sorted(video_exts)}")

summary_rows, all_track_rows = [], []
for i, vid in enumerate(input_files, start=1):
    out_name = f"processed_{i:04d}.mp4"            # <-- no original filename used
    out_path = os.path.join(OUTPUT_DIR_RUN, out_name)
    print(f"\n=== Processing video #{i}: {os.path.basename(vid)} -> {out_name} ===")
    res = process_video(vid, out_path)
    if res is not None:
        srow, trows = res
        summary_rows.append(srow); all_track_rows.extend(trows)


=== Processing video #1: 20250715_142638_e37e7821.mp4 -> processed_0001.mp4 ===

=== Processing video #2: 20250715_144536_9170ec05.mp4 -> processed_0002.mp4 ===

=== Processing video #3: 20250715_160539_6c132ed0.mp4 -> processed_0003.mp4 ===
[20250715_160539_6c132ed0.mp4] No mobile found in entire video.

=== Processing video #4: 20250715_173316_8cb090ab.mp4 -> processed_0004.mp4 ===

=== Processing video #5: 20250718_145802_46039155.mp4 -> processed_0005.mp4 ===
[20250718_145802_46039155.mp4] Processed 100 frames...

=== Processing video #6: 20250718_150650_075a44fc.mp4 -> processed_0006.mp4 ===
[20250718_150650_075a44fc.mp4] Processed 100 frames...


Save CSVs

In [33]:
if summary_rows:
    df_sum = pd.DataFrame(summary_rows)
    df_tracks = pd.DataFrame(all_track_rows) if all_track_rows else pd.DataFrame(
        columns=["video","track_id","type","total_frames_for_track","active_frames_for_track","idle_frames_for_track","active_ratio_for_track"]
    )
    sum_path    = SUMMARY_CSV
    tracks_path = os.path.join(OUTPUT_DIR_RUN, "tracks.csv")
    df_sum.to_csv(sum_path, index=False); df_tracks.to_csv(tracks_path, index=False)

    print("\nPer-video summary:"); display(df_sum.head())
    print("\nPer-track summary (first 20 rows):"); display(df_tracks.head(20))
    print(f"\nSaved:\n- {sum_path}\n- {tracks_path}\n- Annotated videos in: {OUTPUT_DIR_RUN}")
else:
    print("No results to summarize.")



Per-video summary:


Unnamed: 0,video,total_frames,phone_frames_with_any_detection,num_tracks
0,20250715_142638_e37e7821.mp4,80,13,1
1,20250715_144536_9170ec05.mp4,70,31,1
2,20250715_160539_6c132ed0.mp4,80,0,0
3,20250715_173316_8cb090ab.mp4,80,27,1
4,20250718_145802_46039155.mp4,180,179,2



Per-track summary (first 20 rows):


Unnamed: 0,video,track_id,type,total_frames_for_track,active_frames_for_track,idle_frames_for_track,active_ratio_for_track
0,20250715_142638_e37e7821.mp4,3,phone,3,3,0,1.0
1,20250715_144536_9170ec05.mp4,4,phone,31,31,0,1.0
2,20250715_173316_8cb090ab.mp4,5,phone,27,0,27,0.0
3,20250718_145802_46039155.mp4,6,phone,173,0,173,0.0
4,20250718_145802_46039155.mp4,8,phone,133,133,0,1.0



Saved:
- /content/drive/MyDrive/Task Video/op4/run_20250903_170408/summary.csv
- /content/drive/MyDrive/Task Video/op4/run_20250903_170408/tracks.csv
- Annotated videos in: /content/drive/MyDrive/Task Video/op4/run_20250903_170408
