In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from pathlib import Path
import os

from core.models.model_factory import create_model
from core.data.dataset import EmotionDataset
from core.training.trainer import train_model

from torch.optim.lr_scheduler import StepLR

import csv, json, shutil, time
import numpy as np
import torch.nn.functional as F

def _resolve_dataset_paths(ds):
    """
    데이터셋 객체에서 원본 이미지 경로 리스트를 최대한 호환되게 추출.
    가능한 필드 우선순위: paths > image_paths > img_paths > samples/imgs ([(path, cls), ...])
    찾지 못하면 None 반환.
    """
    if hasattr(ds, "paths"): return list(getattr(ds, "paths"))
    if hasattr(ds, "image_paths"): return list(getattr(ds, "image_paths"))
    if hasattr(ds, "img_paths"): return list(getattr(ds, "img_paths"))
    if hasattr(ds, "samples"): return [p for p, _ in getattr(ds, "samples")]
    if hasattr(ds, "imgs"): return [p for p, _ in getattr(ds, "imgs")]
    return None

@torch.no_grad()
def evaluate_and_dump(model, val_loader, classes, out_dir: Path, device,
                      save_miscls=True, max_miscls_per_pair=200):
    out_dir.mkdir(parents=True, exist_ok=True)
    csv_path = out_dir / "val_predictions.csv"
    cm_path  = out_dir / "confusion_matrix.npy"
    per_class_path = out_dir / "per_class_acc.json"
    miscls_dir = out_dir / "misclassified"

    # 경로 복사를 위해 검증 데이터셋 원본 경로 추출
    val_paths = _resolve_dataset_paths(val_loader.dataset)
    copy_ok = (val_paths is not None)

    all_preds, all_labels, all_confs = [], [], []
    writer = csv.writer(open(csv_path, "w", newline="", encoding="utf-8"))
    writer.writerow(["index","path","true","pred","conf","correct"])

    model.eval()
    idx_base = 0
    # 클래스별 오분류 저장 상한 관리 (t->p 쌍별)
    pair_counter = {}

    for inputs, labels in val_loader:
        inputs = inputs.to(device, non_blocking=True)
        labels = labels.to(device, non_blocking=True)

        # EmoNet은 dict 출력, 그 외는 텐서 출력
        out = model(inputs)
        logits = out["expression"] if isinstance(out, dict) and "expression" in out else out
        probs = F.softmax(logits, dim=1)
        confs, preds = probs.max(dim=1)

        B = inputs.size(0)
        for b in range(B):
            i = idx_base + b
            t = int(labels[b].item())
            p = int(preds[b].item())
            c = float(confs[b].item())
            all_labels.append(t); all_preds.append(p); all_confs.append(c)
            path = val_paths[i] if copy_ok and i < len(val_paths) else ""
            writer.writerow([i, path, classes[t], classes[p], f"{c:.4f}", int(t==p)])

            # 오분류 이미지 복사
            if save_miscls and copy_ok and t != p:
                pair = (t, p)
                if pair_counter.get(pair, 0) < max_miscls_per_pair:
                    subdir = miscls_dir / f"{classes[t]}_as_{classes[p]}"
                    subdir.mkdir(parents=True, exist_ok=True)
                    # 파일명: [idx]_true-{t}_pred-{p}_{conf}.원확장자
                    src = Path(path)
                    dst = subdir / f"{i:06d}_true-{classes[t]}_pred-{classes[p]}_{c:.3f}{src.suffix}"
                    try:
                        shutil.copy2(src, dst)
                        pair_counter[pair] = pair_counter.get(pair, 0) + 1
                    except Exception:
                        pass
        idx_base += B

    # 혼동행렬 / 클래스별 정확도 저장
    num_classes = len(classes)
    cm = np.zeros((num_classes, num_classes), dtype=np.int64)
    for t, p in zip(all_labels, all_preds):
        cm[t, p] += 1
    np.save(cm_path, cm)

    per_class = {}
    for k in range(num_classes):
        support = int((np.array(all_labels) == k).sum())
        correct = int(((np.array(all_labels) == k) & (np.array(all_preds) == k)).sum())
        acc = (correct / support) if support > 0 else None
        per_class[classes[k]] = {"acc": acc, "support": support}
    with open(per_class_path, "w", encoding="utf-8") as f:
        json.dump(per_class, f, ensure_ascii=False, indent=2)

    overall = (np.array(all_labels) == np.array(all_preds)).mean()
    print(f"[EVAL] Overall Acc: {overall:.4f} | CM -> {cm_path.name} | per-class -> {per_class_path.name} | CSV -> {csv_path.name}")
    if save_miscls and copy_ok:
        print(f"[EVAL] Misclassified samples saved to: {miscls_dir}")
    elif save_miscls and not copy_ok:
        print("[EVAL] Dataset 경로를 찾지 못해 오분류 복사를 건너뜀(EmotionDataset에 paths/samples 필드가 필요).")


if __name__ == '__main__':
    # CUDA 성능 플래그 최적화
    torch.backends.cudnn.benchmark = True
    # TF32 텐서 코어 사용을 허용하여 Ampere 아키텍처 이상 GPU에서 연산 속도 향상
    torch.backends.cuda.matmul.allow_tf32 = True
    
    # 설정값 정의
    # 장치 설정: 사용 가능한 경우 GPU(cuda)를, 그렇지 않으면 CPU를 사용
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {DEVICE}")
    
    sampling_percent = 30
    DATA_DIR = Path("D:/ex/최종프로젝트/AI_Dev_3rd_Project/YEO/datasets/30_pp_si")
    # 사용하고자 하는 모델 하나만 남기고 다른 MODEL_NAME 앞에 # 붙여서 주석처리
    #MODEL_NAME = 'resnet18'             #철원
    #MODEL_NAME = 'resnet50' 
    #MODEL_NAME = 'mobilenet_v3_small'  #승현님
    #MODEL_NAME = 'shufflenet_v2'       #철원
    #MODEL_NAME = 'efficientnet_v2_s'   #규진님
    #MODEL_NAME = 'squeezenet'          #승희님
    #MODEL_NAME = 'emotionnet'           # 감정 인식 전용 모델
    MODEL_NAME = 'emonet'               # 경량화된 감정 인식 모델

    NUM_CLASSES = 7  # 데이터셋의 클래스 수에 맞게 조정해야 합니다. ['기쁨', '당황', '분노', '불안', '상처', '슬픔', '중립']
    BATCH_SIZE = 64  # 배치 크기를 늘려 GPU 메모리 사용 최적화
    LEARNING_RATE = 0.001
    NUM_EPOCHS = 1
    EARLY_STOPPING_PATIENCE = 10 # 10번 연속 성능 개선이 없으면 조기 종료
    STEPS_PER_EPOCH = None # 빠른 테스트를 위해 에폭당 배치 수를 제한하려면 숫자로 변경 (예: 100)
    train_transform = None
    val_transform = None
    
    if MODEL_NAME == 'emotionnet':
        # 48x48 크기, 흑백(Grayscale), 정규화
        # RandomResizedCrop + TrivialAugmentWide (강력한 데이터 증강 방법)
        train_transform = transforms.Compose([
            #transforms.Resize((48, 48)),
            # 원본 이미지의 80% ~ 100% 사이를 무작위로 잘라 48x48 크기로 만듦
            transforms.RandomResizedCrop(size=48, scale=(0.8, 1.0)),
            # 잘라낸 이미지에 최적의 증강 정책을 자동으로 적용
            transforms.TrivialAugmentWide(),
            # 흑백으로 변환
            transforms.Grayscale(num_output_channels=1),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5], std=[0.5]) # 흑백 이미지 정규화
        ])
        val_transform = transforms.Compose([
            transforms.Resize((48, 48)),
            transforms.Grayscale(num_output_channels=1),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5], std=[0.5]) # 흑백 이미지는 채널이 1개
        ])

    elif MODEL_NAME == 'emonet':
        # 데이터 증강을 포함한 훈련용 Transform 정의
        train_transform = transforms.Compose([
            #transforms.Resize((256, 256)),
            transforms.RandomResizedCrop(size=256, scale=(0.8, 1.0)),
            transforms.TrivialAugmentWide(), 
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        # 증강이 없는 검증/테스트용 Transform 정의
        val_transform = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        
    else:
        # 데이터 증강을 포함한 훈련용 Transform 정의
        train_transform = transforms.Compose([
            #transforms.Resize((224, 224)),
            transforms.RandomResizedCrop(size=224, scale=(0.8, 1.0)),
            transforms.TrivialAugmentWide(), 
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
        ])
        # 증강이 없는 검증/테스트용 Transform 정의
        val_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
        ])
    
    # 훈련용과 검증용 데이터셋을 각각 생성.
    train_dataset = EmotionDataset(data_dir=DATA_DIR / "train", transform=train_transform)
    val_dataset = EmotionDataset(data_dir=DATA_DIR / "val", transform=val_transform)

    # DataLoader I/O 튜닝
    train_loader = DataLoader(
        train_dataset, 
        batch_size=BATCH_SIZE, 
        shuffle=True,
        # CPU 코어를 최대한 활용하여 데이터를 미리 GPU 메모리로 올리는 작업을 병렬 처리
        num_workers=min(8, os.cpu_count()), 
        pin_memory=True, # GPU로의 데이터 전송 속도 향상
        persistent_workers=True, # 워커 프로세스를 계속 유지하여 오버헤드 감소
        prefetch_factor=2, # 각 워커가 미리 로드할 배치 수
        drop_last=True # 마지막 배치가 배치 사이즈보다 작을 경우 버려서 연산 일관성 유지
    )
    val_loader = DataLoader(
        val_dataset, 
        batch_size=BATCH_SIZE, 
        shuffle=False,
        num_workers=min(8, os.cpu_count()),
        pin_memory=True,
        persistent_workers=True,
        prefetch_factor=2
    )

    NUM_CLASSES = len(train_dataset.classes)
    
    print("데이터 준비 완료!")
    print(f"훈련 데이터셋 크기: {len(train_dataset)}")
    print(f"클래스 수: {NUM_CLASSES} -> {train_dataset.classes}")

    # 모델, 손실 함수, 옵티마이저 준비
    model = create_model(model_name=MODEL_NAME, num_classes=NUM_CLASSES)
    # 모델을 지정된 장치로 이동
    model.to(DEVICE)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(
        model.parameters(), 
        weight_decay=1e-4, #과적합 방지를 위한 정규화 기법(Weight Decay), 학습을 방해함으로서 과적합 방지.
        lr=LEARNING_RATE 
        ) 
    START_EPOCH = 0
    
    scheduler = StepLR(optimizer, step_size=7, gamma=0.1)   # 7 에폭마다 학습률을 0.1배로 감소

    CHECKPOINT_PATH = f'./infrastructure/models/weights/checkpoints/{MODEL_NAME}_{sampling_percent}_percent_trained.pth'
    if os.path.exists(CHECKPOINT_PATH):
        print("체크포인트를 불러옵니다...")
        # checkpoint = torch.load(CHECKPOINT_PATH)
        # model.load_state_dict(checkpoint['model_state_dict'])
        # optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        # START_EPOCH = checkpoint['epoch'] + 1 # 다음 에폭부터 시작
        checkpoint = torch.load(CHECKPOINT_PATH)
        model.load_state_dict(checkpoint)   # 키 접근 없이 바로 넣기
    else:
        print("체크포인트가 존재하지 않습니다. 처음부터 훈련을 시작합니다.")
        
    #model = torch.compile(model)   # Windows 환경에서 에러 발생
    #print("모델 컴파일 완료!")
    print(f"'{MODEL_NAME}' 모델, 손실 함수, 옵티마이저 준비 완료!")
    
    # 모델 훈련 시작
    print("\n모델 훈련을 시작합니다...")
    trained_model = train_model(model, 
                                train_loader, 
                                val_loader, 
                                criterion, 
                                optimizer, 
                                scheduler,
                                DEVICE, 
                                num_epochs=NUM_EPOCHS,
                                start_epoch=START_EPOCH,
                                patience=EARLY_STOPPING_PATIENCE,
                                steps_per_epoch=STEPS_PER_EPOCH
                                )

    # 훈련된 모델 저장 (옵션)
    # torch.save(trained_model.state_dict(), f'./infrastructure/models/weights/checkpoints/{MODEL_NAME}_{sampling_percent}_percent_trained.pth')
    print("훈련된 모델 가중치가 저장되었습니다.")

Using device: cuda
데이터 준비 완료!
훈련 데이터셋 크기: 86974
클래스 수: 7 -> ['기쁨', '당황', '분노', '불안', '상처', '슬픔', '중립']
사전 훈련된 EmoNet 가중치를 불러옵니다 (Fine-tuning)...
체크포인트를 불러옵니다...
'emonet' 모델, 손실 함수, 옵티마이저 준비 완료!

모델 훈련을 시작합니다...
Epoch 1/1
----------
  [Batch 20/1358] Train Loss: 0.6620 Acc: 0.7656
  [Batch 1358/1358] Train Loss: 0.7950 Acc: 0.6562
Train Loss: 0.7311 Acc: 0.7459
Val Loss: 0.5993 Acc: 0.7906

  -> Val Loss 개선됨! (0.5993) 모델 저장.
--------------------------------------------------
Training complete in 10m 27s
Saved Epoch: 1
--------------------------------------------------
Saved Train Loss: 0.7311
Saved Train Acc: 0.7459
Saved Val Loss: 0.5993
Saved Val Acc: 0.7906
--------------------------------------------------
Best Train Loss: 0.7311
Best Train Acc: 0.7459
Best Val Loss: 0.5993
Best Val Acc: 0.7906
--------------------------------------------------
훈련된 모델 가중치가 저장되었습니다.


In [2]:
# === 재검증 & 산출물 저장 ===
stamp = time.strftime("%Y%m%d_%H%M%S")
RUN_DIR = Path(f"./runs/{MODEL_NAME}_{stamp}")
RUN_DIR.mkdir(parents=True, exist_ok=True)

# 클래스 이름은 PM의 EmotionDataset가 보유 (예: dataset.classes)
classes = train_dataset.classes  # 또는 val_dataset.classes
evaluate_and_dump(trained_model, val_loader, classes, RUN_DIR, DEVICE,
                save_miscls=True, max_miscls_per_pair=200)

[EVAL] Overall Acc: 0.7906 | CM -> confusion_matrix.npy | per-class -> per_class_acc.json | CSV -> val_predictions.csv
[EVAL] Misclassified samples saved to: runs\emonet_20250820_025411\misclassified


In [None]:
# 데이터 양을 늘려도 성능이 특정 수준에서 다시 정체된다면, 파인튜닝 세분화를 적용하여 모델의 학습 효율을 극대화
# 새로 학습시킬 파라미터와 미세 조정할 파라미터를 분리
new_classifier_params = model.emo_fc_3.parameters()
pretrained_params = [p for name, p in model.named_parameters() if 'emo_fc_3' not in name]

optimizer = optim.Adam([
    {'params': pretrained_params, 'lr': LEARNING_RATE * 0.1}, # 기존 부분은 10분의 1로 미세 조정
    {'params': new_classifier_params, 'lr': LEARNING_RATE}      # 새 부분은 원래 학습률로 학습
], weight_decay=1e-4)