In [None]:
# 라이브러리 임포트
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.model_selection import train_test_split
import numpy as np
from collections import Counter
from sklearn.metrics import accuracy_score, f1_score
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
from Model import Transformer, PositionalEncoding
from TrainDataset import prepare_classification_dataset, tensor_dataset

In [None]:
df = pd.read_csv(f'datasets/sentiment_train.csv', index_col=0)

# '감정' 레이블을 숫자로 매핑
df.loc[(df['감정'] == '불안'), '감정'] = 0
df.loc[(df['감정'] == '당황'), '감정'] = 1
df.loc[(df['감정'] == '분노'), '감정'] = 2
df.loc[(df['감정'] == '슬픔'), '감정'] = 3
df.loc[(df['감정'] == '중립'), '감정'] = 4
df.loc[(df['감정'] == '행복'), '감정'] = 5
df.loc[(df['감정'] == '혐오'), '감정'] = 6

print(f"원본 df 크기: {len(df)}")
print(f"원본 df 감정 분포 (매핑 후): {Counter(df['감정'])}")

train_df, val_df = train_test_split(df, train_size=0.8, test_size=0.2, stratify=df['감정'], random_state=42) # 재현성을 위해 random_state 추가

print(f"학습 데이터프레임 크기: {len(train_df)}")
print(f"검증 데이터프레임 크기: {len(val_df)}")
print(f"학습 데이터프레임 감정 분포: {Counter(train_df['감정'])}")
print(f"검증 데이터프레임 감정 분포: {Counter(val_df['감정'])}")


print("학습 데이터 파싱 중...")
train_datasets_dict = prepare_classification_dataset(train_df)
print("검증 데이터 파싱 중...")
val_datasets_dict = prepare_classification_dataset(val_df)

train_datasets = tensor_dataset(train_datasets_dict)
val_datasets = tensor_dataset(val_datasets_dict)

print(f"학습 데이터셋 크기: {len(train_datasets)}")
print(f"검증 데이터셋 크기: {len(val_datasets)}")

BATCH_SIZE = 32
train_loader = DataLoader(
    train_datasets,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=0
)
print(f"학습 DataLoader 배치 수: {len(train_loader)}")

val_loader = DataLoader(
    val_datasets,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0
)
print(f"검증 DataLoader 배치 수: {len(val_loader)}")

VOCAB_SIZE = 32000
MAX_LEN =128
OUTPUT_DIM = df['감정'].unique()
PAD_TOKEN_ID = 0

print(f"\n설정된 전역 변수:")
print(f"  VOCAB_SIZE: {VOCAB_SIZE}")
print(f"  MAX_LEN: {MAX_LEN}")
print(f"  OUTPUT_DIM: {OUTPUT_DIM}")
print(f"  PAD_TOKEN_ID: {PAD_TOKEN_ID}")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"\n사용 가능한 장치: {device}")

VERSION_FILE_PATH = os.path.join('saves', 'optuna_version.txt')

OPTUNA_LOG_MODEL_ROOT_DIR = 'optuna_runs'

def get_next_version():
    os.makedirs('saves', exist_ok=True) 
    current_version = 0
    if os.path.exists(VERSION_FILE_PATH):
        with open(VERSION_FILE_PATH, 'r') as f:
            try:
                current_version = int(f.read().strip())
            except ValueError:
                current_version = 0
    next_version = current_version + 1
    with open(VERSION_FILE_PATH, 'w') as f:
        f.write(str(next_version))
    return next_version

CURRENT_OPTUNA_VERSION = get_next_version()
print(f"\nOptuna 최적화 버전: v{CURRENT_OPTUNA_VERSION} (버전 파일: {VERSION_FILE_PATH})")

OPTUNA_VERSION_DIR = os.path.join(OPTUNA_LOG_MODEL_ROOT_DIR, f'v{CURRENT_OPTUNA_VERSION}')
os.makedirs(OPTUNA_VERSION_DIR, exist_ok=True)
print(f"모든 Optuna Trial 로그 및 모델은 '{OPTUNA_VERSION_DIR}' 아래에 저장됩니다.")

In [None]:
def Transformer_Train(epoch, device, train_loader, val_loader, NN, loss_function, optimizer, scheduler_plateau,
                      warmup_epochs, log_dir, save_path, patience, trial=None):
    best_val_f1_weighted = 0.0
    no_improve_epochs = 0

    from torch.utils.tensorboard import SummaryWriter
    writer = SummaryWriter(log_dir)
    print(f"  [Trial {trial.number}] TensorBoard 로그 디렉토리: {log_dir}")

    if warmup_epochs > 0:
        initial_lr = optimizer.param_groups[0]['lr']
        warmup_lr_schedule = lambda e: (e + 1) / warmup_epochs if e < warmup_epochs else 1.0

    for e in range(1, epoch + 1):
        NN.train()
        total_loss = 0

        if warmup_epochs > 0 and e <= warmup_epochs:
            for param_group in optimizer.param_groups:
                param_group['lr'] = initial_lr * warmup_lr_schedule(e - 1)

        for batch_idx, (data, attention_mask, labels) in enumerate(train_loader):
            data, attention_mask, labels = data.to(device), attention_mask.to(device), labels.to(device)
            optimizer.zero_grad()
            predictions = NN(data, attention_mask)
            loss = loss_function(predictions, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        avg_train_loss = total_loss / len(train_loader)

        NN.eval()
        val_preds = []
        val_true = []
        with torch.no_grad():
            for data, attention_mask, labels in val_loader:
                data, attention_mask, labels = data.to(device), attention_mask.to(device), labels.to(device)
                predictions = NN(data, attention_mask)
                val_preds.extend(torch.argmax(predictions, dim=1).cpu().numpy())
                val_true.extend(labels.cpu().numpy())

        val_acc = accuracy_score(val_true, val_preds)
        val_f1_weighted = f1_score(val_true, val_preds, average='weighted', zero_division=0)
        val_f1_macro = f1_score(val_true, val_preds, average='macro', zero_division=0)

        if e > warmup_epochs:
            scheduler_plateau.step(val_f1_weighted)

        if val_f1_weighted > best_val_f1_weighted:
            best_val_f1_weighted = val_f1_weighted
            no_improve_epochs = 0
        else:
            no_improve_epochs += 1

        print(f"  [Trial {trial.number}] Epoch {e}\tTrain Loss: {avg_train_loss:.5f}\tVal Acc: {val_acc:.4f}\tVal F1 (Weighted): {val_f1_weighted:.4f}\tVal F1 (Macro): {val_f1_macro:.4f}\tNo Improve Epochs: {no_improve_epochs}")

        writer.add_scalar('Loss/train', avg_train_loss, e)
        writer.add_scalar('Metrics/Val_Accuracy', val_acc, e)
        writer.add_scalar('Metrics/Val_F1_Weighted', val_f1_weighted, e)
        writer.add_scalar('Metrics/Val_F1_Macro', val_f1_macro, e)
        writer.add_scalar('Learning_Rate', optimizer.param_groups[0]['lr'], e)

        if trial:
            trial.report(val_f1_weighted, e)
            if trial.should_prune():
                print(f"Trial {trial.number} is pruned at epoch {e}.")
                raise optuna.exceptions.TrialPruned()

        if no_improve_epochs >= patience:
            print(f"조기 종료: {patience} 에포크 동안 성능 개선 없음. Trial {trial.number} 종료.")
            break
    
    writer.close()
    
    return best_val_f1_weighted

In [None]:
def objective(trial):
    # 1. 하이퍼파라미터 제안
    embedding_dim = trial.suggest_categorical('embedding_dim', [128, 256])
    hidden_dim = trial.suggest_categorical('hidden_dim', [128, 256, 512])
    n_layers = trial.suggest_int('n_layers', 2, 4)
    n_heads = trial.suggest_categorical('n_heads', [4, 8, 16])
    dropout_p = trial.suggest_float('dropout_p', 0.1, 0.5, step=0.1)
    learning_rate = trial.suggest_loguniform('learning_rate', 5e-6, 5e-5)
    warmup_epochs = trial.suggest_int('warmup_epochs', 5, 15)
    patience = trial.suggest_int('patience', 10, 20)
    use_class_weights = trial.suggest_categorical('use_class_weights', [True, False])

    if hidden_dim % n_heads != 0:
        raise optuna.exceptions.TrialPruned(f"hidden_dim ({hidden_dim}) is not divisible by n_heads ({n_heads})")

    # 2. 모델 인스턴스 생성 및 하이퍼파라미터 적용
    NN = Transformer(vocab_size=VOCAB_SIZE, embedding_dim=embedding_dim, hidden_dim=hidden_dim,
                     output_dim=OUTPUT_DIM, n_layers=n_layers, n_heads=n_heads, dropout_p=dropout_p,
                     max_len=MAX_LEN, pad_token_id=PAD_TOKEN_ID).to(device)

    class_weights = None
    if use_class_weights:
        all_train_labels_original = train_df['감정'].values.astype(int)
        num_classes = NN.output_dim
        label_counts_original = np.bincount(all_train_labels_original, minlength=num_classes)
        class_counts_tensor = torch.tensor(label_counts_original, dtype=torch.float)
        # 0으로 나누는 것을 방지 (매우 드물게 발생할 수 있는 0개 클래스 방지)
        class_counts_tensor = torch.where(class_counts_tensor == 0, torch.tensor(1.0), class_counts_tensor) 
        class_weights = (class_counts_tensor.sum() / class_counts_tensor).to(device)
        print(f"Trial {trial.number}: 계산된 클래스 가중치: {class_weights.tolist()}")
    else:
        print(f"Trial {trial.number}: 클래스 가중치 미사용.")


    # 4. 손실 함수 (클래스 가중치 적용 여부에 따라 초기화)
    if use_class_weights:
        loss_function = nn.CrossEntropyLoss(weight=class_weights)
    else:
        loss_function = nn.CrossEntropyLoss()

    # 5. 옵티마이저
    optimizer = optim.AdamW(NN.parameters(), lr=learning_rate)

    # 6. 스케줄러
    scheduler_plateau = ReduceLROnPlateau(optimizer, mode="max", factor=0.8, patience=5, min_lr=1e-7)

    # 7. 학습 실행 (Transformer_Train 함수 호출)
    log_dir = os.path.join(OPTUNA_VERSION_DIR, f"trial_{trial.number}_params_emb{embedding_dim}_heads{n_heads}_lr{learning_rate:.1e}_cw{use_class_weights}")
    save_path = os.path.join(OPTUNA_VERSION_DIR, f"model_trial_{trial.number}.pt")
    
    os.makedirs(log_dir, exist_ok=True)
    os.makedirs(os.path.dirname(save_path), exist_ok=True)

    val_f1_weighted = Transformer_Train(
        epoch=10000,
        device=device,
        train_loader=train_loader,
        val_loader=val_loader,
        NN=NN,
        loss_function=loss_function,
        optimizer=optimizer,
        scheduler_plateau=scheduler_plateau,
        warmup_epochs=warmup_epochs,
        log_dir=log_dir,
        save_path=save_path,
        patience=patience,
        trial=trial
    )

    return val_f1_weighted

In [None]:
study = optuna.create_study(
    direction="maximize",
    sampler=optuna.samplers.TPESampler(),
    pruner=optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=30, interval_steps=10)
)

print("Optuna 최적화 시작...")
study.optimize(objective, n_trials=5)

print("\n--- Optuna 최적화 결과 ---")
print(f"최적의 하이퍼파라미터 조합 (Best Trial): {study.best_trial.params}")
print(f"최적의 검증 Weighted F1 (Best Value): {study.best_trial.value:.4f}")

print("\n--- 모든 시도 결과 ---")
for i, trial in enumerate(study.trials):
    print(f"Trial {i}: Value={trial.value:.4f}, Params={trial.params}, State={trial.state}")

> tensorboard --logdir optuna_runs