In [None]:
import cv2
import json
import os
import numpy as np
import subprocess
import torch
from ultralytics import YOLO

In [None]:
def to_h264(input_path: str):
    # Convert video to H.264 codec using ffmpeg
    h264_output_path = input_path.replace('.mp4', '.h264.mp4')

    # Run FFMPEG command to convert to H.264
    cmd = [
        'ffmpeg', '-y',  # -y to overwrite output file
        '-loglevel', 'quiet',  # silence FFMPEG output
        '-i', input_path,  # input file
        '-c:v', 'libx264',  # H.264 codec
        '-preset', 'fast',  # encoding preset
        '-crf', '28',  # constant rate factor (lower quality, smaller file)
        '-profile:v', 'baseline',  # baseline profile for better compatibility
        '-level', '3.0',  # H.264 level for broader device support
        '-movflags', '+faststart',  # optimize for streaming/web playback
        '-pix_fmt', 'yuv420p',  # pixel format for maximum compatibility
        '-tune', 'fastdecode',  # optimize for faster decoding
        h264_output_path
    ]

    print(f"Converting {input_path} to H.264 format...")
    subprocess.run(cmd, capture_output=True, text=True, check=True)
    print(f"H.264 video saved to {h264_output_path}")


In [None]:
# Load YOLO model
model = YOLO("./runs/detect/train13/weights/best.pt")

# Paths to video and annotation files
video_path = '/otif-dataset/dataset/caldot1/train/video/0.mp4'
annotation_path = '/otif-dataset/dataset/caldot1/train/yolov3-704x480/0.json'
output_video_path = './0_annotated_with_predictions.mp4'

# Load annotation file
with open(annotation_path, 'r') as f:
    annotations = json.load(f)

# Print annotation structure to understand the format
print("Annotation keys:", annotations.keys() if isinstance(annotations, dict) else "List with", len(annotations), "items")
if isinstance(annotations, dict):
    print("Sample keys:", list(annotations.keys())[:5])
    if 'annotations' in annotations:
        print("Sample annotation:", annotations['annotations'][0] if annotations['annotations'] else "Empty")
elif isinstance(annotations, list):
    print("Sample item:", annotations[0] if annotations else "Empty")

In [None]:
# Open video
cap = cv2.VideoCapture(video_path)
assert cap.isOpened(), f"Could not open video {video_path}"

# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

print(f"Video properties: {width}x{height}, {fps} FPS, {frame_count} frames")

# Create video writer
fourcc = cv2.VideoWriter.fourcc('m', 'p', '4', 'v')
writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
assert writer.isOpened(), f"Could not create video writer for {output_video_path}"


In [None]:
# Parse annotations into a dictionary keyed by frame index
# Handle different annotation formats
frame_annotations = {}

if isinstance(annotations, dict):
    # COCO format or similar
    if 'annotations' in annotations and 'images' in annotations:
        # COCO format: map image_id to frame_index, then annotations to frames
        image_id_to_frame = {}
        for img in annotations['images']:
            # Try to extract frame index from filename or use image_id
            if 'frame_index' in img:
                image_id_to_frame[img['id']] = img['frame_index']
            elif 'file_name' in img:
                # Try to extract frame number from filename
                filename = img['file_name']
                try:
                    frame_idx = int(filename.split('_')[-1].split('.')[0])
                    image_id_to_frame[img['id']] = frame_idx
                except:
                    image_id_to_frame[img['id']] = img['id']
            else:
                image_id_to_frame[img['id']] = img['id']
        
        # Group annotations by frame
        for ann in annotations['annotations']:
            image_id = ann['image_id']
            frame_idx = image_id_to_frame.get(image_id, image_id)
            if frame_idx not in frame_annotations:
                frame_annotations[frame_idx] = []
            
            # Extract bbox: COCO format is [x, y, width, height]
            bbox = ann['bbox']
            x, y, w, h = bbox
            # Convert to [x1, y1, x2, y2] format
            frame_annotations[frame_idx].append([x, y, x + w, y + h])
    elif 'frames' in annotations:
        # Custom format with frames key
        for frame_data in annotations['frames']:
            frame_idx = frame_data.get('frame_idx', frame_data.get('frame_index', 0))
            frame_annotations[frame_idx] = frame_data.get('boxes', frame_data.get('detections', []))
    else:
        # Dictionary with frame indices as keys
        frame_annotations = annotations
elif isinstance(annotations, list):
    # List format: each item is a frame annotation
    for idx, frame_ann in enumerate(annotations):
        if isinstance(frame_ann, dict):
            frame_idx = frame_ann.get('frame_idx', frame_ann.get('frame_index', idx))
            frame_annotations[frame_idx] = frame_ann.get('boxes', frame_ann.get('detections', frame_ann.get('annotations', [])))
        elif isinstance(frame_ann, list):
            # Direct list of boxes
            frame_annotations[idx] = frame_ann

print(f"Loaded annotations for {len(frame_annotations)} frames")
print(f"Frame indices range: {min(frame_annotations.keys()) if frame_annotations else 'N/A'} to {max(frame_annotations.keys()) if frame_annotations else 'N/A'}")
if frame_annotations:
    sample_frame = list(frame_annotations.keys())[0]
    print(f"Sample frame {sample_frame} has {len(frame_annotations[sample_frame])} annotations")
    if frame_annotations[sample_frame]:
        print(f"Sample annotation format: {frame_annotations[sample_frame][0]}")


In [None]:
# Helper function to draw bounding boxes
def draw_box(frame, box, color, thickness=2):
    """Draw a bounding box on the frame.
    
    Args:
        frame: OpenCV frame image
        box: Bounding box in various formats (dict, list, tuple)
        color: BGR color tuple (e.g., (0, 255, 0) for green)
        thickness: Line thickness
    """
    # Handle different box formats
    if isinstance(box, dict):
        # Dictionary format with keys like 'left', 'top', 'right', 'bottom'
        if 'left' in box and 'top' in box and 'right' in box and 'bottom' in box:
            x1, y1, x2, y2 = box['left'], box['top'], box['right'], box['bottom']
        elif 'x1' in box and 'y1' in box and 'x2' in box and 'y2' in box:
            x1, y1, x2, y2 = box['x1'], box['y1'], box['x2'], box['y2']
        elif 'x' in box and 'y' in box and 'width' in box and 'height' in box:
            # COCO format: [x, y, width, height]
            x1 = box['x']
            y1 = box['y']
            x2 = x1 + box['width']
            y2 = y1 + box['height']
        else:
            return
    elif isinstance(box, (list, tuple)) and len(box) >= 4:
        if len(box) == 4:
            # [x1, y1, x2, y2] or [x, y, w, h]
            x1, y1, x2_or_w, y2_or_h = box
            # Check if it's [x, y, w, h] or [x1, y1, x2, y2]
            if x2_or_w < x1 or y2_or_h < y1:
                # Likely [x, y, w, h] format
                x1, y1, w, h = box
                x2, y2 = x1 + w, y1 + h
            else:
                # [x1, y1, x2, y2] format
                x1, y1, x2, y2 = box
        else:
            # [track_id, x1, y1, x2, y2] or similar
            x1, y1, x2, y2 = box[-4:]
    else:
        return
    
    # Convert to integers and ensure they're within frame bounds
    x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
    x1 = max(0, min(x1, frame.shape[1]))
    y1 = max(0, min(y1, frame.shape[0]))
    x2 = max(0, min(x2, frame.shape[1]))
    y2 = max(0, min(y2, frame.shape[0]))
    
    # Draw bounding box
    cv2.rectangle(frame, (x1, y1), (x2, y2), color, thickness)


In [None]:
# Reopen video for processing (since we already read properties)
cap = cv2.VideoCapture(video_path)

# Process each frame and draw bounding boxes
# Green for ground truth annotations, Red for model predictions
GROUND_TRUTH_COLOR = (0, 255, 0)  # Green in BGR
PREDICTION_COLOR = (0, 0, 255)    # Red in BGR

frame_idx = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    # Get ground truth annotations for this frame
    gt_boxes = frame_annotations.get(frame_idx, [])
    
    # Draw ground truth bounding boxes in green
    for box in gt_boxes:
        draw_box(frame, box, GROUND_TRUTH_COLOR, thickness=2)
    
    # Run YOLO model on this frame to get predictions
    results = model(frame, verbose=False)
    
    # Draw model predictions in red
    for result in results:
        boxes = result.boxes
        for box in boxes:
            # Get bounding box coordinates (xyxy format)
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            # Get confidence score
            conf = box.conf[0].cpu().numpy()
            # Draw prediction box
            draw_box(frame, [x1, y1, x2, y2], PREDICTION_COLOR, thickness=2)
            # Optionally draw confidence score
            label = f"{conf:.2f}"
            cv2.putText(frame, label, (int(x1), int(y1) - 10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, PREDICTION_COLOR, 2)
    
    # Write frame to output video
    writer.write(frame)
    
    frame_idx += 1
    
    if frame_idx % 100 == 0:
        print(f"Processed {frame_idx}/{frame_count} frames")

cap.release()
writer.release()

print(f"Video saved to {output_video_path}")
to_h264(output_video_path)
print(f"H264 Video saved")


In [None]:
# Load YOLOv3 (polyis) and Faster R-CNN models
from polyis.models import detector as polyis_detector
from polyis.models.detector import detect as polyis_detect
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import torch

DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {DEVICE}")

# Load YOLOv3 detector via polyis detector utility (dataset_name='caldot1')
print("Loading YOLOv3 detector (dataset 'caldot1')...")
yolov3_detector = polyis_detector.get_detector(dataset_name='caldot1', gpu_id=0, batch_size=1)
print("YOLOv3 detector loaded.")

# Load Faster R-CNN trained weights without downloading backbone (no internet)
fasterrcnn_weights_path = '/polyis-data/fasterrcnn_output/fasterrcnn_epoch_50.pth'
print(f"Loading Faster R-CNN weights from: {fasterrcnn_weights_path}")

# Build model architecture (2 classes: 1 object class + background)
faster_rcnn_model = fasterrcnn_resnet50_fpn(weights=None, weights_backbone=None)
in_features = faster_rcnn_model.roi_heads.box_predictor.cls_score.in_features  # type: ignore
faster_rcnn_model.roi_heads.box_predictor = FastRCNNPredictor(in_features, 2)

checkpoint = torch.load(fasterrcnn_weights_path, map_location=DEVICE)
state_dict = checkpoint.get('model_state_dict', checkpoint)
faster_rcnn_model.load_state_dict(state_dict)
faster_rcnn_model.to(DEVICE)
faster_rcnn_model.eval()
print("Faster R-CNN model loaded and set to eval mode.")

In [None]:
# Create three separate annotated videos: one for each detector with ground truth
# Video 1: Ground truth + YOLOv11
# Video 2: Ground truth + YOLOv3
# Video 3: Ground truth + Faster R-CNN

output_yolov11_path = './0_annotated_yolov11.mp4'
output_yolov3_path = './0_annotated_yolov3.mp4'
output_frcnn_path = './0_annotated_frcnn.mp4'

# Get video properties
cap_props = cv2.VideoCapture(video_path)
assert cap_props.isOpened(), f"Could not open video {video_path}"
fps_all = cap_props.get(cv2.CAP_PROP_FPS)
width_all = int(cap_props.get(cv2.CAP_PROP_FRAME_WIDTH))
height_all = int(cap_props.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_count_all = int(cap_props.get(cv2.CAP_PROP_FRAME_COUNT))
cap_props.release()

print(f"Video properties: {width_all}x{height_all}, {fps_all} FPS, {frame_count_all} frames")

# Colors for different sources
COLOR_GT = (0, 255, 0)        # Green
COLOR_YOLOV11 = (0, 0, 255)   # Red
COLOR_YOLOV3 = (255, 0, 0)    # Blue
COLOR_FRCNN = (0, 255, 255)   # Yellow

CONF_THRESHOLD = 0.25

# Create video writers
fourcc_all = cv2.VideoWriter.fourcc('m', 'p', '4', 'v')
writer_yolov11 = cv2.VideoWriter(output_yolov11_path, fourcc_all, fps_all, (width_all, height_all))
writer_yolov3 = cv2.VideoWriter(output_yolov3_path, fourcc_all, fps_all, (width_all, height_all))
writer_frcnn = cv2.VideoWriter(output_frcnn_path, fourcc_all, fps_all, (width_all, height_all))

assert writer_yolov11.isOpened(), f"Could not create video writer for {output_yolov11_path}"
assert writer_yolov3.isOpened(), f"Could not create video writer for {output_yolov3_path}"
assert writer_frcnn.isOpened(), f"Could not create video writer for {output_frcnn_path}"

print(f"Creating three separate annotated videos...")
print(f"  1. YOLOv11: {output_yolov11_path}")
print(f"  2. YOLOv3: {output_yolov3_path}")
print(f"  3. Faster R-CNN: {output_frcnn_path}")

# Open video for processing
cap_all = cv2.VideoCapture(video_path)
frame_idx_all = 0

while cap_all.isOpened():
    ret, frame = cap_all.read()
    if not ret:
        break

    # Create separate copies for each video
    frame_yolov11 = frame.copy()
    frame_yolov3 = frame.copy()
    frame_frcnn = frame.copy()

    # Ground truth boxes (same for all)
    gt_boxes = frame_annotations.get(frame_idx_all, [])
    
    # Draw GT on all frames
    for box in gt_boxes:
        draw_box(frame_yolov11, box, COLOR_GT, thickness=2)
        draw_box(frame_yolov3, box, COLOR_GT, thickness=2)
        draw_box(frame_frcnn, box, COLOR_GT, thickness=2)

    # YOLOv11 predictions
    results_v11 = model(frame, verbose=False)
    for result in results_v11:
        for box in result.boxes:
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            conf = float(box.conf[0].cpu().numpy())
            if conf < CONF_THRESHOLD:
                continue
            draw_box(frame_yolov11, [x1, y1, x2, y2], COLOR_YOLOV11, thickness=2)
            cv2.putText(frame_yolov11, f"v11:{conf:.2f}", (int(x1), int(y1) - 10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.45, COLOR_YOLOV11, 1)

    # YOLOv3 predictions
    yolo3_preds = polyis_detect(frame, yolov3_detector, threshold=CONF_THRESHOLD)
    if yolo3_preds is not None:
        for pred in yolo3_preds:
            x1, y1, x2, y2, conf = pred
            if conf < CONF_THRESHOLD:
                continue
            draw_box(frame_yolov3, [x1, y1, x2, y2], COLOR_YOLOV3, thickness=2)
            cv2.putText(frame_yolov3, f"v3:{conf:.2f}", (int(x1), int(y1) - 10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.45, COLOR_YOLOV3, 1)

    # Faster R-CNN predictions
    with torch.no_grad():
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        tensor = torch.from_numpy(rgb).permute(2, 0, 1).float().to(DEVICE) / 255.0
        outputs = faster_rcnn_model([tensor])[0]
        frcnn_boxes = outputs['boxes'].detach().cpu().numpy()
        frcnn_scores = outputs['scores'].detach().cpu().numpy()
        for (x1, y1, x2, y2), conf in zip(frcnn_boxes, frcnn_scores):
            if float(conf) < CONF_THRESHOLD:
                continue
            draw_box(frame_frcnn, [x1, y1, x2, y2], COLOR_FRCNN, thickness=2)
            cv2.putText(frame_frcnn, f"fr:{conf:.2f}", (int(x1), int(y1) - 10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.45, COLOR_FRCNN, 1)

    # Write frames to respective videos
    writer_yolov11.write(frame_yolov11)
    writer_yolov3.write(frame_yolov3)
    writer_frcnn.write(frame_frcnn)
    
    frame_idx_all += 1
    if frame_idx_all % 100 == 0:
        print(f"Processed {frame_idx_all}/{frame_count_all} frames")

cap_all.release()
writer_yolov11.release()
writer_yolov3.release()
writer_frcnn.release()

print(f"\nAll videos saved successfully!")
print(f"Converting to H.264 format...")
to_h264(output_yolov11_path)
to_h264(output_yolov3_path)
to_h264(output_frcnn_path)
print(f"H.264 videos saved!")