In [None]:
%pip install "numpy==1.23.5" ultralytics --no-cache-dir
import numpy as np

In [None]:
!pip install -U ultralytics

In [None]:
# core
import os
import json
import random
from typing import Iterable, Tuple, Dict, Any

# numeric & plotting
import matplotlib.pyplot as plt
from tqdm import tqdm

# vision
import cv2 as cv
import torch

# ultralytics YOLO
from ultralytics import YOLO

# Imports

# Main

## Methods

In [None]:
def apply_clahe_bgr(frame_bgr: np.ndarray) -> np.ndarray:
    hsv = cv.cvtColor(frame_bgr, cv.COLOR_BGR2HSV)
    h, s, v = cv.split(hsv)
    clahe = cv.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    v2 = clahe.apply(v)
    hsv2 = cv.merge([h, s, v2])
    return cv.cvtColor(hsv2, cv.COLOR_HSV2BGR)

def safe_clip_box(x1, y1, x2, y2, W, H):
    xi1 = max(0, min(int(round(x1)), W - 1))
    yi1 = max(0, min(int(round(y1)), H - 1))
    xi2 = max(0, min(int(round(x2)), W - 1))
    yi2 = max(0, min(int(round(y2)), H - 1))
    # ensure proper order
    if xi2 <= xi1 or yi2 <= yi1:
        return None
    return xi1, yi1, xi2, yi2

def l2_normalize(vec: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    n = np.linalg.norm(vec)
    if n < eps:
        return vec
    return vec / n

def cosine_similarity(a: np.ndarray, b: np.ndarray, eps: float = 1e-12) -> float:
    # expects 1D float32 vectors
    na = np.linalg.norm(a)
    nb = np.linalg.norm(b)
    if na < eps or nb < eps:
        return 0.0
    return float(np.dot(a, b) / (na * nb))

def image_to_vector(img_bgr: np.ndarray) -> np.ndarray:
    # Convert to float32 and scale to [0,1], flatten
    v = img_bgr.astype(np.float32) / 255.0
    return v.reshape(-1)

In [None]:
MODEL_PATH = '/kaggle/input/yolov11-weights-zalo-ai-challenge-2025/other/default/1/best.pt'

TEST_DATA_DIR = '/kaggle/input/zalo-ai-challenge-2025-track-1-dataset/public_test/samples'

OUTPUT_FILE = 'predictions.json'

REF_IMG_DIR = '/kaggle/input/ref-images-zalo-ai-challenge-2025/Ref'

CONFIDENCE_THRESHOLD = 0.25

## Inference

In [None]:
def run_inference(
    TTA: bool = True,
    REF_IMG_DIR: str = REF_IMG_DIR,
    TEST_DATA_DIR: str = TEST_DATA_DIR,
    MODEL_PATH: str = MODEL_PATH,
    OUTPUT_FILE: str = OUTPUT_FILE,
    CLAHE: bool = True,
):
    try:
        YOLO11_n = YOLO(MODEL_PATH)
        print(f"Successfully loaded model from {MODEL_PATH}")
    except Exception as e:
        print(f"Error: Could not load model from {MODEL_PATH}")
        print(e)
        return

    try:
        video_folders = sorted([f for f in os.listdir(TEST_DATA_DIR) if os.path.isdir(os.path.join(TEST_DATA_DIR, f))])
    except FileNotFoundError:
        print(f"Error: Test data directory not found at: {TEST_DATA_DIR}")
        return
    if not video_folders:
        print(f"Error: No video folders found in {TEST_DATA_DIR}")
        return
    print(f"Found {len(video_folders)} videos to process...")

    ref_images = []
    ref_cache = {}
    for p in os.listdir(REF_IMG_DIR):
        img = cv.imread(os.path.join(REF_IMG_DIR, p))
        if img is None:
            print(f"Warning: cannot read reference image: {p}")
            continue
        ref_images.append((p, img))
    if not ref_images:
        print("Warning: no valid reference images loaded; cosine similarity will be skipped.")

    all_predictions = []

    for video_folder_name in video_folders:
        video_path = os.path.join(TEST_DATA_DIR, video_folder_name, "drone_video.mp4")
        if not os.path.exists(video_path):
            print(f"Warning: 'drone_video.mp4' not found in {video_folder_name}, skipping.")
            continue

        video_bboxes = []

        try:
            cap = cv.VideoCapture(video_path)
            if not cap.isOpened():
                raise RuntimeError(f"Cannot open video: {video_path}...")
            idx = 0
            while True:
                ok, frame = cap.read()
                if not ok:
                    break
                if CLAHE:
                    frame = apply_clahe_bgr(frame)
                    
                if (idx == 0):
                    print(f'Frame Type: {type(frame)}')
                    print(f"Frame Shape: {frame.shape}")
                    
                if frame.dtype != np.uint8: frame = (np.clip(frame, 0, 255)).astype(np.uint8)
                results_list = YOLO11_n.predict(
                    frame,
                    imgsz=640,
                    conf=CONFIDENCE_THRESHOLD,
                    verbose=False,
                    augment=TTA
                )
                if not results_list:
                    idx += 1
                    continue
                results = results_list[0]

                # Extract detections
                if results.boxes is None or len(results.boxes) == 0:
                    idx += 1
                    continue

                H, W = frame.shape[:2]
                xyxy = results.boxes.xyxy.detach().cpu().numpy()
                confs = results.boxes.conf.detach().cpu().numpy()

                score = []
                for i in range(xyxy.shape[0]):
                    x1, y1, x2, y2 = xyxy[i]
                    clipped = safe_clip_box(x1, y1, x2, y2, W, H)
                    if clipped is None:
                        continue
                    xi1, yi1, xi2, yi2 = clipped
                
                    crop = frame[yi1:yi2, xi1:xi2]
                    if crop.size == 0:
                        continue
                    base_conf = float(confs[i])
                    best_sim = -10
                
                    if ref_images:
                        ch, cw = crop.shape[:2]
                        key = (cw, ch)
                        if key not in ref_cache:
                            vecs = []
                            for ref_path, ref_img in ref_images:
                                ref_resized = cv.resize(ref_img, (cw, ch), interpolation=cv.INTER_LINEAR)
                                ref_vec = l2_normalize(image_to_vector(ref_resized))
                                vecs.append((ref_path, ref_vec))
                            ref_cache[key] = vecs
                
                        crop_vec = l2_normalize(image_to_vector(crop))
                        for ref_path, ref_vec in ref_cache[key]:
                            sim = cosine_similarity(crop_vec, ref_vec)
                            if float(sim) > float(best_sim):
                                best_sim = sim
                    combined_score = best_sim * base_conf
                    score.append(combined_score)

                x1best, y1best, x2best, y2best = xyxy[np.argmax(score)]
                bbox_data = {
                    "frame": int(idx),
                    "x1": int(x1best),
                    "y1": int(y1best),
                    "x2": int(x2best),
                    "y2": int(y2best),}
                
                video_bboxes.append(bbox_data)
                idx += 1
            cap.release()
            
        except Exception as e:
            print(f"Error while processing video {video_path}: {e}")
            continue

        detections_list = []
        if video_bboxes:
            detections_list.append({"bboxes": video_bboxes})
        final_video_obj = {
            "video_id": video_folder_name,
            "detections": detections_list
        }
        all_predictions.append(final_video_obj)

    try:
        print(f"\nSaving all {len(all_predictions)} video predictions to {OUTPUT_FILE}...")
        with open(OUTPUT_FILE, "w") as f:
            json.dump(all_predictions, f, indent=4)
        print("Inference complete.")
    except Exception as e:
        print(f"Error: Could not write output JSON file: {e}")

run_inference()