In [1]:
import os
import torch
import numpy as np
import tensorrt as trt
import cv2 as cv
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from boxmot.trackers import ByteTrack, BoostTrack, BotSort, StrongSort, OcSort, DeepOcSort, HybridSort
import time
import logging
import gc
import json
import pandas as pd 
import pathlib
from pathlib import Path

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

MODEL_PATH = os.path.join(Path.cwd().parent, 'models', 'yolov7-tiny.engine')
REID_PATH = os.path.join(Path.cwd().parent, 'models', 'osnet_x0_25_msmt17.pt')

if not os.path.exists(MODEL_PATH):
    raise FileNotFoundError(f"Model file not found at {MODEL_PATH}")

if not os.path.exists(REID_PATH):
    raise FileNotFoundError(f"ReID model file not found at {REID_PATH}")

FRAMES_DIR = r'../video/frames'
NUM_ROUNDS = 1

In [3]:
def get_logger(name: str, level=logging.INFO):
    logger = logging.getLogger(name)
    logger.setLevel(level)
    if not logger.handlers:
        handler = logging.StreamHandler()
        formatter = logging.Formatter("%(asctime)s | %(levelname)s | %(name)s | %(message)s")
        handler.setFormatter(formatter)
        logger.addHandler(handler)
    logger.propagate = False
    return logger

def cleanup():
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        torch.cuda.ipc_collect()
    gc.collect()

logger = get_logger("Benchmark")

In [4]:
INPUT_NAME = "images"
TRT_LOGGER = trt.Logger(trt.Logger.INFO)
trt.init_libnvinfer_plugins(TRT_LOGGER, "")

class TRTModel:
    def __init__(self, engine_path: str, input_shape: tuple, device: torch.device):
        self.input_shape = input_shape
        self.device = torch.device("cuda:0") if device.type == "cuda" else device
        
        torch.cuda.set_device(self.device)
        self.trt_stream = torch.cuda.Stream(device=self.device)
        
        # Load engine
        with open(engine_path, "rb") as f:
            self.engine = trt.Runtime(TRT_LOGGER).deserialize_cuda_engine(f.read())
        if self.engine is None:
            raise RuntimeError("Failed to deserialize TensorRT engine")
        
        self.context = self.engine.create_execution_context()
        
        # Setup input tensor
        self.input_tensor = torch.empty(input_shape, device=self.device, dtype=torch.float32)
        self.context.set_input_shape(INPUT_NAME, input_shape)
        self.context.set_tensor_address(INPUT_NAME, int(self.input_tensor.data_ptr()))
        
        # Setup output tensors
        self.outputs = {}
        for i in range(self.engine.num_io_tensors):
            name = self.engine.get_tensor_name(i)
            if self.engine.get_tensor_mode(name) == trt.TensorIOMode.OUTPUT:
                shape = tuple(self.context.get_tensor_shape(name))
                dtype = trt.nptype(self.engine.get_tensor_dtype(name))
                tensor = torch.empty(shape, device=self.device, 
                                   dtype=torch.from_numpy(np.empty((), dtype=dtype)).dtype)
                self.outputs[name] = tensor
                self.context.set_tensor_address(name, int(tensor.data_ptr()))
        
        self.warmup()

    def warmup(self, iters: int = 30):
        times = []
        for _ in range(iters):
            self.input_tensor.normal_()
            starter, ender = torch.cuda.Event(enable_timing=True), torch.cuda.Event(enable_timing=True)
            starter.record(stream=self.trt_stream)
            with torch.cuda.stream(self.trt_stream):
                self.context.execute_async_v3(self.trt_stream.cuda_stream)
            ender.record(stream=self.trt_stream)
            self.trt_stream.synchronize()
            times.append(starter.elapsed_time(ender) / 1000.0)
        logger.info(f"Warmup: {sum(times)/len(times):.4f}s avg inference time")

    def infer(self, inp: torch.Tensor) -> tuple:
        assert inp.device == self.device and inp.shape == self.input_shape
        
        starter, ender = torch.cuda.Event(enable_timing=True), torch.cuda.Event(enable_timing=True)
        with torch.cuda.stream(self.trt_stream):
            self.input_tensor.copy_(inp, non_blocking=True)
            starter.record(stream=self.trt_stream)
            self.context.execute_async_v3(self.trt_stream.cuda_stream)
            ender.record(stream=self.trt_stream)
        
        self.trt_stream.synchronize()
        return starter.elapsed_time(ender) / 1000.0, self.outputs


In [5]:
def letterbox(im, new_shape=(640, 640), color=(114, 114, 114)):
    shape = im.shape[:2]
    r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
    
    new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
    dw, dh = (new_shape[1] - new_unpad[0]) / 2, (new_shape[0] - new_unpad[1]) / 2
    
    if shape[::-1] != new_unpad:
        im = cv.resize(im, new_unpad, interpolation=cv.INTER_LINEAR)
    
    top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
    left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
    im = cv.copyMakeBorder(im, top, bottom, left, right, cv.BORDER_CONSTANT, value=color)
    return im, r, (dw, dh)

def scale_coords_back(boxes, ratio, dwdh):
    """
    Scale bounding boxes from 640x640 letterboxed space back to original frame space
    
    Args:
        boxes: numpy array of shape (N, 4) with [x1, y1, x2, y2] in 640x640 space
        ratio: scaling ratio from letterbox
        dwdh: (dw, dh) padding offsets from letterbox
    
    Returns:
        scaled boxes in original frame coordinates
    """
    dw, dh = dwdh
    
    # Remove padding
    boxes = boxes.copy()
    boxes[:, [0, 2]] -= dw  # x coords
    boxes[:, [1, 3]] -= dh  # y coords
    
    # Scale by ratio
    boxes /= ratio
    
    return boxes

class VideoFrameDataset(Dataset):
    def __init__(self, frames_dir, skip=0):
        """
        Dataset for pre-extracted frames
        Args:
            frames_dir: Path to directory containing frame images
            skip: Skip every nth frame (0 = no skip)
        """
        self.frames_paths = []
        self.skip = skip
        
        logger.info(f"Loading frames from: {frames_dir}")
        
        # Get all image files (sorted by name)
        valid_extensions = ('.jpg', '.jpeg', '.png', '.bmp')
        all_files = sorted([
            os.path.join(frames_dir, f) 
            for f in os.listdir(frames_dir) 
            if f.lower().endswith(valid_extensions)
        ])
        
        # Apply skip filter
        for i, path in enumerate(all_files):
            if skip != 0 and i % skip == 0:
                continue
            self.frames_paths.append(path)
        
        logger.info(f"Loaded {len(self.frames_paths)} frame paths")
    
    def __len__(self):
        return len(self.frames_paths)
    
    def __getitem__(self, idx):
        # Read frame from disk
        img_bgr = cv.imread(self.frames_paths[idx])
        if img_bgr is None:
            raise RuntimeError(f"Failed to read frame: {self.frames_paths[idx]}")
        
        img_rgb = cv.cvtColor(img_bgr, cv.COLOR_BGR2RGB)
        img_lb, ratio, dwdh = letterbox(img_rgb, new_shape=(640, 640))
        img_lb_bgr = cv.cvtColor(img_lb, cv.COLOR_RGB2BGR)
        
        img_chw = img_lb.transpose(2, 0, 1)
        img_chw = np.ascontiguousarray(img_chw, dtype=np.float32)
        tensor = torch.from_numpy(img_chw) / 255.0
        
        # Return original image to keep coordinates consistent with boxes scaled back
        return img_bgr, tensor, ratio, dwdh, self.frames_paths[idx]

def collate_fn(batch):
    imgs, tensors, ratios, dwdhs, frame_paths = zip(*batch)
    tensors = torch.stack(tensors, dim=0)
    return imgs, tensors, ratios, dwdhs, frame_paths

In [6]:
trackers_config = {
    'OcSort': {
        'class': OcSort,
        'params': {
            'reid_weights': Path('osnet_x0_25_msmt17.pt'),
            'device': device,
            'half': False
        }
    }
}

In [7]:
# Initialize model
model = TRTModel(
    engine_path=MODEL_PATH,
    input_shape=(1, 3, 640, 640),
    device=device,
)

# Load dataset from pre-extracted frames
dataset = VideoFrameDataset(
    frames_dir=FRAMES_DIR,
    skip=0,  # adjust if you want to skip frames
)

dataloader = DataLoader(
    dataset,
    batch_size=1,
    shuffle=False,
    num_workers=0,
    pin_memory=True,
    collate_fn=collate_fn,
)

2026-01-17 15:47:06,229 | INFO | Benchmark | Warmup: 0.0071s avg inference time
2026-01-17 15:47:06,229 | INFO | Benchmark | Loading frames from: ../video/frames
2026-01-17 15:47:06,236 | INFO | Benchmark | Loaded 1501 frame paths


In [8]:
NOTEBOOK_DIR = Path.cwd()
DATA_DIR = NOTEBOOK_DIR / "data"
PREDICTED_DIR = DATA_DIR / "predicted"
CONFIG_DIR = DATA_DIR / "config"
PREDICTED_DIR.mkdir(parents=True, exist_ok=True)
CONFIG_DIR.mkdir(parents=True, exist_ok=True)

# Save results directory with timestamp
directory_time = input("Enter a name for the results directory (or press Enter to use timestamp): ")
if directory_time.strip() == "":
    RUNNING_TIME = time.strftime("%Y%m%d-%H%M%S")
else:
    RUNNING_TIME = directory_time.strip()
RUN_RESULTS_DIR = PREDICTED_DIR / RUNNING_TIME
RUN_RESULTS_DIR.mkdir(parents=True, exist_ok=True)

RUN_CONFIG_DIR = CONFIG_DIR / RUNNING_TIME
RUN_CONFIG_DIR.mkdir(parents=True, exist_ok=True)

In [9]:
all_results = []

for tracker_name, config in trackers_config.items():
    for round_idx in range(NUM_ROUNDS):
        logger.info(f"\n{'='*60}")
        logger.info(f"{tracker_name} | Attempt {round_idx + 1} of {NUM_ROUNDS}")
        logger.info(f"{'='*60}")

        tracker = config["class"](**config["params"])
        tracker.reset()

        infer_times = []
        tracking_times = []
        tracking_rows = []

        for frame_idx, (imgs, input_tensor, ratios, dwdhs, frame_paths) in enumerate(
            tqdm(dataloader, desc=f"{tracker_name} (run {round_idx + 1})")
        ):
            input_tensor = input_tensor.to(device)

            # Inference
            infer_time, outputs = model.infer(input_tensor)
            infer_times.append(infer_time)

            # Parse detections
            num = int(outputs["num_dets"][0].item())
            boxes = outputs["det_boxes"][0][:num].cpu().numpy()
            scores = outputs["det_scores"][0][:num].cpu().numpy()
            classes = outputs["det_classes"][0][:num].cpu().numpy()

            # Scale boxes back to original frame coordinates
            boxes_original = scale_coords_back(boxes, ratios[0], dwdhs[0])

            # Clamp boxes to original image bounds and filter invalid boxes
            h, w = imgs[0].shape[:2]
            boxes_original[:, [0, 2]] = np.clip(boxes_original[:, [0, 2]], 0, w - 1)
            boxes_original[:, [1, 3]] = np.clip(boxes_original[:, [1, 3]], 0, h - 1)
            valid = (boxes_original[:, 2] > boxes_original[:, 0] + 1) & (boxes_original[:, 3] > boxes_original[:, 1] + 1)
            boxes_original = boxes_original[valid]
            scores = scores[valid]
            classes = classes[valid]

            dets = np.concatenate([boxes_original, scores[:, None], classes[:, None]], axis=-1)

            # Tracking
            start = time.perf_counter()
            results = tracker.update(dets, imgs[0])
            tracking_time = time.perf_counter() - start
            tracking_times.append(tracking_time)

            id_map = {}
            next_id = 1

            # Store: frame, id, left, top, width, height, confidence
            for track in results:
                x1, y1, x2, y2, track_id = track[:5]
                conf = track[5] if len(track) > 5 else 1.0

                if track_id not in id_map:
                    id_map[track_id] = next_id
                    next_id += 1
                mapped_id = id_map[track_id]

                width = x2 - x1
                height = y2 - y1
                tracking_rows.append(
                    {
                        "frame": frame_idx+1,
                        "id": int(mapped_id),
                        "bb_left": float(x1),
                        "bb_top": float(y1),
                        "bb_width": float(width),
                        "bb_height": float(height),
                        "conf": float(conf),
                        "x": int(-1),
                        "y": int(-1),
                        "z": int(-1),
                    }
                )

        # Paths for this round (ensure directories exist)
        csv_path = RUN_RESULTS_DIR / f"{tracker_name}_round{round_idx + 1}.csv"
        json_path = RUN_CONFIG_DIR / f"{tracker_name}_round{round_idx + 1}.json"

        # Save CSV
        pd.DataFrame(tracking_rows).to_csv(csv_path, index=False)

        # Save JSON summary
        summary = {
            "attempt": round_idx + 1,
            "algorithm": tracker_name,
            "input_path": FRAMES_DIR,
            "inference_time": infer_times,
            "tracking_time": tracking_times,
            "path_to_detections": str(csv_path),
        }
        with open(json_path, "w", encoding="utf-8") as f:
            json.dump(summary, f, indent=2)

        all_results.append(summary)
        logger.info(
            f"{tracker_name} (run {round_idx + 1}) "
            f"- frames: {len(infer_times)} "
            f"- avg inf: {np.mean(infer_times):.4f}s "
            f"- avg track: {np.mean(tracking_times):.4f}s "
            f"- csv: {csv_path}"
        )

cleanup()
logger.info("\nâœ“ All benchmarks complete")

2026-01-17 15:47:12,318 | INFO | Benchmark | 
2026-01-17 15:47:12,318 | INFO | Benchmark | OcSort | Attempt 1 of 1
OcSort (run 1):   0%|          | 4/1501 [00:00<01:47, 13.93it/s]


TypeError: only 0-dimensional arrays can be converted to Python scalars

In [None]:
def write_textfile(df, file_name, directory=RUN_RESULTS_DIR):
    try:
        with open(os.path.join(directory, file_name), 'w') as f:
            for row in df.iterrows():
                values = row[1].tolist()
                frame = int(values[0])
                obj_id = int(values[1])
                bb_left = float(values[2])
                bb_top = float(values[3])
                bb_width = float(values[4])
                bb_height = float(values[5])
                conf = int(values[6])
                x = int(values[7])
                y = int(values[8])
                z = int(values[9]) 
                
                line = f"{frame},{obj_id},{bb_left},{bb_top},{bb_width},{bb_height},{conf},{x},{y},{z}\n"
                f.write(line)

        return True
    except Exception as e:
        print(f'Write error: {file_name} because {e}')
        return False

In [None]:
for file in os.listdir(RUN_RESULTS_DIR):
    if file.split('.')[-1] == 'csv':
        df = pd.read_csv(os.path.join(RUN_RESULTS_DIR, file))

    if not os.path.exists(os.path.join(RUN_RESULTS_DIR, file.replace('.csv', '.txt'))):
        write_textfile(df, file.replace('.csv', '.txt'), directory=RUN_RESULTS_DIR)
    else:
        print(f"{file} already convert to txt format")

BoostTrack_round1.csv already convert to txt format
BoostTrack_round1.txt already convert to txt format
