## YOLOv5 Multi-Object Tracking with Hungarian Algorithm ##

1) Module Imports and Dependencies


In [3]:
import cv2
import numpy as np
import copy
import time
from scipy.optimize import linear_sum_assignment
from filterpy.kalman import KalmanFilter
from pathlib import Path
from ultralytics import YOLO
import os
import motmetrics as mm
import configparser
import random

2) Configuration and Class Definitions

In [4]:
# Configuration
MODEL_PATH = "yolov5su.pt"
# Expanded list of classes to track (COCO dataset class IDs)
CLASSES_TO_TRACK = [0, 1, 2, 3, 5, 7,9,10]  # person, bicycle, car, motorcycle, bus, truck
# Class names for visualization (corresponding to COCO dataset)
CLASS_NAMES = {
    0: "person", 1: "bicycle", 2: "car", 3: "motorcycle", 4: "airplane",
    5: "bus", 6: "train", 7: "truck", 8: "boat", 9: "traffic light",
    10: "fire hydrant", 11: "stop sign", 12: "parking meter", 13: "bench",
    14: "bird", 15: "cat", 16: "dog", 17: "horse", 18: "sheep", 19: "cow",
    20: "elephant", 21: "bear", 22: "zebra", 23: "giraffe", 24: "backpack",
    25: "umbrella", 26: "handbag", 27: "tie", 28: "suitcase", 29: "frisbee",
    30: "skis", 31: "snowboard", 32: "sports ball", 33: "kite", 34: "baseball bat",
    35: "baseball glove", 36: "skateboard", 37: "surfboard", 38: "tennis racket",
    39: "bottle", 40: "wine glass", 41: "cup", 42: "fork", 43: "knife", 44: "spoon",
    45: "bowl", 46: "banana", 47: "apple", 48: "sandwich", 49: "orange",
    50: "broccoli", 51: "carrot", 52: "hot dog", 53: "pizza", 54: "donut",
    55: "cake", 56: "chair", 57: "couch", 58: "potted plant", 59: "bed",
    60: "dining table", 61: "toilet", 62: "TV", 63: "laptop", 64: "mouse",
    65: "remote", 66: "keyboard", 67: "cell phone", 68: "microwave", 69: "oven",
    70: "toaster", 71: "sink", 72: "refrigerator", 73: "book", 74: "clock",
    75: "vase", 76: "scissors", 77: "teddy bear", 78: "hair drier", 79: "toothbrush"
}

3) Tracking Parameters and Thresholds


In [5]:
# Tracking configuration
MIN_CONFIDENCE = 0.5
NMS_THRESHOLD = 0.45  # Aggressive NMS to reduce overlaps
MAX_MISSED_FRAMES = 5  # How long to keep lost tracks
IOU_THRESHOLD = 0.3
LINEAR_THRESHOLD = 0.1
EXP_THRESHOLD = 0.1
MIN_HIT_STREAK = 3
MAX_UNMATCHED_AGE = 5
OUTPUT_DIR = "Model3_Detections"
FPS = 30

# Generate random colors for visualization
TRACK_COLORS = {i: (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) for i in range(1000)}


4) Tracking Utility Functions
## Core utility functions for track processing:
- ID to color mapping for consistent visualization
- Bounding box format conversion and manipulation
- IoU (Intersection over Union) calculation
- Linear and exponential cost functions for track association
- Hungarian algorithm cost matrix generation
These functions form the foundation of the tracking logic


In [6]:
# Utility functions from the first code
def id_to_color(idx):
    blue = idx * 5 % 256
    green = idx * 36 % 256
    red = idx * 23 % 256
    return (red, green, blue)

def convert_data(box):
    x1, y1, x2, y2 = int(box[0]), int(box[1]), int(box[0] + box[2]), int(box[1] + box[3])
    return x1, y1, x2, y2

def box_iou(box1, box2):
    """Calculate IoU for boxes in [x,y,w,h] format"""
    box1_corners = convert_data(box1)
    box2_corners = convert_data(box2)
    xA = max(box1_corners[0], box2_corners[0])
    yA = max(box1_corners[1], box2_corners[1])
    xB = min(box1_corners[2], box2_corners[2])
    yB = min(box1_corners[3], box2_corners[3])
    inter_area = max(0, xB - xA + 1) * max(0, yB - yA + 1)
    box1_area = (box1_corners[2] - box1_corners[0] + 1) * (box1_corners[3] - box1_corners[1] + 1)
    box2_area = (box2_corners[2] - box2_corners[0] + 1) * (box2_corners[3] - box2_corners[1] + 1)
    union_area = (box1_area + box2_area) - inter_area
    iou = inter_area / float(union_area + 1e-6)
    return iou

def c_lin(XA, YA, WA, HA, XB, YB, WB, HB):
    w, h = 1920, 1080  # MOT17 typical resolution
    Q_dist = np.linalg.norm(np.array([w, h]))
    Q_shp = w * h
    d1 = np.linalg.norm(np.array([XA - XB, YA - YB])) + 1e-6
    d2 = np.linalg.norm(np.array([HA - HB, WA - WB])) + 1e-6
    return (Q_dist / d1) * (Q_shp / d2)

def c_exp(XA, YA, WA, HA, XB, YB, WB, HB):
    w1, w2 = 0.5, 1.5
    p1 = ((XA - XB) / WA) ** 2 + ((YA - YB) / HA) ** 2
    p2 = abs(HA - HB) / (HA + HB + 1e-6) + abs(WA - WB) / (WA + WB + 1e-6)
    return np.exp(-w1 * p1) * np.exp(-w2 * p2)

def hungarian_cost(old_boxes, new_boxes, class_ids_old=None, class_ids_new=None):
    """Calculate cost matrix with class ID matching if provided"""
    cost_matrix = []
    for i, box1 in enumerate(old_boxes):
        row = []
        for j, box2 in enumerate(new_boxes):
            # If class IDs are provided, only match boxes of the same class
            if class_ids_old is not None and class_ids_new is not None:
                if class_ids_old[i] != class_ids_new[j]:
                    row.append(0)  # Different classes have zero similarity
                    continue
            
            iou_cost = box_iou(box1, box2)
            XA, YA = box1[0] + box1[2] * 0.5, box1[1] + box1[3] * 0.5
            WA, HA = box1[2], box1[3]
            XB, YB = box2[0] + box2[2] * 0.5, box2[1] + box2[3] * 0.5
            WB, HB = box2[2], box2[3]
            lin_cost = c_lin(XA, YA, WA, HA, XB, YB, WB, HB)
            exp_cost = c_exp(XA, YA, WA, HA, XB, YB, WB, HB)
            if iou_cost >= IOU_THRESHOLD and lin_cost >= LINEAR_THRESHOLD and exp_cost >= EXP_THRESHOLD:
                row.append(iou_cost)
            else:
                row.append(0)
        cost_matrix.append(row)
    return cost_matrix

5) Track Association Implementation
## Advanced tracking association logic:
- Implementation of Hungarian algorithm for optimal assignment
- Handles track-to-detection matching
- Manages unmatched detections and tracks
- Incorporates multiple cost metrics (IoU, linear, exponential)
Critical for maintaining consistent object identities across frames

In [7]:
def associate(old_boxes, new_boxes, class_ids_old=None, class_ids_new=None):
    """Associate detections with existing tracks"""
    if not old_boxes or not new_boxes:
        return [], list(range(len(old_boxes))), new_boxes
    
    iou_matrix = np.array(hungarian_cost(old_boxes, new_boxes, class_ids_old, class_ids_new))
    
    # Use Hungarian algorithm if there are boxes to match
    if iou_matrix.size > 0:
        hungarian_row, hungarian_col = linear_sum_assignment(-iou_matrix)
        matches, unmatched_trackers, unmatched_detections = [], [], []
        
        for i, (x, y) in enumerate(zip(hungarian_row, hungarian_col)):
            if iou_matrix[x][y] < IOU_THRESHOLD:
                unmatched_trackers.append(x)
                unmatched_detections.append(y)
            else:
                matches.append([x, y])
        
        # Add unmatched trackers
        for t in range(len(old_boxes)):
            if t not in hungarian_row:
                unmatched_trackers.append(t)
                
        # Add unmatched detections
        for d in range(len(new_boxes)):
            if d not in hungarian_col:
                unmatched_detections.append(d)
    else:
        matches = []
        unmatched_trackers = list(range(len(old_boxes)))
        unmatched_detections = list(range(len(new_boxes)))
    
    return matches, unmatched_trackers, [new_boxes[i] for i in unmatched_detections]

def get_box_from_state(state):
    return [state[0], state[2], state[4], state[6]]

def return_F_with_dt(dt):
    F = np.eye(8)
    F[0, 1] = dt
    F[2, 3] = dt
    F[4, 5] = dt
    F[6, 7] = dt
    return F

6) Kalman Filter Track Class
## Sophisticated track management class (Obstacle):
- Implements Kalman filtering for motion prediction
- Maintains track state (position, velocity)
- Handles track age and matching status
- Updates track properties based on new detections
- Provides smooth trajectory estimation
Essential for robust tracking and motion prediction


In [8]:
# Obstacle (Track) class
class Obstacle:
    def __init__(self, idx, box, class_id, time, age=1, unmatched_age=0):
        # Ensure box is a list or numpy array with 4 elements
        if isinstance(box, (list, np.ndarray)):
            # If box is already a list or numpy array, ensure it has 4 elements
            if len(box) == 4:
                self.box = list(box)  # Convert to list to ensure mutability
            else:
                raise ValueError(f"Invalid box format. Expected 4 elements, got {len(box)}")
        elif isinstance(box, (int, float)):
            # If a single number is passed, convert it to a list with zeros
            self.box = [box, 0, 0, 0]
        else:
            raise TypeError(f"Unexpected box type: {type(box)}")
        
        self.idx = idx
        self.class_id = class_id
        self.time = time
        self.age = age
        self.unmatched_age = unmatched_age
        
        # Initialize Kalman Filter
        self.kf = KalmanFilter(dim_x=8, dim_z=4)
        
        # Use the first 4 elements of box for initialization
        self.kf.x = np.array([
            self.box[0], 0,  # x position and velocity
            self.box[1], 0,  # y position and velocity
            self.box[2], 0,  # width position and velocity
            self.box[3], 0   # height position and velocity
        ])
        
        self.kf.P *= 1000
        self.kf.Q[4:, 4:] *= 0.01
        self.kf.H = np.array([[1, 0, 0, 0, 0, 0, 0, 0],
                              [0, 0, 1, 0, 0, 0, 0, 0],
                              [0, 0, 0, 0, 1, 0, 0, 0],
                              [0, 0, 0, 0, 0, 0, 1, 0]])
        self.kf.R[2:, 2:] *= 10


7) MOT Evaluation Framework 
## Comprehensive evaluation system:
- Parses MOT sequence information and ground truth
- Implements standard MOT metrics (MOTA, MOTP, IDF1)
- Handles different input formats (video/MOT sequence)
- Generates detailed performance reports
Crucial for quantitative assessment of tracking performance

In [9]:
def parse_seqinfo(sequence_path):
    """Parse seqinfo.ini to get sequence metadata"""
    seqinfo_path = os.path.join(sequence_path, 'seqinfo.ini')
    if not os.path.exists(seqinfo_path):
        return {'fps': 30, 'width': 1920, 'height': 1080, 'seq_length': 1000}  # Default values
    
    config = configparser.ConfigParser()
    config.read(seqinfo_path)
    
    seq_info = {
        'fps': int(config['Sequence']['frameRate']),
        'width': int(config['Sequence']['imWidth']),
        'height': int(config['Sequence']['imHeight']),
        'seq_length': int(config['Sequence']['seqLength'])
    }
    return seq_info

def load_ground_truth(gt_path):
    """Load ground truth from gt.txt in MOT format"""
    if not os.path.exists(gt_path):
        raise FileNotFoundError(f"Ground truth file not found at {gt_path}")
    
    gt_data = {}
    with open(gt_path, 'r') as f:
        for line in f:
            # MOT format: frame, id, bb_left, bb_top, bb_width, bb_height, conf, class, visibility
            parts = line.strip().split(',')
            frame = int(parts[0])
            obj_id = int(parts[1])
            x = float(parts[2])
            y = float(parts[3])
            w = float(parts[4])
            h = float(parts[5])
            conf = float(parts[6])
            class_id = int(parts[7])
            visibility = float(parts[8])
            
            # Map MOT class IDs to COCO class IDs (simplified mapping)
            coco_class_id = None
            if class_id == 1:  # MOT pedestrian -> COCO person
                coco_class_id = 0
            elif class_id == 2:  # MOT vehicle (car, bus, truck) -> COCO car (simplified)
                coco_class_id = 2
            elif class_id == 3:  # MOT bicycle -> COCO bicycle
                coco_class_id = 1
            
            # Only consider relevant classes with high visibility
            if coco_class_id in CLASSES_TO_TRACK and visibility > 0.5:
                if frame not in gt_data:
                    gt_data[frame] = []
                gt_data[frame].append({
                    'id': obj_id,
                    'box': [x, y, w, h],
                    'class_id': coco_class_id,
                    'conf': conf
                })
    return gt_data

In [10]:
def evaluate_mot(tracks_by_frame, gt_data):
    """Evaluate tracking performance using MOT metrics"""
    acc = mm.MOTAccumulator(auto_id=True)
    
    for frame_id in sorted(gt_data.keys()):
        # Ground truth for this frame
        gt_entries = gt_data[frame_id]
        gt_boxes = [entry['box'] for entry in gt_entries]
        gt_ids = [entry['id'] for entry in gt_entries]
        
        # Hypothesis (tracked) for this frame
        if frame_id in tracks_by_frame:
            hyp_entries = tracks_by_frame[frame_id]
            hyp_boxes = [track['box'] for track in hyp_entries]
            hyp_ids = [track['id'] for track in hyp_entries]
        else:
            hyp_boxes = []
            hyp_ids = []
        
        # Compute distances (IoU-based)
        distances = np.full((len(gt_boxes), len(hyp_boxes)), np.inf)
        for i, gt_box in enumerate(gt_boxes):
            for j, hyp_box in enumerate(hyp_boxes):
                iou = box_iou(gt_box, hyp_box)
                if iou > 0:
                    distances[i, j] = 1 - iou
        
        # Update accumulator
        acc.update(gt_ids, hyp_ids, distances)
    
    # Compute metrics
    mh = mm.metrics.create()
    summary = mh.compute(acc, metrics=['mota', 'motp', 'idf1', 'num_false_positives', 'num_misses', 'num_switches'], name='metrics')
    return summary.to_dict()


In [11]:
def process_input(input_path, output_path, is_mot_sequence=False, gt_path=None):
    """Process input video or MOT sequence"""
    model = YOLO(MODEL_PATH)
    stored_obstacles = []
    next_id = 1
    tracks_by_frame = {}  # Store tracks for MOT evaluation
    
    if is_mot_sequence:
        # MOT sequence: read images from 'img1' directory
        img_dir = os.path.join(input_path, 'img1')
        if not os.path.exists(img_dir):
            raise ValueError(f"Image directory not found: {img_dir}")
        
        # Parse seqinfo.ini for metadata
        seq_info = parse_seqinfo(input_path)
        fps = seq_info['fps']
        width = seq_info['width']
        height = seq_info['height']
        
        # Get sorted list of image files
        img_files = sorted([f for f in os.listdir(img_dir) if f.endswith(('.jpg', '.png'))])
        if not img_files:
            raise ValueError(f"No images found in {img_dir}")
    else:
        # Video input
        cap = cv2.VideoCapture(input_path)
        if not cap.isOpened():
            raise ValueError(f"Could not open video file: {input_path}")
        
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # Video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    # Frame counter
    frame_idx = 0
    
    if is_mot_sequence:
        for img_file in img_files:
            frame_idx += 1
            frame = cv2.imread(os.path.join(img_dir, img_file))
            if frame is None:
                continue
            
            # Process frame
            processed_frame, tracks = process_frame(frame, model, stored_obstacles, next_id, frame_idx)
            
            # Store tracks for evaluation
            tracks_by_frame[frame_idx] = [
                {'id': track.idx, 'box': track.box, 'class_id': track.class_id}
                for track in tracks
            ]
            
            # Update next_id
            if tracks:
                next_id = max([track.idx for track in tracks]) + 1
            
            # Update stored obstacles
            stored_obstacles = copy.deepcopy(tracks)
            
            # Write frame
            out.write(processed_frame)
    else:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            
            frame_idx += 1
            
            # Process frame
            processed_frame, tracks = process_frame(frame, model, stored_obstacles, next_id, frame_idx)
            
            # Update next_id
            if tracks:
                next_id = max([track.idx for track in tracks]) + 1
            
            # Update stored obstacles
            stored_obstacles = copy.deepcopy(tracks)
            
            # Write frame
            out.write(processed_frame)
        
        cap.release()
    
    out.release()
    
    # Evaluate MOT metrics if ground truth is provided (for MOT sequences)
    metrics = None
    if is_mot_sequence and gt_path:
        gt_data = load_ground_truth(gt_path)
        metrics = evaluate_mot(tracks_by_frame, gt_data)
    
    return metrics

In [12]:
def process_input(input_path, output_path, is_mot_sequence=False, gt_path=None):
    """Process input video or MOT sequence"""
    model = YOLO(MODEL_PATH)
    stored_obstacles = []
    next_id = 1
    tracks_by_frame = {}  # Store tracks for MOT evaluation
    
    if is_mot_sequence:
        # MOT sequence: read images from 'img1' directory
        img_dir = os.path.join(input_path, 'img1')
        if not os.path.exists(img_dir):
            raise ValueError(f"Image directory not found: {img_dir}")
        
        # Parse seqinfo.ini for metadata
        seq_info = parse_seqinfo(input_path)
        fps = seq_info['fps']
        width = seq_info['width']
        height = seq_info['height']
        
        # Get sorted list of image files
        img_files = sorted([f for f in os.listdir(img_dir) if f.endswith(('.jpg', '.png'))])
        if not img_files:
            raise ValueError(f"No images found in {img_dir}")
    else:
        # Video input
        cap = cv2.VideoCapture(input_path)
        if not cap.isOpened():
            raise ValueError(f"Could not open video file: {input_path}")
        
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    # Video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    # Frame counter
    frame_idx = 0
    
    if is_mot_sequence:
        for img_file in img_files:
            frame_idx += 1
            frame = cv2.imread(os.path.join(img_dir, img_file))
            if frame is None:
                continue
            
            # Process frame
            processed_frame, tracks = process_frame(frame, model, stored_obstacles, next_id, frame_idx)
            
            # Store tracks for evaluation
            tracks_by_frame[frame_idx] = [
                {'id': track.idx, 'box': track.box, 'class_id': track.class_id}
                for track in tracks
            ]
            
            # Update next_id
            if tracks:
                next_id = max([track.idx for track in tracks]) + 1
            
            # Update stored obstacles
            stored_obstacles = copy.deepcopy(tracks)
            
            # Write frame
            out.write(processed_frame)
    else:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            
            frame_idx += 1
            
            # Process frame
            processed_frame, tracks = process_frame(frame, model, stored_obstacles, next_id, frame_idx)
            
            # Update next_id
            if tracks:
                next_id = max([track.idx for track in tracks]) + 1
            
            # Update stored obstacles
            stored_obstacles = copy.deepcopy(tracks)
            
            # Write frame
            out.write(processed_frame)
        
        cap.release()
    
    out.release()
    
    # Evaluate MOT metrics if ground truth is provided (for MOT sequences)
    metrics = None
    if is_mot_sequence and gt_path:
        gt_data = load_ground_truth(gt_path)
        metrics = evaluate_mot(tracks_by_frame, gt_data)
    
    return metrics

def process_frame(frame, model, stored_obstacles, next_id, frame_idx):
    """Process a single frame and update tracks"""
    # Create a deep copy of the input image to avoid modifying the original
    image = copy.deepcopy(frame)
    
    # Get current time
    current_time = time.time()
    
    # Perform object detection
    results = model(image, classes=CLASSES_TO_TRACK, conf=MIN_CONFIDENCE)
    
    # Prepare bounding boxes
    bounding_boxes = []
    class_ids = []
    for result in results:
        boxes = result.boxes.xywh.cpu().numpy()
        classes = result.boxes.cls.cpu().numpy()
        for box, cls in zip(boxes, classes):
            x, y, w, h = box
            # Convert to [x, y, w, h] format with top-left corner
            bounding_boxes.append([x - w/2, y - h/2, w, h])
            class_ids.append(int(cls))
    
    # First frame initialization
    if not stored_obstacles:
        new_obstacles = []
        for i, (box, class_id) in enumerate(zip(bounding_boxes, class_ids)):
            obj = Obstacle(next_id + i, box, class_id, current_time)
            new_obstacles.append(obj)
        
        selected_obstacles = new_obstacles
    else:
        # Get previous frame's boxes and class IDs
        previous_boxes = [obj.box for obj in stored_obstacles]
        previous_class_ids = [obj.class_id for obj in stored_obstacles]
        
        # Associate new detections with existing tracks
        matches, unmatched_trackers, unmatched_detections = associate(
            previous_boxes, bounding_boxes, previous_class_ids, class_ids
        )
        
        # Prepare lists for new obstacles and selected obstacles
        new_obstacles, selected_obstacles = [], []

        # Process matched detections
        for match in matches:
            track_idx, det_idx = match
            obj = stored_obstacles[track_idx]
            obj.age += 1
            obj.unmatched_age = 0
            
            # Update Kalman filter with new measurement
            measurement = np.array(bounding_boxes[det_idx])
            obj.kf.update(measurement)
            
            # Predict next state
            dt = current_time - obj.time
            obj.kf.F = return_F_with_dt(dt)
            obj.kf.predict()
            
            # Update object properties
            obj.time = current_time
            obj.box = get_box_from_state(obj.kf.x)
            obj.class_id = class_ids[det_idx]  # Update class ID
            
            new_obstacles.append(obj)
            
            # Track objects that have been consistently detected
            if obj.age >= MIN_HIT_STREAK:
                selected_obstacles.append(obj)

        # Process unmatched trackers
        for index in unmatched_trackers:
            obj = stored_obstacles[index]
            obj.unmatched_age += 1
            
            # Predict next state
            dt = current_time - obj.time
            obj.kf.F = return_F_with_dt(dt)
            obj.kf.predict()
            
            # Update object properties
            obj.time = current_time
            obj.box = get_box_from_state(obj.kf.x)
            
            # Keep track of objects within unmatched age limit
            if obj.unmatched_age < MAX_UNMATCHED_AGE:
                selected_obstacles.append(obj)
                new_obstacles.append(obj)

        # Process new unmatched detections
        current_next_id = next_id
        if len(new_obstacles) > 0:
            current_next_id = max([obj.idx for obj in new_obstacles]) + 1
        
        for i, box_idx in enumerate(range(len(unmatched_detections))):
            box = unmatched_detections[box_idx]
            # Find original index to get class_id
            orig_idx = bounding_boxes.index(box)
            class_id = class_ids[orig_idx]
            obj = Obstacle(current_next_id + i, box, class_id, current_time)
            new_obstacles.append(obj)
    
    # Visualize selected obstacles
    for obj in selected_obstacles:
        left, top, right, bottom = convert_data(obj.box)
        color = id_to_color(obj.idx)
        label = f"{CLASS_NAMES.get(obj.class_id, 'unknown')} ID:{obj.idx}"
        cv2.rectangle(image, (left, top), (right, bottom), color, 2)
        cv2.putText(image, label, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
    
    return image, new_obstacles


7)  Main Processing Pipeline
## Complete tracking pipeline implementation:
- Supports both video and MOT sequence inputs
- Handles frame-by-frame processing
- Integrates detection, association, and tracking
- Implements visualization
- Provides user interface for input selection
- Generates output videos with tracking results
Forms the complete end-to-end tracking system

In [13]:
def main():
    """Main function for tracking with multi-input support"""
    print("\n" + "="*50)
    print("Multi-Object Tracking Pipeline")
    print("="*50)
    
    # Verify model path
    if not os.path.exists(MODEL_PATH):
        print(f"Warning: Model not found at {MODEL_PATH}. Will attempt to download.")
    
    # Select input type
    print("\nSelect input type:")
    print("1. MOT17 Sequence")
    print("2. Video file")
    
    while True:
        choice = input("\nEnter choice (1 or 2): ").strip()
        if choice in ['1', '2']:
            break
        print("Invalid choice. Please enter 1 or 2.")
    
    # Handle input based on choice
    if choice == '1':
        # MOT17 Sequence
        sequence_path = input("\nEnter MOT17 sequence path (or press Enter for default): ").strip()
        if not sequence_path:
             sequence_path = "/scratch/b22ai025/MOT17/train/MOT17-13-SDP"
        
        if not os.path.exists(sequence_path):
            raise ValueError(f"Invalid sequence path: {sequence_path}")
        
        input_path = sequence_path
        output_dir = os.path.join(OUTPUT_DIR, "sequence_output")
        output_path = os.path.join(output_dir, "tracked.mp4")
        gt_path = os.path.join(sequence_path, "gt", "gt.txt")
        is_mot_sequence = True
    else:
        # Video file
        video_path = input("\nEnter video file path (.mp4/.avi): ").strip()
        if not video_path.endswith(('.mp4', '.avi', '.mov')):
            raise ValueError("Invalid video format. Supported formats: .mp4, .avi, .mov")
        if not os.path.exists(video_path):
            raise FileNotFoundError(f"Video file not found: {video_path}")
        
        input_path = video_path
        output_dir = os.path.join(OUTPUT_DIR, "video_output")
        output_path = os.path.join(output_dir, "tracked.mp4")
        gt_path = None  # No ground truth for video input
        is_mot_sequence = False
    
    # Create output directory
    os.makedirs(output_dir, exist_ok=True)
    print(f"\nOutput will be saved to: {output_path}")
    
    # Run tracking pipeline
    print("\nStarting tracking pipeline...")
    try:
        metrics = process_input(input_path, output_path, is_mot_sequence, gt_path)
        print("\n" + "="*50)
        print("Tracking Pipeline Summary")
        print("="*50)
        print(f"Input type: {'MOT17 Sequence' if is_mot_sequence else 'Video'}")
        print(f"Output saved to: {output_path}")
        
        if metrics:
            print("\nTracking Metrics:")
            print("-"*20)
            for metric, value in metrics.items():
                if isinstance(value, dict):  # If value is a dictionary, print it properly
                    print(f"{metric}: {value}")
                else:
                    print(f"{metric}: {value:.3f}")
        else:
            print("\nNote: Evaluation skipped (no ground truth available)")
        
        print("="*50)
    except Exception as e:
        print(f"\nError during tracking: {str(e)}")
        raise

if __name__ == "__main__":
    main()


Multi-Object Tracking Pipeline

Select input type:
1. MOT17 Sequence
2. Video file

Output will be saved to: Model3_Detections/sequence_output/tracked.mp4

Starting tracking pipeline...

0: 384x640 7 persons, 3 cars, 85.7ms
Speed: 2.7ms preprocess, 85.7ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 8 persons, 3 cars, 60.8ms
Speed: 2.9ms preprocess, 60.8ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 persons, 3 cars, 62.8ms
Speed: 2.3ms preprocess, 62.8ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 persons, 3 cars, 61.3ms
Speed: 2.3ms preprocess, 61.3ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 persons, 3 cars, 55.7ms
Speed: 2.3ms preprocess, 55.7ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 persons, 2 cars, 61.4ms
Speed: 2.2ms preprocess, 61.4ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: