In [1]:
# -*- coding: utf-8 -*-

import json
from pathlib import Path
from typing import Dict, Optional

import pandas as pd

from mmengine.config import Config
from mmengine.runner import Runner
from mmdet.utils import register_all_modules

MMD_ROOT = Path("/data/ephemeral/home/model/baseline/mmdetection")

FULL_DATA_ROOT = Path("/data/ephemeral/home/model/dataset")
TEST_JSON_FULL = FULL_DATA_ROOT / "test.json"

SAMPLE_SUB_DIR = Path("/data/ephemeral/home/model/sample_submission")

WORK_DIR = Path("/data/ephemeral/home/model/work_dirs_single")
EXP_NAME = "cascade_rcnn_r50_1024_single"
EXP_WORK_DIR = WORK_DIR / EXP_NAME

IMAGE_SCALE = (1024, 1024)

CLASSES = (
    "General trash",
    "Paper",
    "Paper pack",
    "Metal",
    "Glass",
    "Plastic",
    "Styrofoam",
    "Plastic bag",
    "Battery",
    "Clothing",
)

print("EXP_WORK_DIR:", EXP_WORK_DIR)


EXP_WORK_DIR: /data/ephemeral/home/model/work_dirs_single/cascade_rcnn_r50_1024_single


In [2]:
def pick_sample_csv(sample_dir: Path) -> Path:
    csvs = sorted(sample_dir.glob("*.csv"))
    if not csvs:
        raise FileNotFoundError(f"샘플 제출 csv가 없습니다: {sample_dir}")

    for c in csvs:
        if "mmdetection" in c.name.lower():
            return c

    return csvs[0]

sample_csv = pick_sample_csv(SAMPLE_SUB_DIR)
print("Using sample csv:", sample_csv)


Using sample csv: /data/ephemeral/home/model/sample_submission/faster_rcnn_mmdetection_submission.csv


In [3]:
def find_checkpoint(work_dir: Path) -> Optional[Path]:
    last_txt = work_dir / "last_checkpoint"
    if last_txt.exists():
        p = last_txt.read_text().strip()
        if p:
            ck = Path(p)
            if ck.exists():
                return ck

    ckpts = sorted(work_dir.glob("*.pth"))
    if ckpts:
        return ckpts[-1]

    return None

ckpt = find_checkpoint(EXP_WORK_DIR)
print("checkpoint:", ckpt)

if ckpt is None:
    raise FileNotFoundError("checkpoint를 찾지 못했습니다. 학습 work_dir을 확인하세요.")


checkpoint: /data/ephemeral/home/model/work_dirs_single/cascade_rcnn_r50_1024_single/epoch_12.pth


In [4]:
def set_img_scale(pipeline, scale):
    for t in pipeline:
        if isinstance(t, list):
            set_img_scale(t, scale)
            continue
        if not isinstance(t, dict):
            continue

        if t.get("type") in ("Resize", "RandomResize", "RandomChoiceResize"):
            if "scale" in t:
                t["scale"] = scale
            if "img_scale" in t:
                t["img_scale"] = scale
            if "scales" in t:
                t["scales"] = [scale]

        if "transforms" in t:
            set_img_scale(t["transforms"], scale)

def set_num_classes(model_cfg, num_classes: int):
    if isinstance(model_cfg, dict):
        if "num_classes" in model_cfg:
            model_cfg["num_classes"] = num_classes
        for v in model_cfg.values():
            set_num_classes(v, num_classes)
    elif isinstance(model_cfg, list):
        for v in model_cfg:
            set_num_classes(v, num_classes)

def build_test_cfg(ckpt_path: Path) -> Config:
    register_all_modules(init_default_scope=True)

    base_cfg_rel = "configs/cascade_rcnn/cascade-rcnn_r50_fpn_1x_coco.py"
    base_cfg_path = MMD_ROOT / base_cfg_rel
    if not base_cfg_path.exists():
        raise FileNotFoundError(f"base config 없음: {base_cfg_path}")

    cfg = Config.fromfile(str(base_cfg_path))
    cfg.default_scope = "mmdet"

    # data_root
    cfg.data_root = str(FULL_DATA_ROOT)

    # test_dataloader 보정
    if "test_dataloader" not in cfg:
        cfg.test_dataloader = cfg.val_dataloader

    loader = cfg.test_dataloader
    ds_cfg = loader["dataset"] if "dataset" in loader else loader

    ds_cfg.metainfo = dict(classes=CLASSES)
    ds_cfg.data_root = str(FULL_DATA_ROOT)
    ds_cfg.ann_file = str(TEST_JSON_FULL)
    ds_cfg.data_prefix = dict(img="")

    # pipeline 스케일 고정
    if hasattr(ds_cfg, "pipeline"):
        set_img_scale(ds_cfg.pipeline, IMAGE_SCALE)

    # 클래스 수 고정
    set_num_classes(cfg.model, len(CLASSES))

    # 배치/워커
    cfg.test_dataloader.batch_size = 1
    cfg.test_dataloader.num_workers = 2

    # COCO bbox json만 출력
    out_prefix = EXP_WORK_DIR / f"{EXP_NAME}_test"

    cfg.test_evaluator = dict(
        type="CocoMetric",
        ann_file=str(TEST_JSON_FULL),
        metric="bbox",
        format_only=True,
        outfile_prefix=str(out_prefix),
    )

    # checkpoint 로드
    cfg.load_from = str(ckpt_path)

    # 추론 전용으로 학습 관련 제거
    cfg.train_dataloader = None
    cfg.train_cfg = None
    cfg.optim_wrapper = None
    if hasattr(cfg, "param_scheduler"):
        cfg.param_scheduler = None
    if hasattr(cfg, "val_dataloader"):
        cfg.val_dataloader = None
    if hasattr(cfg, "val_cfg"):
        cfg.val_cfg = None
    if hasattr(cfg, "val_evaluator"):
        cfg.val_evaluator = None

    # work_dir 고정
    cfg.work_dir = str(EXP_WORK_DIR)

    return cfg


In [5]:
def parse_test_images(test_json: Path) -> Dict[int, str]:
    with open(test_json, "r") as f:
        data = json.load(f)
    return {img["id"]: img["file_name"] for img in data["images"]}

def coco_bbox_json_to_prediction_dict(
    bbox_json: Path,
    test_json: Path,
    score_thr: float = 0.05
) -> Dict[str, str]:
    id_to_fname = parse_test_images(test_json)

    with open(bbox_json, "r") as f:
        dets = json.load(f)

    per_image = {}
    for d in dets:
        score = float(d.get("score", 0.0))
        if score < score_thr:
            continue

        img_id = int(d["image_id"])
        cat_id = int(d["category_id"])

        x, y, w, h = d["bbox"]
        x1, y1, x2, y2 = x, y, x + w, y + h

        per_image.setdefault(img_id, []).append((cat_id, score, x1, y1, x2, y2))

    pred_dict = {}
    for img_id, fname in id_to_fname.items():
        preds = per_image.get(img_id, [])
        preds = sorted(preds, key=lambda x: x[1], reverse=True)

        tokens = []
        for cat, sc, x1, y1, x2, y2 in preds:
            tokens.extend([
                str(cat),
                f"{sc:.6f}",
                f"{x1:.1f}",
                f"{y1:.1f}",
                f"{x2:.1f}",
                f"{y2:.1f}",
            ])

        pred_dict[fname] = " ".join(tokens)

    return pred_dict

def save_submission_with_sample(
    pred_dict: Dict[str, str],
    sample_csv_path: Path,
    out_csv_path: Path
) -> None:
    sample = pd.read_csv(sample_csv_path)

    if "image_id" not in sample.columns or "PredictionString" not in sample.columns:
        raise ValueError(f"샘플 컬럼이 예상과 다름: {list(sample.columns)}")

    # 행/열 순서를 건드리지 않고 PredictionString만 교체
    sample["PredictionString"] = sample["image_id"].map(lambda x: pred_dict.get(x, ""))

    out_csv_path.parent.mkdir(parents=True, exist_ok=True)
    sample.to_csv(out_csv_path, index=False)

    print("Saved submission:", out_csv_path)


In [6]:
test_cfg = build_test_cfg(ckpt)
runner = Runner.from_cfg(test_cfg)

# COCO bbox json 생성
runner.test()

# 생성된 bbox json 경로 탐색
bbox_json = EXP_WORK_DIR / f"{EXP_NAME}_test.bbox.json"
if not bbox_json.exists():
    cand = sorted(EXP_WORK_DIR.glob("*_test.bbox.json"))
    if cand:
        bbox_json = cand[-1]

print("bbox json:", bbox_json)
if not bbox_json.exists():
    raise FileNotFoundError("bbox json 생성 실패")

# PredictionString dict 생성
pred_dict = coco_bbox_json_to_prediction_dict(
    bbox_json=bbox_json,
    test_json=TEST_JSON_FULL,
    score_thr=0.05
)

# 최종 제출 저장
out_csv = EXP_WORK_DIR / f"{EXP_NAME}_submission_full.csv"
save_submission_with_sample(pred_dict, sample_csv, out_csv)

print("단일 모델 전체 테스트 추론 완료")
print("제출 파일:", out_csv)


12/06 09:52:46 - mmengine - [4m[97mINFO[0m - 
------------------------------------------------------------
System environment:
    sys.platform: linux
    Python: 3.10.13 (main, Sep 11 2023, 13:44:35) [GCC 11.2.0]
    CUDA available: True
    MUSA available: False
    numpy_random_seed: 427661093
    GPU 0: Tesla V100-SXM2-32GB
    CUDA_HOME: None
    GCC: n/a
    PyTorch: 2.1.2+cu121
    PyTorch compiling details: PyTorch built with:
  - GCC 9.3
  - C++ Version: 201703
  - Intel(R) oneAPI Math Kernel Library Version 2022.2-Product Build 20220804 for Intel(R) 64 architecture applications
  - Intel(R) MKL-DNN v3.1.1 (Git Hash 64f6bcbcbab628e96f33a62c3e975f8535a7bde4)
  - OpenMP 201511 (a.k.a. OpenMP 4.5)
  - LAPACK is enabled (usually provided by MKL)
  - NNPACK is enabled
  - CPU capability usage: AVX512
  - CUDA Runtime 12.1
  - NVCC architecture flags: -gencode;arch=compute_50,code=sm_50;-gencode;arch=compute_60,code=sm_60;-gencode;arch=compute_70,code=sm_70;-gencode;arch=compute_

/bin/sh: 1: gcc: not found


12/06 09:52:47 - mmengine - [4m[97mINFO[0m - Config:
auto_scale_lr = dict(base_batch_size=16, enable=False)
backend_args = None
data_root = '/data/ephemeral/home/model/dataset'
dataset_type = 'CocoDataset'
default_hooks = dict(
    checkpoint=dict(interval=1, type='CheckpointHook'),
    logger=dict(interval=50, type='LoggerHook'),
    param_scheduler=dict(type='ParamSchedulerHook'),
    sampler_seed=dict(type='DistSamplerSeedHook'),
    timer=dict(type='IterTimerHook'),
    visualization=dict(type='DetVisualizationHook'))
default_scope = 'mmdet'
env_cfg = dict(
    cudnn_benchmark=False,
    dist_cfg=dict(backend='nccl'),
    mp_cfg=dict(mp_start_method='fork', opencv_num_threads=0))
load_from = '/data/ephemeral/home/model/work_dirs_single/cascade_rcnn_r50_1024_single/epoch_12.pth'
log_level = 'INFO'
log_processor = dict(by_epoch=True, type='LogProcessor', window_size=50)
model = dict(
    backbone=dict(
        depth=50,
        frozen_stages=1,
        init_cfg=dict(checkpoint='to