In [1]:
! pip install ultralytics filterpy nest_asyncio motmetrics

Collecting ultralytics
  Downloading ultralytics-8.3.247-py3-none-any.whl.metadata (37 kB)
Collecting filterpy
  Downloading filterpy-1.4.5.zip (177 kB)
[?25l     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/178.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m178.0/178.0 kB[0m [31m10.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting motmetrics
  Downloading motmetrics-1.4.0-py3-none-any.whl.metadata (20 kB)
Collecting ultralytics-thop>=2.0.18 (from ultralytics)
  Downloading ultralytics_thop-2.0.18-py3-none-any.whl.metadata (14 kB)
Collecting xmltodict>=0.12.0 (from motmetrics)
  Downloading xmltodict-1.0.2-py3-none-any.whl.metadata (15 kB)
Downloading ultralytics-8.3.247-py3-none-any.whl (1.2 MB)
[2K   [90m‚

## Class: GMC (Global Motion Compensation)

**M√¥ t·∫£:**
L·ªõp x·ª≠ l√Ω b√π tr·ª´ chuy·ªÉn ƒë·ªông n·ªÅn cho video (ƒë·∫∑c bi·ªát h·ªØu √≠ch cho UAV/Drone). S·ª≠ d·ª•ng ph∆∞∆°ng ph√°p *Sparse Optical Flow* ƒë·ªÉ ∆∞·ªõc l∆∞·ª£ng chuy·ªÉn ƒë·ªông c·ªßa camera v√† cƒÉn ch·ªânh l·∫°i h·ªá t·ªça ƒë·ªô cho c√°c ƒë·ªëi t∆∞·ª£ng ƒëang theo d√µi.

**Ph∆∞∆°ng ph√°p:**
1. **Downscaling:** Gi·∫£m ƒë·ªô ph√¢n gi·∫£i ·∫£nh ƒë·ªÉ tƒÉng t·ªëc ƒë·ªô x·ª≠ l√Ω.
2. **Feature Detection:** S·ª≠ d·ª•ng thu·∫≠t to√°n *FAST* ƒë·ªÉ t√¨m ƒëi·ªÉm ƒë·∫∑c tr∆∞ng.
3. **Matching:** S·ª≠ d·ª•ng *Lucas-Kanade Optical Flow* ƒë·ªÉ kh·ªõp ƒëi·ªÉm gi·ªØa 2 frames li√™n ti·∫øp.
4. **Transformation:** D√πng *RANSAC* (trong `estimateAffinePartial2D`) ƒë·ªÉ lo·∫°i b·ªè nhi·ªÖu v√† t√≠nh ma tr·∫≠n Affine.


In [2]:
import gradio as gr
import cv2
import numpy as np
import os
from ultralytics import YOLO
from scipy.optimize import linear_sum_assignment
from filterpy.kalman import KalmanFilter
import subprocess
import pandas as pd
import motmetrics as mm


if not hasattr(np, 'asfarray'):
    np.asfarray = lambda x: np.asarray(x, dtype=np.float64)

class GMC:
    def __init__(self, downscale=2):
        self.downscale = downscale
        self.detector = cv2.FastFeatureDetector_create(threshold=20)
        self.prev_gray = None
        self.prev_kps = None
    def apply(self, raw_frame, tracks):
        height, width = raw_frame.shape[:2]
        frame_gray = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY)
        if self.downscale > 1: frame_gray = cv2.resize(frame_gray, (width // self.downscale, height // self.downscale))
        kps = self.detector.detect(frame_gray, None)
        kps = np.float32([kp.pt for kp in kps])

        if self.prev_gray is None or self.prev_kps is None or len(self.prev_kps) == 0 or len(kps) == 0:
            self.prev_gray = frame_gray; self.prev_kps = kps; return tracks
        p1, st, err = cv2.calcOpticalFlowPyrLK(self.prev_gray, frame_gray, self.prev_kps, None)


        if p1 is not None:
            status = st.flatten() == 1; good_old = self.prev_kps[status]; good_new = p1[status]
        else: good_old = np.array([]); good_new = np.array([])


        if len(good_old) < 10: self.prev_gray = frame_gray; self.prev_kps = kps; return tracks
        m, inliers = cv2.estimateAffinePartial2D(good_old, good_new)


        if m is not None:
            if self.downscale > 1: m[0, 2] *= self.downscale; m[1, 2] *= self.downscale
            for track in tracks: track.apply_gmc(m)
        self.prev_gray = frame_gray; self.prev_kps = kps
        return tracks

Creating new Ultralytics Settings v0.0.6 file ‚úÖ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


## Class: KalmanBoxTracker

**M√¥ t·∫£:**
L·ªõp qu·∫£n l√Ω tr·∫°ng th√°i c·ªßa t·ª´ng ƒë·ªëi t∆∞·ª£ng (Track) ri√™ng bi·ªát b·∫±ng b·ªô l·ªçc Kalman (Kalman Filter). Class n√†y ch·ªãu tr√°ch nhi·ªám kh·ªüi t·∫°o, d·ª± ƒëo√°n v·ªã tr√≠ m·ªõi, v√† c·∫≠p nh·∫≠t v·ªã tr√≠ khi c√≥ ph√°t hi·ªán (detection) m·ªõi. ƒê·∫∑c bi·ªát, n√≥ ƒë∆∞·ª£c t√≠ch h·ª£p kh·∫£ nƒÉng th√≠ch nghi v·ªõi nhi·ªÖu (Adaptive Noise) v√† b√π tr·ª´ chuy·ªÉn ƒë·ªông n·ªÅn (GMC).

**Kh√¥ng gian tr·∫°ng th√°i (State Space):**
M·ªói ƒë·ªëi t∆∞·ª£ng ƒë∆∞·ª£c m√¥ h√¨nh h√≥a b·ªüi vector tr·∫°ng th√°i 7 chi·ªÅu ($dim\_x=7$):
$$\mathbf{x} = [u, v, s, r, \dot{u}, \dot{v}, \dot{s}]^T$$
Trong ƒë√≥:
- $(u, v)$: T·ªça ƒë·ªô t√¢m c·ªßa h·ªôp bao.
- $s$: Di·ªán t√≠ch h·ªôp bao ($area = w \times h$).
- $r$: T·ª∑ l·ªá khung h√¨nh ($ratio = w/h$).
- $\dot{u}, \dot{v}, \dot{s}$: V·∫≠n t·ªëc bi·∫øn thi√™n c·ªßa t√¢m v√† di·ªán t√≠ch.

**C√°c ph∆∞∆°ng th·ª©c ch√≠nh:**

### 1. `__init__(self, bbox)`
Kh·ªüi t·∫°o b·ªô l·ªçc Kalman cho m·ªôt ƒë·ªëi t∆∞·ª£ng m·ªõi.
- **Thi·∫øt l·∫≠p ma tr·∫≠n:**
  - `F` (Transition Matrix): Ma tr·∫≠n chuy·ªÉn tr·∫°ng th√°i, m√¥ h√¨nh h√≥a chuy·ªÉn ƒë·ªông v·ªõi v·∫≠n t·ªëc kh√¥ng ƒë·ªïi.
  - `H` (Measurement Matrix): Ma tr·∫≠n ƒëo l∆∞·ªùng, √°nh x·∫° tr·∫°ng th√°i sang kh√¥ng gian quan s√°t (ch·ªâ ƒëo ƒë∆∞·ª£c $u, v, s, r$).
  - `P, R, Q`: C√°c ma tr·∫≠n hi·ªáp ph∆∞∆°ng sai nhi·ªÖu.
- **Tr·∫°ng th√°i:** Kh·ªüi t·∫°o `age=0`, `hits=0`, `state=0` (Tentative - ch∆∞a x√°c nh·∫≠n).

### 2. `apply_gmc(self, warp_matrix)`
**Ch·ª©c nƒÉng quan tr·ªçng:** C·∫≠p nh·∫≠t l·∫°i v·ªã tr√≠ $(u, v)$ c·ªßa ƒë·ªëi t∆∞·ª£ng d·ª±a tr√™n chuy·ªÉn ƒë·ªông c·ªßa Camera.
- **Input:** Ma tr·∫≠n bi·∫øn ƒë·ªïi Affine $2 \times 3$ t·ª´ module GMC.
- **Logic:** Nh√¢n vector v·ªã tr√≠ $[u, v, 1]$ v·ªõi ma tr·∫≠n bi·∫øn ƒë·ªïi ƒë·ªÉ "d·ªùi" ƒë·ªëi t∆∞·ª£ng v·ªÅ ƒë√∫ng v·ªã tr√≠ trong h·ªá t·ªça ƒë·ªô m·ªõi tr∆∞·ªõc khi th·ª±c hi·ªán d·ª± ƒëo√°n.

### 3. `update(self, bbox, confidence=None)`
C·∫≠p nh·∫≠t tr·∫°ng th√°i b·ªô l·ªçc khi c√≥ k·∫øt qu·∫£ ph√°t hi·ªán (Detection) m·ªõi t·ª´ YOLO.
- **Adaptive Measurement Noise (Th√≠ch nghi nhi·ªÖu):**
  N·∫øu c√≥ `confidence` (ƒë·ªô tin c·∫≠y), h·ªá th·ªëng s·∫Ω t·ª± ƒë·ªông ƒëi·ªÅu ch·ªânh ma tr·∫≠n nhi·ªÖu ƒëo l∆∞·ªùng `R`.
  - *Confidence th·∫•p* $\rightarrow$ TƒÉng `R` $\rightarrow$ Tin v√†o d·ª± ƒëo√°n (Predict) h∆°n.
  - *Confidence cao* $\rightarrow$ Gi·∫£m `R` $\rightarrow$ Tin v√†o ph√°t hi·ªán (Detection) h∆°n.
- **Chuy·ªÉn tr·∫°ng th√°i:** N·∫øu ƒë·ªëi t∆∞·ª£ng xu·∫•t hi·ªán li√™n ti·∫øp 3 l·∫ßn (`hits >= 3`), chuy·ªÉn t·ª´ `Tentative` (0) sang `Confirmed` (1).

### 4. `predict(self)`
D·ª± ƒëo√°n tr·∫°ng th√°i ·ªü khung h√¨nh ti·∫øp theo.
- TƒÉng tu·ªïi (`age`) c·ªßa qu·ªπ ƒë·∫°o.
- N·∫øu m·∫•t d·∫•u (`time_since_update > 0`), reset chu·ªói `hit_streak` v·ªÅ 0.
- Tr·∫£ v·ªÅ h·ªôp bao d·ª± ƒëo√°n.

### Helper Methods (Static)
- `convert_bbox_to_z(bbox)`: Chuy·ªÉn ƒë·ªïi t·ª´ `[x1, y1, x2, y2]` sang kh√¥ng gian tr·∫°ng th√°i `[u, v, s, r]`.
- `convert_x_to_bbox(x)`: Chuy·ªÉn ng∆∞·ª£c l·∫°i t·ª´ kh√¥ng gian tr·∫°ng th√°i v·ªÅ h·ªôp bao chu·∫©n ƒë·ªÉ hi·ªÉn th·ªã.

---


In [3]:
class KalmanBoxTracker:
    count = 0
    def __init__(self, bbox):
        self.kf = KalmanFilter(dim_x=7, dim_z=4)
        self.kf.F = np.array([[1,0,0,0,1,0,0], [0,1,0,0,0,1,0], [0,0,1,0,0,0,1],[0,0,0,1,0,0,0],  [0,0,0,0,1,0,0], [0,0,0,0,0,1,0], [0,0,0,0,0,0,1]])
        self.kf.H = np.array([[1,0,0,0,0,0,0], [0,1,0,0,0,0,0], [0,0,1,0,0,0,0], [0,0,0,1,0,0,0]])
        self.kf.R[2:,2:] *= 10.; self.kf.P[4:,4:] *= 1000.; self.kf.P *= 10.
        self.kf.Q[-1,-1] *= 0.01; self.kf.Q[4:,4:] *= 0.01
        self.kf.x[:4] = self.convert_bbox_to_z(bbox)
        self.time_since_update = 0; self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1; self.history = []; self.hits = 0; self.hit_streak = 0; self.age = 0; self.state = 0
    def apply_gmc(self, warp_matrix):
        pos = np.array([self.kf.x[0, 0], self.kf.x[1, 0], 1.0])
        new_pos = warp_matrix @ pos
        self.kf.x[0, 0] = new_pos[0]; self.kf.x[1, 0] = new_pos[1]
    def update(self, bbox, confidence=None):
        self.time_since_update = 0; self.history = []; self.hits += 1; self.hit_streak += 1
        if confidence:
            r_factor = (1.0 - confidence) * 20.0
            self.kf.R = np.diag([1., 1., 10., 10.]) + np.diag([r_factor, r_factor, r_factor, r_factor])
        self.kf.update(self.convert_bbox_to_z(bbox))
        if self.state == 0 and self.hits >= 3: self.state = 1
    def predict(self):
        if((self.kf.x[6, 0] + self.kf.x[2, 0]) <= 0): self.kf.x[6, 0] *= 0.0
        self.kf.predict(); self.age += 1
        if(self.time_since_update > 0): self.hit_streak = 0
        self.time_since_update += 1
        self.history.append(self.convert_x_to_bbox(self.kf.x))
        return self.history[-1]
    def get_state(self):
        return self.convert_x_to_bbox(self.kf.x)
    @staticmethod
    def convert_bbox_to_z(bbox):
        w = bbox[2] - bbox[0]; h = bbox[3] - bbox[1]; x = bbox[0] + w/2.; y = bbox[1] + h/2.
        s = w * h; r = w / float(h); return np.array([x, y, s, r]).reshape((4, 1))
    @staticmethod
    def convert_x_to_bbox(x, score=None):
        w = np.sqrt(x[2] * x[3]); h = x[2] / w
        if(score is None): return np.array([x[0]-w/2., x[1]-h/2., x[0]+w/2., x[1]+h/2.]).reshape((1,4))
        else: return np.array([x[0]-w/2., x[1]-h/2., x[0]+w/2., x[1]+h/2., score]).reshape((1,5))


## Module: Data Association & Utilities

Module n√†y cung c·∫•p c√°c c√¥ng c·ª• to√°n h·ªçc v√† x·ª≠ l√Ω d·ªØ li·ªáu c·∫ßn thi·∫øt ƒë·ªÉ li√™n k·∫øt c√°c ph√°t hi·ªán (detections) v·ªõi c√°c qu·ªπ ƒë·∫°o (trackers).

### 1. H√†m `iou_batch(bb_test, bb_gt)`
**M√¥ t·∫£:**
T√≠nh to√°n ch·ªâ s·ªë IoU (Intersection over Union) gi·ªØa hai t·∫≠p h·ª£p c√°c h·ªôp bao (Bounding Boxes) m·ªôt c√°ch song song (vectorized operation) s·ª≠ d·ª•ng Numpy broadcasting. ƒê√¢y l√† phi√™n b·∫£n t·ªëi ∆∞u t·ªëc ƒë·ªô cao thay v√¨ d√πng v√≤ng l·∫∑p `for`.

**Tham s·ªë:**
- `bb_test`: M·∫£ng Numpy `(N, 4)` ch·ª©a c√°c h·ªôp bao d·ª± ƒëo√°n (Detections).
- `bb_gt`: M·∫£ng Numpy `(M, 4)` ch·ª©a c√°c h·ªôp bao th·ª±c t·∫ø (Trackers).

**C∆° ch·∫ø:**
S·ª≠ d·ª•ng k·ªπ thu·∫≠t `expand_dims` ƒë·ªÉ t·∫°o ma tr·∫≠n so s√°nh k√≠ch th∆∞·ªõc `(N, M)`, cho ph√©p t√≠nh to√°n ƒë·ªìng th·ªùi di·ªán t√≠ch giao (Intersection) v√† di·ªán t√≠ch h·ª£p (Union) c·ªßa t·∫•t c·∫£ c√°c c·∫∑p c√≥ th·ªÉ.

**Tr·∫£ v·ªÅ:**
- Ma tr·∫≠n `(N, M)` ch·ª©a gi√° tr·ªã IoU (t·ª´ 0.0 ƒë·∫øn 1.0) c·ªßa t·ª´ng c·∫∑p.

---

### 2. H√†m `associate_detections_to_trackers`
**M√¥ t·∫£:**
Th·ª±c hi·ªán gh√©p n·ªëi (Matching) gi·ªØa c√°c ph√°t hi·ªán m·ªõi v√† c√°c qu·ªπ ƒë·∫°o hi·ªán c√≥ d·ª±a tr√™n ma tr·∫≠n IoU.

**Thu·∫≠t to√°n:**
1. **T√≠nh Ma tr·∫≠n IoU:** G·ªçi `iou_batch` ƒë·ªÉ l·∫•y ƒë·ªô ch·ªìng l·∫•n gi·ªØa m·ªçi c·∫∑p Detection-Tracker.
2. **Hungarian Algorithm:**
   - N·∫øu ma tr·∫≠n ƒë∆°n gi·∫£n (m·ªói h√†ng/c·ªôt ch·ªâ c√≥ 1 gi√° tr·ªã v∆∞·ª£t ng∆∞·ª°ng), th·ª±c hi·ªán gh√©p tr·ª±c ti·∫øp.
   - N·∫øu ph·ª©c t·∫°p (xung ƒë·ªôt), s·ª≠ d·ª•ng thu·∫≠t to√°n `linear_sum_assignment` (Hungarian) t·ª´ th∆∞ vi·ªán `scipy` ƒë·ªÉ t√¨m ph∆∞∆°ng √°n gh√©p c·∫∑p t·ªëi ∆∞u (t·ªïng IoU l·ªõn nh·∫•t).
3. **L·ªçc theo ng∆∞·ª°ng (Thresholding):**
   - Lo·∫°i b·ªè c√°c c·∫∑p gh√©p c√≥ `IoU < iou_threshold` (m·∫∑c ƒë·ªãnh 0.3). Coi nh∆∞ kh√¥ng kh·ªõp.
4. **Ph√¢n lo·∫°i ƒë·∫ßu ra:**
   - `matches`: C√°c c·∫∑p ƒë√£ gh√©p th√†nh c√¥ng.
   - `unmatched_detections`: C√°c ph√°t hi·ªán m·ªõi kh√¥ng kh·ªõp v·ªõi ai (s·∫Ω t·∫°o Track m·ªõi).
   - `unmatched_trackers`: C√°c qu·ªπ ƒë·∫°o c≈© kh√¥ng t√¨m th·∫•y ƒë·ªëi t∆∞·ª£ng (c√≥ th·ªÉ ƒë√£ b·ªã m·∫•t d·∫•u).

**Tham s·ªë:**
- `detections`: Danh s√°ch h·ªôp bao t·ª´ YOLO.
- `trackers`: Danh s√°ch h·ªôp bao d·ª± ƒëo√°n t·ª´ Kalman Filter.
- `iou_threshold`: Ng∆∞·ª°ng ch·ªìng l·∫•n t·ªëi thi·ªÉu ƒë·ªÉ ch·∫•p nh·∫≠n gh√©p c·∫∑p (M·∫∑c ƒë·ªãnh: 0.3).

---

### 3. H√†m `load_mot_gt(gt_path)`
**M√¥ t·∫£:**
ƒê·ªçc v√† ph√¢n t√≠ch file d·ªØ li·ªáu Ground Truth (GT) theo chu·∫©n MOTChallenge (MOT15/16/17/20) ƒë·ªÉ ph·ª•c v·ª• vi·ªác ƒë√°nh gi√°.

**ƒê·ªãnh d·∫°ng h·ªó tr·ª£:**
H·ªó tr·ª£ c·∫£ file CSV (ph√¢n c√°ch b·∫±ng d·∫•u ph·∫©y) v√† file text ph√¢n c√°ch b·∫±ng kho·∫£ng tr·∫Øng. C·∫•u tr√∫c d√≤ng ti√™u chu·∫©n:
`frame, id, bb_left, bb_top, bb_width, bb_height, conf, x, y, z`

**X·ª≠ l√Ω:**
- T·ª± ƒë·ªông ph√°t hi·ªán ƒë·ªãnh d·∫°ng ph√¢n c√°ch (ph·∫©y ho·∫∑c kho·∫£ng tr·∫Øng).
- Chuy·ªÉn ƒë·ªïi t·ªça ƒë·ªô t·ª´ `[x, y, w, h]` (chu·∫©n MOT) sang `[x1, y1, x2, y2]` (chu·∫©n x·ª≠ l√Ω n·ªôi b·ªô).
- Gom nh√≥m d·ªØ li·ªáu theo t·ª´ng `frame` ƒë·ªÉ truy xu·∫•t nhanh (`dict key = frame_id`).

**Tr·∫£ v·ªÅ:**
- `gt_dict`: Dictionary d·∫°ng `{frame_id: [[x1, y1, x2, y2, obj_id], ...]}`.

In [4]:
def iou_batch(bb_test, bb_gt):
    bb_gt = np.expand_dims(bb_gt, 0); bb_test = np.expand_dims(bb_test, 1)
    xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0]); yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
    xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2]); yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
    w = np.maximum(0., xx2 - xx1); h = np.maximum(0., yy2 - yy1); wh = w * h
    o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1]) + (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh)
    return(o)

def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3):
    if(len(trackers)==0): return np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,),dtype=int)
    iou_matrix = iou_batch(detections, trackers)
    if min(iou_matrix.shape) > 0:
        a = (iou_matrix > iou_threshold).astype(np.int32)
        if a.sum(1).max() == 1 and a.sum(0).max() == 1: matched_indices = np.stack(np.where(a), axis=1)
        else: row_ind, col_ind = linear_sum_assignment(-iou_matrix); matched_indices = np.stack((row_ind, col_ind), axis=1)
    else: matched_indices = np.empty((0,2),dtype=int)
    unmatched_detections = [d for d, det in enumerate(detections) if d not in matched_indices[:,0]]
    unmatched_trackers = [t for t, trk in enumerate(trackers) if t not in matched_indices[:,1]]
    matches = []
    for m in matched_indices:
        if(iou_matrix[m[0], m[1]] < iou_threshold): unmatched_detections.append(m[0]); unmatched_trackers.append(m[1])
        else: matches.append(m.reshape(1,2))
    if(len(matches)==0): matches = np.empty((0,2),dtype=int)
    else: matches = np.concatenate(matches, axis=0)
    return matches, np.array(unmatched_detections, dtype=int), np.array(unmatched_trackers, dtype=int)

def load_mot_gt(gt_path):
    if gt_path is None: return {}
    if hasattr(gt_path, 'name'): gt_path = gt_path.name
    gt_dict = {}
    try:
        try: df = pd.read_csv(gt_path, header=None, sep=',')
        except: df = pd.read_csv(gt_path, header=None, sep=r'\s+', engine='python')
        for index, row in df.iterrows():
            try:
                if not str(row[0]).replace('.','',1).isdigit(): continue
                frame = int(float(row[0])); obj_id = int(float(row[1]))
                x1, y1, w, h = float(row[2]), float(row[3]), float(row[4]), float(row[5])
                if frame not in gt_dict: gt_dict[frame] = []
                gt_dict[frame].append([x1, y1, x1+w, y1+h, obj_id])
            except ValueError: continue
    except Exception as e: print(f"GT Error: {e}"); return {}
    return gt_dict

In [7]:
MODEL_OPTIONS = {
    "EfficientNetB0": "/content/drive/MyDrive/tracking/efficientnetB0-yolov8.pt",
    "EfficientNetB3": "/content/drive/MyDrive/tracking/efficientnetB0-yolov8.pt",
    "MobileNet": "yolov8s.pt",
    "ConvNext-T": "/content/drive/MyDrive/tracking/convnext_T_best.pt",
    "ConvNext-S": "/content/drive/MyDrive/tracking/convnext_S_best.pt"
}


def detect_objects_frame_1(video_path, model_name):
    """
    Ch·∫°y YOLO tr√™n frame 1, c·∫Øt ·∫£nh c√°c ƒë·ªëi t∆∞·ª£ng detected.
    Tr·∫£ v·ªÅ: List ·∫£nh (cho Gallery) v√† List bbox (l∆∞u v√†o State ƒë·ªÉ d√πng sau).
    """
    if video_path is None: return [], [], []

    cap = cv2.VideoCapture(video_path)
    ret, frame = cap.read()
    cap.release()

    if not ret: return [], [], []

    # Load model
    model_path = MODEL_OPTIONS.get(model_name, "yolov8n.pt")
    try: model = YOLO(model_path)
    except: model = YOLO("yolov8n.pt")

    # Detect
    results = model(frame, verbose=False, iou=0.45, conf=0.1)[0]

    gallery_images = []
    detected_boxes = [] # L∆∞u coordinate th·ª±c t·∫ø [x1, y1, x2, y2]

    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    if results.boxes:
        for i, box in enumerate(results.boxes):
            x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy())
            crop = frame_rgb[y1:y2, x1:x2]
            gallery_images.append((crop, f"Object {i}"))
            detected_boxes.append([x1, y1, x2, y2])

    # Reset selected indices state khi scan m·ªõi
    return gallery_images, detected_boxes, []

def on_select_object(evt: gr.SelectData, detected_boxes, current_selection_indices):
    """
    H√†m Toggle ch·ªçn nhi·ªÅu ƒë·ªëi t∆∞·ª£ng.
    """
    if current_selection_indices is None:
        current_selection_indices = []

    index = evt.index

    # Toggle logic: N·∫øu ƒë√£ c√≥ th√¨ x√≥a, ch∆∞a c√≥ th√¨ th√™m
    if index in current_selection_indices:
        current_selection_indices.remove(index)
    else:
        current_selection_indices.append(index)

    # S·∫Øp x·∫øp l·∫°i cho ƒë·∫πp
    current_selection_indices.sort()

    # L·∫•y danh s√°ch c√°c box t∆∞∆°ng ·ª©ng
    selected_boxes = [detected_boxes[i] for i in current_selection_indices if i < len(detected_boxes)]

    feedback_str = f"ƒêang ch·ªçn c√°c Object: {current_selection_indices}" if current_selection_indices else "Ch∆∞a ch·ªçn ƒë·ªëi t∆∞·ª£ng n√†o (S·∫Ω track t·∫•t c·∫£)"

    return feedback_str, current_selection_indices, selected_boxes

def clear_selection():
    return "ƒê√£ x√≥a ch·ªçn. Tracking t·∫•t c·∫£.", [], []

def calculate_iou_single(box1, box2):
    # box: [x1, y1, x2, y2]
    xx1 = max(box1[0], box2[0]); yy1 = max(box1[1], box2[1])
    xx2 = min(box1[2], box2[2]); yy2 = min(box1[3], box2[3])
    w = max(0, xx2 - xx1); h = max(0, yy2 - yy1)
    inter = w * h
    area1 = (box1[2]-box1[0])*(box1[3]-box1[1])
    area2 = (box2[2]-box2[0])*(box2[3]-box2[1])
    union = area1 + area2 - inter
    return inter/union if union > 0 else 0

In [None]:
def run_tracking_demo(video_path, gt_path, model_selection, conf_threshold, iou_threshold, target_boxes_list, progress=gr.Progress()):
    if video_path is None: return None, "Vui l√≤ng upload video."

    model_path = MODEL_OPTIONS.get(model_selection, model_selection)

    target_track_ids = set()
    is_selective_mode = target_boxes_list is not None and len(target_boxes_list) > 0

    if is_selective_mode:
        print(f"üéØ S·ªë l∆∞·ª£ng targets c·∫ßn kh√≥a: {len(target_boxes_list)}")

    try: model = YOLO(model_path)
    except: model = YOLO("yolov8n.pt")

    cap = cv2.VideoCapture(video_path)
    width, height = int(cap.get(3)), int(cap.get(4))
    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps < 1: fps = 30.0
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    output_path = "temp_output.mp4"
    final_output_path = "result_video.mp4"
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

    gt_data = load_mot_gt(gt_path); has_gt = len(gt_data) > 0
    acc = mm.MOTAccumulator(auto_id=True)
    trackers = []; gmc = GMC(downscale=2); KalmanBoxTracker.count = 0
    frame_idx = 0

    while True:
        ret, frame = cap.read()
        if not ret: break
        frame_idx += 1
        if frame_idx % 10 == 0: progress(frame_idx / total_frames, desc=f"Processing {frame_idx}/{total_frames}")

        results = model(frame, verbose=False, iou=0.45, conf=0.1)[0]
        dets = []
        if results.boxes:
             for box in results.boxes:
                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                score = float(box.conf[0].cpu().numpy())
                dets.append([x1, y1, x2, y2, score])
        dets = np.array(dets) if len(dets) > 0 else np.empty((0, 5))

        gmc.apply(frame, trackers)

        trks = np.zeros((len(trackers), 5)); to_del = []
        for t, trk in enumerate(trks):
            pos = trackers[t].predict()[0]
            trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
            if np.any(np.isnan(pos)): to_del.append(t)
        trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
        for t in reversed(to_del): trackers.pop(t)

        if len(dets) > 0:
            inds_high = dets[:, 4] >= conf_threshold
            inds_low = (dets[:, 4] > 0.1) & (dets[:, 4] < conf_threshold)
            dets_high = dets[inds_high]; dets_low = dets[inds_low]
        else: dets_high = np.empty((0, 5)); dets_low = np.empty((0, 5))

        matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets_high, trks, iou_threshold)

        trks_remain = trks[unmatched_trks]; dets_remain = dets_low
        if len(trks_remain) > 0 and len(dets_remain) > 0:
            matched_l, _, _ = associate_detections_to_trackers(dets_remain, trks_remain, 0.1)
            for m in matched_l: trackers[unmatched_trks[m[1]]].update(dets_remain[m[0]][:4], dets_remain[m[0]][4])
        for m in matched: trackers[m[1]].update(dets_high[m[0]][:4], dets_high[m[0]][4])
        for i in unmatched_dets: trackers.append(KalmanBoxTracker(dets_high[i][:4]))

        i = len(trackers); ret_trackers = []
        for trk in reversed(trackers):
            d = trk.get_state()[0]
            if (trk.time_since_update < 1) and (trk.hit_streak >= 3 or frame_idx <= 3):
                ret_trackers.append(np.concatenate((d,[trk.id])).reshape(1,-1))
            i -= 1
            if(trk.time_since_update > 30): trackers.pop(i)

        if frame_idx == 1 and is_selective_mode and len(ret_trackers) > 0:
            for target_box in target_boxes_list:
                best_iou = 0
                best_id = -1
                for trk_data in ret_trackers:
                    d = trk_data[0]
                    trk_box = [d[0], d[1], d[2], d[3]]
                    iou = calculate_iou_single(target_box, trk_box)
                    if iou > best_iou:
                        best_iou = iou
                        best_id = int(d[4])

                if best_iou > 0.5:
                    target_track_ids.add(best_id)

            print(f"üéØ ƒê√£ kh√≥a c√°c ID: {target_track_ids}")

            if len(target_track_ids) == 0:
                print("‚ö†Ô∏è Kh√¥ng kh·ªõp ƒë∆∞·ª£c ID n√†o. Chuy·ªÉn sang track all.")
                is_selective_mode = False

        if has_gt:
            t_ids = []; t_boxes = []
            for trk_data in ret_trackers:
                d = trk_data[0]
                t_ids.append(int(d[4]))
                t_boxes.append([d[0], d[1], d[2]-d[0], d[3]-d[1]])
            g_ids = []; g_boxes = []
            if frame_idx in gt_data:
                for item in gt_data[frame_idx]:
                    g_ids.append(int(item[4]))
                    g_boxes.append([item[0], item[1], item[2]-item[0], item[3]-item[1]])
            dist = mm.distances.iou_matrix(g_boxes, t_boxes, max_iou=0.5) if (len(g_boxes)>0 and len(t_boxes)>0) else []
            acc.update(g_ids, t_ids, dist)

        # --- Draw ---
        for d in ret_trackers:
            d = d[0]
            x1, y1, x2, y2, tid = int(d[0]), int(d[1]), int(d[2]), int(d[3]), int(d[4])

            should_draw = True
            color = (0, 255, 0)
            thickness = 2

            if is_selective_mode:
                if tid in target_track_ids:
                    color = (0, 0, 255)
                    thickness = 3
                else:
                    should_draw = False

            if should_draw:
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, thickness)
                label = f"TARGET {tid}" if (is_selective_mode and tid in target_track_ids) else f"ID {tid}"
                cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        out.write(frame)

    cap.release(); out.release()

    metrics_str = "Metrics:"
    if has_gt:
        mh = mm.metrics.create()
        try:
            summary = mh.compute(acc, metrics=['num_frames', 'mota', 'motp', 'idf1', 'mostly_tracked', 'mostly_lost', 'num_switches'], name='acc')
            metrics_str = mm.io.render_summary(summary, formatters=mh.formatters, namemap={'num_frames': 'Frames', 'mota': 'MOTA', 'motp': 'MOTP', 'idf1': 'IDF1', 'mostly_tracked': 'MT', 'mostly_lost': 'ML', 'num_switches': 'ID Sw'})
        except: metrics_str = "Error calculating metrics"

    if os.path.exists(final_output_path): os.remove(final_output_path)
    try: subprocess.call(args=f"ffmpeg -y -i {output_path} -c:v libx264 {final_output_path} -loglevel quiet", shell=True)
    except: final_output_path = output_path
    return final_output_path, metrics_str


with gr.Blocks(title="UAV Tracking System") as demo:
    gr.Markdown("# UAV Multi-Target Tracking")
    gr.Markdown("Upload video -> Qu√©t -> Click ch·ªçn nhi·ªÅu ƒë·ªëi t∆∞·ª£ng (Click l·∫ßn n·ªØa ƒë·ªÉ b·ªè ch·ªçn) -> Start.")

    # 1. Danh s√°ch t·∫•t c·∫£ box detect ƒë∆∞·ª£c ·ªü frame 1
    detected_boxes_state = gr.State([])
    # 2. Danh s√°ch index c·ªßa c√°c object ƒë√£ click ch·ªçn (VD: [0, 3, 5])
    selected_indices_state = gr.State([])
    # 3. Danh s√°ch coordinate th·ª±c t·∫ø c·ªßa c√°c object ƒë√£ ch·ªçn (Output cho model)
    selected_boxes_state = gr.State([])

    with gr.Row():
        with gr.Column(scale=1):
            input_video = gr.Video(label="1. Upload Video")
            model_dd = gr.Dropdown(choices=list(MODEL_OPTIONS.keys()), value="EfficientNetB0", label="Model")

            with gr.Row():
                btn_scan = gr.Button("Qu√©t ƒë·ªëi t∆∞·ª£ng", variant="secondary")
                btn_clear = gr.Button("X√≥a ch·ªçn", variant="stop")

            gr.Markdown("### 3. Gallery (Click ƒë·ªÉ Ch·ªçn/B·ªè ch·ªçn):")
            gallery = gr.Gallery(
                label="Danh s√°ch ƒë·ªëi t∆∞·ª£ng",
                show_label=True,
                elem_id="gallery",
                columns=4, rows=2, height="auto",
                object_fit="contain",
                allow_preview=False
            )
            selection_info = gr.Textbox(label="Tr·∫°ng th√°i ch·ªçn", value="Ch∆∞a ch·ªçn (Track All)", interactive=False)

            with gr.Row():
                conf_slide = gr.Slider(0.1, 0.9, 0.5, label="Conf Threshold")
                iou_slide = gr.Slider(0.1, 0.9, 0.2, label="IoU Threshold")

            input_gt = gr.File(label="Upload GT (.txt)")
            btn_run = gr.Button("START TRACKING", variant="primary")

        with gr.Column(scale=1):
            output_video = gr.Video(label="K·∫øt qu·∫£ Tracking")
            output_metrics = gr.Textbox(label="Metrics Report", lines=10)


    btn_scan.click(
        fn=detect_objects_frame_1,
        inputs=[input_video, model_dd],
        outputs=[gallery, detected_boxes_state, selected_indices_state]
    )

    gallery.select(
        fn=on_select_object,
        inputs=[detected_boxes_state, selected_indices_state],
        outputs=[selection_info, selected_indices_state, selected_boxes_state]
    )

    btn_clear.click(
        fn=clear_selection,
        inputs=[],
        outputs=[selection_info, selected_indices_state, selected_boxes_state]
    )

    btn_run.click(
        fn=run_tracking_demo,
        inputs=[input_video, input_gt, model_dd, conf_slide, iou_slide, selected_boxes_state],
        outputs=[output_video, output_metrics]
    )

if __name__ == "__main__":
    demo.launch(share=True, debug=True)

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://001cb474ec5ef41e5d.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/uvicorn/protocols/http/h11_impl.py", line 403, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/fastapi/applications.py", line 1139, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.12/dist-packages/starlette/applications.py", line 107, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.12/dist-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/usr/local/lib/python3.12/dist-packages/starlette/middleware/errors.py",

üéØ S·ªë l∆∞·ª£ng targets c·∫ßn kh√≥a: 1
üéØ ƒê√£ kh√≥a c√°c ID: {0}
