In [None]:
## yolov12 fine-tuning with MOT17 + Compition dataset

In [None]:
print("Starting notebook")

In [None]:
# Install required packages
!pip install -q ultralytics
!pip install -q atomicwrites

In [None]:
# Change working directory
%cd /kaggle/working

# Clone the ByteTrack repository and install it
!git clone https://github.com/ifzhang/ByteTrack.git
%cd /kaggle/working/ByteTrack
!sed -i 's/onnx==1.8.1/onnx==1.9.0/g' requirements.txt

!pip install -q -r requirements.txt
!python setup.py -q develop
!pip install -q cython_bbox onemetric loguru lap thop
# Clean up notebook output
from IPython.display import clear_output
clear_output()

In [None]:
# List the ByteTrack directory to check installation
!ls /kaggle/working/ByteTrack

In [None]:
# 2. Apply numpy fix PERMANENTLY
!sed -i "s/\.astype(float32)/.astype(np.float32)/g" /kaggle/working/ByteTrack/yolox/utils/visualize.py

In [None]:
!find /kaggle/working/ByteTrack -type f -name "*.py" -exec sed -i -E "s/([^n])p\.(float32|float64|int32)/\1np.\2/g" {} \;

In [None]:
!grep "astype(np.float32)" /kaggle/working/ByteTrack/yolox/utils/visualize.py

In [None]:
import sys
import numpy as np  
# Patch for deprecated np.float in numpy 1.20+
if not hasattr(np, 'float'):
    np.float = float

sys.path.insert(0, '/kaggle/working/ByteTrack')
if 'yolox' in sys.modules:
    del sys.modules['yolox']
from yolox.tracker.byte_tracker import BYTETracker, STrack
print("✅ Modules imported successfully!")

In [None]:
import yolox
print("yolox.__version__:", yolox.__version__)

In [None]:
import os, sys, cv2, pandas as pd, torch, locale, shutil, glob, json
from tqdm import tqdm
from collections import defaultdict
from ultralytics import YOLO
from dataclasses import dataclass
from atomicwrites import atomic_write
from pathlib import Path
locale.getpreferredencoding = lambda: "UTF-8"
from onemetric.cv.utils.iou import box_iou_batch
from typing import List, Dict, Tuple, Optional, Union
#TPU
# import torch_xla
# import torch_xla.core.xla_model as xm


In [None]:
# ================================
# Define Unified Dataset Directories
# ================================
OUTPUT_DIR = "/kaggle/working/mot_output"
YOLO_TRAIN_DATA_DIR = f"{OUTPUT_DIR}/yolo_data"
SUBMISSION_FILE = f"{OUTPUT_DIR}/submission.csv"

os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(YOLO_TRAIN_DATA_DIR, exist_ok=True)
os.makedirs(f"{YOLO_TRAIN_DATA_DIR}/images/train", exist_ok=True)
os.makedirs(f"{YOLO_TRAIN_DATA_DIR}/images/val", exist_ok=True)
os.makedirs(f"{YOLO_TRAIN_DATA_DIR}/labels/train", exist_ok=True)
os.makedirs(f"{YOLO_TRAIN_DATA_DIR}/labels/val", exist_ok=True)

In [None]:
# Define Dataset Roots and Sequences
# ================================
# Competition dataset
COMP_DATASET_ROOT = "/kaggle/input/surveillance-for-retail-stores/tracking"
# MOT17 dataset root
MOT17_ROOT = "/kaggle/input/mot-17/MOT17/train"

# For the competition dataset, we use all available sequences for training.
COMP_SEQUENCES = ["02", "03", "05"]

# MOT17 split: use some sequences for training and others for validation.
MOT17_TRAIN_SEQUENCES = ["MOT17-02-SDP", "MOT17-04-SDP", "MOT17-05-SDP", "MOT17-09-SDP","MOT17-10-SDP"]
MOT17_VAL_SEQUENCES   = ["MOT17-11-SDP", "MOT17-13-SDP"]

In [None]:
# Cell 5: Dataset Preparation Functions (Full Updated Version)

def safe_bbox_conversion(x: float, y: float, w: float, h: float, 
                        img_w: int, img_h: int) -> Tuple[float, float, float, float]:
    """Convert MOT bbox to YOLO format with safety checks"""
    x_center = max(0.0, min(1.0, (x + w/2) / img_w))
    y_center = max(0.0, min(1.0, (y + h/2) / img_h))
    w_norm = max(0.001, min(1.0, w / img_w))  # Prevent zero-width
    h_norm = max(0.001, min(1.0, h / img_h))  # Prevent zero-height
    return (round(x_center, 6), round(y_center, 6), 
            round(w_norm, 6), round(h_norm, 6))

def validate_mot_annotation(ann: dict, gt_file: Path, frame_id: int) -> bool:
    """Validate MOT annotation format"""
    valid = True
    if ann['x'] < 0 or ann['y'] < 0:
        print(f"⚠️ Invalid X/Y in {gt_file} frame {frame_id}: {ann}")
        valid = False
    if ann['width'] <= 0 or ann['height'] <= 0:
        print(f"⚠️ Invalid W/H in {gt_file} frame {frame_id}: {ann}")
        valid = False
    if ann['conf'] < 0 or ann['conf'] > 1:
        print(f"⚠️ Invalid confidence in {gt_file} frame {frame_id}: {ann}")
        valid = False
    return valid

def read_mot_gt(gt_file: Path, dataset_type="competition") -> list:
    """
    Read and validate MOT ground truth file.
    For competition: only include annotations with class_id == 1 and visibility > 0.1.
    For MOT17: include annotations with track_id > 0 and visibility > 0.1 (remap to single pedestrian class).
    """
    annotations = []
    try:
        with gt_file.open('r') as f:
            for line_num, line in enumerate(f, 1):
                parts = line.strip().split(',')
                if len(parts) < 9:
                    continue
                try:
                    frame_id = int(float(parts[0]))
                    track_id = int(float(parts[1]))
                    x = float(parts[2])
                    y = float(parts[3])
                    width = float(parts[4])
                    height = float(parts[5])
                    conf = float(parts[6])
                    class_id = int(float(parts[7]))
                    visibility = float(parts[8])
                    if dataset_type == "competition":
                        if class_id == 1 and visibility >= 0.06:
                            ann = {
                                'frame_id': frame_id,
                                'track_id': track_id,
                                'x': x,
                                'y': y,
                                'width': width,
                                'height': height,
                                'conf': conf
                            }
                            annotations.append(ann)
                    elif dataset_type == "mot17":
                        # For MOT17, consider valid if track_id > 0 and visibility > 0.1
                        if track_id > 0 and visibility > 0.1:
                            ann = {
                                'frame_id': frame_id,
                                'track_id': track_id,
                                'x': x,
                                'y': y,
                                'width': width,
                                'height': height,
                                'conf': conf
                            }
                            annotations.append(ann)
                except Exception as e:
                    print(f"🚨 Error in {gt_file} line {line_num}: {str(e)}")
    except Exception as e:
        print(f"🔥 Failed to process {gt_file}: {str(e)}")
    return annotations


def prepare_competition_dataset():
    print("🚀 Preparing Competition Dataset...")
    comp_base = Path(COMP_DATASET_ROOT) / "train"
    yolo_path = Path(YOLO_TRAIN_DATA_DIR)
    frames_to_annotations = defaultdict(list)
    
    for seq in COMP_SEQUENCES:
        seq_path = comp_base / seq
        gt_file = seq_path / "gt" / "gt.txt"
        if not gt_file.exists():
            raise FileNotFoundError(f"🚨 Missing GT file: {gt_file}")
        annotations = read_mot_gt(gt_file, dataset_type="competition")
        for ann in annotations:
            key = (seq, ann['frame_id'])
            frames_to_annotations[key].append(ann)
    
    # Process all competition frames into the training folder
    for (seq, frame_id), anns in tqdm(frames_to_annotations.items(), desc="Processing Competition Frames"):
        img_path = comp_base / seq / "img1" / f"{frame_id:06d}.jpg"
        if not img_path.exists():
            print(f"Missing image: {img_path}")
            continue
        img = cv2.imread(str(img_path))
        if img is None:
            print(f"Invalid image: {img_path}")
            continue
        height, width = img.shape[:2]
        dest_img = yolo_path / "images/train" / f"comp_{seq}_{frame_id:06d}.jpg"
        dest_label = yolo_path / "labels/train" / f"comp_{seq}_{frame_id:06d}.txt"
        with atomic_write(dest_img, mode='wb', overwrite=True) as f:
            f.write(img_path.read_bytes())
        label_content = []
        for ann in anns:
            try:
                xc, yc, w_norm, h_norm = safe_bbox_conversion(ann['x'], ann['y'], ann['width'], ann['height'], width, height)
                label_content.append(f"0 {xc} {yc} {w_norm} {h_norm}")
            except Exception as e:
                print(f"⚠️ Invalid bbox in comp {seq}-{frame_id}: {str(e)}")
        if label_content:
            with atomic_write(dest_label, mode='w', overwrite=True) as f:
                f.write("\n".join(label_content))
        else:
            os.remove(dest_img)
    print("✅ Competition dataset prepared.")

In [None]:
def prepare_mot17_dataset():
    print("🚀 Preparing MOT17 Dataset...")
    mot17_base = Path(MOT17_ROOT)
    yolo_path = Path(YOLO_TRAIN_DATA_DIR)
    
    # Process MOT17 training sequences
    for seq in MOT17_TRAIN_SEQUENCES:
        seq_path = mot17_base / seq
        gt_file = seq_path / "gt" / "gt.txt"
        if not gt_file.exists():
            print(f"🚨 Missing GT file: {gt_file}")
            continue
        annotations = read_mot_gt(gt_file, dataset_type="mot17")
        frames_to_annotations = defaultdict(list)
        for ann in annotations:
            key = (seq, ann['frame_id'])
            frames_to_annotations[key].append(ann)
        for (seq_id, frame_id), anns in tqdm(frames_to_annotations.items(), desc=f"Processing MOT17 Train {seq}"):
            img_path = seq_path / "img1" / f"{frame_id:06d}.jpg"
            if not img_path.exists():
                print(f"Missing image: {img_path}")
                continue
            img = cv2.imread(str(img_path))
            if img is None:
                print(f"Invalid image: {img_path}")
                continue
            height, width = img.shape[:2]
            dest_img = yolo_path / "images/train" / f"mot17_{seq}_{frame_id:06d}.jpg"
            dest_label = yolo_path / "labels/train" / f"mot17_{seq}_{frame_id:06d}.txt"
            with atomic_write(dest_img, mode='wb', overwrite=True) as f:
                f.write(img_path.read_bytes())
            label_content = []
            for ann in anns:
                try:
                    xc, yc, w_norm, h_norm = safe_bbox_conversion(ann['x'], ann['y'], ann['width'], ann['height'], width, height)
                    label_content.append(f"0 {xc} {yc} {w_norm} {h_norm}")
                except Exception as e:
                    print(f"⚠️ Invalid bbox in MOT17 train {seq}-{frame_id}: {str(e)}")
            if label_content:
                with atomic_write(dest_label, mode='w', overwrite=True) as f:
                    f.write("\n".join(label_content))
            else:
                os.remove(dest_img)
                
    # Process MOT17 validation sequences
    for seq in MOT17_VAL_SEQUENCES:
        seq_path = mot17_base / seq
        gt_file = seq_path / "gt" / "gt.txt"
        if not gt_file.exists():
            print(f"🚨 Missing GT file: {gt_file}")
            continue
        annotations = read_mot_gt(gt_file, dataset_type="mot17")
        frames_to_annotations = defaultdict(list)
        for ann in annotations:
            key = (seq, ann['frame_id'])
            frames_to_annotations[key].append(ann)
        for (seq_id, frame_id), anns in tqdm(frames_to_annotations.items(), desc=f"Processing MOT17 Val {seq}"):
            img_path = seq_path / "img1" / f"{frame_id:06d}.jpg"
            if not img_path.exists():
                print(f"Missing image: {img_path}")
                continue
            img = cv2.imread(str(img_path))
            if img is None:
                print(f"Invalid image: {img_path}")
                continue
            height, width = img.shape[:2]
            dest_img = yolo_path / "images/val" / f"mot17_{seq}_{frame_id:06d}.jpg"
            dest_label = yolo_path / "labels/val" / f"mot17_{seq}_{frame_id:06d}.txt"
            with atomic_write(dest_img, mode='wb', overwrite=True) as f:
                f.write(img_path.read_bytes())
            label_content = []
            for ann in anns:
                try:
                    xc, yc, w_norm, h_norm = safe_bbox_conversion(ann['x'], ann['y'], ann['width'], ann['height'], width, height)
                    label_content.append(f"0 {xc} {yc} {w_norm} {h_norm}")
                except Exception as e:
                    print(f"⚠️ Invalid bbox in MOT17 val {seq}-{frame_id}: {str(e)}")
            if label_content:
                with atomic_write(dest_label, mode='w', overwrite=True) as f:
                    f.write("\n".join(label_content))
            else:
                os.remove(dest_img)
    print("✅ MOT17 dataset prepared.")

In [None]:
def create_dataset_yaml():
    yaml_content = f"""train: {YOLO_TRAIN_DATA_DIR}/images/train
val: {YOLO_TRAIN_DATA_DIR}/images/val
nc: 1
names: ['pedestrian']
"""
    dataset_yaml_path = Path(YOLO_TRAIN_DATA_DIR) / "dataset.yaml"
    with atomic_write(dataset_yaml_path, overwrite=True) as f:
        f.write(yaml_content)
    print("✅ dataset.yaml created.")

In [None]:
prepare_competition_dataset()
prepare_mot17_dataset()
create_dataset_yaml()

In [None]:
torch.cuda.manual_seed(42)
torch.manual_seed(42)

In [None]:
# Cell 7: YOLOv12 Training Function
def train_yolov11():
    """Train YOLOv11 model on the prepared dataset"""
    print("Training YOLOv11 model...")
    
    # Initialize YOLOv12 model (smaller model for faster training)
    model = YOLO('yolov8m.pt')
    
    # Check for CUDA availability
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    #TPU
    # device = xm.xla_device()

    
    # Start training
    results = model.train(
    data=f"{YOLO_TRAIN_DATA_DIR}/dataset.yaml",
    epochs=2,              # Train longer for MOT's complexity
    patience=4,
    imgsz=1184,              # Critical for small pedestrians
    batch=10,                # Max batch your GPU allows
    optimizer='SGD',         # AdamW is suboptimal for detection
    lr0=0.005,               # Higher initial LR
    lrf=0.015,               # Final LR factor
    momentum=0.937,
    weight_decay=0.0005,
    cos_lr=True,             # Learning rate decay
    warmup_epochs=2,         # Stabilize training
    # Augmentations
    augment=True,
    fliplr=0.5,              # Safe for pedestrians
    hsv_h=0.015,             # Minimal hue shift (preserve clothing colors)
    hsv_s=0.4,               # Reduce saturation jitter
    hsv_v=0.2,               # Minimal brightness change (for low-light prep)
    degrees=0.0,             # Avoid rotation (pedestrians are upright)
    translate=0.05,          # Small translation
    scale=0.2,               # Limited scaling
    erasing=0.1,             # Reduced occlusion sim
    mixup=0.0,               # Disable (distorts pedestrian interactions)
    copy_paste=0.0,          # Disable (unrealistic for tracking)
    auto_augment='randaugment',       # Disable RandAugment
    # Tracking-Specific
    overlap_mask=False,      # Critical for MOT
    single_cls=True,         # Focus on "person" class only
    pretrained=True,# Start from COCO weights
    device = device,
    half = True
)
    
    print("Training completed!")
    return model

In [None]:
# Cell 8: Run Training
# Train the model (can be run separately)
trained_model = train_yolov11()

In [None]:
# Cell 4: ByteTrack Helper
# ByteTrack configuration
@dataclass(frozen=True)
class BYTETrackerArgs:
    track_thresh: float = 0.0
    track_buffer: int = 30
    match_thresh: float = 0.7
    aspect_ratio_thresh: float = 3.0
    min_box_area: float = 1.0
    mot20: bool = True

# Helper functions for tracking
def detections2boxes(xyxy, confidence) -> np.ndarray:
    """Convert detections to format that can be used by ByteTrack"""
    return np.hstack((
        xyxy,
        confidence[:, np.newaxis]
    ))

def tracks2boxes(tracks: List[STrack]) -> np.ndarray:
    """Convert STrack objects to bounding boxes"""
    return np.array([
        track.tlbr
        for track
        in tracks
    ], dtype=float)

def match_detections_with_tracks(xyxy, tracks: List[STrack]) -> List[Optional[int]]:
    """Match detections with existing tracks"""
    if len(xyxy) == 0 or len(tracks) == 0:
        return [None] * len(xyxy)

    tracks_boxes = tracks2boxes(tracks=tracks)
    iou = box_iou_batch(tracks_boxes, xyxy)
    track2detection = np.argmax(iou, axis=1)

    tracker_ids = [None] * len(xyxy)

    for tracker_index, detection_index in enumerate(track2detection):
        if iou[tracker_index, detection_index] != 0:
            tracker_ids[detection_index] = tracks[tracker_index].track_id

    return tracker_ids

In [None]:
assert Path("/kaggle/working/ByteTrack/runs/detect/train2/weights/best.pt").exists()


In [None]:
def run_tracking_inference():
    print("Running tracking inference on test sequence...")
    weights_dir = Path("/kaggle/working/ByteTrack/runs/detect/train2/weights")
    try:
        best_model_path = next(weights_dir.glob("best*.pt"))
        model = YOLO(str(best_model_path))
        print(f"✅ Loaded best model from: {best_model_path}")
    except StopIteration:
        raise FileNotFoundError(f"❌ No trained weights found in {weights_dir}")

    # Using the competition dataset test sequence
    test_img_dir = Path(f"{COMP_DATASET_ROOT}/test/01/img1")
    test_seq_info = Path(f"{COMP_DATASET_ROOT}/test/01/seqinfo.ini")
    if not test_img_dir.exists():
        raise FileNotFoundError(f"Test image directory {test_img_dir} not found")

    seq_info = {}
    with test_seq_info.open() as f:
        for line in f:
            if '=' in line:
                key, value = line.strip().split('=')
                seq_info[key.strip()] = value.strip()

    try:
        frame_rate = int(seq_info['frameRate'])
        num_frames = int(seq_info['seqLength'])
    except KeyError as e:
        raise ValueError(f"Missing key in seqinfo.ini: {e}")

    print(f"📊 Test sequence: {num_frames} frames @ {frame_rate}FPS")

    tracker_args = BYTETrackerArgs(
        track_thresh=0.35,
        track_buffer=45,
        match_thresh=0.7
    )
    byte_tracker = BYTETracker(tracker_args)
    submission_data = []
    
    for frame_idx in tqdm(range(1, num_frames + 1), desc="Tracking"):
        img_path = test_img_dir / f"{frame_idx:06d}.jpg"
        frame_data = {
            "ID": frame_idx - 1,
            "Frame": frame_idx,
            "Objects": [],
            "Objective": "tracking"
        }
        if not img_path.exists():
            submission_data.append(frame_data)
            continue

        try:
            results = model.predict(str(img_path), conf=0.3, iou=0.7, verbose=False)[0]
            if len(results.boxes) == 0:
                submission_data.append(frame_data)
                continue

            xyxy = results.boxes.xyxy.cpu().numpy().astype(np.float64)
            img = cv2.imread(str(img_path))
            if img is None:
                raise ValueError(f"Failed to read image {img_path}")
            detections = np.hstack([xyxy, np.ones((len(xyxy), 1))])  # Force confidence = 1
            tracks = byte_tracker.update(
                output_results=detections,
                img_info=img.shape[:2],
                img_size=img.shape[:2]
            )
            track_ids = match_detections_with_tracks(xyxy, tracks)
            objects = []
            for i, (box, track_id) in enumerate(zip(xyxy, track_ids)):
                if track_id is not None:
                    obj = {
                        "tracked_id": int(track_id),
                        "x": float(box[0]),
                        "y": float(box[1]),
                        "w": float(box[2] - box[0]),
                        "h": float(box[3] - box[1]),
                        "confidence": 1.0
                    }
                    objects.append(obj)
            frame_data["Objects"] = objects
            submission_data.append(frame_data)
        except Exception as e:
            print(f"⚠️ Error processing frame {frame_idx}: {str(e)}")
            submission_data.append(frame_data)

    submission_df = pd.DataFrame(submission_data)
    required_columns = {"ID", "Frame", "Objects", "Objective"}
    if not required_columns.issubset(submission_df.columns):
        missing = required_columns - set(submission_df.columns)
        raise ValueError(f"Missing submission columns: {missing}")

    submission_df.to_csv(SUBMISSION_FILE, index=False)
    print(f"✅ Submission saved to {SUBMISSION_FILE}")
    return submission_df

In [None]:
# Cell 10: Run Inference and Generate Submissionrrrr
# Run inference and create submission file
submission = run_tracking_inference()

In [None]:
def visualize_tracking_results(num_frames=5):
    """Visualize a sample of tracking results using the unified directories"""
    import matplotlib.pyplot as plt
    from matplotlib.patches import Rectangle
    import json

    # Load submission data (assumes the submission file was saved at SUBMISSION_FILE)
    submission_df = pd.read_csv(SUBMISSION_FILE)

    # Use the competition test sequence directory from COMP_DATASET_ROOT and TEST_SEQUENCE
    # (If TEST_SEQUENCE isn't defined globally, we define it here)
    TEST_SEQUENCE = "01"
    test_img_dir = f"{COMP_DATASET_ROOT}/test/{TEST_SEQUENCE}/img1"

    # Choose random frames with objects based on the "Objects" column.
    # The submission file uses column names "Frame" and "Objects" (capitalized).
    frames_with_objects = submission_df[submission_df['Objects'].apply(lambda x: len(eval(x)) > 0)]['Frame'].values
    if len(frames_with_objects) == 0:
        print("No frames with tracked objects found.")
        return

    # Randomly select a few frames to visualize
    frames_to_show = np.random.choice(frames_with_objects, min(num_frames, len(frames_with_objects)), replace=False)
    
    plt.figure(figsize=(15, 5 * len(frames_to_show)))
    track_history = {}  # Dictionary to store track center history for each track ID
    
    for i, frame_num in enumerate(frames_to_show):
        # Construct image path using our competition test directory
        img_path = f"{test_img_dir}/{int(frame_num):06d}.jpg"
        if not os.path.exists(img_path):
            continue
        img = cv2.imread(img_path)
        if img is None:
            continue
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        # Get the corresponding row in submission_df for this frame
        frame_data = submission_df[submission_df['Frame'] == frame_num].iloc[0]
        # Convert the "Objects" column from its string representation to a list
        try:
            objects = json.loads(frame_data['Objects'])
        except Exception as e:
            # Fall back to eval() if needed
            objects = eval(frame_data['Objects'])
        
        # Plot the image
        plt.subplot(len(frames_to_show), 1, i+1)
        plt.imshow(img)
        ax = plt.gca()
        
        for obj in objects:
            # Assuming each object is stored as a dictionary with keys: 
            # "tracked_id", "x", "y", "w", "h", "confidence"
            track_id = obj.get("tracked_id")
            x = obj.get("x")
            y = obj.get("y")
            w = obj.get("w")
            h = obj.get("h")
            
            # Choose a color based on track ID (50 distinct colors)
            colors = plt.cm.hsv(np.linspace(0, 1, 50))
            color = colors[int(track_id) % 50]
            
            # Draw rectangle around the object
            rect = Rectangle((x, y), w, h, edgecolor=color, facecolor='none', lw=2)
            ax.add_patch(rect)
            plt.text(x, y - 10, f"ID: {track_id}", color='white', fontsize=12,
                     bbox=dict(facecolor='red', alpha=0.7))
            
            # Update track history for trail visualization
            center = (x + w/2, y + h/2)
            if track_id in track_history:
                track_history[track_id].append(center)
            else:
                track_history[track_id] = [center]
            
            # If we have a history, plot the track trail
            if len(track_history[track_id]) > 1:
                trail = np.array(track_history[track_id])
                plt.plot(trail[:, 0], trail[:, 1], color=color, alpha=0.5)
        
        plt.title(f"Frame {int(frame_num)}")
        plt.axis('off')
    
    plt.tight_layout()
    vis_path = f"{OUTPUT_DIR}/tracking_visualization.png"
    plt.savefig(vis_path)
    plt.show()
    print(f"Visualization saved to {vis_path}")


In [None]:
# Uncomment to run visualization
visualize_tracking_results()

In [None]:
# Cell 12: Run Complete Pipeline
def run_complete_pipeline():
    """Run the complete pipeline from dataset preparation to submission"""
    # 1. Prepare dataset
    prepare_yolo_dataset()
    
    # 2. Train model
    train_yolov12()
    
    # 3. Run inference and generate submission
    run_tracking_inference()
    
    print("Complete pipeline executed successfully!")


In [None]:
# Uncomment to run the complete pipeline at once
# run_complete_pipeline()

In [None]:
def create_tracking_video(output_video_path="/kaggle/working/tracking_output.mp4", fps=30):
    """
    Process all images in COMP_DATASET_ROOT/test/01/img1 using YOLO and ByteTrack,
    draw bounding boxes and track IDs, and write the processed frames to a video.
    
    Args:
        output_video_path (str): Path to save the output video.
        fps (int): Frames per second for the output video.
    """
    # Define test image directory
    test_img_dir = Path(f"{COMP_DATASET_ROOT}/test/01/img1")
    
    # Get sorted list of image paths
    img_paths = sorted(list(test_img_dir.glob("*.jpg")))
    if not img_paths:
        raise FileNotFoundError(f"No images found in {test_img_dir}")
    
    # Read the first image to get frame dimensions
    first_img = cv2.imread(str(img_paths[0]))
    if first_img is None:
        raise ValueError("Could not read the first image.")
    height, width = first_img.shape[:2]
    
    # Initialize VideoWriter (using MP4V codec)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    print(f"Writing video to {output_video_path} at {fps} FPS, resolution: {width}x{height}")
    
    # Initialize ByteTrack tracker with your configuration (adjust parameters as needed)
    tracker_args = BYTETrackerArgs(
        track_thresh=0.35,
        track_buffer=45,
        match_thresh=0.7,
        aspect_ratio_thresh=3.0,
        min_box_area=1.0,
        mot20=True
    )
    byte_tracker = BYTETracker(tracker_args)
    
    # Process each frame
    for img_path in tqdm(img_paths, desc="Creating video with tracking"):
        img = cv2.imread(str(img_path))
        if img is None:
            continue
        
        # Run YOLO detection on the image
        results = trained_model.predict(str(img_path), conf=0.3, iou=0.7, verbose=False)[0]
        if len(results.boxes) > 0:
            boxes = results.boxes.xyxy.cpu().numpy()  # shape: (N, 4)
            # Force confidence to 1 for tracker input; alternatively, you can use actual scores.
            detections = np.hstack([boxes, np.ones((len(boxes), 1))])
            # Update ByteTrack tracker using image info
            tracks = byte_tracker.update(
                output_results=detections,
                img_info=img.shape[:2],
                img_size=img.shape[:2]
            )
            # Optionally, get associations (if needed)
            # track_ids = match_detections_with_tracks(boxes, tracks)
            
            # Draw each tracked bounding box on the image
            for trk in tracks:
                # trk.tlbr should contain [x1, y1, x2, y2]
                x1, y1, x2, y2 = trk.tlbr
                cv2.rectangle(img, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
                cv2.putText(img, f"ID: {trk.track_id}", (int(x1), int(y1)-10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
        
        # Write the processed frame to the video
        video_writer.write(img)
    
    video_writer.release()
    print(f"✅ Video saved to {output_video_path}")

In [None]:
create_tracking_video()