<a href="https://colab.research.google.com/github/Hanbin-git/kaggle/blob/main/Untitled17.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 1. 라이브러리 설치  (재시작되면 다시 설치해야 함)
!pip install -q wandb timm==0.9.12 torchvision --upgrade
!pip install -q transformers==4.41.2


In [None]:
# 2. Drive 마운트 + 데이터 unzip
from google.colab import drive, files
drive.mount('/content/drive')

import os, zipfile, shutil
SRC_ZIP="/content/drive/MyDrive/open.zip"; DST_DIR="/content/open"
if os.path.exists(DST_DIR): shutil.rmtree(DST_DIR)
shutil.copy(SRC_ZIP, "/content/open.zip")
with zipfile.ZipFile("/content/open.zip") as z: z.extractall(DST_DIR)


In [None]:
yaml_text = """
SEED: 42
IMG_SIZE: 456
BATCH_SIZE: 48
EPOCHS: 40
LEARNING_RATE: 0.0003
patience: 8
model: "tf_efficientnet_b5"
ema_decay: 0.9997
stochastic_depth: 0.2
train_root: "/content/open/train"
test_root : "/content/open/test"
"""
with open("config.yaml", "w") as f:
    f.write(yaml_text)
print("✅ config.yaml 저장 완료")


In [None]:
with open("dataload.py", "w") as f:
    f.write('''
import os, yaml, random, math
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, Subset, WeightedRandomSampler
from torchvision.transforms import v2
from sklearn.model_selection import train_test_split
from PIL import Image

# ───────── config 로드 ─────────
with open("config.yaml") as cf:
    CFG = yaml.safe_load(cf)

# ───────── 시드 고정 ─────────
def seed_everything(seed:int):
    random.seed(seed); np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.manual_seed(seed); torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = False
    torch.backends.cudnn.benchmark = True

# ───────── Dataset 정의 ─────────
class CustomImageDataset(Dataset):
    def __init__(self, root_dir, transform=None, is_test=False):
        self.root_dir, self.transform, self.is_test = root_dir, transform, is_test
        self.samples = []
        if is_test:
            for f in sorted(os.listdir(root_dir)):
                if f.lower().endswith(('.jpg','.jpeg','.png')):
                    self.samples.append((os.path.join(root_dir, f),))
        else:
            self.classes = sorted(os.listdir(root_dir))
            self.class_to_idx = {c:i for i,c in enumerate(self.classes)}
            for cls in self.classes:
                cls_dir = os.path.join(root_dir, cls)
                for f in os.listdir(cls_dir):
                    if f.lower().endswith(('.jpg','.jpeg','.png')):
                        self.samples.append((os.path.join(cls_dir, f), self.class_to_idx[cls]))

    def __len__(self): return len(self.samples)

    def __getitem__(self, idx):
        if self.is_test:
            path = self.samples[idx][0]
            img  = Image.open(path).convert("RGB")
            return self.transform(img) if self.transform else img
        path, label = self.samples[idx]
        img = Image.open(path).convert("RGB")
        if self.transform: img = self.transform(img)
        return img, label

# ───────── Transform ─────────
def get_transforms():
    train_tf = v2.Compose([
        v2.ToImage(), v2.ToDtype(torch.float32, scale=True),
        v2.RandomResizedCrop((CFG["IMG_SIZE"], CFG["IMG_SIZE"]), scale=(0.8,1.0)),
        v2.RandomHorizontalFlip(),
        v2.ColorJitter(0.3,0.3,0.3,0.1),
        v2.RandAugment(num_ops=2, magnitude=7),
        v2.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
    ])
    val_tf = v2.Compose([
        v2.ToImage(), v2.ToDtype(torch.float32, scale=True),
        v2.Resize((CFG["IMG_SIZE"], CFG["IMG_SIZE"])),
        v2.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225]),
    ])
    return train_tf, val_tf

# ───────── Loader ─────────
def get_loaders():
    seed_everything(CFG["SEED"])

    full = CustomImageDataset(CFG["train_root"])
    labels = [y for _, y in full.samples]
    tr_idx, val_idx = train_test_split(range(len(labels)),
                                       test_size=0.2,
                                       stratify=labels,
                                       random_state=CFG["SEED"])

    tr_tf, val_tf = get_transforms()
    train_ds = Subset(CustomImageDataset(CFG["train_root"], tr_tf), tr_idx)
    val_ds   = Subset(CustomImageDataset(CFG["train_root"], val_tf), val_idx)

    # ---- 불균형 보정 Sampler
    cls_cnt   = np.bincount([labels[i] for i in tr_idx])
    cls_wt    = 1. / (cls_cnt + 1e-6)
    sample_wt = [cls_wt[labels[i]] for i in tr_idx]
    sampler   = WeightedRandomSampler(sample_wt, num_samples=len(tr_idx), replacement=True)

    # ---- DataLoader 파라미터
    n_workers   = CFG.get("NUM_WORKERS", 8)
    prefetch    = CFG.get("PREFETCH", 4)

    train_loader = DataLoader(
        train_ds, batch_size=CFG["BATCH_SIZE"], sampler=sampler,
        num_workers=n_workers, pin_memory=True, persistent_workers=True,
        prefetch_factor=prefetch
    )
    val_loader = DataLoader(
        val_ds, batch_size=CFG["BATCH_SIZE"], shuffle=False,
        num_workers=max(1, n_workers//2), pin_memory=True,
        persistent_workers=True, prefetch_factor=prefetch
    )
    test_loader = DataLoader(
        CustomImageDataset(CFG["test_root"], val_tf, is_test=True),
        batch_size=CFG["BATCH_SIZE"], shuffle=False,
        num_workers=max(1, n_workers//2), pin_memory=True,
        persistent_workers=True, prefetch_factor=prefetch
    )
    return train_loader, val_loader, test_loader, full.classes
''')


In [None]:
%%writefile train.py
import os, yaml, wandb, torch, timm, gc
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from transformers import get_cosine_schedule_with_warmup
from torch.cuda.amp import autocast, GradScaler
from dataload import get_loaders, CFG

# ──────────────────────── 0. 환경 설정 ────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if device.type == "cuda":
    print(f"🚀 Using GPU : {torch.cuda.get_device_name(0)}")
    torch.backends.cudnn.benchmark = True   # 속도 ↑
    torch.cuda.empty_cache()                # ↳ 캐시 정리
    torch.cuda.reset_peak_memory_stats()

# ──────────────────────── 1. 모델 정의 ────────────────────────
class BaseModel(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()
        self.backbone = timm.create_model(
            CFG["model"],
            pretrained=True,
            num_classes=num_classes,
            drop_path_rate=CFG.get("stochastic_depth", 0.0)
        )
    def forward(self, x):
        return self.backbone(x)

# ──────────────────────── 2. 학습 함수 ────────────────────────
def main():
    run = wandb.init(project="effb5-color", config=CFG)
    train_loader, val_loader, _, class_names = get_loaders()
    model = BaseModel(len(class_names)).to(device)

    # 파라미터 그룹 설정: 분류 헤드(10×lr)
    head, body = [], []
    for n, p in model.named_parameters():
        (head if "classifier" in n or "head" in n else body).append(p)
    optimizer = optim.AdamW([
        {"params": body, "lr": CFG["LEARNING_RATE"]},
        {"params": head, "lr": CFG["LEARNING_RATE"] * 10}
    ], weight_decay=0.05)

    criterion = nn.CrossEntropyLoss()
    scaler    = GradScaler()

    # 학습률 스케줄러
    total_steps = len(train_loader) * CFG["EPOCHS"]
    scheduler   = get_cosine_schedule_with_warmup(
        optimizer,
        int(total_steps * 0.1),         # warm-up 10 %
        total_steps
    )

    # 파라미터 수 출력
    total_params = sum(p.numel() for p in model.parameters())
    train_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"📊 Params: {total_params:,} (trainable {train_params:,})")

    best_loss, bad_epochs = float("inf"), 0

    for epoch in range(CFG["EPOCHS"]):
        # ─── Train ────────────────────────────────────────────
        model.train(); running = 0
        for imgs, labels in tqdm(train_loader, desc=f"[Train {epoch+1}]"):
            imgs, labels = imgs.to(device, non_blocking=True), labels.to(device, non_blocking=True)
            optimizer.zero_grad(set_to_none=True)
            with autocast():
                loss = criterion(model(imgs), labels)
            scaler.scale(loss).backward()
            scaler.step(optimizer); scaler.update()
            scheduler.step()
            running += loss.item()
        train_loss = running / len(train_loader)

        # ─── Validation ──────────────────────────────────────
        model.eval(); val_loss = 0; correct = total = 0
        with torch.no_grad():
            for imgs, labels in tqdm(val_loader, desc=f"[Val   {epoch+1}]"):
                imgs, labels = imgs.to(device, non_blocking=True), labels.to(device, non_blocking=True)
                with autocast():
                    outs  = model(imgs)
                    loss  = criterion(outs, labels)
                val_loss += loss.item()
                correct  += (outs.argmax(1) == labels).sum().item()
                total    += labels.size(0)
        val_loss /= len(val_loader)
        val_acc   = 100 * correct / total

        # ─── GPU 메모리 모니터링 ──────────────────────────────
        if device.type == "cuda":
            mem_alloc   = torch.cuda.memory_allocated() / 1024**2
            peak_alloc  = torch.cuda.max_memory_allocated() / 1024**2
            print(f"🧠 GPU Mem  : {mem_alloc:.1f} MB (peak {peak_alloc:.1f} MB)")

        # ─── 로깅 & EarlyStopping ────────────────────────────
        wandb.log({"epoch": epoch+1,
                   "train_loss": train_loss,
                   "val_loss":   val_loss,
                   "val_acc":    val_acc,
                   "gpu_mem_MB": mem_alloc if device.type=="cuda" else 0})

        if val_loss < best_loss:                     # improvement
            best_loss, bad_epochs = val_loss, 0
            torch.save(model.state_dict(), "best_model.pth")
            print(f"📦  Best model saved (val_loss={val_loss:.4f})")
        else:
            bad_epochs += 1
            print(f"⚠️  No improvement {bad_epochs}/{CFG['patience']}")
            if bad_epochs >= CFG["patience"]:
                print(f"🛑 Early Stopping at epoch {epoch+1}")
                break

    # 훈련 종료 후 캐시 정리
    if device.type == "cuda":
        torch.cuda.empty_cache(); gc.collect()
        print("♻️  GPU cache cleared")

    run.finish()

if __name__ == "__main__":
    main()


In [None]:
# 6. 학습 실행  ← 주석 제거!
%cd /content          # 주석 없이 정확히
import os; os.environ["WANDB_MODE"]="offline"   # WandB 로그인 생략
from train import main
main()


In [None]:
# 2) 추론 & 제출 파일 생성
!python inference.py
!head submission_tta.csv    # 결과 미리보기
