### 1. 라이브러리 import

In [1]:
import os, random, time
from pathlib import Path

import numpy as np
import pandas as pd
from PIL import Image
from sklearn.model_selection import train_test_split
from tqdm.auto import tqdm

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision.models import efficientnet_v2_s, EfficientNet_V2_S_Weights
from torchvision import transforms

  from .autonotebook import tqdm as notebook_tqdm


### 2. 하이퍼 파라미터

In [3]:
# ────────────────── 1. 디바이스 자동 선택 ──────────────────
if torch.cuda.is_available():
    device = torch.device("cuda")
    PIN_MEM  = True          # CUDA일 때만 의미 있음
    NUM_WORKERS = os.cpu_count()  # 병렬 로딩
else:
    device = torch.device("cpu")
    PIN_MEM  = False
    NUM_WORKERS = 0

print("사용 디바이스 ▶", device)
print("num_workers  ▶", NUM_WORKERS)
print("pin_memory   ▶", PIN_MEM)

사용 디바이스 ▶ cpu
num_workers  ▶ 0
pin_memory   ▶ False


In [4]:
# --- 데이터 디렉터리 ---
train_root = Path("./train")  # Train 디렉터리를 가리키는 Path 객체
test_root  = Path("./test")   # Test 디렉터리를 가리키는 Path 객체
csv_path   = Path("./train_data.csv")  # 첫 열: 이미지 파일명

# --- 학습 파라미터 ---
BATCH_SIZE  = 32
EPOCHS      = 20
LR          = 3e-4
VAL_RATIO   = 0.2
SEED        = 42

### 3. SEED 설정

In [6]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark     = False

set_seed(SEED)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

### 4. 데이터셋 클래스
- Dataset: 인덱스로 개별 샘플을 읽어는 방법 정의

In [8]:
class DeskDataset(Dataset):
    """
    이미지 파일 목록을 받아 인덱스로 접근할 때마다 -> (이미지, 라벨) or (이미지, 파일명) 반환
    
    files: 이미지 파일 경로의 리스트
    test: 테스트 모드 여부. True면 라벨을 반환하지 않고 (이미지, 파일명)만 제공
    transform: 이미지 전처리 파이프라인
    """
    def __init__(self, files, test=False, transform=None):
        self.files = files
        self.test = test
        self.transform = transform or transforms.Compose([])

        if not test:
            # 파일명에 unclean 포함되면 1, 그렇지 않으면 0
            self.lbl_fn = lambda p: 1 if "unclean" in p.name.lower() else 0

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        img_path = self.files[idx] # 인덱스로 이미지 파일 경로를 얻는다.
        img = Image.open(img_path).convert("RGB") # 이미지 열고 채널 수를 3개(RGB)로 맞춘다.
        img = self.transform(img) # 전처리(Resize, Normalize 등) 적용
        if self.test: # 테스트 모드면 라벨에 없으므로 (이미지 텐서, 파일명) 반환하고 종료
            return img, img_path.name
        label = self.lbl_fn(img_path) # 훈련.검증 모드인 경우, 파일명을 이용해 라벨(0/1) 계산
        return img, torch.tensor(label, dtype=torch.long) # 이미지 텐서, 라벨 텐서 반환

### 5. 전처리 & DataLoader
- DataLoader: Dataset을 배치 단위로 묶어 GPU에 전달

In [10]:
# ── 사전 학습 weight 및 메타 정보 로드 ─────────────
weights = EfficientNet_V2_S_Weights.DEFAULT # torchvision에 포함된 공식 사전 학습 가중치 선택
preprocess = weights.transforms() 
IMG_SIZE = preprocess.crop_size[0]         # (384,) 형태 튜플 → 384
MEAN, STD = preprocess.mean, preprocess.std

# ── 파일 수집 ─────────────────────────────────────
exts = {".jpg", ".jpeg", ".png", ".bmp"} # exts: 허용할 이미지 확장자 집합(set)
# rglob(*): train_root 하위 모든 경로를 검색
train_files = sorted([p for p in train_root.rglob("*") if p.suffix.lower() in exts]) # train_root: Path(./train)로 지정된 디렉토리
test_files  = sorted([p for p in test_root .rglob("*") if p.suffix.lower() in exts])
# p.suffix: 파일의 확장자(ex .JPG)
# sorted(): 필터링된 Path 객체들을 이름순으로 정렬 -> 이렇게 하면 인덱스가 고정돼 재현성 확보

# ── Stratified Train/Val Split ───────────────────
# train_files에 들어 있는 학습용 이미지 목록을 라벨 정보와 함께 훈련 / 검증 세트로 나누는 작업
y_all = [1 if "unclean" in p.name.lower() else 0 for p in train_files]
train_f, val_f = train_test_split(
    train_files, # 분할할 원본 리스트
    test_size=VAL_RATIO, 
    random_state=SEED,
    stratify=y_all # 라벨 분포 유지
)

# ── Transform ────────────────────────────────────
train_tf = transforms.Compose([
    transforms.RandomResizedCrop(IMG_SIZE, scale=(0.8,1.0), ratio=(0.95,1.05)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(0.15,0.15,0.15,0.05),
    transforms.ToTensor(),
    transforms.Normalize(MEAN, STD),
])
val_tf = transforms.Compose([
    transforms.Resize(int(IMG_SIZE*1.15)),   # 384*1.15 ≈ 442
    transforms.CenterCrop(IMG_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(MEAN, STD),
])

# ── Dataset & DataLoader ─────────────────────────
# Dataset
train_ds = DeskDataset(train_f, transform=train_tf)
val_ds   = DeskDataset(val_f,   transform=val_tf)
test_ds  = DeskDataset(test_files, test=True, transform=val_tf)

# DataLoader
# 내부에서 Dataset.__getitem__을 동시에 여러 번 호출해 미리 배치 단위 텐서로 묶어 주기 때문에, 학습 루프에서는 모델 연산에만 집중
train_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                      num_workers=NUM_WORKERS, pin_memory=True) # pin_memory: CPU 메모리를 잠금(pin)하여 GPU로 복사 속도 향상(CUDA 사용 시 효과)
val_dl   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False,
                      num_workers=NUM_WORKERS, pin_memory=True)
test_dl  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False,
                      num_workers=NUM_WORKERS, pin_memory=True)


### 6. 모델 준비

In [12]:
model = efficientnet_v2_s(weights=weights)
in_features = model.classifier[1].in_features # model.classifier는 Dropout -> Linear(1280, 1000) 형테
# model.classifier[1]을 통해 입력 차원 1280을 읽어옴
model.classifier = nn.Sequential( # 기존 1000 클래스 Linear를 2-클래스 Linear로 교체
    nn.Dropout(0.4),
    nn.Linear(in_features, 2)    # 2-class
)
model = model.to(device)

criterion = nn.CrossEntropyLoss() 
optimizer = torch.optim.AdamW(model.parameters(), # 학습할 파라미터 전체
                              lr=LR, 
                              weight_decay=1e-4 # L2 regularization
                             )
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)

# (1) Early-Stopping 변수 선언
best_acc          = 0.0   # 최고 검증 정확도
PATIENCE          = 4     # 허용할 연속 미개선 epoch 수
epochs_no_improve = 0     # 연속으로 개선되지 않은 횟수

### 7. 학습 루프 & best_model.h5 저장

In [21]:
def train_epoch(model, loader):
    model.train() # 훈련 모드
    total_loss, total_correct = 0, 0 # 한 에포크 동안 평균 손실과 정확도를 계산하기 위해 합계를 저장할 변수를 0으로 초기화
    for imgs, labels in tqdm(loader, leave=False): # loader가 배치 단위로 (이미지, 라벨) 튜플을 공급 / tqdm: 진행률 바 표시
        imgs, labels = imgs.to(device), labels.to(device)
        optimizer.zero_grad(set_to_none=True) # 지난 배치에소 계산된 기울기를 0으로 리셋
        logits = model(imgs) # 모델에 이미지 배치를 넣어 로짓(logits)을 얻는다.
        loss   = criterion(logits, labels)
        loss.backward()
        optimizer.step() # 기울기에 따라 가중치를 한 스텝 갱신
        total_loss += loss.item() * len(labels)
        total_correct += (logits.argmax(1) == labels).sum().item() # 로짓의 최댓값 인덱스(argmax[1])가 정답과 일치하는 개수를 더한다.
    return total_loss / len(loader.dataset), total_correct / len(loader.dataset)

@torch.no_grad() # 자동 미분 끄기. 평가 시 기울기 계산, 저장을 하지 않아 메모리와 연산 절약
def eval_epoch(model, loader):
    model.eval() # 평가 모드. 드롭아웃 끄고, 배치 정규화는 이동평균, 분산을 사용
    total_loss, total_correct = 0, 0
    for imgs, labels in loader:
        imgs, labels = imgs.to(device), labels.to(device)
        logits = model(imgs)
        loss   = criterion(logits, labels)
        total_loss += loss.item() * len(labels)
        total_correct += (logits.argmax(1) == labels).sum().item()
    return total_loss / len(loader.dataset), total_correct / len(loader.dataset)

In [23]:
for ep in range(1, EPOCHS + 1):
    tr_loss, tr_acc = train_epoch(model, train_dl)
    val_loss, val_acc = eval_epoch(model, val_dl)
    scheduler.step()

    print(f"[{ep:02}/{EPOCHS}] "
          f"Train {tr_acc*100:6.2f}% | Valid {val_acc*100:6.2f}%")

    if val_acc > best_acc:
        best_acc = val_acc
        epochs_no_improve = 0
        torch.save(model.state_dict(), "best_model.pth")
        torch.save(model, "best_model.h5")
        print(f"  ↳ 새 최고 정확도: {best_acc*100:.2f}% (모델 저장)")
    else:
        epochs_no_improve += 1
        print(f"  ↳ 개선 없음 ({epochs_no_improve}/{PATIENCE})")

    if epochs_no_improve >= PATIENCE:
        print(f"\n{PATIENCE} epoch 연속 개선이 없어 학습을 조기 종료합니다.")
        break

                                                                                

[01/20] Train  78.10% | Valid  88.57%
  ↳ 새 최고 정확도: 88.57% (모델 저장)


                                                                                

[02/20] Train  98.54% | Valid  94.29%
  ↳ 새 최고 정확도: 94.29% (모델 저장)


                                                                                

[03/20] Train 100.00% | Valid  97.14%
  ↳ 새 최고 정확도: 97.14% (모델 저장)


                                                                                

[04/20] Train 100.00% | Valid 100.00%
  ↳ 새 최고 정확도: 100.00% (모델 저장)


                                                                                

[05/20] Train 100.00% | Valid 100.00%
  ↳ 개선 없음 (1/4)


                                                                                

[06/20] Train 100.00% | Valid 100.00%
  ↳ 개선 없음 (2/4)


                                                                                

[07/20] Train 100.00% | Valid 100.00%
  ↳ 개선 없음 (3/4)


                                                                                

[08/20] Train 100.00% | Valid 100.00%
  ↳ 개선 없음 (4/4)

4 epoch 연속 개선이 없어 학습을 조기 종료합니다.


### 8. train_data.csv -> 예측 컬럼 추가

In [25]:
# ── 1. 최고 성능 모델 불러오기 ───────────────────
best_model = efficientnet_v2_s(weights=None)         # 구조만 생성
in_features = best_model.classifier[1].in_features
best_model.classifier = nn.Sequential(
    nn.Dropout(0.4),
    nn.Linear(in_features, 2)
)
best_model.load_state_dict(
    torch.load("best_model.pth", map_location=device)  # 저장한 state_dict 로드
)
best_model = best_model.to(device).eval()

# ── 2. test_dl에 대해 예측 ───────────────────────
preds, fnames = [], []
with torch.no_grad(): # 기울기 저장 X
    for imgs, names in test_dl:        # DataLoader가 배치 단위 (이미지 텐서, 파일명 리스트) 제공
        imgs = imgs.to(device)
        logits = best_model(imgs)
        preds.extend(logits.argmax(1).cpu().tolist())  # 0/1 예측값
        fnames.extend(names)                           # 이미지 파일명

# ── 3. 예측 결과 DataFrame 생성 ───────────────────
pred_df = pd.DataFrame({"fname": fnames, "pred": preds})

# ── 4. 기존 CSV 읽어와서 예측 열 추가 ─────────────
df = pd.read_csv(csv_path)            # header가 이미 있다고 가정
fname_col = df.columns[0]             # 첫 번째 열(파일명 열) 이름
df["pred"] = df[fname_col].map(       # 파일명 기준으로 매핑
    dict(zip(pred_df.fname, pred_df.pred))
)

# ── 5. 저장 ──────────────────────────────────────
out_path = csv_path.with_name("train_data_with_pred.csv")
df.to_csv(out_path, index=False)
print(f"예측 결과를 '{out_path}'에 저장했습니다.")


예측 결과를 'train_data_with_pred.csv'에 저장했습니다.
