<a href="https://colab.research.google.com/github/audalsgh/20250812/blob/main/0812_Roboflow_Segformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### 첫 코드에서 설치와, Semantic Segmentation Masks zip파일을 업로드했기에 오류가 떠도 일단 남겨두었다.

In [None]:
# 0) 설치
!pip -q install transformers accelerate evaluate opencv-python-headless pillow

# 1) ZIP 업로드 (수동 업로드 창이 뜹니다)
from google.colab import files
up = files.upload()  # 방금 받은 Roboflow ZIP 선택
ZIP_PATH = "/content/" + list(up.keys())[0]

# 2) 압축 풀기
import os, zipfile, glob, shutil, re
EXTRACT_DIR = "/content/ds_rf"
if os.path.isdir(EXTRACT_DIR): shutil.rmtree(EXTRACT_DIR)
os.makedirs(EXTRACT_DIR, exist_ok=True)
with zipfile.ZipFile(ZIP_PATH, "r") as z: z.extractall(EXTRACT_DIR)
print("unzipped to", EXTRACT_DIR, "->", os.listdir(EXTRACT_DIR))

# 3) 데이터 구조 파악 (train/valid/test)
def find_split_dir(root, names=("train","valid","val","test")):
    found={}
    for n in names:
        p=os.path.join(root,n)
        if os.path.isdir(p): found["valid" if n in ("valid","val") else n]=p
    return found
splits = find_split_dir(EXTRACT_DIR)
if not splits: raise RuntimeError("train/valid/test 폴더를 찾지 못함. ZIP 내용 확인")

# 4) 학습 설정
COLLAPSE_TO_BINARY = True  # True: 모든 non-zero를 'lane(1)'로 합치기
if COLLAPSE_TO_BINARY:
    CLASS_NAMES = ["background","lane"]
else:
    # 예) 멀티클래스: background + lane 계열
    CLASS_NAMES = ["background","lane","lane-dot","lane-mid","lane_crosswalk"]

id2label = {i:n for i,n in enumerate(CLASS_NAMES)}
label2id = {n:i for i,n in id2label.items()}
NUM_LABELS = len(CLASS_NAMES)

# 5) 데이터셋 클래스 (Roboflow 'Semantic Segmentation Masks' 구조 자동 대응)
from PIL import Image
import numpy as np
from torch.utils.data import Dataset
from transformers import SegformerImageProcessor

def normalize_stem(s):
    s=os.path.splitext(os.path.basename(s))[0]
    s=re.sub(r'(_mask|-mask)$','',s)
    return s

def index_mask_array(mask_img, collapse_to_binary=True):
    # 팔레트 PNG/그레이스케일 모두 지원
    m = np.array(mask_img.convert("L"), dtype=np.uint8)
    if collapse_to_binary:
        m = (m>0).astype(np.uint8)  # 0/1
    else:
        # 0..K 인덱스 그대로 사용 (배경은 0이어야 함)
        pass
    return m

class RFSegFolder(Dataset):
    def __init__(self, split_dir, processor):
        self.img_dir = os.path.join(split_dir, "images")
        # 마스크 폴더 후보
        cand = ["masks","labels","annotations","masks_png","labels_png"]
        self.mask_dirs = [os.path.join(split_dir,c) for c in cand if os.path.isdir(os.path.join(split_dir,c))]
        if not self.mask_dirs:
            # 일부 버전에선 images와 같은 폴더에 있을 수 있음(드물지만)
            self.mask_dirs = [split_dir]
        self.processor = processor

        # 마스크 인덱스 구축
        mask_map = {}
        for md in self.mask_dirs:
            for p in glob.glob(os.path.join(md, "*.png")):
                mask_map[normalize_stem(p)] = p

        # 이미지-마스크 페어 만들기
        self.items=[]
        for ip in sorted(glob.glob(os.path.join(self.img_dir, "*.*"))):
            st = normalize_stem(ip)
            mp = mask_map.get(st)
            if mp and os.path.exists(mp):
                self.items.append((ip, mp))
        if not self.items:
            raise RuntimeError(f"No (image,mask) pairs in {split_dir}. 마스크 폴더명이 'masks/labels/annotations' 중 하나인지 확인")

    def __len__(self): return len(self.items)

    def __getitem__(self, idx):
        ip, mp = self.items[idx]
        image = Image.open(ip).convert("RGB")
        mask  = Image.open(mp)
        mask  = index_mask_array(mask, COLLAPSE_TO_BINARY)
        enc = self.processor(images=image, segmentation_maps=mask, return_tensors="pt")
        return {k: v.squeeze(0) for k,v in enc.items()}

# 6) 프로세서/모델
from transformers import SegformerForSemanticSegmentation
import torch, evaluate

CKPT = "nvidia/segformer-b0-finetuned-ade-512-512"
processor = SegformerImageProcessor.from_pretrained(CKPT, reduce_labels=False)
model = SegformerForSemanticSegmentation.from_pretrained(
    CKPT,
    num_labels=NUM_LABELS,
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes=True
)

# 7) 데이터 로더 구성
train_dir = splits.get("train")
valid_dir = splits.get("valid") or splits.get("val") or train_dir  # valid 없으면 train 재사용(데모용)
train_ds = RFSegFolder(train_dir, processor)
val_ds   = RFSegFolder(valid_dir, processor)

# 8) 학습
from transformers import TrainingArguments, Trainer
metric = evaluate.load("mean_iou")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=1)
    return metric.compute(
        predictions=preds, references=labels,
        num_labels=NUM_LABELS, ignore_index=255, reduce_labels=False
    )

args = TrainingArguments(
    output_dir="segformer-lane",
    learning_rate=5e-5,
    num_train_epochs=20,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    fp16=torch.cuda.is_available(),
    logging_steps=50,
    load_best_model_at_end=True,
    metric_for_best_model="mean_iou",
    greater_is_better=True
)

trainer = Trainer(model=model, args=args, train_dataset=train_ds, eval_dataset=val_ds, compute_metrics=compute_metrics)
trainer.train()

# 9) 저장
trainer.save_model("segformer-lane/best")
processor.save_pretrained("segformer-lane/best")
print("✅ Saved to segformer-lane/best")

In [2]:
# === 패치: RFSegFolder를 더 관대한 버전으로 재정의 ===
import os, glob, re
import numpy as np
from PIL import Image
from torch.utils.data import Dataset

# 이미지/마스크 파일명 매칭을 위해 뒤에 붙는 접미어들을 제거
_SUFFIX_RE = re.compile(r'(_|-)(mask|masks|label|labels|seg|segment|segmentation)$', re.I)

def _stem_no_suffix(path):
    s = os.path.splitext(os.path.basename(path))[0]
    s = _SUFFIX_RE.sub('', s)   # ..._mask, -labels 등 제거
    return s

def _is_img(name):
    return name.lower().endswith((".jpg",".jpeg",".png",".bmp",".tif",".tiff"))

class RFSegFolder(Dataset):
    def __init__(self, split_dir, processor):
        # 1) 이미지 폴더 탐색: 'images/'가 있으면 거기, 없으면 split 루트에서 바로 찾기
        img_cands = [os.path.join(split_dir, "images"), split_dir]
        self.img_dir = None
        for d in img_cands:
            if os.path.isdir(d) and any(_is_img(f) for f in os.listdir(d)):
                self.img_dir = d
                break
        if self.img_dir is None:
            raise RuntimeError(f"No images found in {split_dir}")

        # 2) 마스크 폴더 후보: labels/masks/annotations/… 없으면 split 루트까지 포함
        mask_cands = ["masks","labels","annotations","masks_png","labels_png","mask","Labels","Masks"]
        self.mask_dirs = [os.path.join(split_dir, c) for c in mask_cands if os.path.isdir(os.path.join(split_dir, c))]
        if not self.mask_dirs:
            # 마지막 수단: split 디렉토리 안에서 PNG가 있는 모든 폴더를 스캔(이미지 폴더 제외)
            self.mask_dirs = []
            for root, dirs, files in os.walk(split_dir):
                if os.path.abspath(root) == os.path.abspath(self.img_dir):
                    continue
                if any(f.lower().endswith(".png") for f in files):
                    self.mask_dirs.append(root)
            if not self.mask_dirs:
                # 정말 없으면 루트도 후보에 포함(아주 드문 케이스)
                self.mask_dirs = [split_dir]

        self.processor = processor

        # 3) 마스크 인덱스 구축 (동일 stem 매칭)
        mask_map = {}
        for md in self.mask_dirs:
            for p in glob.glob(os.path.join(md, "*.png")):
                mask_map[_stem_no_suffix(p)] = p

        # 4) 이미지-마스크 페어 만들기
        self.items = []
        for ip in sorted(glob.glob(os.path.join(self.img_dir, "*.*"))):
            if not _is_img(ip):
                continue
            st = _stem_no_suffix(ip)
            mp = mask_map.get(st)
            if mp and os.path.exists(mp):
                self.items.append((ip, mp))

        if not self.items:
            # 디버깅 도움: 폴더 안에 뭐가 있는지 조금 찍어줌
            print("[DEBUG] img_dir:", self.img_dir)
            print("[DEBUG] mask_dirs:", self.mask_dirs[:3], "…", f"({sum(len(glob.glob(os.path.join(d,'*.png'))) for d in self.mask_dirs)} masks png)")
            raise RuntimeError(f"No (image,mask) pairs in {split_dir}. "
                               f"이미지/마스크 파일명이 서로 매칭되는지(예: abc.jpg ↔ abc_mask.png) 확인해주세요.")

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx):
        ip, mp = self.items[idx]
        image = Image.open(ip).convert("RGB")
        # 팔레트/그레이스케일 모두 지원: 0=배경, 1+=전부 차선으로 뭉치기(이진)
        m = np.array(Image.open(mp).convert("L"), dtype=np.uint8)
        m = (m > 0).astype(np.uint8)  # 이진 세팅 (여러 클래스를 쓰려면 여기 로직 바꿔도 됨)
        enc = processor(images=image, segmentation_maps=m, return_tensors="pt")
        return {k: v.squeeze(0) for k, v in enc.items()}

In [7]:
from transformers import TrainingArguments, Trainer
import numpy as np, evaluate, torch

metric = evaluate.load("mean_iou")

def _to_py(o):
    if isinstance(o, np.ndarray):
        return o.tolist()
    if isinstance(o, (np.floating, np.integer)):
        return o.item()
    return o

def compute_metrics(eval_pred):
    logits, labels = eval_pred  # logits: (N, C, h, w), labels: (N, H, W)
    if isinstance(logits, tuple):
        logits = logits[0]
    lt = torch.from_numpy(logits)
    yt = torch.from_numpy(labels)

    # 라벨 크기에 맞춰 업샘플(크기 불일치 방지)
    lt_up = torch.nn.functional.interpolate(
        lt, size=yt.shape[-2:], mode="bilinear", align_corners=False
    )
    preds = lt_up.argmax(dim=1).cpu().numpy()

    res = metric.compute(
        predictions=preds,
        references=labels,
        num_labels=getattr(model.config, "num_labels", 2),
        ignore_index=255,
        reduce_labels=False,
    )
    # ✅ JSON 직렬화 가능하도록 변환
    return {k: _to_py(v) for k, v in res.items()}

args = TrainingArguments(
    output_dir="segformer-lane",
    learning_rate=5e-5,
    num_train_epochs=20,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    eval_strategy="epoch",
    save_strategy="epoch",
    fp16=torch.cuda.is_available(),
    logging_steps=50,
    load_best_model_at_end=True,
    metric_for_best_model="mean_iou",
    greater_is_better=True,
    report_to="none",
)

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    compute_metrics=compute_metrics,
)

trainer.train()  # 체크포인트에서 이어하려면: trainer.train(resume_from_checkpoint=True)

Epoch,Training Loss,Validation Loss,Mean Iou,Mean Accuracy,Overall Accuracy,Per Category Iou,Per Category Accuracy
1,0.0418,0.051805,0.829753,0.891799,0.978775,"[0.977764526246168, 0.6817407377445032]","[0.9900312565575472, 0.79356707346073]"
2,0.0411,0.051026,0.824263,0.876652,0.978599,"[0.9776227191937592, 0.670902560475457]","[0.9917922488130659, 0.7615108769059554]"
3,0.0435,0.049662,0.823929,0.867503,0.979051,"[0.9781211282789524, 0.6697368534093936]","[0.9934862420854199, 0.7415196278928194]"
4,0.0367,0.046568,0.840051,0.898403,0.980284,"[0.9793298821908697, 0.7007721688383226]","[0.9908807389195879, 0.8059249888234679]"
5,0.0387,0.048303,0.832564,0.87972,0.979911,"[0.9789879666681581, 0.686140205392096]","[0.992876718980598, 0.7665626694314712]"
6,0.0331,0.047084,0.840808,0.89748,0.980461,"[0.9795181158565466, 0.702097482588834]","[0.991200067981712, 0.8037600707688503]"
7,0.0308,0.046345,0.843238,0.896089,0.980948,"[0.980032978899547, 0.7064439120770976]","[0.991930235502293, 0.800248261692555]"
8,0.0324,0.045964,0.84446,0.902385,0.980864,"[0.9799282932035621, 0.7089910081630961]","[0.9910201707982706, 0.8137504637071844]"
9,0.0299,0.045453,0.845824,0.903432,0.981052,"[0.9801225168152489, 0.7115263039997345]","[0.9910964768015298, 0.8157669954628035]"
10,0.0315,0.046826,0.841864,0.887938,0.981101,"[0.9802141813866447, 0.7035146665298759]","[0.9931578950410925, 0.7827187033320334]"




TrainOutput(global_step=7120, training_loss=0.030945719163236993, metrics={'train_runtime': 1370.9482, 'train_samples_per_second': 20.745, 'train_steps_per_second': 5.193, 'total_flos': 4.9849505503248384e+17, 'train_loss': 0.030945719163236993, 'epoch': 20.0})