In [1]:
!pip install torch torchvision pillow

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [6]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hemocytometer DETECT + COUNT (모든 라벨 -> 'cell'), NO argparse, FOLDER 입력.
- 입력: Roboflow TensorFlow export를 이미 풀어둔 폴더
        (예) DATASET/
             ├─ train/
             │   ├─ _annotations.csv
             │   └─ *.jpg|png|tif...
             └─ valid/
                 ├─ _annotations.csv
                 └─ *.jpg|png|tif...
- 모델: torchvision SSDLite320 MobileNetV3 (경량) 단일 클래스 학습
- 출력: OUT_DIR/
        ├─ models/best.pt
        ├─ logs.txt
        ├─ viz_val/*.png   (박스 + gt/pred 총계)
        ├─ viz_test/*.png
        ├─ report_val.csv  (image, gt_count, pred_count)
        └─ report_test.csv
"""

# =======================
# CONFIG (필요시 수정)
# =======================
CONFIG = {
    "IMAGE_SIZE": 640,
    "BATCH": 8,
    "EPOCHS": 30,
    "LR": 3e-4,
    "WD": 5e-4,
    "CONF_THRESH": 0.25,
    "IOU_MATCH": 0.50,
    "USE_CPU": False,
    "SEED": 1337,
    "VAL_TEST_SPLIT": 0.5,   # valid를 val/test 반반
}

# =======================
# Imports
# =======================
import os, io, csv, random, math, time
from pathlib import Path
from collections import defaultdict

import numpy as np
from PIL import Image, ImageDraw, ImageFont

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import torchvision
from torchvision.transforms.functional import to_tensor
from torchvision.ops import nms, box_iou

from torchvision.models.detection.ssdlite import SSDLiteClassificationHead

# =======================
# Utils
# =======================
def set_seed(s=1337):
    random.seed(s); np.random.seed(s); torch.manual_seed(s); torch.cuda.manual_seed_all(s)

def ensure_dir(p):
    Path(p).mkdir(parents=True, exist_ok=True)

def log(msg, fp=None):
    print(msg, flush=True)
    if fp: fp.write(msg + "\n"); fp.flush()

def load_font():
    try:
        return ImageFont.truetype("arial.ttf", 18)
    except:
        try:
            return ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 18)
        except:
            return ImageFont.load_default()

def _csv_value(row, *keys, default=None):
    for k in keys:
        if k in row and row[k] != "":
            return row[k]
    return default


# =======================
# Data Loading (from DIR)
# =======================
def read_split_from_dir(base_dir, split):  # split in {"train","valid"}
    """
    base_dir/train/_annotations.csv, base_dir/valid/_annotations.csv
    리턴: items = [{path: 이미지 절대경로, W,H, boxes:[[x1,y1,x2,y2],...], labels:[1,...]}]
    """
    base_dir = Path(base_dir)
    csv_path = base_dir / split / "_annotations.csv"
    if not csv_path.exists():
        raise FileNotFoundError(f"{csv_path} not found")

    by_file = defaultdict(list)
    with open(csv_path, "r", encoding="utf-8") as f:
        rows = csv.DictReader(f)
        for r in rows:
            fn = _csv_value(r, "filename", "file", "image")
            if not fn:
                continue
            W = int(float(_csv_value(r, "width", default="0")))
            H = int(float(_csv_value(r, "height", default="0")))
            xmin = float(_csv_value(r, "xmin", default="0"))
            ymin = float(_csv_value(r, "ymin", default="0"))
            xmax = float(_csv_value(r, "xmax", default="0"))
            ymax = float(_csv_value(r, "ymax", default="0"))
            by_file[fn].append((W, H, xmin, ymin, xmax, ymax))

    items = []
    for fn, rows_ in by_file.items():
        # 이미지 실제 경로 찾기 (보통 split/filename 그대로)
        img_path = base_dir / split / fn
        if not img_path.exists():
            # 확장자만 다를 수 있으니 폴더 내에서 suffix search
            cand = None
            for p in (base_dir / split).glob(f"**/{Path(fn).name}"):
                cand = p; break
            if cand is None:
                # 동일 basename으로 검색
                bn = Path(fn).stem
                for p in (base_dir / split).glob(f"**/{bn}.*"):
                    cand = p; break
            if cand is None:
                continue
            img_path = cand

        W = int(rows_[0][0]); H = int(rows_[0][1])
        boxes = []
        for (_, _, x1, y1, x2, y2) in rows_:
            x1 = max(0.0, min(float(x1), W-1))
            y1 = max(0.0, min(float(y1), H-1))
            x2 = max(0.0, min(float(x2), W-1))
            y2 = max(0.0, min(float(y2), H-1))
            if x2 > x1 and y2 > y1:
                boxes.append([x1, y1, x2, y2])

        items.append({
            "path": str(img_path.resolve()),
            "W": W, "H": H,
            "boxes": boxes,
            "labels": [1]*len(boxes),  # 단일 클래스 'cell'
        })
    return items

def split_valid_into_val_test(valid_items, test_ratio=0.5, seed=1337):
    rnd = random.Random(seed)
    idx = list(range(len(valid_items)))
    rnd.shuffle(idx)
    k = int(len(idx) * test_ratio)
    test_idx = set(idx[:k])
    val, test = [], []
    for i, it in enumerate(valid_items):
        (test if i in test_idx else val).append(it)
    return val, test


# =======================
# Dataset / Collate
# =======================
class DetectDataset(Dataset):
    def __init__(self, items, img_size=640, train=True, hflip_p=0.5, vflip_p=0.0):
        self.items = items
        self.size = img_size
        self.train = train
        self.hflip_p = hflip_p
        self.vflip_p = vflip_p

    def __len__(self): return len(self.items)

    def __getitem__(self, i):
        it = self.items[i]
        im = Image.open(it["path"]).convert("RGB")
        W, H = im.size
        im = im.resize((self.size, self.size), Image.BILINEAR)

        # 박스 resize
        sx = self.size / W; sy = self.size / H
        boxes = [[x1*sx, y1*sy, x2*sx, y2*sy] for x1,y1,x2,y2 in it["boxes"]]

        # 간단 aug
        if self.train:
            if random.random() < self.hflip_p:
                im = im.transpose(Image.FLIP_LEFT_RIGHT)
                boxes = [[self.size-x2, y1, self.size-x1, y2] for x1,y1,x2,y2 in boxes]
            if random.random() < self.vflip_p:
                im = im.transpose(Image.FLIP_TOP_BOTTOM)
                boxes = [[x1, self.size-y2, x2, self.size-y1] for x1,y1,x2,y2 in boxes]

        img_t = to_tensor(im)  # [0,1], CxHxW
        target = {}
        if len(boxes):
            target["boxes"] = torch.tensor(boxes, dtype=torch.float32)
            target["labels"] = torch.ones((len(boxes),), dtype=torch.int64)  # all '1'
        else:
            target["boxes"] = torch.zeros((0,4), dtype=torch.float32)
            target["labels"] = torch.zeros((0,), dtype=torch.int64)
        target["image_id"] = torch.tensor([i])
        return img_t, target

def collate_fn(batch):
    imgs, targets = list(zip(*batch))
    return list(imgs), list(targets)


# =======================
# Model
# =======================
def build_model(num_classes=2, probe_size=320):
    """
    num_classes = background 포함(단일 전경 'cell'이면 2).
    - 일부 버전: backbone.out_channels 없음 → 더미 텐서로 채널 수 추정
    - 일부 버전: SSDLiteClassificationHead 가 norm_layer 인자를 요구
    """
    try:
        from torchvision.models.detection import (
            ssdlite320_mobilenet_v3_large,
            SSDLite320_MobileNet_V3_Large_Weights,
        )
        m = ssdlite320_mobilenet_v3_large(
            weights=SSDLite320_MobileNet_V3_Large_Weights.COCO_V1
        )
    except Exception:
        m = torchvision.models.detection.ssdlite320_mobilenet_v3_large(pretrained=True)

    # 1) feature 채널 수 얻기
    try:
        in_channels = m.backbone.out_channels  # (일부 버전에서만 제공)
        if isinstance(in_channels, int):
            in_channels = [in_channels]  # 안전장치
    except Exception:
        m.eval()
        with torch.no_grad():
            dummy = torch.zeros(1, 3, probe_size, probe_size)
            feats = m.backbone(dummy)             # OrderedDict of tensors
            in_channels = [t.shape[1] for t in feats.values()]

    # 2) 레벨별 anchor 수
    num_anchors = m.anchor_generator.num_anchors_per_location()  # list[int]

    # 3) 분류 head 교체 (버전 차이 대응)
    try:
        # 일부 버전은 norm_layer 필요 없음
        m.head.classification_head = SSDLiteClassificationHead(
            in_channels=in_channels,
            num_anchors=num_anchors,
            num_classes=num_classes,
        )
    except TypeError:
        # 당신 환경처럼 norm_layer 필수인 버전
        m.head.classification_head = SSDLiteClassificationHead(
            in_channels=in_channels,
            num_anchors=num_anchors,
            num_classes=num_classes,
            norm_layer=nn.BatchNorm2d,
        )
    return m



# =======================
# Evaluation (박스/카운트)
# =======================
@torch.no_grad()
def simple_eval(model, loader, device, score_thresh=0.25, iou_thresh=0.5, viz_dir=None, names=None):
    font = load_font()
    ensure_dir(viz_dir) if viz_dir else None

    tot_tp=tot_fp=tot_fn=0
    abs_err=[]

    for bidx, (imgs, targets) in enumerate(loader):
        imgs = [im.to(device) for im in imgs]
        outs = model(imgs)

        for i, (pred, gt) in enumerate(zip(outs, targets)):
            boxes = pred["boxes"].detach().cpu()
            scores = pred["scores"].detach().cpu()
            keep = scores >= score_thresh
            boxes = boxes[keep]; scores = scores[keep]
            if boxes.numel():
                keep_idx = nms(boxes, scores, 0.5)
                boxes = boxes[keep_idx]

            gt_boxes = gt["boxes"].cpu()

            # greedy IoU match
            if len(boxes)>0 and len(gt_boxes)>0:
                ious = box_iou(boxes, gt_boxes)
                matched_p=set(); matched_g=set()
                while True:
                    v, idx = torch.max(ious, dim=1)
                    p = int(torch.argmax(v))
                    g = int(torch.argmax(ious[p]))
                    if p in matched_p or g in matched_g or ious[p,g] < iou_thresh:
                        break
                    matched_p.add(p); matched_g.add(g)
                    ious[p,:]=0; ious[:,g]=0
                tp = len(matched_p)
                fp = len(boxes) - tp
                fn = len(gt_boxes) - tp
            else:
                tp = 0; fp = int(len(boxes)); fn = int(len(gt_boxes))

            tot_tp += tp; tot_fp += fp; tot_fn += fn

            pred_count = int(len(boxes))
            gt_count = int(len(gt_boxes))
            abs_err.append(abs(pred_count - gt_count))

            # viz
            if viz_dir:
                im = (imgs[i].cpu().numpy().transpose(1,2,0)*255).astype(np.uint8)
                pil = Image.fromarray(im)
                draw = ImageDraw.Draw(pil)
                for b in boxes:
                    x1,y1,x2,y2 = [float(x) for x in b]
                    draw.rectangle([x1,y1,x2,y2], outline=(255,0,255), width=2)
                txt = f"gt:{gt_count}  pred:{pred_count}"
                draw.rectangle([6,6,6+220,6+26], fill=(0,0,0))
                draw.text((10,8), txt, fill=(255,255,255), font=font)
                name = names[bidx] if names and bidx < len(names) else f"img{bidx:06d}"
                ensure_dir(viz_dir)
                pil.save(Path(viz_dir)/f"viz_{name}_{i:06d}.png")

    prec = tot_tp / max(1, tot_tp + tot_fp)
    rec  = tot_tp / max(1, tot_tp + tot_fn)
    f1   = 2*prec*rec / max(1e-9, (prec+rec))
    mae  = float(np.mean(abs_err)) if len(abs_err) else float("nan")
    return {"precision":prec, "recall":rec, "f1":f1, "mae":mae}


# =======================
# Train / Test
# =======================
def main():
    # ---- 입력 받기 (argparse 없음) ----
    base_dir ='/kaggle/input/hemocytomer'
    if not base_dir:
        raise SystemExit("Folder path required.")
    out_dir = '/kaggle/working/'

    C = CONFIG
    set_seed(C["SEED"])
    device = torch.device("cpu" if C["USE_CPU"] or not torch.cuda.is_available() else "cuda")

    out_dir = Path(out_dir); ensure_dir(out_dir); ensure_dir(out_dir/"models")
    logf = open(out_dir/"logs.txt", "w", encoding="utf-8")
    log(f"Device: {device}", logf)
    log(f"Base dir: {base_dir}", logf)

    # 데이터 로드
    train_items = read_split_from_dir(base_dir, "train")
    valid_items = read_split_from_dir(base_dir, "valid")
    val_items, test_items = split_valid_into_val_test(valid_items, test_ratio=C["VAL_TEST_SPLIT"], seed=C["SEED"])
    log(f"Train {len(train_items)}  Val {len(val_items)}  Test {len(test_items)}", logf)

    ds_tr = DetectDataset(train_items, img_size=C["IMAGE_SIZE"], train=True,  hflip_p=0.5, vflip_p=0.0)
    ds_va = DetectDataset(val_items,   img_size=C["IMAGE_SIZE"], train=False)
    ds_te = DetectDataset(test_items,  img_size=C["IMAGE_SIZE"], train=False)

    dl_tr = DataLoader(ds_tr, batch_size=C["BATCH"], shuffle=True,  num_workers=2, collate_fn=collate_fn)
    dl_va = DataLoader(ds_va, batch_size=C["BATCH"], shuffle=False, num_workers=2, collate_fn=collate_fn)
    dl_te = DataLoader(ds_te, batch_size=C["BATCH"], shuffle=False, num_workers=2, collate_fn=collate_fn)

    # 모델
    model = build_model(num_classes=2).to(device)
    opt = optim.AdamW([p for p in model.parameters() if p.requires_grad], lr=C["LR"], weight_decay=C["WD"])

    best_f1 = -1.0
    best_file = out_dir/"models"/"best.pt"

    # ---- 학습 ----
    for ep in range(1, C["EPOCHS"]+1):
        model.train()
        loss_sum = 0.0; n=0
        t0 = time.time()
        for imgs, targets in dl_tr:
            imgs = [im.to(device) for im in imgs]
            tgts = [{k:(v.to(device) if torch.is_tensor(v) else v) for k,v in t.items()} for t in targets]
            losses = model(imgs, tgts)      # dict of losses
            loss = sum(v for v in losses.values())
            opt.zero_grad(); loss.backward(); opt.step()
            loss_sum += float(loss.item()); n += 1
        train_loss = loss_sum / max(1,n)
        dt = time.time()-t0

        # ---- 검증 ----
        model.eval()
        val_metrics = simple_eval(
            model, dl_va, device,
            score_thresh=C["CONF_THRESH"], iou_thresh=C["IOU_MATCH"],
            viz_dir=str(out_dir/"viz_val"),
            names=[Path(it["path"]).name for it in val_items],
        )
        log(f"[{ep:03d}] loss={train_loss:.4f}  val_F1={val_metrics['f1']:.3f}  val_MAE={val_metrics['mae']:.2f}  ({dt:.1f}s)", logf)

        if val_metrics["f1"] > best_f1:
            best_f1 = val_metrics["f1"]
            torch.save({"epoch": ep, "model": model.state_dict(), "val": val_metrics}, best_file)
            log(f"  saved best -> {best_file}", logf)

    # ---- 테스트 ----
    ckpt = torch.load(best_file, map_location=device)
    model.load_state_dict(ckpt["model"]); model.eval()

    val_metrics = simple_eval(
        model, dl_va, device,
        score_thresh=C["CONF_THRESH"], iou_thresh=C["IOU_MATCH"],
        viz_dir=str(out_dir/"viz_val"),
        names=[Path(it["path"]).name for it in val_items],
    )
    test_metrics = simple_eval(
        model, dl_te, device,
        score_thresh=C["CONF_THRESH"], iou_thresh=C["IOU_MATCH"],
        viz_dir=str(out_dir/"viz_test"),
        names=[Path(it["path"]).name for it in test_items],
    )
    log(f"FINAL  Val: P={val_metrics['precision']:.3f} R={val_metrics['recall']:.3f} F1={val_metrics['f1']:.3f} MAE={val_metrics['mae']:.2f}", logf)
    log(f"FINAL Test: P={test_metrics['precision']:.3f} R={test_metrics['recall']:.3f} F1={test_metrics['f1']:.3f} MAE={test_metrics['mae']:.2f}", logf)

    # ---- per-image count CSV ----
    def dump_counts(loader, items, out_csv):
        ensure_dir(Path(out_csv).parent)
        with torch.no_grad(), open(out_csv, "w", encoding="utf-8", newline="") as f:
            w = csv.writer(f); w.writerow(["image", "gt_count", "pred_count"])
            for (imgs, targets), it in zip(loader, items):
                im = imgs[0].to(device)
                pred = model([im])[0]
                boxes = pred["boxes"].detach().cpu()
                scores = pred["scores"].detach().cpu()
                keep = scores >= CONFIG["CONF_THRESH"]
                boxes = boxes[keep]
                if boxes.numel():
                    k = nms(boxes, scores[keep], 0.5)
                    boxes = boxes[k]
                w.writerow([it["path"], len(it["boxes"]), int(len(boxes))])

    dump_counts(dl_va, val_items,  out_dir/"report_val.csv")
    dump_counts(dl_te, test_items, out_dir/"report_test.csv")

    log("Done.", logf); logf.close()


if __name__ == "__main__":
    main()


Device: cuda
Base dir: /kaggle/input/hemocytomer
Train 160  Val 20  Test 20
[001] loss=7.4368  val_F1=0.034  val_MAE=201.00  (19.8s)
  saved best -> /kaggle/working/models/best.pt
[002] loss=4.5756  val_F1=0.048  val_MAE=163.95  (18.3s)
  saved best -> /kaggle/working/models/best.pt
[003] loss=3.8666  val_F1=0.063  val_MAE=131.05  (17.8s)
  saved best -> /kaggle/working/models/best.pt
[004] loss=3.5358  val_F1=0.098  val_MAE=83.70  (17.6s)
  saved best -> /kaggle/working/models/best.pt
[005] loss=3.3631  val_F1=0.370  val_MAE=5.00  (18.3s)
  saved best -> /kaggle/working/models/best.pt
[006] loss=3.1754  val_F1=0.282  val_MAE=2.20  (17.5s)
[007] loss=3.0692  val_F1=0.377  val_MAE=3.10  (18.1s)
  saved best -> /kaggle/working/models/best.pt
[008] loss=2.9301  val_F1=0.355  val_MAE=2.40  (18.5s)
[009] loss=2.8000  val_F1=0.346  val_MAE=2.45  (18.0s)
[010] loss=2.7231  val_F1=0.379  val_MAE=2.50  (16.5s)
  saved best -> /kaggle/working/models/best.pt
[011] loss=2.6075  val_F1=0.387  val_M

In [10]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SSDLite-MobileNetV3 (단일 클래스 'cell') 로드 + 임곗값 스윕(Val MAE 최적) + 폴더 추론(+사각형/총계)
- 학습 시 썼던 헤드 교체 로직(버전 호환) 포함
- Roboflow TensorFlow export의 valid/_annotations.csv 를 사용해 conf 스윕 가능
- argparse 없음: 상단 CONFIG or input() 사용
"""

# =======================
# CONFIG (원하면 수정)
# =======================
CONFIG = {
    "WEIGHTS_PATH": "/kaggle/working/models/best.pt",   # 학습된 가중치
    "TUNE_BASE_DIR": "",       # conf 스윕용 데이터 폴더(풀린 상태). 예: "/kaggle/input/hemocytomer" (valid/_annotations.csv 필요). 비우면 스킵
    "PREDICT_FOLDER": "",      # 추론할 이미지 폴더(비우면 스킵)
    "OUT_DIR": "/kaggle/working/infer_out",  # 추론 결과 출력 폴더
    "IMAGE_SIZE": 640,         # 학습 때 썼던 입력 크기와 맞추면 편함
    "NMS_IOU": 0.45,           # 중복 제거 강도
    "SIZE_MIN": 12*12,         # 면적 하한(격자 교차점 등 FP 억제용). None이면 비활성
    "SIZE_MAX": 80*80,         # 면적 상한. None이면 비활성
    "CONF_CAND": [0.05, 0.08, 0.10, 0.12, 0.15, 0.18, 0.20, 0.22, 0.25, 0.30],
    "USE_CPU": False,
}

# =======================
# Imports
# =======================
import os, io, csv, random
from pathlib import Path
from collections import defaultdict

import numpy as np
from PIL import Image, ImageDraw, ImageFont

import torch
import torch.nn as nn
from torchvision.transforms.functional import to_tensor
from torchvision.ops import nms, box_iou
import torchvision
from torchvision.models.detection.ssdlite import SSDLiteClassificationHead

# =======================
# Utils
# =======================
def ensure_dir(p):
    Path(p).mkdir(parents=True, exist_ok=True)

def load_font():
    try:
        from PIL import ImageFont
        return ImageFont.truetype("arial.ttf", 18)
    except:
        try:
            return ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 18)
        except:
            return ImageFont.load_default()

# =======================
# Model (버전 호환)
# =======================
def build_model(num_classes=2, probe_size=320):
    """
    num_classes: background 포함(단일 전경 'cell'이면 2).
    - out_channels 없을 수 있어 더미 텐서로 채널 수 추정
    - 일부 버전은 SSDLiteClassificationHead에 norm_layer 필요
    """
    try:
        from torchvision.models.detection import (
            ssdlite320_mobilenet_v3_large,
            SSDLite320_MobileNet_V3_Large_Weights,
        )
        m = ssdlite320_mobilenet_v3_large(
            weights=SSDLite320_MobileNet_V3_Large_Weights.COCO_V1
        )
    except Exception:
        m = torchvision.models.detection.ssdlite320_mobilenet_v3_large(pretrained=True)

    # in_channels 추정
    try:
        in_channels = m.backbone.out_channels
        if isinstance(in_channels, int):
            in_channels = [in_channels]
    except Exception:
        m.eval()
        with torch.no_grad():
            dummy = torch.zeros(1, 3, probe_size, probe_size)
            feats = m.backbone(dummy)  # OrderedDict[name->Tensor(B,C,H,W)]
            in_channels = [t.shape[1] for t in feats.values()]

    num_anchors = m.anchor_generator.num_anchors_per_location()

    # classification head 교체
    try:
        m.head.classification_head = SSDLiteClassificationHead(
            in_channels=in_channels,
            num_anchors=num_anchors,
            num_classes=num_classes,
        )
    except TypeError:
        m.head.classification_head = SSDLiteClassificationHead(
            in_channels=in_channels,
            num_anchors=num_anchors,
            num_classes=num_classes,
            norm_layer=nn.BatchNorm2d,
        )
    return m

def load_best_model(weights_path, device=None):
    device = torch.device("cpu" if (CONFIG["USE_CPU"] or not torch.cuda.is_available()) else "cuda") if device is None else device
    model = build_model(num_classes=2).to(device)
    ckpt = torch.load(weights_path, map_location=device)
    model.load_state_dict(ckpt["model"])
    model.eval()
    return model, device

# =======================
# Val 데이터 로더 (Roboflow TF export)
# =======================
def _csv_value(row, *keys, default=None):
    for k in keys:
        if k in row and row[k] != "":
            return row[k]
    return default

def read_valid_from_dir(base_dir):
    """
    base_dir/valid/_annotations.csv 를 읽어 (이미지 경로, GT 박스) 목록 반환
    """
    base = Path(base_dir)
    csv_path = base / "valid" / "_annotations.csv"
    if not csv_path.exists():
        raise FileNotFoundError(f"{csv_path} not found")
    by_file = defaultdict(list)
    with open(csv_path, "r", encoding="utf-8") as f:
        rows = csv.DictReader(f)
        for r in rows:
            fn = _csv_value(r, "filename", "file", "image")
            if not fn: 
                continue
            W = int(float(_csv_value(r, "width", default="0")))
            H = int(float(_csv_value(r, "height", default="0")))
            x1 = float(_csv_value(r, "xmin", default="0"))
            y1 = float(_csv_value(r, "ymin", default="0"))
            x2 = float(_csv_value(r, "xmax", default="0"))
            y2 = float(_csv_value(r, "ymax", default="0"))
            by_file[fn].append((W, H, x1, y1, x2, y2))

    items = []
    for fn, rows_ in by_file.items():
        img_path = base / "valid" / fn
        if not img_path.exists():
            # suffix search
            cand = None
            for p in (base/"valid").glob(f"**/{Path(fn).name}"):
                cand = p; break
            if cand is None:
                bn = Path(fn).stem
                for p in (base/"valid").glob(f"**/{bn}.*"):
                    cand = p; break
            if cand is None:
                continue
            img_path = cand

        W, H = int(rows_[0][0]), int(rows_[0][1])
        boxes = []
        for (_, _, a, b, c, d) in rows_:
            a = max(0.0, min(float(a), W-1))
            b = max(0.0, min(float(b), H-1))
            c = max(0.0, min(float(c), W-1))
            d = max(0.0, min(float(d), H-1))
            if c > a and d > b:
                boxes.append([a, b, c, d])
        items.append({"path": str(img_path), "W": W, "H": H, "boxes": boxes})
    return items

# =======================
# Inference helpers
# =======================
@torch.no_grad()
def predict_image(image_path, model, device,
                  image_size=640, conf=0.15, nms_iou=0.45,
                  size_min=12*12, size_max=80*80,
                  draw=False, out_path=None):
    im = Image.open(image_path).convert("RGB")
    im_r = im.resize((image_size, image_size), Image.BILINEAR)
    x = to_tensor(im_r).to(device).unsqueeze(0)

    out = model([x[0]])[0]
    boxes = out["boxes"].detach().cpu()
    scores = out["scores"].detach().cpu()

    # score filter + NMS
    keep = scores >= conf
    boxes = boxes[keep]; scores = scores[keep]
    if boxes.numel():
        k = nms(boxes, scores, nms_iou)
        boxes = boxes[k]; scores = scores[k]

    # area filter
    if (size_min is not None) or (size_max is not None):
        wh = boxes[:, 2:4] - boxes[:, 0:2]
        area = wh[:, 0] * wh[:, 1]
        keep = torch.ones(len(area), dtype=torch.bool)
        if size_min is not None:
            keep &= (area >= size_min)
        if size_max is not None:
            keep &= (area <= size_max)
        boxes = boxes[keep]; scores = scores[keep]

    count = int(len(boxes))

    if draw:
        drw = ImageDraw.Draw(im_r)
        for b in boxes:
            x1, y1, x2, y2 = [float(z) for z in b]
            drw.rectangle([x1, y1, x2, y2], outline=(255, 0, 255), width=2)
        drw.rectangle([6, 6, 220, 32], fill=(0, 0, 0))
        drw.text((10, 10), f"count: {count}", fill=(255, 255, 255), font=load_font())
        if out_path:
            ensure_dir(Path(out_path).parent)
            im_r.save(out_path)

    return {"count": count, "boxes": boxes}

# =======================
# CONF 스윕 (Val MAE 최소값)
# =======================
@torch.no_grad()
def best_conf_by_mae(model, device, valid_items,
                     image_size=640, nms_iou=0.45, size_min=12*12, size_max=80*80,
                     conf_cand=None):
    conf_cand = conf_cand or [0.05, 0.08, 0.10, 0.12, 0.15, 0.18, 0.20, 0.22, 0.25, 0.30]
    results = []
    for conf in conf_cand:
        abs_err = []
        for it in valid_items:
            r = predict_image(
                it["path"], model, device,
                image_size=image_size, conf=conf, nms_iou=nms_iou,
                size_min=size_min, size_max=size_max,
                draw=False
            )
            gt = len(it["boxes"])
            abs_err.append(abs(r["count"] - gt))
        mae = float(np.mean(abs_err)) if abs_err else float("nan")
        print(f"[CONF SWEEP] conf={conf:.2f} -> val_MAE={mae:.3f}")
        results.append((conf, mae))
    # best by MAE
    results = sorted(results, key=lambda x: x[1])
    return results[0], results  # (best_conf, full_list)

# =======================
# 폴더 일괄 추론
# =======================
@torch.no_grad()
def predict_folder(folder, out_dir, model, device,
                   image_size=640, conf=0.15, nms_iou=0.45, size_min=12*12, size_max=80*80):
    out_dir = Path(out_dir); ensure_dir(out_dir); ensure_dir(out_dir/"viz")
    exts = (".png",".jpg",".jpeg",".tif",".tiff",".bmp")
    paths = [str(p) for p in Path(folder).rglob("*") if p.suffix.lower() in exts]

    with open(out_dir/"predict_counts.csv", "w", encoding="utf-8", newline="") as f:
        w = csv.writer(f); w.writerow(["image","pred_count"])
        for p in paths:
            r = predict_image(
                p, model, device, image_size=image_size, conf=conf, nms_iou=nms_iou,
                size_min=size_min, size_max=size_max,
                draw=True, out_path=out_dir/"viz"/(Path(p).stem+"_viz.png")
            )
            w.writerow([p, r["count"]])
    print(f"Saved {out_dir/'predict_counts.csv'} and viz images under {out_dir/'viz'}")

# =======================
# Main (input() 기반)
# =======================
def main():
    # 경로 입력(비우면 CONFIG 사용)
    wp =  CONFIG["WEIGHTS_PATH"]
    tune_dir =  CONFIG["TUNE_BASE_DIR"]
    pred_dir = CONFIG["PREDICT_FOLDER"]
    out_dir =  CONFIG["OUT_DIR"]

    device = torch.device("cpu" if (CONFIG["USE_CPU"] or not torch.cuda.is_available()) else "cuda")
    print(f"Device: {device}")

    # 모델 로드
    model, device = load_best_model(wp, device=device)

    # 1) conf 스윕 (선택)
    best_conf = None
    if tune_dir:
        valid_items = read_valid_from_dir(tune_dir)
        (best_conf, best_mae), _all = best_conf_by_mae(
            model, device, valid_items,
            image_size=CONFIG["IMAGE_SIZE"], nms_iou=CONFIG["NMS_IOU"],
            size_min=CONFIG["SIZE_MIN"], size_max=CONFIG["SIZE_MAX"],
            conf_cand=CONFIG["CONF_CAND"]
        )
        print(f"[BEST] conf={best_conf:.2f}  val_MAE={best_mae:.3f}")

    # 2) 폴더 추론 (선택)
    if pred_dir:
        use_conf = float(best_conf) if best_conf is not None else float(CONFIG["CONF_THRESH"]) if "CONF_THRESH" in CONFIG else 0.15
        print(f"Predict with conf={use_conf:.2f}")
        predict_folder(
            pred_dir, out_dir, model, device,
            image_size=CONFIG["IMAGE_SIZE"], conf=use_conf, nms_iou=CONFIG["NMS_IOU"],
            size_min=CONFIG["SIZE_MIN"], size_max=CONFIG["SIZE_MAX"]
        )

if __name__ == "__main__":
    main()


Device: cuda


In [None]:
# === 붙여넣기: 학습된 모델로 폴더 예측 ===
def load_best_model(weights_path, num_classes=2):
    m = build_model(num_classes=num_classes)
    ckpt = torch.load(weights_path, map_location="cpu")
    m.load_state_dict(ckpt["model"])
    m.eval()
    return m

@torch.no_grad()
def predict_on_folder(folder, out_dir, weights_path, image_size=640, conf=0.15, nms_iou=0.5, use_cpu=False):
    from torchvision.transforms.functional import to_tensor
    from torchvision.ops import nms
    import csv
    device = torch.device("cpu" if use_cpu or not torch.cuda.is_available() else "cuda")
    os.makedirs(out_dir, exist_ok=True)
    os.makedirs(os.path.join(out_dir, "viz"), exist_ok=True)

    model = load_best_model(weights_path, num_classes=2).to(device)

    exts = (".png",".jpg",".jpeg",".tif",".tiff",".bmp")
    paths = [str(p) for p in Path(folder).rglob("*") if p.suffix.lower() in exts]
    rep = open(os.path.join(out_dir, "predict_counts.csv"), "w", encoding="utf-8", newline="")
    w = csv.writer(rep); w.writerow(["image","pred_count"])

    font = load_font()

    for p in paths:
        im = Image.open(p).convert("RGB")
        im_r = im.resize((image_size, image_size), Image.BILINEAR)
        x = to_tensor(im_r).to(device).unsqueeze(0)  # 1xCxHxW
        out = model([x[0]])[0]
        boxes = out["boxes"].detach().cpu()
        scores = out["scores"].detach().cpu()
        keep = scores >= conf
        boxes = boxes[keep]; scores = scores[keep]
        if boxes.numel():
            k = nms(boxes, scores, nms_iou)
            boxes = boxes[k]
        pred_count = int(len(boxes))

        # 시각화
        draw = ImageDraw.Draw(im_r)
        for b in boxes:
            x1,y1,x2,y2 = [float(z) for z in b]
            draw.rectangle([x1,y1,x2,y2], outline=(255,0,255), width=2)
        draw.rectangle([6,6,220,32], fill=(0,0,0))
        draw.text((10,10), f"count: {pred_count}", fill=(255,255,255), font=font)
        im_r.save(os.path.join(out_dir, "viz", Path(p).stem+"_viz.png"))

        w.writerow([p, pred_count])

    rep.close()
    print(f"Saved: {out_dir}/predict_counts.csv and {out_dir}/viz/*.png")

# 사용 예:
# predict_on_folder("/kaggle/input/my_new_images", "/kaggle/working/infer_out",
#                   "/kaggle/working/models/best.pt", image_size=CONFIG["IMAGE_SIZE"],
#                   conf=CONFIG["CONF_THRESH"], use_cpu=False)
