# AI 감정 인식 모델 학습 (병렬 처리 & 정규화)

이 노트북은 **MediaPipe FaceMesh**를 사용하여 얼굴 랜드마크를 추출하고, PyTorch를 사용하여 감정 분류 모델을 학습합니다.

**개선된 점:**
- 2개의 GPU 활용 (DataParallel)
- **병렬 데이터 처리 (Parallel Processing)**: ProcessPoolExecutor를 사용하여 데이터 로딩 속도 대폭 향상
- **좌표 정규화 (Coordinate Normalization)**: 얼굴 위치와 크기에 상관없이 학습되도록 전처리 추가
- 학습 진행 상황(Loss, Accuracy) 실시간 시각화

In [1]:
import zipfile
import cv2
import mediapipe as mp
import numpy as np
import os
import joblib
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
import matplotlib.pyplot as plt
from concurrent.futures import ProcessPoolExecutor

# 시각화 설정
%matplotlib inline

# GPU 설정 확인
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
if torch.cuda.device_count() > 1:
    print(f"Detected {torch.cuda.device_count()} GPUs! Using DataParallel.")


Using device: cuda
Detected 2 GPUs! Using DataParallel.


In [2]:
# ============================
# 설정 변수
# ============================

# 데이터셋 경로 (사용자 환경에 맞게 수정됨)
BASE_DIR = r"c:\Users\ldy34\Desktop\Face\video"
TRAIN_DIR = os.path.join(BASE_DIR, "Training")

# 학습할 파일 목록
TARGET_FILES = {
    "Neutral": {"filename": "[원천]EMOIMG_중립_TRAIN_01.zip", "label": 0},
    "Anxious": {"filename": "[원천]EMOIMG_불안_TRAIN_01.zip", "label": 1}
}

# 추출할 최대 샘플 수 (테스트용 10,000개)
MAX_SAMPLES = 10000

In [3]:
# ============================
# 병렬 처리를 위한 함수 정의
# ============================

def process_single_image(args):
    """
    이미지 데이터(bytes) 하나를 받아서 랜드마크를 추출하고 정규화하는 함수
    (ProcessPoolExecutor에서 실행됨)
    """
    img_data, label = args
    try:
        img_array = np.frombuffer(img_data, np.uint8)
        image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
        if image is None: return None
        
        # FaceMesh 객체를 프로세스마다 새로 생성해야 충돌이 없음
        with mp.solutions.face_mesh.FaceMesh(
            static_image_mode=True, 
            max_num_faces=1, 
            refine_landmarks=True, 
            min_detection_confidence=0.5
        ) as face_mesh_local:
            
            results = face_mesh_local.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
            
            if results.multi_face_landmarks:
                landmarks_raw = []
                for lm in results.multi_face_landmarks[0].landmark:
                    landmarks_raw.append([lm.x, lm.y, lm.z])
                
                landmarks_raw = np.array(landmarks_raw)
                
                # 1. 중심 이동
                nose_tip = landmarks_raw[1]
                landmarks_centered = landmarks_raw - nose_tip
                
                # 2. 스케일링
                max_dist = np.max(np.linalg.norm(landmarks_centered, axis=1))
                if max_dist > 0:
                    landmarks_normalized = landmarks_centered / max_dist
                else:
                    landmarks_normalized = landmarks_centered
                
                return landmarks_normalized.flatten(), label

    except Exception:
        return None
    return None

def extract_landmarks_parallel(zip_path, label, max_samples=10000):
    if not os.path.exists(zip_path):
        print(f"\n[Error] File not found: {zip_path}")
        return [], []

    print(f"\n[{os.path.basename(zip_path)}] Reading Zip...")
    
    # 1. ZIP 파일 읽기 (메인 스레드)
    image_data_list = []
    with zipfile.ZipFile(zip_path, 'r') as z:
        file_list = [f for f in z.namelist() if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
        use_count = min(len(file_list), max_samples)
        target_files = file_list[:use_count]
        
        # 이미지를 메모리에 로드 (병렬 처리를 위해)
        for img_name in target_files:
            with z.open(img_name) as f:
                image_data_list.append((f.read(), label))
    
    print(f"  - Loaded {len(image_data_list):,} images. Starting CPU Parallel Processing...")

    # 2. 병렬 처리 실행
    data = []
    labels = []
    
    # max_workers는 CPU 코어 수만큼 자동 할당됨
    with ProcessPoolExecutor() as executor:
        # map 함수로 병렬 실행
        results = list(tqdm(executor.map(process_single_image, image_data_list), total=len(image_data_list), desc="  Processing"))
        
        for res in results:
            if res is not None:
                d, l = res
                data.append(d)
                labels.append(l)
                
    print(f"  -> Successfully extracted: {len(data):,} samples")
    return data, labels

In [4]:
# 데이터 로드 실행
if __name__ == '__main__':
    X = []
    y = []

    print("Starting Parallel Data Extraction...")

    for emotion, info in TARGET_FILES.items():
        path = os.path.join(TRAIN_DIR, info['filename'])
        d, l = extract_landmarks_parallel(path, info['label'], max_samples=MAX_SAMPLES)
        X.extend(d)
        y.extend(l)

    X = np.array(X)
    y = np.array(y)

    print(f"\nTotal Dataset Size: {len(X):,} samples")

Starting Parallel Data Extraction...

[[원천]EMOIMG_중립_TRAIN_01.zip] Reading Zip...
  - Loaded 10,000 images. Starting CPU Parallel Processing...


  Processing:   0%|          | 0/10000 [00:00<?, ?it/s]


BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

In [None]:
# 데이터 분할 및 텐서 변환
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Tensor 변환
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.long)
X_test_t = torch.tensor(X_test, dtype=torch.float32)
y_test_t = torch.tensor(y_test, dtype=torch.long)

# DataLoader (배치 사이즈 증가)
BATCH_SIZE = 128
train_dataset = TensorDataset(X_train_t, y_train_t)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_dataset = TensorDataset(X_test_t, y_test_t)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

print(f"Train Set: {len(X_train):,} | Test Set: {len(X_test):,}")

In [None]:
# 모델 정의
class EmotionMLP(nn.Module):
    def __init__(self, input_size):
        super(EmotionMLP, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(input_size, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Dropout(0.3),
            
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Dropout(0.2),
            
            nn.Linear(256, 128),
            nn.ReLU(),
            
            nn.Linear(128, 2) # Neutral, Anxious
        )
    
    def forward(self, x):
        return self.network(x)

model = EmotionMLP(X_train.shape[1]).to(device)

# Multi-GPU 적용
if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

print("Model Initialized.")

In [None]:
# 학습 루프
epochs = 50
history = {'train_loss': [], 'test_loss': [], 'train_acc': [], 'test_acc': []}

print("Target Epochs:", epochs)
for epoch in range(epochs):
    # --- Training ---
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
    
    epoch_train_loss = running_loss / len(train_loader)
    epoch_train_acc = correct / total
    
    # --- Validation (Test) ---
    model.eval()
    test_loss = 0.0
    correct_test = 0
    total_test = 0
    
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            
            test_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total_test += y_batch.size(0)
            correct_test += (predicted == y_batch).sum().item()
            
    epoch_test_loss = test_loss / len(test_loader)
    epoch_test_acc = correct_test / total_test
    
    history['train_loss'].append(epoch_train_loss)
    history['test_loss'].append(epoch_test_loss)
    history['train_acc'].append(epoch_train_acc)
    history['test_acc'].append(epoch_test_acc)
    
    if (epoch+1) % 5 == 0:
        print(f"Epoch [{epoch+1}/{epochs}] | "
              f"Train Loss: {epoch_train_loss:.4f} Acc: {epoch_train_acc:.4f} | "
              f"Test Loss: {epoch_test_loss:.4f} Acc: {epoch_test_acc:.4f}")

print("Training Complete.")

In [None]:
# 결과 시각화
plt.figure(figsize=(14, 5))

plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['test_loss'], label='Test (Validation) Loss')
plt.title('Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(history['train_acc'], label='Train Accuracy')
plt.plot(history['test_acc'], label='Test (Validation) Accuracy')
plt.title('Accuracy')
plt.legend()
plt.grid(True)

plt.show()

In [None]:
# 모델 저장
model.eval()
with torch.no_grad():
    X_test_device = X_test_t.to(device)
    outputs = model(X_test_device)
    _, predicted = torch.max(outputs, 1)
    y_pred = predicted.cpu().numpy()

print("\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=['Neutral', 'Anxious']))

torch.save(model.state_dict(), 'emotion_model_gpu.pth')
print("Model saved.")