# 앙상블 메타모델 (Stacking) - WandB 포함

여러 베이스 모델의 예측 결과(probabilities)를 입력으로 받아 메타모델을 학습하고, 최종 예측을 생성합니다.

## 포함 기능
- 베이스 모델 예측 CSV 로드 및 스택 특성 생성
- Stratified K-Fold OOF 학습/평가 (메타모델: 로지스틱 회귀)
- Test 예측 생성 및 저장
- WandB 로깅 및 아티팩트 업로드


In [1]:
# Imports
import os
import json
import warnings
from datetime import datetime

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, f1_score
from scipy.optimize import minimize

import wandb

warnings.filterwarnings("ignore")
print("✅ 라이브러리 임포트 완료")




✅ 라이브러리 임포트 완료


In [2]:
# Config
RANDOM_STATE = 42
TIMESTAMP = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

# 예측 파일 디렉토리
PRED_DIR = "./predictions"

# 자동 탐색: 디렉토리 내 *_val_predictions.csv, *_test_predictions.csv
AUTO_DISCOVER = True

# 열 이름 설정
ID_COL_CANDIDATES = ["ID", "id"]
LABEL_COL = "label"
PROB_COL_PREFIX = "prob_class_"  # prob_class_0..3
NUM_CLASSES = 4

# 출력
OUTPUT_DIR = os.path.join(PRED_DIR, "ensemble_ori")
os.makedirs(OUTPUT_DIR, exist_ok=True)

# PyTorch 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"디바이스: {device}")

# WandB
PROJECT_NAME = "[domain_project]_Ensemble_Models"
RUN_NAME = f"stacking_torch_{TIMESTAMP}"
USE_WANDB = True

print("✅ 설정 완료")
print(f"출력 디렉토리: {OUTPUT_DIR}")


디바이스: cuda
✅ 설정 완료
출력 디렉토리: ./predictions/ensemble_ori


In [3]:
# WandB init
if USE_WANDB:
    run = wandb.init(
        project=PROJECT_NAME,
        name=RUN_NAME,
        config={
            "random_state": RANDOM_STATE,
            "num_classes": NUM_CLASSES,
            "meta_model": "torch.nn.Linear",
            "cv_folds": 5,
            "device": str(device),
        }
    )
    print("✅ WandB 초기화 완료")
else:
    print("⚠️ WandB 비활성화")


[34m[1mwandb[0m: W&B API key is configured. Use [1m`wandb login --relogin`[0m to force relogin


✅ WandB 초기화 완료


In [4]:
# 유틸: 예측 CSV 탐색 및 로드
from glob import glob

def discover_prediction_files(pred_dir: str):
    val_files = sorted(glob(os.path.join(pred_dir, "*_val_predictions.csv")))
    test_files = sorted(glob(os.path.join(pred_dir, "*_test_predictions.csv")))
    return val_files, test_files

val_files, test_files = ([], [])
if AUTO_DISCOVER:
    val_files, test_files = discover_prediction_files(PRED_DIR)

print("🔎 발견된 파일 수:")
print(f"  Val: {len(val_files)}  Test: {len(test_files)}")

#assert len(val_files) > 0 and len(test_files) > 0, "예측 파일을 찾을 수 없습니다. predictions 디렉토리를 확인하세요."



🔎 발견된 파일 수:
  Val: 3  Test: 3


In [5]:
# 스택 특성 생성 함수

def extract_id_column(df: pd.DataFrame):
    for c in ID_COL_CANDIDATES:
        if c in df.columns:
            return df[c]
    return pd.Series(np.arange(len(df)), name="row_idx")


def build_stack_features(file_list: list, is_val: bool):
    """여러 CSV에서 prob_class_* 열을 수집하여 [N, M*NUM_CLASSES] 특성 행렬 생성"""
    probs = []
    ids = None
    labels = None

    for path in file_list:
        print(path)
        df = pd.read_csv(path)
        # ID
        cur_ids = extract_id_column(df)
        if ids is None:
            ids = cur_ids
        # 라벨 (val 기준)
        if is_val and LABEL_COL in df.columns and labels is None:
            labels = df[LABEL_COL].values
        # 확률
        model_probs = df[[f"{PROB_COL_PREFIX}{i}" for i in range(NUM_CLASSES)]].values
        probs.append(model_probs)

    X = np.hstack(probs)  # [N, num_models*NUM_CLASSES]
    if is_val and labels is None:
        raise ValueError("검증 데이터에서는 라벨 컬럼이 필요합니다.")

    return ids.values, X, labels

# 생성
val_ids, X_val, y_val = build_stack_features(val_files, is_val=True)
print(val_ids)
_, X_test, _ = build_stack_features(test_files, is_val=False)
print(f"✅ 스택 특성 생성 완료: X_val={X_val.shape}, X_test={X_test.shape}")


./predictions/TAPT_klue_Roberta-kor-base_final_training_2025-10-30_00-40-22_RANDOM_42_val_predictions.csv
./predictions/TAPT_kykim_bert-kor-base_final_training_RAN_42_val_predictions.csv


./predictions/TAPT_monologg_koelectra-base-v3-discriminator_augX3_best_discriminator_1028_final_training_2025-10-30_05-34-33_RANDOM_42_val_predictions.csv
[ 24475  24420 206143 ... 198563 293967  41866]
./predictions/TAPT_klue_Roberta-kor-base_final_training_2025-10-30_00-40-22_RANDOM_42_final_training_2025-10-30_07-36-17_RANDOM_42_epoch_2_test_predictions.csv
./predictions/TAPT_kykim_bert-kor-base_final_training_RAN_42_final_training_2025-10-30_08-54-54_RANDOM_42_epoch_3_test_predictions.csv
./predictions/TAPT_monologg_koelectra-base-v3-discriminator_augX3_best_discriminator_1028_final_training_2025-10-30_07-21-18_RANDOM_42_epoch_2_test_predictions.csv
✅ 스택 특성 생성 완료: X_val=(6823, 12), X_test=(59928, 12)


In [6]:
# # ===== 간단 앙상블(가중 평균) - 정확도 최대화로 가중치 최적화 =====
# from typing import List, Tuple


# def load_probs_list(file_list: List[str], is_val: bool) -> Tuple[np.ndarray, List[np.ndarray], np.ndarray]:
#     ids = None
#     labels = None
#     probs_list = []
#     for path in file_list:
#         df = pd.read_csv(path)
#         cur_ids = extract_id_column(df)
#         if ids is None:
#             ids = cur_ids.values
#         model_probs = df[[f"{PROB_COL_PREFIX}{i}" for i in range(NUM_CLASSES)]].values
#         probs_list.append(model_probs)
#         if is_val and labels is None and LABEL_COL in df.columns:
#             labels = df[LABEL_COL].values
#     if is_val and labels is None:
#         raise ValueError("검증 데이터에서는 라벨 컬럼이 필요합니다.")
#     return ids, probs_list, labels


# def optimize_weights_accuracy(probs_list: List[np.ndarray], y_true: np.ndarray, n_starts: int = 8):
#     """가중치 합=1, 각 가중치>=0 제약에서 정확도 최대화"""
#     M = len(probs_list)
#     probs = np.stack(probs_list, axis=0)  # [M, N, C]

#     def acc_from_w(w):
#         w = np.clip(w, 0, 1)
#         s = w.sum()
#         if s == 0:
#             w = np.full_like(w, 1.0 / len(w))
#         else:
#             w = w / s
#         ens = np.tensordot(w, probs, axes=(0, 0))  # [N, C]
#         pred = ens.argmax(axis=1)
#         return accuracy_score(y_true, pred)

#     def obj(w):
#         return -acc_from_w(w)

#     bounds = [(0.0, 1.0)] * M
#     cons = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}]

#     inits = [np.full(M, 1.0 / M)]
#     for _ in range(max(0, n_starts - 1)):
#         r = np.random.rand(M)
#         inits.append(r / r.sum())

#     best_w, best_acc = None, -1.0
#     for w0 in inits:
#         res = minimize(obj, w0, method='SLSQP', bounds=bounds, constraints={ 'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0 }, options={'maxiter': 500, 'ftol': 1e-9, 'disp': False})
#         if res.success:
#             w = np.clip(res.x, 0, 1)
#             s = w.sum()
#             if s == 0:
#                 w = np.full_like(w, 1.0 / len(w))
#             else:
#                 w = w / s
#             acc = acc_from_w(w)
#             if acc > best_acc:
#                 best_w, best_acc = w, acc

#     return best_w, best_acc


# # 프로브 로드
# val_ids_simple, val_probs_list, y_val_simple = load_probs_list(val_files, is_val=True)
# _, test_probs_list, _ = load_probs_list(test_files, is_val=False)

# # 최적 가중치 탐색 (정확도 최대화)
# best_w, best_acc = optimize_weights_accuracy(val_probs_list, y_val_simple, n_starts=10)
# print("🔧 Simple Ensemble - Best weights:", best_w)
# print(f"🔧 Simple Ensemble - Val Accuracy: {best_acc:.6f}")

# # 검증/테스트 앙상블 확률 계산
# val_probs_stack = np.stack(val_probs_list, axis=0)  # [M, N, C]
# val_ens = np.tensordot(best_w, val_probs_stack, axes=(0, 0))
# val_pred = val_ens.argmax(axis=1)
# val_acc_simple = accuracy_score(y_val_simple, val_pred)
# print(f"✅ Simple Ensemble (Val) Accuracy: {val_acc_simple:.6f}")

# # 테스트
# test_probs_stack = np.stack(test_probs_list, axis=0)
# test_ens = np.tensordot(best_w, test_probs_stack, axes=(0, 0))
# test_pred_simple = test_ens.argmax(axis=1)

# # 저장
# simple_val_out = pd.DataFrame({
#     "ID": val_ids_simple,
#     "pred": val_pred,
#     "label": y_val_simple,
# })
# for i in range(NUM_CLASSES):
#     simple_val_out[f"prob_{i}"] = val_ens[:, i]

# simple_val_path = os.path.join(OUTPUT_DIR, f"simple_ensemble_val_{TIMESTAMP}.csv")
# simple_val_out.to_csv(simple_val_path, index=False)
# print(f"💾 Simple Ensemble Val 저장: {simple_val_path}")

# # Test 저장
# test_ids_simple = extract_id_column(pd.read_csv(test_files[0])).values
# simple_test_out = pd.DataFrame({
#     "ID": test_ids_simple,
#     "pred": test_pred_simple,
# })
# for i in range(NUM_CLASSES):
#     simple_test_out[f"prob_{i}"] = test_ens[:, i]

# simple_test_path = os.path.join(OUTPUT_DIR, f"simple_ensemble_test_{TIMESTAMP}.csv")
# simple_test_out.to_csv(simple_test_path, index=False)
# print(f"💾 Simple Ensemble Test 저장: {simple_test_path}")

# if USE_WANDB:
#     wandb.log({
#         "simple_ensemble_val_acc": float(val_acc_simple),
#     })



In [None]:
# PyTorch 메타모델 정의
import torch.nn.functional as F
import torch.nn as nn
class MetaModel(nn.Module):
    def __init__(self, input_dim, num_classes, hidden_dim=16):
        super(MetaModel, self).__init__()
        self.linear_1 = nn.Linear(input_dim, num_classes)

        
    def forward(self, x):
        x = self.linear_1(x)
        return x

# 훈련 함수
def train_model(model, train_loader, val_loader, device, epochs=60, lr=0.0001):
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    best_val_acc = 0
    best_model_state = None
    
    for epoch in range(epochs):
        # 훈련
        model.train()
        train_loss = 0
        for batch_x, batch_y in train_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            
            optimizer.zero_grad()
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        # 검증
        model.eval()
        val_correct = 0
        val_total = 0
        val_loss = 0
        
        with torch.no_grad():
            for batch_x, batch_y in val_loader:
                batch_x, batch_y = batch_x.to(device), batch_y.to(device)
                outputs = model(batch_x)
                loss = criterion(outputs, batch_y)
                val_loss += loss.item()
                
                _, predicted = torch.max(outputs.data, 1)
                val_total += batch_y.size(0)
                val_correct += (predicted == batch_y).sum().item()
        
        val_acc = val_correct / val_total
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_state = model.state_dict().copy()
        
        if epoch % 20 == 0:
            print(f"Epoch {epoch}: Val Acc: {val_acc:.4f}")
    
    # 최고 모델 복원
    model.load_state_dict(best_model_state)
    return model


In [8]:

# # 메타모델 학습: Stratified K-Fold OOF
# skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)

# oof_preds = np.zeros((len(y_val), NUM_CLASSES), dtype=float)
# fold_metrics = []
# models = []

# for fold, (tr_idx, va_idx) in enumerate(skf.split(X_val, y_val), start=1):
#     X_tr, X_va = X_val[tr_idx], X_val[va_idx]
#     y_tr, y_va = y_val[tr_idx], y_val[va_idx]
    
#     # PyTorch 데이터로더 생성
#     train_dataset = TensorDataset(
#         torch.FloatTensor(X_tr), 
#         torch.LongTensor(y_tr)
#     )
#     val_dataset = TensorDataset(
#         torch.FloatTensor(X_va), 
#         torch.LongTensor(y_va)
#     )
    
#     train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
#     val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
    
#     # 모델 생성 및 훈련
#     model = MetaModel(X_val.shape[1], NUM_CLASSES)
#     model = train_model(model, train_loader, val_loader, device, epochs=100, lr=0.01)
    
#     # 검증 예측 및 메트릭
#     model.eval()
#     with torch.no_grad():
#         X_va_tensor = torch.FloatTensor(X_va).to(device)
#         va_logits = model(X_va_tensor)
#         va_proba = torch.softmax(va_logits, dim=1).cpu().numpy()
#         va_pred = va_proba.argmax(axis=1)
    
#     acc = accuracy_score(y_va, va_pred)
#     f1 = f1_score(y_va, va_pred, average="weighted")

#     oof_preds[va_idx] = va_proba
#     fold_metrics.append({"fold": fold, "acc": acc, "f1": f1})
#     models.append(model.state_dict().copy())

#     print(f"Fold {fold} -> Acc: {acc:.4f}, F1: {f1:.4f}")
#     if USE_WANDB:
#         wandb.log({"fold": fold, "fold_acc": acc, "fold_f1": f1})

# # OOF 성능
# oof_pred_labels = oof_preds.argmax(axis=1)
# oof_acc = accuracy_score(y_val, oof_pred_labels)
# oof_f1 = f1_score(y_val, oof_pred_labels, average="weighted")
# print(f"\nOOF Acc: {oof_acc:.4f}, OOF F1: {oof_f1:.4f}")
# if USE_WANDB:
#     wandb.log({"oof_acc": oof_acc, "oof_f1": oof_f1})


In [9]:
# 최종 메타모델 학습 (전체 val 사용) 및 Test 예측
final_model = MetaModel(X_val.shape[1], NUM_CLASSES, hidden_dim=12)

# 전체 val 데이터로 훈련
train_dataset = TensorDataset(
    torch.FloatTensor(X_val), 
    torch.LongTensor(y_val)
)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# 더미 검증 데이터로더 (전체 데이터로 훈련)
val_dataset = TensorDataset(
    torch.FloatTensor(X_val[:100]), 
    torch.LongTensor(y_val[:100])
)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

final_model = train_model(final_model, train_loader, val_loader, device, epochs=60, lr=0.0001)




print("✅ 최종 메타모델 학습 완료")
if USE_WANDB:
    wandb.log({"meta_model_trained": 1})


Epoch 0: Val Acc: 0.2900
Epoch 20: Val Acc: 0.8800
Epoch 40: Val Acc: 0.9000
✅ 최종 메타모델 학습 완료


In [10]:


final_model.eval()
with torch.no_grad():
    X_test_tensor = torch.FloatTensor(X_test).to(device)
    test_logits = final_model(X_test_tensor)
    test_proba = torch.softmax(test_logits, dim=1).cpu().numpy()
    test_pred = test_proba.argmax(axis=1)

test_ids = extract_id_column(pd.read_csv(test_files[0])).values

test_out = pd.DataFrame({
    "ID": test_ids,
    "pred": test_pred
})
for i in range(NUM_CLASSES):
    test_out[f"prob_{i}"] = test_proba[:, i]

test_path = os.path.join(OUTPUT_DIR, f"ensemble_test_{TIMESTAMP}.csv")
test_out.to_csv(test_path, index=False)
print(f"💾 Test 저장: {test_path}")

if USE_WANDB:
    wandb.log({"oof_saved": 1, "test_saved": 1})

💾 Test 저장: ./predictions/ensemble_ori/ensemble_test_2025-10-30_09-38-01.csv


In [11]:
# 모델 및 아티팩트 저장
model_path = os.path.join(OUTPUT_DIR, f"ensemble_torch_{TIMESTAMP}.pth")
torch.save(final_model.state_dict(), model_path)
print(f"💾 메타모델 저장: {model_path}")


TIMESTAMP_2 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# 모델 구조도 저장 (로드 시 필요)
model_config = {
    "input_dim": X_val.shape[1],
    "num_classes": NUM_CLASSES,
    "model_class": "MetaModel"
}
config_path = os.path.join(OUTPUT_DIR, f"ensemble_run_{TIMESTAMP_2}.json")
with open(config_path, "w", encoding="utf-8") as f:
    json.dump({
        "timestamp": TIMESTAMP,
        "random_state": RANDOM_STATE,
        "num_classes": NUM_CLASSES,
        "val_files": val_files,
        "test_files": test_files,
        "model_config": model_config
    }, f, indent=2, ensure_ascii=False)
print(f"💾 설정 저장: {config_path}")


💾 메타모델 저장: ./predictions/ensemble_ori/ensemble_torch_2025-10-30_09-38-01.pth
💾 설정 저장: ./predictions/ensemble_ori/ensemble_run_2025-10-30_09-38-28.json


In [12]:
# # ===== WandB Sweep for hidden_dim (MetaModel) =====

# # 주의: 아래 셀은 USE_WANDB=True 상태에서 실행하세요.
# # sweep는 MetaModel의 hidden_dim만 탐색하고, 나머지 하이퍼파라미터는 고정합니다.

# import math

# def run_meta_cv_with_params(hidden_dim: int = 24, epochs: int = 80, lr: float = 0.01, folds: int = 5, batch_size: int = 64):
#     skf = StratifiedKFold(n_splits=folds, shuffle=True, random_state=RANDOM_STATE)
#     oof_preds_local = np.zeros((len(y_val), NUM_CLASSES), dtype=float)

#     for tr_idx, va_idx in skf.split(X_val, y_val):
#         X_tr, X_va = X_val[tr_idx], X_val[va_idx]
#         y_tr, y_va = y_val[tr_idx], y_val[va_idx]

#         train_dataset = TensorDataset(torch.FloatTensor(X_tr), torch.LongTensor(y_tr))
#         val_dataset = TensorDataset(torch.FloatTensor(X_va), torch.LongTensor(y_va))
#         train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
#         val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

#         model = MetaModel(X_val.shape[1], NUM_CLASSES, hidden_dim=hidden_dim)
#         model = train_model(model, train_loader, val_loader, device, epochs=epochs, lr=lr)

#         model.eval()
#         with torch.no_grad():
#             X_va_tensor = torch.FloatTensor(X_va).to(device)
#             va_logits = model(X_va_tensor)
#             va_proba = torch.softmax(va_logits, dim=1).cpu().numpy()
#         oof_preds_local[va_idx] = va_proba

#     oof_pred_labels_local = oof_preds_local.argmax(axis=1)
#     oof_acc_local = accuracy_score(y_val, oof_pred_labels_local)
#     return float(oof_acc_local)

# sweep_config = {
#     "name": f"meta_hidden_dim_sweep_{TIMESTAMP}",
#     "method": "grid",
#     "metric": {"name": "oof_acc", "goal": "maximize"},
#     "parameters": {
#         "hidden_dim": {"values": [16, 24, 32]},
#         "epochs": {"values": [40, 60]},
#         "lr": {"values": [1e-4, 5e-5, 1e-5]},
#     },
#     "early_terminate": {"type": "hyperband", "min_iter": 10}
# }

# def sweep_train():
#     with wandb.init(project=PROJECT_NAME, config={"cv_folds": 4, "batch_size": 64}):
#         cfg = wandb.config
#         hidden_dim = int(cfg.get("hidden_dim", 12))
#         epochs = int(cfg.get("epochs", 80))
#         lr = float(cfg.get("lr", 0.01))
#         folds = 4
#         batch_size = 64

#         oof_acc = run_meta_cv_with_params(
#             hidden_dim=hidden_dim, epochs=epochs, lr=lr,
#             folds=folds, batch_size=batch_size
#         )
#         wandb.log({"oof_acc": oof_acc, "hidden_dim": hidden_dim, "epochs": epochs, "lr": lr})
#         print(f"[SWEEP] hidden_dim={hidden_dim}, epochs={epochs}, lr={lr} -> OOF Acc: {oof_acc:.6f}")

# if USE_WANDB:
#     sweep_id = wandb.sweep(sweep_config, project=PROJECT_NAME)
#     wandb.agent(sweep_id, function=sweep_train)  # grid 전체 조합 자동 실행
# else:
#     print("⚠️ USE_WANDB=False: Sweep를 실행하려면 USE_WANDB를 True로 설정하세요.")



In [13]:
# 결과 저장
val_out = pd.DataFrame({
    "ID": val_ids,
    "oof_pred": oof_pred_labels,
    "label": y_val
})
for i in range(NUM_CLASSES):
    val_out[f"oof_prob_{i}"] = oof_preds[:, i]

val_path = os.path.join(OUTPUT_DIR, f"ensemble_oof_{TIMESTAMP}.csv")
val_out.to_csv(val_path, index=False)
print(f"💾 OOF 저장: {val_path}")

# Test 저장



NameError: name 'oof_pred_labels' is not defined