# Zalo AI Challenge 2025 - Track 1 - Inferencing

## 1. Import thư viện cần thiết

In [None]:
# Nếu sử dụng Google Colab, sẽ cần mount Google Drive trước
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import json
import cv2
import random
import yaml
from tqdm import tqdm
import matplotlib.pyplot as plt
import random
import torch
import numpy as np

def seed_everything(seed=42):
    # Python
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)

    # NumPy
    np.random.seed(seed)

    # PyTorch CPU
    torch.manual_seed(seed)

    # PyTorch GPU
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

    # Đảm bảo deterministic
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# Gọi hàm ngay khi khởi tạo notebook
seed_everything(42)

In [None]:
!pip install ultralytics

In [None]:
from ultralytics import YOLO

## 2. Chuẩn bị test data để thực hiện inferencing

- Data public_test: https://drive.google.com/file/d/1WtHDIIXl9PIfegOJBXRVZypWSVq56W8q/view?usp=sharing
- Data private_test: https://drive.google.com/file/d/1GcmJIulmENEK3qsApuQoQ1UcHI3q73YT/view?usp=sharing
- Giải nén file zip để thực hiện bước inference và tạo file kết quả

In [None]:
# Giải nén dữ liệu public_test hoặc private_test từ Google Drive để thực hiện inferencing
# !unzip /content/drive/MyDrive/3_HK1-25_26/DeepLearning/ZaloAIC2025/Track1/public_test.zip
!unzip /content/drive/MyDrive/3_HK1-25_26/DeepLearning/ZaloAIC2025/Track1/private_test_6QTOAB.zip

## 3. Tải các mô hình và thư viện cần thiết khác để thực hiện inferencing

Tải mô hình `DINOv3` từ HuggingFace về thư mục cục bộ để sử dụng trong quá trình trích xuất đặc trưng từ ảnh đầu vào, phục vụ cho bước phân loại tiếp theo.

In [None]:
from transformers import AutoImageProcessor, AutoModel

MODEL_NAME = 'facebook/dinov3-vits16-pretrain-lvd1689m'
LOCAL_SAVE_PATH = './dinov3_local' # Đây là thư mục sẽ chứa mô hình
token = "YOUR_HUGGINGFACE_TOKEN" # Token HuggingFace của tài khoản đã được cấp quyền truy cập mô hình

# Tải về và lưu processor
processor = AutoImageProcessor.from_pretrained(MODEL_NAME, token=token)
processor.save_pretrained(LOCAL_SAVE_PATH)

# Tải về và lưu model
model = AutoModel.from_pretrained(MODEL_NAME, token=token)
model.save_pretrained(LOCAL_SAVE_PATH)

print(f"Đã lưu DINOv3 vào thư mục: {LOCAL_SAVE_PATH}")

Cài đặt các thư viện `rembg` và `onnxruntime`. Trong đó:
- `rembg` được sử dụng cho bước loại bỏ nền ảnh
- `onnxruntime` đóng vai trò là môi trường thực thi cho các mô hình định dạng ONNX.

In [None]:
!pip install rembg
!pip install onnxruntime

## 4. Inference với mô hình YOLOv11

In [None]:
# Thay đổi đường dẫn mô hình và thư mục dữ liệu cho phù hợp
MODEL_PATH = '/content/best.pt'
TEST_DATA_DIR = '/content/private_test/samples' # Có thể dùng public_test hoặc private_test
OUTPUT_FILE = 'predictions_best.json'

CONFIDENCE_THRESHOLD = 0.2

In [None]:
def run_inference():
    """
    Chạy inference YOLO11 trên các video kiểm thử và lưu kết quả
    theo định dạng JSON yêu cầu.
    """

    # Load model đã được huấn luyện
    try:
        model = YOLO(MODEL_PATH)
        print(f"Successfully loaded model from {MODEL_PATH}")
    except Exception as e:
        print(f"Error: Could not load model from {MODEL_PATH}")
        print(e)
        return

    # Danh sách chứa tất cả các dự đoán cuối cùng
    all_predictions = []

    # Tìm tất cả thư mục video trong thư mục test
    try:
        video_folders = sorted([f for f in os.listdir(TEST_DATA_DIR) if os.path.isdir(os.path.join(TEST_DATA_DIR, f))])
    except FileNotFoundError:
        print(f"Error: Test data directory not found at: {TEST_DATA_DIR}")
        return

    if not video_folders:
        print(f"Error: No video folders found in {TEST_DATA_DIR}")
        return

    print(f"Found {len(video_folders)} videos to process...")

    # Sử dụng tqdm để hiển thị tiến trình khi xử lý các video
    for video_folder_name in tqdm(video_folders, desc="Processing videos"):
        video_path = os.path.join(TEST_DATA_DIR, video_folder_name, 'drone_video.mp4')

        if not os.path.exists(video_path):
            print(f"Warning: 'drone_video.mp4' not found in {video_folder_name}, skipping.")
            continue

        # Danh sách tạm thời để chứa tất cả bbox cho video hiện tại
        video_bboxes = []

        try:
            # Thực hiện dự đoán trên video
            # stream=True để tiết kiệm bộ nhớ (xử lý từng khung hình)
            # conf=... để lọc các phát hiện yếu
            # verbose=False để tránh in log cho mỗi khung hình
            results_generator = model.predict(
                video_path,
                stream=True,
                conf=CONFIDENCE_THRESHOLD,
                verbose=False
            )

            # Duyệt qua từng khung hình trong video
            # Lấy chỉ số khung hình cùng với kết quả
            for frame_idx, results in enumerate(results_generator):
                # Lấy bbox dưới dạng [x1, y1, x2, y2]
                # Chuyển về CPU và numpy để xử lý
                xyxy_boxes = results.boxes.xyxy.cpu().numpy()

                if len(xyxy_boxes) == 0:
                    continue # Không phát hiện bbox nào trong khung hình này

                # Duyệt qua từng bbox được phát hiện trong khung hình
                for box in xyxy_boxes:
                    x1, y1, x2, y2 = box

                    # Tạo bbox theo định dạng yêu cầu
                    bbox_data = {
                        "frame": frame_idx,
                        "x1": int(round(x1)),
                        "y1": int(round(y1)),
                        "x2": int(round(x2)),
                        "y2": int(round(y2))
                    }

                    video_bboxes.append(bbox_data)

        except Exception as e:
            print(f"Error while processing video {video_path}: {e}")
            continue

        detections_list = []
        if len(video_bboxes) > 0:
            # Nếu có bbox được phát hiện, đóng gói chúng theo định dạng yêu cầu
            detections_list.append({"bboxes": video_bboxes})
        # Nếu 'video_bboxes' rỗng, 'detections_list' sẽ vẫn là []

        # Tạo JSON cuối cùng cho video này
        final_video_obj = {
            "video_id": video_folder_name,
            "detections": detections_list
        }
        all_predictions.append(final_video_obj)

    # Lưu tất cả dự đoán vào file JSON
    try:
        print(f"\nSaving all {len(all_predictions)} video predictions to {OUTPUT_FILE}...")
        with open(OUTPUT_FILE, 'w') as f:
            json.dump(all_predictions, f, indent=4)
        print("Inference complete.")
    except Exception as e:
        print(f"Error: Could not write output JSON file: {e}")

if __name__ == "__main__":
    run_inference()

Successfully loaded model from /content/best.pt
Found 10 videos to process...


Processing videos: 100%|██████████| 10/10 [15:51<00:00, 95.13s/it]


Saving all 10 video predictions to predictions_best.json...
Inference complete.





## 5. Inference với giải pháp đề xuất:

In [None]:
import time
import csv
import json
import cv2
from tqdm import tqdm
from ultralytics import YOLO
from transformers import AutoModel
from rembg import remove, new_session
from PIL import Image
from torchvision.transforms import v2
import torch.nn.functional as F
from sklearn.cluster import KMeans
from collections import Counter

In [None]:
# --- CÁC HẰNG SỐ CẤU HÌNH ---
MODEL_PATH = '/content/best.pt'
TEST_DATA_DIR = '/content/private_test/samples'
OUTPUT_FILE = 'submission.json'
CONFIDENCE_THRESHOLD = 0.2
DINO_MODEL_NAME = './dinov3_local'
SIM_THRESHOLD = 0.05
COLOR_DISTANCE_THRESHOLD = 87

# --- HẰNG SỐ TRACKING & LOGIC MỚI ---
TRACKING_IOU_THRESHOLD = 0.3
MAX_DRIFT_RATIO = 0.5
HIGH_DRIFT_THRESHOLD = 0.35

MAX_FRAME_GAP_FOR_START = 50
MAX_BLIND_FRAMES = 50
MIN_BOX_SIZE = 10

# =============================================================================
# CÁC HÀM HỖ TRỢ
# =============================================================================

def clip_box(box, img_w, img_h):
    """Kẹp toạ độ vào trong khung ảnh"""
    x1, y1, x2, y2 = box
    x1 = max(0, int(x1))
    y1 = max(0, int(y1))
    x2 = min(img_w, int(x2))
    y2 = min(img_h, int(y2))
    return [x1, y1, x2, y2]

def calculate_iou(box1, box2):
    x1_1, y1_1, x2_1, y2_1 = box1
    x1_2, y1_2, x2_2, y2_2 = box2
    x_left = max(x1_1, x1_2); y_top = max(y1_1, y1_2)
    x_right = min(x2_1, x2_2); y_bottom = min(y2_1, y2_2)
    if x_right < x_left or y_bottom < y_top: return 0.0
    intersection = (x_right - x_left) * (y_bottom - y_top)
    box1_area = (x2_1 - x1_1) * (y2_1 - y1_1)
    box2_area = (x2_2 - x1_2) * (y2_2 - y1_2)
    union = box1_area + box2_area - intersection
    return intersection / (union + 1e-6)

def calculate_center_distance_ratio(box1, box2):
    c1_x = (box1[0] + box1[2]) / 2; c1_y = (box1[1] + box1[3]) / 2
    c2_x = (box2[0] + box2[2]) / 2; c2_y = (box2[1] + box2[3]) / 2
    dist = np.sqrt((c1_x - c2_x)**2 + (c1_y - c2_y)**2)
    avg_width = ((box1[2] - box1[0]) + (box2[2] - box2[0])) / 2
    return dist / (avg_width + 1e-6)

def predict_next_bbox(history_bboxes):
    if len(history_bboxes) < 2: return history_bboxes[-1]
    curr = np.array(history_bboxes[-1])
    prev = np.array(history_bboxes[-2])
    velocity = curr - prev
    pred = curr + velocity
    return pred.tolist()

def get_dominant_color(image, k=3, image_processing_size=(25, 25)):
    if image.shape[:2] != image_processing_size:
        image = cv2.resize(image, image_processing_size, interpolation=cv2.INTER_AREA)
    pixels = image.reshape(-1, 3)
    brightness = np.mean(pixels, axis=1)
    valid_pixels = pixels[(brightness > 20) & (brightness < 235)]
    if len(valid_pixels) == 0: valid_pixels = pixels
    kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
    kmeans.fit(valid_pixels)
    dominant_colors = kmeans.cluster_centers_.astype(int)
    label_counts = Counter(kmeans.labels_)
    sorted_colors = sorted([{'color': tuple(dominant_colors[l]), 'count': c} for l, c in label_counts.items()], key=lambda x: x['count'], reverse=True)
    return sorted_colors[0]['color'], 0, sorted_colors

def calculate_color_distance(rgb1, rgb2):
    return np.sqrt(sum((c1 - c2) ** 2 for c1, c2 in zip(rgb1, rgb2)))

def check_color_similarity(img1_rgb, img2_rgb, distance_threshold=80):
    c1, _, _ = get_dominant_color(img1_rgb, k=1)
    _, _, colors2 = get_dominant_color(img2_rgb, k=2)
    top2_colors_img2 = [c['color'] for c in colors2]
    distances = [calculate_color_distance(c1, c2) for c2 in top2_colors_img2]
    return {'is_similar': any(d <= distance_threshold for d in distances)}

def verify_bbox_with_color_and_dino(crop_img, ref_images_rgb, ref_mean_embed, transform, device, dinov3_model):
    color_match_count = 0
    for ref_img_rgb in ref_images_rgb:
        res = check_color_similarity(ref_img_rgb, crop_img, COLOR_DISTANCE_THRESHOLD)
        if res['is_similar']: color_match_count += 1
    if color_match_count < (len(ref_images_rgb) + 1) // 2: return False

    crop_pil = Image.fromarray(crop_img)
    crop_tensor = transform(crop_pil).unsqueeze(0).to(device)
    with torch.inference_mode():
        crop_embed = dinov3_model(crop_tensor).pooler_output
    sim = F.cosine_similarity(crop_embed, ref_mean_embed).item()
    return sim > SIM_THRESHOLD

def crop_bbox(frame_img, box):
    x1, y1, x2, y2 = map(int, box)
    if x2 <= x1 or y2 <= y1: return None
    h, w = frame_img.shape[:2]
    x1 = max(0, min(x1, w - 1)); y1 = max(0, min(y1, h - 1))
    x2 = max(0, min(x2, w)); y2 = max(0, min(y2, h))
    crop = frame_img[y1:y2, x1:x2]
    return crop if crop.size > 0 else None

def crop_to_content(pil_image):
    try:
        alpha = pil_image.split()[-1]
        bbox = alpha.getbbox()
        if bbox: return pil_image.crop(bbox)
    except: pass
    return pil_image

def make_transform(resize_size=224):
    return v2.Compose([
        v2.ToImage(), v2.Resize((resize_size, resize_size), antialias=True),
        v2.ToDtype(torch.float32, scale=True),
        v2.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
    ])


# =============================================================================
# MAIN INFERENCE LOOP
# =============================================================================

def run_inference():
    try:
        model = YOLO(MODEL_PATH)
        device = "cuda" if torch.cuda.is_available() else "cpu"
        dinov3_model = AutoModel.from_pretrained(DINO_MODEL_NAME).to(device).eval()
        rembg_session = new_session("u2netp")
        transform = make_transform(224)
        print(f"Models loaded on {device}")
    except Exception as e:
        print(f"Error loading models: {e}")
        return

    video_folders = sorted([f for f in os.listdir(TEST_DATA_DIR) if os.path.isdir(os.path.join(TEST_DATA_DIR, f))])
    all_predictions = []

    for video_folder_name in tqdm(video_folders, desc="Processing"):
        video_path = os.path.join(TEST_DATA_DIR, video_folder_name, 'drone_video.mp4')
        if not os.path.exists(video_path): continue

        # --- Load Refs ---
        ref_paths = [os.path.join(TEST_DATA_DIR, video_folder_name, f"object_images/img_{k}.jpg") for k in range(1, 4)]
        ref_embeds, ref_images_rgb = [], []
        for path in ref_paths:
            if not os.path.exists(path): continue
            try:
                orig = Image.open(path).convert("RGB")
                no_bg = remove(orig, session=rembg_session)
                cropped = crop_to_content(no_bg)
                bg = Image.new("RGB", cropped.size, (128, 128, 128))
                bg.paste(cropped, mask=cropped.split()[3] if len(cropped.split())==4 else None)
                ref_images_rgb.append(np.array(bg))
                tensor = transform(bg).unsqueeze(0).to(device)
                with torch.inference_mode(): ref_embeds.append(dinov3_model(tensor).pooler_output)
            except: pass

        if not ref_embeds: continue
        ref_mean_embed = torch.mean(torch.stack(ref_embeds), dim=0)

        # --- INIT VARIABLES ---
        video_bboxes = []
        start_sequence = []
        is_tracking = False
        tracking_history = []
        consecutive_blind_frames = 0

        results_generator = model.predict(video_path, stream=True, conf=CONFIDENCE_THRESHOLD, verbose=False)

        for frame_idx, results in enumerate(results_generator):
            frame_img = results.orig_img
            frame_img_rgb = cv2.cvtColor(frame_img, cv2.COLOR_BGR2RGB)
            h_img, w_img = frame_img.shape[:2]
            yolo_boxes = results.boxes.xyxy.cpu().numpy()

            # ============================================================
            # MODE 1: TRACKING
            # ============================================================
            if is_tracking:
                pred_bbox = predict_next_bbox(tracking_history)
                selected_bbox = None

                # A. YOLO MÙ
                if len(yolo_boxes) == 0:
                    selected_bbox = pred_bbox
                    consecutive_blind_frames += 1

                # B. YOLO CÓ DETECT
                else:
                    best_idx, best_iou, min_dist = -1, -1, float('inf')
                    for idx, box in enumerate(yolo_boxes):
                        w_check = box[2] - box[0]; h_check = box[3] - box[1]
                        if w_check <= MIN_BOX_SIZE or h_check <= MIN_BOX_SIZE: continue

                        iou = calculate_iou(pred_bbox, box)
                        dist = calculate_center_distance_ratio(pred_bbox, box)
                        if iou > best_iou: best_iou, best_idx = iou, idx
                        if dist < min_dist: min_dist = dist

                    if best_idx == -1:
                        selected_bbox = pred_bbox
                        consecutive_blind_frames += 1
                    else:
                        if best_iou > TRACKING_IOU_THRESHOLD or min_dist < MAX_DRIFT_RATIO:
                            selected_bbox = yolo_boxes[best_idx].tolist()
                            consecutive_blind_frames = 0
                        else:
                            selected_bbox = pred_bbox
                            consecutive_blind_frames += 1

                if consecutive_blind_frames >= MAX_BLIND_FRAMES:
                    is_tracking = False; start_sequence = []; tracking_history = []; consecutive_blind_frames = 0
                    continue

                # Dùng hàm clip_box để code gọn và chắc chắn
                selected_bbox = clip_box(selected_bbox, w_img, h_img)
                x1, y1, x2, y2 = selected_bbox

                w_b = x2 - x1
                h_b = y2 - y1

                if w_b <= MIN_BOX_SIZE or h_b <= MIN_BOX_SIZE:
                    is_tracking = False
                    start_sequence = []; tracking_history = []; consecutive_blind_frames = 0
                else:
                    tracking_history.append([x1, y1, x2, y2])
                    if len(tracking_history) > 3: tracking_history.pop(0)
                    video_bboxes.append({"frame": frame_idx, "x1": x1, "y1": y1, "x2": x2, "y2": y2})

            # ============================================================
            # MODE 2: SCANNING
            # ============================================================
            else:
                found_valid = False
                best_box = None

                for box in yolo_boxes:
                    w_scan = box[2] - box[0]; h_scan = box[3] - box[1]
                    if w_scan <= MIN_BOX_SIZE or h_scan <= MIN_BOX_SIZE: continue

                    crop = crop_bbox(frame_img_rgb, box)
                    if crop is None: continue

                    if verify_bbox_with_color_and_dino(crop, ref_images_rgb, ref_mean_embed, transform, device, dinov3_model):
                        found_valid = True
                        best_box = box.tolist()
                        break

                if found_valid:
                    if len(start_sequence) > 0:
                        gap = frame_idx - start_sequence[-1]['frame']
                        if gap > MAX_FRAME_GAP_FOR_START: start_sequence = []

                    # Clip luôn cả bbox tìm được ở Scanning để an toàn
                    best_box = clip_box(best_box, w_img, h_img)
                    x1, y1, x2, y2 = best_box

                    start_sequence.append({"frame": frame_idx, "bbox": best_box})
                    video_bboxes.append({"frame": frame_idx, "x1": x1, "y1": y1, "x2": x2, "y2": y2})

                    if len(start_sequence) >= 3:
                        is_tracking = True
                        consecutive_blind_frames = 0
                        tracking_history = [x['bbox'] for x in start_sequence[-3:]]
                        start_sequence = []
                else:
                    pass

        all_predictions.append({"video_id": video_folder_name, "detections": [{"bboxes": video_bboxes}] if video_bboxes else []})

    with open(OUTPUT_FILE, 'w') as f: json.dump(all_predictions, f, indent=4)
    print("Done.")

if __name__ == "__main__":
    run_inference()

Downloading data from 'https://github.com/danielgatis/rembg/releases/download/v0.0.0/u2netp.onnx' to file '/root/.u2net/u2netp.onnx'.
100%|█████████████████████████████████████| 4.57M/4.57M [00:00<00:00, 2.36GB/s]


Models loaded on cuda


Processing: 100%|██████████| 10/10 [20:44<00:00, 124.42s/it]

Done.



