In [3]:
import os
import json
import numpy as np
from tqdm import tqdm
from scipy.optimize import linear_sum_assignment
from filterpy.kalman import KalmanFilter
import copy
import heapq
from scipy.optimize import linear_sum_assignment

In [9]:
def murty_top_k(cost_matrix, k):
    num_rows, num_cols = cost_matrix.shape
    base_cost, base_assignment = linear_sum_assignment(cost_matrix)
    base_total_cost = cost_matrix[base_cost, base_assignment].sum()

    results = [(base_total_cost, list(zip(base_cost, base_assignment)))]
    queue = []
    heapq.heappush(queue, (base_total_cost, [], base_cost, base_assignment))

    seen = set()
    seen.add(tuple(base_assignment))

    while len(results) < k and queue:
        total_cost, fixed, base_rows, base_cols = heapq.heappop(queue)

        for i in range(len(base_rows)):
            new_cost_matrix = cost_matrix.copy()
            for f in fixed:
                new_cost_matrix[f[0], :] = np.inf
                new_cost_matrix[:, f[1]] = np.inf
                new_cost_matrix[f[0], f[1]] = cost_matrix[f[0], f[1]]
            new_cost_matrix[base_rows[i], base_cols[i]] = np.inf

            try:
                new_r, new_c = linear_sum_assignment(new_cost_matrix)
                if len(new_r) < num_rows:
                    continue
                new_assignment = list(zip(new_r, new_c))
                key = tuple(new_c)
                if key not in seen:
                    seen.add(key)
                    new_cost = new_cost_matrix[new_r, new_c].sum()
                    heapq.heappush(queue, (new_cost, fixed + [(base_rows[i], base_cols[i])], new_r, new_c))
                    results.append((new_cost, new_assignment))
            except:
                continue

    return [assignment for _, assignment in results[:k]]

def create_kalman_filter(init_bbox):
    kf = KalmanFilter(dim_x=7, dim_z=4)
    kf.F = np.eye(7)
    for i in range(4):
        kf.F[i, i + 3] = 1.0
    kf.H = np.zeros((4, 7))
    kf.H[:4, :4] = np.eye(4)
    kf.R *= 10.0
    kf.P *= 500.0
    kf.Q *= 0.01
    x, y, w, h = init_bbox
    s = w * h
    r = w / float(h + 1e-6)
    kf.x[:4] = np.array([[x], [y], [s], [r]])
    return kf

def predict_bbox(kf):
    kf.predict()
    x, y, s, r = kf.x[:4].flatten()
    s = max(s, 1e-3)
    r = max(r, 1e-3)
    w = np.sqrt(s * r)
    h = s / (w + 1e-6)
    return [x, y, w, h]
def create_kalman_filter(init_bbox):
    kf = KalmanFilter(dim_x=7, dim_z=4)
    kf.F = np.eye(7)
    for i in range(4):
        kf.F[i, i + 3] = 1.0
    kf.H = np.zeros((4, 7))
    kf.H[:4, :4] = np.eye(4)
    kf.R *= 10.0
    kf.P *= 500.0
    kf.Q *= 0.01
    x, y, w, h = init_bbox
    s = w * h
    r = w / float(h + 1e-6)
    kf.x[:4] = np.array([[x], [y], [s], [r]])
    return kf

def predict_bbox(kf):
    kf.predict()
    x, y, s, r = kf.x[:4].flatten()
    s = max(s, 1e-3)
    r = max(r, 1e-3)
    w = np.sqrt(s * r)
    h = s / (w + 1e-6)
    return [x, y, w, h]

MHT Tracker

In [11]:
class RealMHT:
    def __init__(self, iou_threshold=0.3, prune_k=30, max_hypotheses=5):
        self.global_hypotheses = [[]]
        self.track_counter = 1
        self.iou_threshold = iou_threshold
        self.prune_k = prune_k
        self.max_hypotheses = max_hypotheses

    @staticmethod
    def iou(boxA, boxB):
        xA = max(boxA[0], boxB[0])
        yA = max(boxA[1], boxB[1])
        xB = min(boxA[0] + boxA[2], boxB[0] + boxB[2])
        yB = min(boxA[1] + boxA[3], boxB[1] + boxB[3])
        interArea = max(0, xB - xA) * max(0, yB - yA)
        boxAArea = boxA[2] * boxA[3]
        boxBArea = boxB[2] * boxB[3]
        return interArea / float(boxAArea + boxBArea - interArea + 1e-6)

    def match(self, predictions, detections, top_k=3):
        if not predictions or not detections:
            return [[None] * len(predictions)], list(range(len(detections)))

        cost_matrix = np.ones((len(predictions), len(detections)), dtype=np.float32)
        for i, p in enumerate(predictions):
            for j, d in enumerate(detections):
                cost_matrix[i, j] = 1.0 - self.iou(p, d)

        assignment_sets = murty_top_k(cost_matrix, k=top_k)


        all_matches = []
        for assignment in assignment_sets:
            match = [None] * len(predictions)
            for row, col in assignment:
                if row < len(predictions) and col < len(detections):
                    if cost_matrix[row, col] < (1.0 - self.iou_threshold):
                        match[row] = col
            all_matches.append(match)

        first_match = all_matches[0] if all_matches else []
        matched_dets = set([m for m in first_match if m is not None])
        unmatched = list(set(range(len(detections))) - matched_dets)

        return all_matches, unmatched

    def update(self, detections, frame_idx):
        new_global = []
        for hypothesis in self.global_hypotheses:
            predicted = [predict_bbox(h.kf) for h in hypothesis]
            matchings, unmatched_dets = self.match(predicted, detections, top_k=self.max_hypotheses)

            for match in matchings[:self.max_hypotheses]:
                if len(match) != len(hypothesis):
                    continue
                new_hypo = []
                for i, det_idx in enumerate(match):
                    if det_idx is not None:
                        if i >= len(hypothesis):
                            continue
                        parent = hypothesis[i]
                        det = detections[det_idx]
                        node = HypothesisNode(
                            parent, parent.track_id, det, frame_idx,
                            kalman_filter=copy.deepcopy(parent.kf)
                        )
                        parent.children.append(node)
                        new_hypo.append(node)
                    else:
                        if i < len(hypothesis):
                            new_hypo.append(hypothesis[i])
                new_global.append(new_hypo)

            for idx in unmatched_dets:
                temp = hypothesis.copy()
                node = HypothesisNode(None, self.track_counter, detections[idx], frame_idx)
                self.track_counter += 1
                temp.append(node)
                new_global.append(temp)

        new_global.sort(key=lambda hlist: sum(h.score for h in hlist), reverse=True)
        self.global_hypotheses = new_global[:self.prune_k]

    def get_tracks(self):
        tracks_dict = {}
    
        for hypo in self.global_hypotheses:
            for node in hypo:
                path = []
                cur = node
                while cur:
                    path.append((cur.frame_idx, cur.track_id, cur.bbox))
                    cur = cur.parent
                if len(path) >= 1:
                    track_id = path[0][1]
                    if track_id not in tracks_dict or len(path) > len(tracks_dict[track_id]):
                        tracks_dict[track_id] = path[::-1]  # reverse the path
    
        return list(tracks_dict.values())


In [12]:
class HypothesisNode:
    def __init__(self, parent, track_id, bbox, frame_idx, kalman_filter=None):
        self.parent = parent
        self.track_id = track_id
        self.bbox = bbox
        self.frame_idx = frame_idx
        self.children = []
        self.kf = kalman_filter or create_kalman_filter(bbox)
        self.kf.update(np.array(bbox[:4]).reshape((4, 1)))
        self.score = 0.0 if parent is None else parent.score + self.compute_score()

    def compute_score(self):
        return 1.0

Load

In [13]:
def get_file_lists(folder):
    return sorted([os.path.join(folder, f) for f in os.listdir(folder) if f.endswith('.json')])


Main

In [14]:
folder = r'C:\Users\PC1\Desktop\2D\X-band'
json_files = get_file_lists(folder)
tracker = RealMHT(iou_threshold=0.1, prune_k=50, max_hypotheses=3)

for frame_idx, json_file in enumerate(tqdm(json_files, desc="Tracking", ncols=80)):
    with open(json_file, 'r') as f:
        data = json.load(f)

    annotations = data.get('annotations', [])
    detections = [[ann['xmin'], ann['ymin'], ann['width'], ann['height']] for ann in annotations]
    tracker.update(detections, frame_idx)

tracks = tracker.get_tracks()

print("\n=== Final Tracks ===")
for track in tracks:
    print(f"Track ID {track[0][1]}: {len(track)} frames")

Tracking: 100%|█████████████████████████████| 1655/1655 [02:25<00:00, 11.37it/s]


=== Final Tracks ===
Track ID 8204: 2 frames
Track ID 70997: 2 frames
Track ID 658348: 2 frames
Track ID 1047739: 2 frames
Track ID 1664342: 2 frames
Track ID 1913186: 1 frames
Track ID 1913136: 1 frames
Track ID 1913187: 1 frames
Track ID 1913086: 1 frames
Track ID 1913188: 1 frames
Track ID 1913137: 1 frames
Track ID 1913189: 1 frames
Track ID 1913036: 1 frames
Track ID 1913190: 1 frames
Track ID 1913138: 1 frames
Track ID 1913191: 1 frames
Track ID 1913087: 1 frames
Track ID 1913192: 1 frames
Track ID 1913139: 1 frames
Track ID 1913193: 1 frames
Track ID 1912986: 1 frames
Track ID 1913194: 1 frames
Track ID 1913140: 1 frames
Track ID 1913195: 1 frames
Track ID 1913088: 1 frames
Track ID 1913196: 1 frames
Track ID 1913141: 1 frames
Track ID 1913197: 1 frames
Track ID 1913037: 1 frames
Track ID 1913198: 1 frames
Track ID 1913142: 1 frames
Track ID 1913199: 1 frames
Track ID 1913089: 1 frames
Track ID 1913200: 1 frames
Track ID 1913143: 1 frames
Track ID 1913201: 1 frames
Track ID 191


