In [1]:
import numpy as np  
import json
import re
from ultralytics import YOLO
import os, cv2, json, glob, torch
from torchvision import transforms, models
import torch.nn.functional as F
from PIL import Image
import shutil
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import precision_recall_fscore_support, roc_auc_score
from deep_sort_realtime.deepsort_tracker import DeepSort

device = "cuda" if torch.cuda.is_available() else "cpu"

  from .autonotebook import tqdm as notebook_tqdm


### Extract Frame

In [2]:
#----------------- Frame Extraction ------------------
def extract_frames(video_path, save_dir, item):
    item_dir = os.path.join(save_dir, item)
    os.makedirs(item_dir, exist_ok=True)

    cap = cv2.VideoCapture(video_path)
    frame_id = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        cv2.imwrite(f"{item_dir}/frame_{frame_id:05}.jpg", frame)
        frame_id += 1
    cap.release()

#----------------- Frame Extraction All ------------------
def run_extract_frames(dataset_path, save_path):
    import os
    items = os.listdir(dataset_path)
    for item in items:
        video_path = os.path.join(dataset_path, item, "drone_video.mp4")
        if not os.path.exists(video_path):
            print(f"Cannot find: {video_path}")
            continue
        extract_frames(video_path, save_path, item)

In [3]:
current_dir = os.getcwd()
dataset_path = os.path.join(current_dir ,"dataset", "train", "samples")
save_frames_path = os.path.join(current_dir, "dataset", "frames")

In [11]:
run_extract_frames(dataset_path, save_frames_path)

### Train Yolo

In [None]:
# ------------------ Prepare YOLO Dataset ------------------
def prepare_yolo_dataset(annotation_path, save_path, dataset_frames):
    with open(annotation_path) as f:
        annos = json.load(f)

    frames_by_video = {}
    for video in annos:
        video_id = video["video_id"]
        frames_by_video[video_id] = []
        for det in video["annotations"]:
            for bbox in det["bboxes"]:
                frame_num = bbox["frame"]
                frame_path = os.path.join(dataset_frames, video_id, f"frame_{frame_num:05d}.jpg")
                if not os.path.exists(frame_path):
                    print(f"Frame not found: {frame_path}")
                    continue
                frames_by_video[video_id].append({
                    "frame_path": frame_path,
                    "bbox": bbox
                })

    for split in ["train", "val"]:
        os.makedirs(os.path.join(save_path, f"images/{split}"), exist_ok=True)
        os.makedirs(os.path.join(save_path, f"labels/{split}"), exist_ok=True)

    for video_id, frames in frames_by_video.items():
        train_frames, val_frames = train_test_split(frames, test_size=0.2, random_state=42)

        for split, frame_list in zip(["train", "val"], [train_frames, val_frames]):
            img_dir = os.path.join(save_path, f"images/{split}", video_id)
            lbl_dir = os.path.join(save_path, f"labels/{split}", video_id)
            os.makedirs(img_dir, exist_ok=True)
            os.makedirs(lbl_dir, exist_ok=True)

            for item in frame_list:
                frame_path = item["frame_path"]
                bbox = item["bbox"]
                img = cv2.imread(frame_path)
                if img is None:
                    continue
                h, w = img.shape[:2]
                x1, y1, x2, y2 = bbox["x1"], bbox["y1"], bbox["x2"], bbox["y2"]
                x_center = ((x1 + x2)/2) / w
                y_center = ((y1 + y2)/2) / h
                bw = (x2 - x1) / w
                bh = (y2 - y1) / h

                dst_img_path = os.path.join(img_dir, os.path.basename(frame_path))
                shutil.copy(frame_path, dst_img_path)

                txt_file = os.path.join(lbl_dir, os.path.basename(frame_path).replace(".jpg", ".txt"))
                with open(txt_file, "w") as ftxt:
                    ftxt.write(f"0 {x_center} {y_center} {bw} {bh}\n")

    print("Prepare YOLO dataset successfully.")


In [None]:
annotation_path = os.path.join(current_dir, "dataset", "train", "annotations", "annotations.json")
dataset_frames = os.path.join(current_dir, "dataset", "frames")
save_path = os.path.join(current_dir,"dataset", "yolo_dataset")

In [None]:
prepare_yolo_dataset(annotation_path, save_path, dataset_frames)

In [None]:
data_dir = os.path.join(current_dir, "dataset", "data.yaml")

model = YOLO("yolov8s.pt")  

# --------------- Training YOLO ------------------
model.train(
    data=data_dir,
    epochs=10,
    imgsz=640,
    batch=16,
    project="runs/train",
    name="drone_detect_object",
    exist_ok=True,
    half=True
)

In [None]:
model = YOLO("yolov8s.pt")
data_dir = os.path.join(current_dir, "dataset", "data.yaml")

# --------------- Training YOLO v·ªõi c·∫•u h√¨nh t·ªëi ∆∞u ------------------
results = model.train(
    data=str(data_dir),
    
    degrees=5.0,
    translate=0.1,
    scale=0.3,
    fliplr=0.5,
    hsv_h=0.015,
    hsv_s=0.7,
    hsv_v=0.4,
    mosaic=1.0,
    mixup=0.0, 

    epochs=5,  
    imgsz=640,
    batch=32,  
    
    optimizer='AdamW',
    lr0=0.01,  
    lrf=0.01, 
    momentum=0.937,
    weight_decay=0.0005,

    device=device,
    amp=True,   
    
    project="runs/train",
    name="drone_detect_object",
    exist_ok=True,
    
    patience=5,  
    save=True,   
    save_period=1, 
    
    val=True,
    plots=True, 
)


New https://pypi.org/project/ultralytics/8.3.229 available üòÉ Update with 'pip install -U ultralytics'
Ultralytics 8.3.228 üöÄ Python-3.11.13 torch-2.2.2+cu121 CUDA:0 (NVIDIA GeForce RTX 4060 Laptop GPU, 8188MiB)
[34m[1mengine/trainer: [0magnostic_nms=False, amp=True, augment=False, auto_augment=randaugment, batch=16, bgr=0.0, box=7.5, cache=False, cfg=None, classes=None, close_mosaic=10, cls=0.5, compile=False, conf=None, copy_paste=0.0, copy_paste_mode=flip, cos_lr=False, cutmix=0.0, data=/home/nguyen/ZaloAI/dataset/data.yaml, degrees=0.0, deterministic=True, device=0, dfl=1.5, dnn=False, dropout=0.0, dynamic=False, embed=None, epochs=15, erasing=0.4, exist_ok=True, fliplr=0.5, flipud=0.0, format=torchscript, fraction=1.0, freeze=None, half=False, hsv_h=0.015, hsv_s=0.7, hsv_v=0.4, imgsz=640, int8=False, iou=0.7, keras=False, kobj=1.0, line_width=None, lr0=0.01, lrf=0.01, mask_ratio=4, max_det=300, mixup=0.0, mode=train, model=yolov8s.pt, momentum=0.937, mosaic=1.0, multi_scale

### Run with YOLO, Resnet50 

In [None]:
# ------------------ CNN Embedding Model ------------------
def load_cnn_model():
    weights = models.ResNet50_Weights.IMAGENET1K_V1
    cnn = models.resnet50(weights=weights)
    cnn.fc = torch.nn.Identity()
    preprocess = weights.transforms()
    
    cnn = cnn.to(device).half().eval()  
    return cnn, preprocess

def get_embedding(cnn, preprocess, img):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = Image.fromarray(img)

    img_t = preprocess(img).unsqueeze(0).to(device).float()
    img_t = img_t.half()  

    with torch.no_grad():
        return cnn(img_t) 



# ------------------ Load Reference Embeddings ------------------
def load_reference_embeddings(ref_dir, cnn, preprocess):
    os.makedirs(ref_dir, exist_ok=True)
    print(f"Loading reference from: {ref_dir}")
    
    all_embs = []
    
    for item in os.listdir(ref_dir):
        item_dir = os.path.join(ref_dir, item, "object_images")
        os.makedirs(item_dir, exist_ok=True)
        
        for p in glob.glob(os.path.join(item_dir, "*.jpg")):
            img = cv2.imread(p)
            if img is None:
                continue
            
            emb = get_embedding(cnn, preprocess, img)
            all_embs.append(emb)
    
    if len(all_embs) == 0:
        print("No reference images found")
        return torch.empty((0, 512))
    
    ref_embs = torch.cat(all_embs, dim=0)
    return ref_embs


# ------------------ Frame Extractor ------------------
def run_extract_frames(video_root, save_root):
    os.makedirs(save_root, exist_ok=True)
    os.makedirs(video_root, exist_ok=True)
    
    for video_name in os.listdir(video_root):
        if not video_name.endswith(".mp4"):
            continue
        
        video_path = os.path.join(video_root, video_name)
        video_id = os.path.splitext(video_name)[0]
        out_dir = os.path.join(save_root, video_id)
        os.makedirs(out_dir, exist_ok=True)
        
        cap = cv2.VideoCapture(video_path)
        idx = 0
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            cv2.imwrite(os.path.join(out_dir, f"{idx}.jpg"), frame)
            idx += 1
        
        cap.release()
        print(f"- Extracted {idx} frames for {video_id}")


# ------------------ YOLO Model  ------------------
def load_yolo_model(weights_path):
    if not os.path.exists(weights_path):
        os.makedirs(os.path.dirname(weights_path), exist_ok=True)
        raise FileNotFoundError(f"YOLO WEIGHTS NOT FOUND: {weights_path}")
    
    model = YOLO(weights_path)
    model.model.to(device)  
    return model


# ------------------ Inference with YOLO + Tracking ------------------
def inference_frames(frames_root, yolo, cnn, preprocess, ref_embs, threshold=0.5):
    os.makedirs(frames_root, exist_ok=True)
    results_all = []

    tracker = DeepSort(
        max_age=30,
        n_init=2,
        max_cosine_distance=0.3,
        nn_budget=None
    )

    for video_id in sorted(os.listdir(frames_root)):
        frame_dir = os.path.join(frames_root, video_id)
        if not os.path.isdir(frame_dir):
            continue

        video_json = {"video_id": video_id, "detections": []}
        frame_files = sorted(os.listdir(frame_dir))

        for name in frame_files:
            path = os.path.join(frame_dir, name)
            img = cv2.imread(path)
            if img is None:
                continue

            frame_number = int(re.search(r'\d+', name.split('.')[0]).group())
            results = yolo.predict(img, verbose=False, half=True)[0]  
            detections = []
            for box, conf, cls in zip(
                results.boxes.xyxy.cpu().numpy(),
                results.boxes.conf.cpu().numpy(),
                results.boxes.cls.cpu().numpy(),
            ):
                x1, y1, x2, y2 = map(int, box)
                w, h = x2 - x1, y2 - y1
                detections.append([[x1, y1, w, h], float(conf), cls])

            tracks = tracker.update_tracks(detections, frame=img)
            frame_boxes = []

            for track in tracks:
                if not track.is_confirmed():
                    continue

                track_id = track.track_id
                l, t, r, b = track.to_ltrb()
                x1, y1, x2, y2 = map(int, [l, t, r, b])

                crop = img[y1:y2, x1:x2]
                if crop.size == 0:
                    continue

                emb = get_embedding(cnn, preprocess, crop) 

                if ref_embs.numel() == 0:
                    similarity = 1.0
                else:
                    similarity = F.cosine_similarity(emb, ref_embs.to(device).half()).max().item()

                if similarity >= threshold:
                    frame_boxes.append({
                        "frame": frame_number,
                        "x1": x1, "y1": y1, "x2": x2, "y2": y2
                    })
                    print(f"[{video_id}] Frame {frame_number}, ID={track_id}, Sim={similarity:.3f} ‚Üí DETECTED")

            if frame_boxes:
                video_json["detections"].append({"bboxes": frame_boxes})

        results_all.append(video_json)
    return results_all

# ------------------ Save Results ------------------
def save_results_json(results, save_path):
    """Save detection results to JSON file"""
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    
    with open(save_path, "w") as f:
        json.dump(results, f, indent=4)
    
    print(f"Saved JSON to: {save_path}")


In [5]:
# ------------------ Main Pipeline ------------------
def run_pipeline(
    dataset_path, 
    save_frames_path, 
    save_json_path, 
    yolo_weights, 
    threshold=0.5
):
    # run_extract_frames(dataset_path, save_frames_path)
    cnn, preprocess = load_cnn_model()
    yolo_model = load_yolo_model(yolo_weights)
    ref_embs = load_reference_embeddings(dataset_path, cnn, preprocess)
    results = inference_frames(
        save_frames_path, 
        yolo_model, 
        cnn, 
        preprocess, 
        ref_embs, 
        threshold=threshold
    )
    save_results_json(results, save_json_path)

In [8]:
# ------------------ Run ------------------
current_dir = os.getcwd()
dataset_path = os.path.join(current_dir, "public_test", "samples")      
save_frames_path = os.path.join(current_dir, "public_test", "dataset", "frames")   
save_json_path = os.path.join(current_dir, "public_test", "test_results.json")
yolo_weights = os.path.join(current_dir, "runs", "train", "drone_detect_object", "weights", "best.pt")
# yolo_weights = os.path.join(current_dir, "yolov8s.pt")

run_pipeline(dataset_path, save_frames_path, save_json_path, yolo_weights, threshold=0.4)

Loading reference from: /home/nguyen/ZaloAI/public_test/samples
[BlackBox_0] Frame 57, ID=3, Sim=0.595 ‚Üí DETECTED
[BlackBox_0] Frame 58, ID=3, Sim=0.586 ‚Üí DETECTED
[BlackBox_0] Frame 59, ID=3, Sim=0.585 ‚Üí DETECTED
[BlackBox_0] Frame 60, ID=3, Sim=0.582 ‚Üí DETECTED
[BlackBox_0] Frame 61, ID=3, Sim=0.606 ‚Üí DETECTED
[BlackBox_0] Frame 62, ID=3, Sim=0.583 ‚Üí DETECTED
[BlackBox_0] Frame 63, ID=3, Sim=0.566 ‚Üí DETECTED
[BlackBox_0] Frame 64, ID=3, Sim=0.589 ‚Üí DETECTED
[BlackBox_0] Frame 65, ID=3, Sim=0.610 ‚Üí DETECTED
[BlackBox_0] Frame 66, ID=3, Sim=0.592 ‚Üí DETECTED
[BlackBox_0] Frame 67, ID=3, Sim=0.583 ‚Üí DETECTED
[BlackBox_0] Frame 68, ID=3, Sim=0.570 ‚Üí DETECTED
[BlackBox_0] Frame 69, ID=3, Sim=0.601 ‚Üí DETECTED
[BlackBox_0] Frame 70, ID=3, Sim=0.588 ‚Üí DETECTED
[BlackBox_0] Frame 71, ID=3, Sim=0.594 ‚Üí DETECTED
[BlackBox_0] Frame 72, ID=3, Sim=0.596 ‚Üí DETECTED
[BlackBox_0] Frame 73, ID=3, Sim=0.600 ‚Üí DETECTED
[BlackBox_0] Frame 74, ID=3, Sim=0.602 ‚Üí DETECTED
