In [1]:
import os
import numpy as np
import json
from PIL import Image
from sklearn.preprocessing import LabelEncoder
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import timm
import torch.nn as nn
import torch.optim as optim

##### 데이터 준비

In [None]:
SAVE_PATH = '/workspace/yonghak/data/image_emotion_dataset.npz'

In [3]:
# 불러오기
data = np.load(SAVE_PATH)
images = data['images']
emotions = data['emotions']
sets = data['sets']

In [4]:
# train/val/test 인덱스 분리
train_idx = sets == 'train'
val_idx = sets == 'val'
test_idx = sets == 'test'

train_images = images[train_idx]
val_images = images[val_idx]
test_images = images[test_idx]
train_emotions = emotions[train_idx]
val_emotions = emotions[val_idx]
test_emotions = emotions[test_idx]

In [5]:
# 라벨 인코딩 (감정만)
emotion_le = LabelEncoder()
train_emotion_idx = emotion_le.fit_transform(train_emotions)
val_emotion_idx = emotion_le.transform(val_emotions)
test_emotion_idx = emotion_le.transform(test_emotions)
NUM_CLASSES = len(emotion_le.classes_)

print('감정 클래스:', emotion_le.classes_)

감정 클래스: ['기쁨' '당황' '분노' '슬픔']


In [6]:
print(f"Train 이미지 shape: {train_images.shape}, 감정 라벨 shape: {train_emotion_idx.shape}")
print(f"Validation 이미지 shape: {val_images.shape}, 감정 라벨 shape: {val_emotion_idx.shape}")
print(f"Test 이미지 shape: {test_images.shape}, 감정 라벨 shape: {test_emotion_idx.shape}")

Train 이미지 shape: (5994, 224, 224, 3), 감정 라벨 shape: (5994,)
Validation 이미지 shape: (1200, 224, 224, 3), 감정 라벨 shape: (1200,)
Test 이미지 shape: (1200, 224, 224, 3), 감정 라벨 shape: (1200,)


In [7]:
# Dataset 클래스 (감정만 사용)
class EmotionDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        image = Image.fromarray(image.astype('uint8'))
        if self.transform:
            image = self.transform(image)
        return image, label

In [8]:
img_size = 224
common_transform = transforms.Compose([
    transforms.Resize((img_size, img_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

In [9]:
train_dataset = EmotionDataset(train_images, train_emotion_idx, transform=common_transform)
val_dataset = EmotionDataset(val_images, val_emotion_idx, transform=common_transform)
test_dataset = EmotionDataset(test_images, test_emotion_idx, transform=common_transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=0)

##### 모델

In [10]:
# ViT 모델 (감정만 분류)
class ViT_EmotionClassifier(nn.Module):
    def __init__(self, num_classes=NUM_CLASSES, dropout_rate=0.3):
        super(ViT_EmotionClassifier, self).__init__()
        self.vit = timm.create_model('vit_base_patch16_224', pretrained=True)
        in_features = self.vit.head.in_features
        self.vit.head = nn.Identity()  # 기존 head 제거
        self.dropout = nn.Dropout(p=dropout_rate)
        self.fc = nn.Linear(in_features, num_classes)  # 새 head 정의

    def forward(self, x):
        x = self.vit(x)
        x = self.dropout(x)
        x = self.fc(x)
        return x

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ViT_EmotionClassifier(num_classes=NUM_CLASSES, dropout_rate=0.3)
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [12]:
class EarlyStopping:
    def __init__(self, patience=3, min_delta=0.001, path='checkpoint.pt'):
        self.patience = patience              # 개선 안되는 최대 epoch 수
        self.min_delta = min_delta            # 최소 개선 폭
        self.counter = 0                      # 개선 안 된 epoch 수
        self.best_score = None                # 최고 성능
        self.early_stop = False               # 중단 여부
        self.path = path                      # 모델 저장 경로

    def __call__(self, val_score, model):
        if self.best_score is None:
            self.best_score = val_score
            self.save_checkpoint(model)
        elif val_score < self.best_score + self.min_delta:
            self.counter += 1
            print(f"EarlyStopping 카운트: {self.counter}/{self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = val_score
            self.save_checkpoint(model)
            self.counter = 0

    def save_checkpoint(self, model):
        torch.save(model.state_dict(), self.path)
        print(f'>>> [Val] 모델 저장됨: {self.path} (Val Acc: {self.best_score:.4f})')

In [13]:
save_path_val = '/workspace/yonghak/vit_base.pth'
early_stopping = EarlyStopping(patience=3, min_delta=0.001, path=save_path_val)

for epoch in range(20):
    
    # ---- 학습 ----
    model.train()
    train_loss, train_correct = 0, 0
    for imgs, labels in train_loader:
        imgs, labels = imgs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * imgs.size(0)
        preds = outputs.argmax(dim=1)
        train_correct += (preds == labels).sum().item()
    train_acc = train_correct / len(train_dataset)
    train_loss_avg = train_loss / len(train_dataset)
    print(f"Epoch {epoch+1} | Train Loss: {train_loss_avg:.4f} | Train Acc: {train_acc:.4f}")

    # ---- 검증 ----
    model.eval()
    val_loss, val_correct = 0, 0
    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * imgs.size(0)
            preds = outputs.argmax(dim=1)
            val_correct += (preds == labels).sum().item()
    val_acc = val_correct / len(val_dataset)
    val_loss_avg = val_loss / len(val_dataset)
    print(f"Validation Loss: {val_loss_avg:.4f} | Validation Acc: {val_acc:.4f}")

    # ---- Early Stopping 호출 ----
    early_stopping(val_acc, model)

    if early_stopping.early_stop:
        print("EarlyStopping: validation 성능이 개선되지 않아 학습을 조기 종료합니다.")
        break

Epoch 1 | Train Loss: 1.4469 | Train Acc: 0.2504
Validation Loss: 1.4272 | Validation Acc: 0.2500
>>> [Val] 모델 저장됨: /workspace/yonghak/vit_base.pth (Val Acc: 0.2500)
Epoch 2 | Train Loss: 1.2479 | Train Acc: 0.4169
Validation Loss: 1.1491 | Validation Acc: 0.5267
>>> [Val] 모델 저장됨: /workspace/yonghak/vit_base.pth (Val Acc: 0.5267)
Epoch 3 | Train Loss: 0.8200 | Train Acc: 0.6705
Validation Loss: 0.9132 | Validation Acc: 0.6108
>>> [Val] 모델 저장됨: /workspace/yonghak/vit_base.pth (Val Acc: 0.6108)
Epoch 4 | Train Loss: 0.4985 | Train Acc: 0.8061
Validation Loss: 0.9654 | Validation Acc: 0.6575
>>> [Val] 모델 저장됨: /workspace/yonghak/vit_base.pth (Val Acc: 0.6575)
Epoch 5 | Train Loss: 0.3079 | Train Acc: 0.8859
Validation Loss: 1.2324 | Validation Acc: 0.6442
EarlyStopping 카운트: 1/3
Epoch 6 | Train Loss: 0.2076 | Train Acc: 0.9236
Validation Loss: 1.6125 | Validation Acc: 0.6500
EarlyStopping 카운트: 2/3
Epoch 7 | Train Loss: 0.1392 | Train Acc: 0.9483
Validation Loss: 1.4238 | Validation Acc: 0.6

In [14]:
# 평가
model.eval()
test_loss, test_correct = 0, 0
with torch.no_grad():
    for imgs, labels in test_loader:
        imgs, labels = imgs.to(device), labels.to(device)
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        test_loss += loss.item() * imgs.size(0)
        preds = outputs.argmax(dim=1)
        test_correct += (preds == labels).sum().item()
print(f"Test Loss: {test_loss/len(test_dataset):.4f} "
      f"Test Acc: {test_correct/len(test_dataset):.4f}")

Test Loss: 1.2912 Test Acc: 0.6533


In [15]:
from sklearn.metrics import confusion_matrix, classification_report

all_preds = []
all_labels = []
model.eval()
with torch.no_grad():
    for imgs, labels in test_loader:
        imgs, labels = imgs.to(device), labels.to(device)
        outputs = model(imgs)
        preds = outputs.argmax(dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# 혼동행렬
cm = confusion_matrix(all_labels, all_preds)
print(cm)

# 정확도(accuracy)
acc = np.mean(np.array(all_labels) == np.array(all_preds))
print(f"\n 정확도: {acc:.4f} \n")

# precision/recall/f1-score
print(classification_report(all_labels, all_preds, target_names=emotion_le.classes_))

[[221  14  42  23]
 [ 10 199  76  15]
 [ 10  49 194  47]
 [ 25  27  78 170]]

 정확도: 0.6533 

              precision    recall  f1-score   support

          기쁨       0.83      0.74      0.78       300
          당황       0.69      0.66      0.68       300
          분노       0.50      0.65      0.56       300
          슬픔       0.67      0.57      0.61       300

    accuracy                           0.65      1200
   macro avg       0.67      0.65      0.66      1200
weighted avg       0.67      0.65      0.66      1200

