In [1]:
import importlib, sys, time, numpy as np
from pathlib import Path
import cv2
import matplotlib.pyplot as plt
import pandas as pd
from tqdm.auto import tqdm

# Ensure the sorth module is imported correctly
if 'sorth' in sys.modules:
    importlib.reload(sys.modules['sorth'])
else:
    import sorth                          
from sorth import Sort, KalmanBoxTracker, associate_detections_to_trackers

from ultralytics import YOLO

  from .autonotebook import tqdm as notebook_tqdm


### Assign model based on AR

In [None]:
def evaluate_model_on_video(video_path, model, dist_th=120, max_age=12, conf_th=0.20, output_base_dir="Output"):
    """
    Evaluate a model on a single video and save results.
    
    """
    
    video_path = Path(video_path)
    
    # Create output directories
    frames_dir = Path(f"{output_base_dir}/frames/{video_path.stem}")
    frames_dir.mkdir(parents=True, exist_ok=True)
    
    csv_dir = Path(f"{output_base_dir}/eval/")
    csv_dir.mkdir(parents=True, exist_ok=True)
    
    # Initialize tracker
    KalmanBoxTracker.count = 0
    tracker = Sort(max_age=max_age, min_hits=1, dist_threshold=dist_th)
    
    # Open video
    cap = cv2.VideoCapture(str(video_path))
    
    # Colors and font
    CLR_DET, CLR_PRED, CLR_TRACK, CLR_LINE = (0,255,0), (255,0,0), (255,0,255), (0,255,255)
    font = cv2.FONT_HERSHEY_SIMPLEX
    
    frame_idx, t0 = 0, time.time()
    records = []
    
    print(f"Processing {video_path.name}...")
    
    # Main processing loop
    while True:
        ok, frame = cap.read()
        if not ok:
            break
            
        # YOLO detections
        result = model(frame, verbose=False)[0]
        xyxy = result.boxes.xyxy.cpu().numpy() if result.boxes else np.empty((0,4))
        conf = result.boxes.conf.cpu().numpy() if result.boxes else np.empty((0,))
        keep = conf > conf_th
        dets = xyxy[keep]
        conf = conf[keep]
        dets_in = np.hstack([dets, conf[:,None]]) if dets.size else np.empty((0,5))
        
        # Save Kalman predictions before update
        preds_before = []
        for trk in tracker.trackers:
            pred = trk.predict()[0]
            preds_before.append(pred)
            
        # Run tracker update
        tracks = tracker.update(dets_in)
        for (x1, y1, x2, y2, tid) in tracks:
            cx = round((x1 + x2) / 2, 6)
            cy = round((y1 + y2) / 2, 6)
            records.append({
                "t": frame_idx,
                "hexbug": int(tid),
                "x": cx,
                "y": cy
            })
            
        # Draw visualization
        canvas = frame.copy()
        
        # Draw detections (green)
        for (x1,y1,x2,y2,sc) in dets_in:
            cv2.rectangle(canvas,(int(x1),int(y1)),(int(x2),int(y2)), CLR_DET,2)
            
        # Draw Kalman predictions (blue)
        for (x1,y1,x2,y2) in preds_before:
            cv2.rectangle(canvas,(int(x1),int(y1)),(int(x2),int(y2)), CLR_PRED,1)
            
        # Draw tracks (magenta)
        for (x1,y1,x2,y2,tid) in tracks:
            cv2.rectangle(canvas,(int(x1),int(y1)),(int(x2),int(y2)), CLR_TRACK,2)
            cx,cy = int((x1+x2)/2), int((y1+y2)/2)
            cv2.putText(canvas,f"{int(tid)}",(cx+5,cy-5),font,0.6,CLR_TRACK,2)
            
        # Draw match lines (yellow)
        if preds_before and dets_in.size:
            tmp_trk = np.zeros((len(preds_before),5))
            tmp_trk[:,:4] = np.array(preds_before)
            matches,_,_ = associate_detections_to_trackers(
                dets_in, tmp_trk, dist_threshold=dist_th)
            for d,t in matches:
                dx,dy = (dets_in[d,0]+dets_in[d,2])/2, (dets_in[d,1]+dets_in[d,3])/2
                tx,ty = (preds_before[t][0]+preds_before[t][2])/2, (preds_before[t][1]+preds_before[t][3])/2
                cv2.line(canvas,(int(dx),int(dy)),(int(tx),int(ty)),CLR_LINE,1,cv2.LINE_AA)
                
        cv2.putText(canvas,f"frame {frame_idx}",(20,30),font,1,(255,255,255),2)
        
        # Save frame
        cv2.imwrite(str(frames_dir / f"{frame_idx:05d}.jpg"), canvas)
        frame_idx += 1
        
    cap.release()
    
    # Save CSV in correct format to match ground truth structure
    csv_path = csv_dir / f"{video_path.stem}.csv"
    df = pd.DataFrame(records).sort_values(['t', 'hexbug'])
    
    # Reset index to create the first unnamed column like in ground truth
    df = df.reset_index(drop=True)
    
    # Save with index=True to create the first column, but no index_label
    df.to_csv(csv_path, index=True)
  
    
    print(f"{video_path.name}: {len(records)} records, {frame_idx} frames")
    
    return {
        'video': video_path.name,
        'csv_path': csv_path,
        'frames_dir': frames_dir,
        'num_records': len(records),
        'num_frames': frame_idx,
    }

In [None]:
import cv2
from ultralytics import YOLO

def select_model(video_path, threshold=3, model_m_path="models/detection/best.pt", model_s_path="models/detection/best_perspective.pt"):
    """
    Selects a model based on aspect ratio of the video.
    
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError("Could not open video")
    
    # Read first frame dimensions
    ret, frame = cap.read()
    cap.release()
    
    if not ret:
        raise ValueError("Could not read frame")
    
    h, w = frame.shape[:2]
    aspect_ratio = h / w
    
    print(f"Aspect Ratio: {aspect_ratio:.2f}")
    
    if aspect_ratio > threshold:
        print("Using Model M (stretched video).")
        return YOLO(model_m_path)
    else:
        print("Using Model S (normal video).")
        return YOLO(model_s_path)

In [None]:
# Complete Pipeline: AR Detection + Model Assignment + Evaluation

video_dir = Path("Validation")  # path to test videos
output_base_dir = "Val"  # output directory

# Parameters for evaluation
eval_params = {
    'dist_th': 120,
    'max_age': 12, 
    'conf_th': 0.20,
    'output_base_dir': output_base_dir
}

print(f"Parameters: {eval_params}")
print(f"Output directory: {output_base_dir}")
print("=" * 50)

# Get all video files
video_files = sorted(video_dir.glob("*.mp4"))
print(f"Found {len(video_files)} video files to process")

results = []

for i, video_path in enumerate(video_files, 1):
    print(f"\n[{i}/{len(video_files)}] Processing {video_path.name}")
    
    try:
        # Step 1: Detect AR and select appropriate model
        selected_model = select_model(str(video_path))
        
        # Step 2: Run evaluation with the selected model
        result = evaluate_model_on_video(
            video_path=video_path,
            model=selected_model,
            **eval_params
        )
        results.append(result)
        
    except Exception as e:
        print(f"❌ Error processing {video_path.name}: {e}")
        results.append({
            'video': video_path.name,
            'error': str(e),
            'num_records': 0,
            'num_frames': 0,
        })