In [None]:
import os
import random
import numpy as np
import pandas as pd
import scipy.io
from PIL import Image
from datetime import datetime
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
from sklearn.metrics import f1_score
from sklearn.model_selection import StratifiedShuffleSplit
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
import torchvision.transforms as transforms
from approach.ResEmoteNet import ResEmoteNet

In [None]:
# 실험 재현성을 위한 시드 설정 함수
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# 시드 설정
set_seed(42)

# 하이퍼파라미터 설정
hyperparams = {
    "num_epochs": 100,
    "batch_size": 128,
    "learning_rate": 1e-4,
    "weight_decay": 1e-5,
    "scheduler_step_size": 10,
    "scheduler_gamma": 0.1,
    "patience": 5,
    "dropout_rate": 0.5,
    "seed": 42,
    "train_val_split_ratio": 0.8
}

num_classes = 7
class_dict = {'happy': 0, 'surprise': 1, 'sad': 2, 'anger': 3, 'disgust': 4, 'fear': 5, 'neutral': 6}
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(device)

In [None]:
# Transform the dataset
train_transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.Grayscale(num_output_channels=3),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

val_transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.Grayscale(num_output_channels=3),
    # transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    )
])

In [None]:
class EmotionDataset(Dataset):
    def __init__(self, file_paths, labels, transform=None):
        self.paths = file_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        # 이미지 파일 로드 (Grayscale로 변환)
        img_path = self.paths[idx]
        image = Image.open(img_path).convert("L")  # Grayscale로 변환

        # PIL 이미지를 NumPy 배열로 변환
        # image = np.array(image)

        # 레이블 로드
        label = self.labels[idx]

        # 변환 적용
        image = self.transform(image)

        return image, label

In [None]:
# 데이터셋 경로 설정
data_dir = '/root/Project/class/Emotion_detection/dataset'

# 파일 경로와 레이블 수집 함수
def collect_files_and_labels(base_dir, class_dict):
    file_paths = []
    labels = []

    for class_name, label in class_dict.items():
        class_dir = os.path.join(base_dir, class_name)
        if os.path.isdir(class_dir):  # 폴더만 처리
            for file in os.listdir(class_dir):
                if file.endswith(('.png', '.jpg', '.jpeg')):  # png, jpg, jpeg 파일만 처리
                    file_paths.append(os.path.join(class_dir, file))
                    labels.append(label)

    return file_paths, labels

# train 및 val 데이터셋 수집
train_dir = os.path.join(data_dir, 'train')
val_dir = os.path.join(data_dir, 'test')  # 'test' 폴더를 'val'로 활용

train_file_paths, train_labels = collect_files_and_labels(train_dir, class_dict)
val_file_paths, val_labels = collect_files_and_labels(val_dir, class_dict)

print(f"Number of training samples: {len(train_file_paths)}")
print(f"Number of validation samples: {len(val_file_paths)}")

# 각각의 서브셋에 다른 변환 적용 (예: train_transform, val_transform 필요)
train_dataset = EmotionDataset(train_file_paths, train_labels, transform=train_transform)
val_dataset = EmotionDataset(val_file_paths, val_labels, transform=val_transform)

# 데이터 로더 생성
train_loader = DataLoader(train_dataset, batch_size=hyperparams["batch_size"], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=hyperparams["batch_size"], shuffle=False)


In [None]:
train_dataset.__getitem__(0)

In [None]:
# 모델 생성 및 입력 채널 수정
model = ResEmoteNet()
model = model.to(device)

In [None]:
# 손실 함수, 옵티마이저, 스케줄러 설정
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=hyperparams["learning_rate"], weight_decay=hyperparams["weight_decay"])
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=hyperparams["num_epochs"], eta_min=1e-6)

In [None]:
# 학습 및 검증 함수 (TensorBoard 기록 추가)
def train_one_epoch(model, dataloader, optimizer, criterion, device, epoch):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    all_labels = []
    all_preds = []
    step_loss = []

    for step, (data, labels) in enumerate(tqdm(dataloader, desc="Training")):
        data, labels = data.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = model(data)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        running_loss += loss.item() * data.size(0)
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(predicted.cpu().numpy())

        # TensorBoard에 기록
        writer.add_scalar("Train/Loss", loss.item(), epoch * len(dataloader) + step)
        step_loss.append(loss.item())

    epoch_loss = running_loss / total
    epoch_acc = 100. * correct / total
    epoch_f1 = 100. * f1_score(all_labels, all_preds, average='weighted')
    writer.add_scalar("Train/Accuracy", epoch_acc, epoch)
    writer.add_scalar("Train/F1-score", epoch_f1, epoch)
    return epoch_loss, epoch_acc, epoch_f1, step_loss

def validate(model, dataloader, criterion, device, epoch):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_labels = []
    all_preds = []
    step_loss = []

    with torch.no_grad():
        for step, (data, labels) in enumerate(tqdm(dataloader, desc="Validating")):
            data, labels = data.to(device), labels.to(device)

            outputs = model(data)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * data.size(0)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())

            # TensorBoard에 기록
            writer.add_scalar("Val/Loss", loss.item(), epoch * len(dataloader) + step)
            step_loss.append(loss.item())

    epoch_loss = running_loss / total
    epoch_acc = 100. * correct / total
    epoch_f1 = 100. * f1_score(all_labels, all_preds, average='weighted')
    writer.add_scalar("Val/Accuracy", epoch_acc, epoch)
    writer.add_scalar("Val/F1-score", epoch_f1, epoch)
    return epoch_loss, epoch_acc, epoch_f1, step_loss

# 그래프 저장 함수
def plot_metrics(train_losses, train_losses_step, val_losses, val_losses_step, train_accuracies, val_accuracies, train_f1_scores, val_f1_scores, epoch):
    # train 손실(step) 그래프 저장
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses_step, label='Train Loss(step)', color='blue')
    plt.title('Loss Over Steps')
    plt.xlabel('Steps')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig(f"result/classification{current_time}/train_loss_step.png")
    plt.close()

    # val 손실(step) 그래프 저장
    plt.figure(figsize=(10, 5))
    plt.plot(val_losses_step, label='Val Loss(step)', color='blue')
    plt.title('Loss Over Steps')
    plt.xlabel('Steps')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig(f"result/classification{current_time}/val_loss_step.png")
    plt.close()

    # 손실 그래프 저장
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Train Loss', color='blue')
    plt.plot(val_losses, label='Val Loss', color='orange')
    plt.title('Loss Over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.savefig(f"result/classification{current_time}/train_val_loss_epoch.png")
    plt.close()

    # 정확도 및 F1 점수 그래프 저장
    plt.figure(figsize=(10, 5))
    plt.plot(train_accuracies, label='Train Accuracy', color='blue')
    plt.plot(val_accuracies, label='Val Accuracy', color='orange')
    plt.plot(train_f1_scores, label='Train F1-score', color='green')
    plt.plot(val_f1_scores, label='Val F1-score', color='red')
    plt.title('Accuracy and F1-Score Over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Score')
    plt.legend()
    plt.savefig(f"result/classification{current_time}/train_val_metrics_epoch.png")
    plt.close()

In [None]:
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
writer = SummaryWriter(log_dir=f"runs/classification{current_time}")
os.makedirs(f"result/classification{current_time}", exist_ok=True)

num_epochs = hyperparams["num_epochs"]
best_val_loss = float('inf')
train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []
train_f1_scores = []
val_f1_scores = []
train_losses_step = []
val_losses_step = []

for epoch in range(num_epochs):
    print(f"Epoch [{epoch + 1}/{num_epochs}]")

    train_loss, train_acc, train_f1, train_loss_step = train_one_epoch(model, train_loader, optimizer, criterion, device, epoch)
    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%, Train F1-score: {train_f1:.2f}%")

    train_losses.append(train_loss)
    train_accuracies.append(train_acc)
    train_f1_scores.append(train_f1)
    train_losses_step.extend(train_loss_step)

    # 검증 단계
    val_loss, val_acc, val_f1, val_loss_step = validate(model, val_loader, criterion, device, epoch)
    print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%, Val F1-score: {val_f1:.2f}%")

    val_losses.append(val_loss)
    val_accuracies.append(val_acc)
    val_f1_scores.append(val_f1)
    val_losses_step.extend(val_loss_step)

    # 스케줄러 업데이트
    scheduler.step()

    # 최상의 검증 손실 갱신 및 모델 저장
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), f"result/classification{current_time}/best_model.pth")
        print("Best model saved with validation loss: {:.4f} \n".format(best_val_loss))
    
    plot_metrics(train_losses, train_losses_step, val_losses, val_losses_step, train_accuracies, val_accuracies, train_f1_scores, val_f1_scores, epoch)


torch.save(model.state_dict(), f"result/classification{current_time}/last_model.pth")

# 그래프 저장
print("Training completed. Best validation loss: {:.4f}".format(best_val_loss))
writer.close()