In [None]:
from pathlib import Path
import json
import shutil
from typing import Any, Dict, List, Optional, Set, Tuple
from PIL import Image
from tqdm import tqdm

In [None]:
# 학습(train) 세트 경로
TRAIN_ROOT: Path = Path("dataset/traffic_data/training/bbox_tr")
TRAIN_LABEL_DIR: Path = TRAIN_ROOT / "label_bbox"
TRAIN_IMAGE_DIR: Path = TRAIN_ROOT / "image_bbox"

# 검증(val) 세트 경로
VAL_ROOT: Path = Path("dataset/traffic_data/validation/bbox_val")
VAL_LABEL_DIR: Path = VAL_ROOT / "label_bbox"
VAL_IMAGE_DIR: Path = VAL_ROOT / "image_bbox"

# 출력 루트
OUT_ROOT: Path = Path("yolo_transfer_all")

# 실행 옵션
OVERWRITE_LABELS: bool = True
COPY_IMAGES: bool = True
DRY_RUN: bool = False

# 카테고리 통합 규칙
TARGET_CLASS_ORDER: List[str] = [
    "승용차",
    "버스",
    "트럭",
    "오토바이(자전거)",
    "분류없음"
]

MERGE_TO: Dict[str, str] = {
    "소형버스": "버스",
    "대형버스": "버스"
}

DROP_SET: Set[str] = {
    "대형 트레일러",
    "대형트레일러",
    "보행자"
}

In [None]:
def ensure_dir(path: Path) -> None:
    path.mkdir(parents=True, exist_ok=True)

def build_camera_index(images_root: Path) -> Dict[str, List[Path]]:
    cam_index: Dict[str, List[Path]] = {}
    if not images_root.exists():
        return cam_index
    for site1 in images_root.iterdir():
        if not site1.is_dir():
            continue
        for site2 in site1.iterdir():
            if not site2.is_dir():
                continue
            for cam_dir in site2.iterdir():
                if cam_dir.is_dir():
                    cam_index.setdefault(cam_dir.name, []).append(cam_dir)
    return cam_index

def find_image_path_from_index(cam_index: Dict[str, List[Path]], camera_id: str, filename: str) -> Optional[Path]:
    for d in cam_index.get(camera_id, []):
        candidate = d / filename
        if candidate.exists():
            return candidate
    return None

In [None]:
def xyxy_to_yolo(x1: float, y1: float, x2: float, y2: float, w: int, h: int) -> Tuple[float, float, float, float]:
    x1c = float(min(max(x1, 0.0), float(w)))
    y1c = float(min(max(y1, 0.0), float(h)))
    x2c = float(min(max(x2, 0.0), float(w)))
    y2c = float(min(max(y2, 0.0), float(h)))
    xmin, xmax = min(x1c, x2c), max(x1c, x2c)
    ymin, ymax = min(y1c, y2c), max(y1c, y2c)
    bw, bh = max(xmax - xmin, 1e-6), max(ymax - ymin, 1e-6)
    cx, cy = xmin + bw / 2.0, ymin + bh / 2.0
    return cx / w, cy / h, bw / w, bh / h

In [None]:
# 3단계 분류 제거 → 항상 단일 폴더 all_data 사용
def decide_stage(weather: str, time_space: str) -> str:
    return "all_data"


def process_coco_json_fixed_subset(
    json_path: Path,
    cam_index: Dict[str, List[Path]],
    out_root: Path,
    id_to_idx: Dict[int, int],
    overwrite_labels: bool,
    copy_images: bool,
    dry_run: bool,
    subset_name: str
) -> Dict[str, int]:
    counts = {"copied": 0, "labeled": 0, "missing_image": 0}

    with open(json_path, "r", encoding="utf-8") as f:
        coco_any: Any = json.load(f)

    meta_by_id = {int(m["id"]): m for m in coco_any.get("meta", [])}
    images = list(coco_any.get("images", []))
    anns = list(coco_any.get("annotations", []))

    anns_by_image: Dict[int, List[Dict[str, Any]]] = {}
    for ann in anns:
        anns_by_image.setdefault(int(ann["image_id"]), []).append(ann)

    # 단일 폴더 all_data 생성
    ensure_dir(out_root / "all_data" / subset_name / "images")
    ensure_dir(out_root / "all_data" / subset_name / "labels")

    for img in images:
        img_id = int(img["id"])
        meta_id = int(img["meta_id"])
        file_name = str(img["file_name"])

        camera_id = file_name.split("/")[0]
        fname = file_name.split("/")[-1]

        src_path_opt = find_image_path_from_index(cam_index, camera_id, fname)
        if src_path_opt is None or not src_path_opt.exists():
            counts["missing_image"] += 1
            continue
        src_path = src_path_opt

        dst_img_dir = out_root / "all_data" / subset_name / "images" / camera_id
        dst_lbl_dir = out_root / "all_data" / subset_name / "labels" / camera_id
        ensure_dir(dst_img_dir)
        ensure_dir(dst_lbl_dir)

        dst_img_path = dst_img_dir / fname
        dst_lbl_path = dst_lbl_dir / (Path(fname).stem + ".txt")

        with Image.open(src_path) as im:
            w, h = im.size

        lines: List[str] = []
        for ann_item in anns_by_image.get(img_id, []):
            bboxes = [list(map(float, b)) for b in ann_item.get("bbox", [])]
            cat_ids = [int(c) for c in ann_item.get("category_id", [])]
            for b, c in zip(bboxes, cat_ids):
                if c not in id_to_idx:
                    continue
                x1, y1, x2, y2 = b
                cxn, cyn, wn, hn = xyxy_to_yolo(x1, y1, x2, y2, w, h)
                cls_idx = id_to_idx[c]
                lines.append(f"{cls_idx} {cxn:.6f} {cyn:.6f} {wn:.6f} {hn:.6f}")

        if not dry_run:
            if overwrite_labels or not dst_lbl_path.exists():
                with open(dst_lbl_path, "w", encoding="utf-8") as lf:
                    lf.write("\n".join(lines))
            if copy_images and not dst_img_path.exists():
                shutil.copy2(src_path, dst_img_path)
        counts["labeled"] += 1
        if copy_images:
            counts["copied"] += 1

    return counts

In [None]:
def build_category_maps(first_json_path: Path) -> Tuple[Dict[int, int], Dict[int, str]]:
    with open(first_json_path, "r", encoding="utf-8") as f0:
        first_any: Any = json.load(f0)
    categories: List[Dict[str, Any]] = list(first_any.get("categories", []))

    name_to_id = {str(cat["name"]): int(cat["id"]) for cat in categories}
    target_index_of = {name: i for i, name in enumerate(TARGET_CLASS_ORDER)}

    id_to_idx: Dict[int, int] = {}
    for cname, cid in name_to_id.items():
        if cname in DROP_SET:
            continue
        final_name = MERGE_TO.get(cname, cname)
        if final_name not in target_index_of:
            continue
        id_to_idx[cid] = target_index_of[final_name]

    id_to_name = {idx: name for name, idx in target_index_of.items()}
    return id_to_idx, id_to_name

In [None]:
def run_pipeline() -> Dict[str, int]:
    ensure_dir(OUT_ROOT)
    train_cam_index = build_camera_index(TRAIN_IMAGE_DIR)
    val_cam_index = build_camera_index(VAL_IMAGE_DIR)

    train_json_files = sorted(TRAIN_LABEL_DIR.rglob("*.json"))
    val_json_files = sorted(VAL_LABEL_DIR.rglob("*.json"))
    if not train_json_files or not val_json_files:
        raise FileNotFoundError("Train/Val JSON missing.")

    id_to_idx, id_to_name = build_category_maps(train_json_files[0])
    total = {"copied": 0, "labeled": 0, "missing_image": 0}

    for jp in tqdm(train_json_files, desc="train json files"):
        c = process_coco_json_fixed_subset(
            json_path=jp,
            cam_index=train_cam_index,
            out_root=OUT_ROOT,
            id_to_idx=id_to_idx,
            overwrite_labels=OVERWRITE_LABELS,
            copy_images=COPY_IMAGES,
            dry_run=DRY_RUN,
            subset_name="train"
        )
        for k in total.keys():
            total[k] += c[k]

    for jp in tqdm(val_json_files, desc="val json files"):
        c = process_coco_json_fixed_subset(
            json_path=jp,
            cam_index=val_cam_index,
            out_root=OUT_ROOT,
            id_to_idx=id_to_idx,
            overwrite_labels=OVERWRITE_LABELS,
            copy_images=COPY_IMAGES,
            dry_run=DRY_RUN,
            subset_name="val"
        )
        for k in total.keys():
            total[k] += c[k]

    yaml_path = OUT_ROOT / "all_data" / "data.yaml"
    ensure_dir(yaml_path.parent)
    idx_names = [v for _, v in sorted(id_to_name.items())]
    with open(yaml_path, "w", encoding="utf-8") as yf:
        yf.write(
            f"path: {OUT_ROOT / 'all_data'}\n"
            f"train: train/images\n"
            f"val: val/images\n"
            f"nc: {len(idx_names)}\n"
            f"names: {idx_names}\n"
        )

    return total

In [None]:
summary = run_pipeline()

print("Done")
print(f"copied -> {summary.get('copied', 0)}")
print(f"labeled -> {summary.get('labeled', 0)}")
print(f"missing_image -> {summary.get('missing_image', 0)}")