###1. 기본 설정

In [35]:
#기본 환경 설정 : pandas/numpy(데이터), scikit-learn(전처리·평가·모델), xgboost/lightgbm/catboost(GBDT 계열), matplotlib(시각화).
#추가 : imbalanced-learn : 클래스불균형 데이터 처리 기법, unlzw3 : data.Z 읽어서 해
!pip install xgboost lightgbm catboost scikit-learn pandas numpy matplotlib imbalanced-learn unlzw3



In [36]:
#공동 import
import os, warnings, json  # 파일 경로 관리, 경고 무시, 설정 저장/불러오기
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd  # 수치계산, 데이터프레임 처리 (데이터 로딩 & EDA 기본)

from pathlib import Path  # 파일/디렉토리 경로 객체형으로 다루기
from dataclasses import dataclass  # 데이터 구조 정의 시 유용 (클래스 필드 자동 생성)
from typing import Dict, Tuple  # 함수 타입 힌트 작성용 (Dict, Tuple 자료형 표시)
from sklearn.preprocessing import LabelEncoder

from sklearn.model_selection import StratifiedKFold  # 층화 교차검증 -> 클래스 비율 유지하며 K-fold split
from sklearn.model_selection import RandomizedSearchCV
from sklearn.compose import ColumnTransformer  # 수치형/범주형 컬럼별로 다른 전처리 적용
from sklearn.preprocessing import OneHotEncoder, StandardScaler  # 범주형 변수 원-핫 인코딩 / 수치형 변수 정규화
from sklearn.pipeline import Pipeline  # 전처리 + 모델 묶어 재현성 있게 실현
from sklearn.metrics import (roc_auc_score, average_precision_score,
                             f1_score, recall_score, precision_score,
                             balanced_accuracy_score, confusion_matrix,
                             roc_curve, precision_recall_curve, make_scorer)
# 성능 평가지표 ROC커브 밑 면적(클래스 불균형에 강함)
# PR 커브 기반 precision (역시 불균형 데이터 평가에 적합)
# f1, recall, precision score : 분류 평가 지표
# balanced accuracy score : 클래스 불균형 고려 정확도
# confusion matrix : 예측 결과 매트릭스
# roc_curve, precision_recall_curve : 시각화용 곡선 좌표 생성 함수
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import label_binarize
#기존 binary class 전용 함수 (ROC-AUC, Average precision)을 멀티클래스에 쓰려면 One vs Rest(ovr) 방식으로 라벨 이진화해야 함
#즉, 멀티클래스 평가지표 계산용
import matplotlib.pyplot as plt  # 2D 그래프 시각화
import seaborn as sns  # 고급 시각화 지원 (heatmap, 분포 시각화 등)

from xgboost import XGBClassifier  # XGBoost 모델
from lightgbm import LGBMClassifier  # LightGBM (빠른 boosting 모델)
from catboost import CatBoostClassifier  # CatBoost : 범주형 변수를 자동으로 처리하는 데 강점

from imblearn.over_sampling import SMOTE
# 소수 클래스를 단순 복제하지 않고, 기존 소수 샘플을 기반으로 새로운 합성 샘플을 생성해서 오버샘플링
# 특징 공간(feature space)에서 KNN을 이용해 인공 데이터를 만들어 학습 데이터의 클래스 균형을 맞춤
from imblearn.pipeline import Pipeline as ImbPipeline
from scipy import sparse #전처리단계에서 OneHotEncoder를 쓰면 보통 희소행렬 (sparse matrix) 형태로 반환되는데 이를 dense array(numpy array)로 변환 위해
from unlzw3 import unlzw #압축 풀기

from sklearn.impute import SimpleImputer

from sklearn.metrics import (
    f1_score, balanced_accuracy_score, confusion_matrix,
    roc_auc_score, average_precision_score, make_scorer
)
from scipy.stats import randint, uniform, loguniform  # 튜닝용 분포

In [37]:
from google.colab import drive
drive.mount('/content/drive') #드라이브에서 마운트

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [38]:
import sys

RANDOM_STATE = 42 #랜덤시드 고정 값
N_SPLITS = 5 #교차검증 시 데이터셋 몇 조각으로 나눌

# 프로젝트 폴더 지정 (구글 드라이브 안의 프로젝트 루트 경로를 고정)
# 이후 모든 데이터, 소스, 결과 파일 경로를 이 경로 기준으로 관리
project_dir = "/content/drive/MyDrive/imbalanced_loss_project"

# 하위 폴더 정의 (데이터/소스코드/결과 저장용 디렉토리)
# → 프로젝트 구조를 체계적으로 관리하기 위함
data_dir    = os.path.join(project_dir, "data")     # 데이터셋 저장 위치
src_dir     = os.path.join(project_dir, "src")      # 사용자 정의 모듈/소스코드 저장 위치
results_dir = os.path.join(project_dir, "results")  # 실험 결과(모델 성능, 그래프 등) 저장 위치
os.makedirs(results_dir, exist_ok=True) #결과 저장용 폴더 자동 생성

# src 폴더를 Python 모듈 검색 경로에 추가
# src 안의 custom_losses.py, evaluation.py 같은 사용자 정의 모듈을 import 가능하게 함
if src_dir not in sys.path:
    sys.path.append(src_dir)

#확인
print("프로젝트 폴더:", project_dir)
print("데이터 폴더:", data_dir)
print("결과 폴더:", results_dir)

프로젝트 폴더: /content/drive/MyDrive/imbalanced_loss_project
데이터 폴더: /content/drive/MyDrive/imbalanced_loss_project/data
결과 폴더: /content/drive/MyDrive/imbalanced_loss_project/results


In [39]:
#데이터프레임 df에서 숫자형 컬럼과 범주형 컬럼 분리
def split_features(df: pd.DataFrame):
    num = df.select_dtypes(include=[np.number]).columns.tolist() #수치형 변수
    cat = [c for c in df.columns if c not in num] #수치형 외 컬럼 목록
    return num, cat

#결측치 비율이 60% 이상인 컬럼은 삭제
def make_preprocessor(X: pd.DataFrame, high_na_drop=0.60):
    keep = X.columns[X.isna().mean() <= high_na_drop]
    X2 = X[keep].copy()
    num_cols, cat_cols = split_features(X2)
    num_pipe = Pipeline([ #숫자형 컬럼(결측치 median으로 채우고 N(0,1)로 표준화)
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", StandardScaler())
    ])
    cat_pipe = Pipeline([ #범주형 컬럼(결측치 최빈값으로 채우고 onehotencoder)
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("onehot", OneHotEncoder(handle_unknown="ignore"))
    ])
    pre = ColumnTransformer([ #숫자,번주형 파이프라인 묶어서 한 번에 전처리하기
        ("num", num_pipe, num_cols),
        ("cat", cat_pipe, cat_cols)
    ], remainder="drop")
    return pre, X2

#클래스별 샘플 개수 입력 받아 클래스 가중치 계산
def adaptive_class_weights_from_counts(class_counts: np.ndarray):
    n = class_counts.astype(float)
    w = 1.0 / np.log(1.0 + n) #1/log(1+n) 사용해 극단적으로 커지는 것 방지
    w[~np.isfinite(w)] = 0.0
    return w

def make_sample_weights(y: pd.Series | np.ndarray): #실제 라벨 벡터 받아서 클래스별 개수(counts) 구하고 클래스별 가중치 구함
    y = np.asarray(y)
    classes, counts = np.unique(y, return_counts=True)
    w = adaptive_class_weights_from_counts(counts)
    class_to_w = {c: w_i for c, w_i in zip(classes, w)}
    sw = np.vectorize(class_to_w.get)(y).astype(float)
    return sw, class_to_w

def _to_dense(Xm): #sparse면 dense
    return Xm.toarray() if sparse.issparse(Xm) else Xm

In [40]:
#데이터 로더
def _read_pageblocks(data_dir: str):
    # 우선 page-blocks.data가 있으면 그대로 읽기 아니면 압축버전 읽기
    path_plain = os.path.join(data_dir, "page-blocks.data")
    path_Z     = os.path.join(data_dir, "page-blocks.data.Z")
    if os.path.exists(path_plain):
        df = pd.read_csv(path_plain, header=None, delim_whitespace=True)
    elif os.path.exists(path_Z):
        # .Z를 메모리에서 해제하여 파싱 (unlzw3 라이브러리로 해제한 뒤 pandas로 읽음)
        with open(path_Z, "rb") as f:
            raw = f.read()
        decompressed = unlzw(raw)
        from io import BytesIO
        df = pd.read_csv(BytesIO(decompressed), header=None, delim_whitespace=True)
    else:
        raise FileNotFoundError("page-blocks.data(.Z) 파일을 찾을 수 없습니다.")
    return df

def _read_covtype(data_dir: str):
    # covtype.data.gz
    path = os.path.join(data_dir, "covtype.data.gz")
    df = pd.read_csv(path, header=None)
    return df

def _read_glass(data_dir: str):
    # glass.data
    path = os.path.join(data_dir, "glass.data")
    if not os.path.exists(path):
        path = os.path.join(data_dir, "glass.data.txt")  # 혹시 이름이 다르면 대비
    df = pd.read_csv(path, header=None, sep=r"[,\s]+", engine="python") #sep 써서 공백, 쉼표 둘 다 구분자로 사용
    return df

def _read_faults(data_dir: str):
    # Faults.NNA
    path = os.path.join(data_dir, "Faults.NNA")
    df = pd.read_csv(path, header=None, delim_whitespace=True)
    return df

def _read_yeast(data_dir: str):
    # yeast.data
    path = os.path.join(data_dir, "yeast.data")
    df = pd.read_csv(path, header=None, delim_whitespace=True)
    return df

def _standardize_feature_names(X: pd.DataFrame):
    #컬럼명이 정수일 때 ColumnTransformer가 위치 인덱스로 오해하는 문제 방지
    X = X.copy()
    X.columns = [f"f{i}" for i in range(X.shape[1])]
    return X

def load_dataset(dataset_name: str, base_data_path: str, random_state=42):
    ds = dataset_name.lower()
    if ds == "covertype":
        df = _read_covtype(base_data_path)
        X = df.iloc[:, :-1].copy()
        y = df.iloc[:, -1].astype(int).values  # 1..7
        y = y - 1  # 0..6 로 정규화 (gpt 권장)
        X = _standardize_feature_names(X)

    elif ds == "glass":
        df = _read_glass(base_data_path)
        # 일반적으로 첫 컬럼이 ID, 마지막이 라벨
        X = df.iloc[:, 1:-1].copy()
        y = df.iloc[:, -1].astype(int).values
        # 라벨이 1,2,3,5,6,7 등 불연속일 수 있으므로 0..K-1로 인코딩
        le = LabelEncoder()
        y = le.fit_transform(y)
        X = _standardize_feature_names(X)

    elif ds == "page_blocks" or ds == "page-blocks":
        df = _read_pageblocks(base_data_path)
        # 마지막 컬럼이 라벨(1..5)
        X = df.iloc[:, :-1].copy()
        y = df.iloc[:, -1].astype(int).values
        y = y - 1
        X = _standardize_feature_names(X)

    elif ds in ("steel_plates_faults", "steel plates faults", "steel"):
        df = _read_faults(base_data_path)
        # 문헌상 마지막 7개 컬럼이 결함(one-hot) -> 단일 라벨로 변환
        # (총 컬럼 수가 다를 수 있으니 뒤에서 7개를 라벨로 가정)
        K = 7
        X = df.iloc[:, :-K].copy()
        Y_onehot = df.iloc[:, -K:].values
        y = Y_onehot.argmax(axis=1)
        X = _standardize_feature_names(X)

    elif ds == "yeast":
        df = _read_yeast(base_data_path)
        # 0:name, 1..8: features(8개), 9: class(str)
        X = df.iloc[:, 1:-1].astype(float).copy()
        y_raw = df.iloc[:, -1].astype(str).values
        le = LabelEncoder()
        y = le.fit_transform(y_raw)  # 0..K-1
        X = _standardize_feature_names(X)

    else:
        raise ValueError(f"지원되지 않는 멀티클래스 데이터셋: {dataset_name}")

    return train_test_split(X, y, test_size=0.2, random_state=random_state, stratify=y)

In [41]:
# 모델 플랜(데이터 특성에 맞춰 대충 좋은 기본값)
MODEL_PLAN = {
    "covertype":       "lightgbm",   # 큰 표본 + 수치형 위주 -> LGBM 쾌적
    "glass":           "xgboost",    # 소표본, 경계 복잡 -> XGB 시도
    "page_blocks":     "lightgbm",   # 표본 큼 -> LGBM
    "steel_plates_faults": "xgboost",
    "yeast":           "catboost",   # 클래스 불균형 + 비선형성 -> CatBoost도 괜찮음
}

LOSS_STRATEGIES = ["baseline", "adaptive_ce", "weighted", "smote"]

In [42]:
#멀티클래스에서 클래스별 recall 계산해 기하평균으로 묶기
def gmean_recall_multiclass(y_true, y_pred, labels=None):
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    # 클래스별 recall = TP_i / (TP_i + FN_i)
    recalls = []
    for i in range(cm.shape[0]):
        tp = cm[i, i]
        fn = cm[i, :].sum() - tp
        rec = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        recalls.append(rec)
    # 기하평균 -> 어느 한 클래스의 recall이 0에 가까우면 전체 점수가 크게 깎임 (소수 클래스 민감)
    # 단 한 클래스라도 분모가 0이면 (샘플 없음) 0 처리 되어 전체가 0이 될 수 있
    recalls = np.array(recalls, dtype=float)
    # 0이 있으면 전체가 0이 되므로, 아주 작은 epsilon 더하는 방법도 있으나 그대로 둠
    return float(np.prod(recalls) ** (1.0 / max(len(recalls), 1)))

def evaluate_multiclass_model(
    y_true, proba, model_name: str, loss_name: str, dataset_name: str
):

    # 멀티클래스 성능 종합 평가.
    # 입력: y_true(정수 레이블), proba((n_samples, K) 확률)
    # 출력: metrics(dict)

    # 예측 라벨
    y_pred = proba.argmax(axis=1)
    classes = np.unique(y_true)

    # 기본 지표
    metrics = {
        "dataset": dataset_name,
        "model": model_name,
        "loss": loss_name,
        "f1_macro": f1_score(y_true, y_pred, average="macro"),
        "f1_weighted": f1_score(y_true, y_pred, average="weighted"),
        "bal_acc": balanced_accuracy_score(y_true, y_pred),
        "gmean": gmean_recall_multiclass(y_true, y_pred, labels=classes),
    }

    # 확률 기반 지표 (OvR, macro) — 실패 시 NaN
    # macro : 멀티클래스 → One-vs-Rest 변환 → 클래스별 accuracy → 매크로 평균
    # 각 클래스에 동등한 가중치를 주기 때문에, 소수 클래스 성능도 잘 반영된다.
    try:
        metrics["roc_auc_macro_ovr"] = roc_auc_score(
            y_true, proba, multi_class="ovr", average="macro"
        )
    except Exception:
        metrics["roc_auc_macro_ovr"] = np.nan

    try:
        Y_bin = label_binarize(y_true, classes=classes)  # (n, K) 이진 행렬
        metrics["avg_precision_macro_ovr"] = average_precision_score(
            Y_bin, proba, average="macro"
        )
    except Exception:
        metrics["avg_precision_macro_ovr"] = np.nan

    # 로그 출력
    print(f"[{dataset_name} | {model_name}-{loss_name}] {metrics}")
    return metrics

In [43]:
#튜닝
f1_macro_scorer = make_scorer(f1_score, average="macro")
cv5 = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)

def get_model_and_space(model_tag, K):
    if model_tag == "xgboost":
        model = XGBClassifier(objective="multi:softprob", num_class=K, tree_method="hist",
                              eval_metric="mlogloss", random_state=RANDOM_STATE)
        space = {
            "n_estimators": randint(300, 1201),
            "learning_rate": loguniform(1e-2, 3e-1),
            "max_depth": randint(3, 11),
            "min_child_weight": randint(1, 11),
            "subsample": uniform(0.6, 0.4),
            "colsample_bytree": uniform(0.6, 0.4),
            "reg_lambda": loguniform(1e-3, 10),
        }
    elif model_tag == "lightgbm":
        model = LGBMClassifier(objective="multiclass", num_class=K, random_state=RANDOM_STATE)
        space = {
            "n_estimators": randint(300, 1201),
            "learning_rate": loguniform(1e-2, 2e-1),
            "num_leaves": randint(31, 257),
            "max_depth": randint(4, 13),
            "min_child_samples": randint(10, 201),
            "subsample": uniform(0.6, 0.4),
            "colsample_bytree": uniform(0.6, 0.4),
            "reg_lambda": loguniform(1e-3, 10),
        }
    elif model_tag == "catboost":
        model = CatBoostClassifier(loss_function="MultiClass", random_state=RANDOM_STATE, verbose=False)
        space = {
            "iterations": randint(300, 1201),
            "learning_rate": loguniform(1e-2, 2e-1),
            "depth": randint(4, 11),
            "l2_leaf_reg": loguniform(1e-2, 10),
            "bagging_temperature": uniform(0.0, 1.0),
        }
    else:
        raise ValueError("지원되지 않는 모델 태그")
    return model, space

def DO_TUNE(model, param_dist, X_train, y_train, cv_splits=3, n_iter=20, random_state=42):

    #주어진 모델과 파라미터 분포에 대해 RandomizedSearchCV를 실행해 최적 모델을 리턴

    scorer = make_scorer(f1_score, average="macro")
    cv = StratifiedKFold(n_splits=cv_splits, shuffle=True, random_state=random_state)

    search = RandomizedSearchCV(
        estimator=model,
        param_distributions=param_dist,
        n_iter=n_iter,
        scoring=scorer,
        cv=cv,
        verbose=1,
        n_jobs=-1,
        random_state=random_state
    )
    search.fit(X_train, y_train)

    print("Best Params:", search.best_params_)
    print("Best Score :", search.best_score_)

    return search.best_estimator_

def tune_once(model_tag, K, X, y, strategy="baseline", n_iter=25):
    base_model, space = get_model_and_space(model_tag, K)
    if strategy == "smote":
        pipe = ImbPipeline([
            ("smote", SMOTE(random_state=RANDOM_STATE, sampling_strategy="not majority", k_neighbors=5)),
            ("clf", base_model),
        ])
        param_dist = {f"clf__{k}": v for k, v in space.items()}
        search = RandomizedSearchCV(pipe, param_distributions=param_dist, n_iter=n_iter,
                                    scoring=f1_macro_scorer, cv=cv5, n_jobs=-1, verbose=1,
                                    random_state=RANDOM_STATE)
    else:
        search = RandomizedSearchCV(base_model, param_distributions=space, n_iter=n_iter,
                                    scoring=f1_macro_scorer, cv=cv5, n_jobs=-1, verbose=1,
                                    random_state=RANDOM_STATE)
    search.fit(X, y)
    return search.best_estimator_, search.best_params_, search.best_score_

### 2. 실행

In [None]:
# 실행할 데이터셋
TARGET_DATASETS = [
    "covertype",
    "glass",
    "page_blocks",
    "steel_plates_faults",
    "yeast",
]

all_results = []

for dataset_name in TARGET_DATASETS:
    print(f"\n{'='*25} {dataset_name.upper()} {'='*25}")
    X_train, X_test, y_train, y_test = load_dataset(dataset_name, data_dir, random_state=RANDOM_STATE)
    print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}")
    uniq, cnts = np.unique(y_train, return_counts=True)
    print("Class dist (train):", {int(k): float((v/len(y_train))) for k, v in zip(uniq, cnts)})

    # 전처리
    pre, _ = make_preprocessor(X_train, high_na_drop=0.60)
    Xtr_t = _to_dense(pre.fit_transform(X_train))
    Xte_t = _to_dense(pre.transform(X_test))

    K = len(np.unique(y_train))
    model_tag = MODEL_PLAN[dataset_name]

    for strategy in LOSS_STRATEGIES:
        print(f"\n Training | model={model_tag}, strategy={strategy}")

        X_fit, y_fit = Xtr_t, y_train.copy()
        fit_kwargs = {}

        if strategy == "smote":
            # 공백! "not majority" (언더스코어 X)
            sm = SMOTE(random_state=RANDOM_STATE, sampling_strategy="not majority", k_neighbors=5)
            X_fit, y_fit = sm.fit_resample(Xtr_t, y_train)

        elif strategy == "adaptive_ce":
            sw, class_to_w = make_sample_weights(y_fit)
            fit_kwargs["sample_weight"] = sw
            print("adaptive weights:", class_to_w)

        elif strategy == "weighted":
            classes_, counts_ = np.unique(y_fit, return_counts=True)
            w_inv = 1.0 / counts_
            class_to_w = {c: w for c, w in zip(classes_, w_inv)}
            sw = np.vectorize(class_to_w.get)(y_fit)
            fit_kwargs["sample_weight"] = sw
            print("weighted(inv-freq) weights:", class_to_w)

        # 모델
        if model_tag == "xgboost":
            model = XGBClassifier(objective="multi:softprob", num_class=K, n_estimators=600,
                                  learning_rate=0.05, max_depth=6, subsample=0.8, colsample_bytree=0.8,
                                  tree_method="hist", eval_metric="mlogloss", random_state=RANDOM_STATE)
        elif model_tag == "lightgbm":
            model = LGBMClassifier(objective="multiclass", num_class=K, n_estimators=500,
                                   learning_rate=0.05, num_leaves=64, subsample=0.8, colsample_bytree=0.8,
                                   random_state=RANDOM_STATE)
        elif model_tag == "catboost":
            model = CatBoostClassifier(loss_function="MultiClass", iterations=700, depth=6,
                                       learning_rate=0.05, random_state=RANDOM_STATE, verbose=False)
        else:
            raise ValueError(f"지원되지 않는 모델: {model_tag}")

        # 튜닝(옵션)
        if DO_TUNE:
            tuned_model, best_params, cv_score = tune_once(model_tag, K, X_fit, y_fit, strategy=strategy, n_iter=25)
            print("Best params:", best_params, "| CV f1_macro:", f"{cv_score:.4f}")
            model = tuned_model

        # 학습
        model.fit(X_fit, y_fit, **fit_kwargs)

        # 예측 확률 (n,K)
        proba = model.predict_proba(Xte_t)
        if isinstance(proba, list):  # CatBoost 방어
            proba = np.asarray(proba)
            if proba.ndim == 3:  # (K, n, 2) 등 예외형태 방어
                proba = np.transpose(proba, (1, 0, 2))[:, :, 1]
            elif proba.ndim == 2:
                pass

        # 평가
        metrics = evaluate_multiclass_model(
            y_true=y_test,
            proba=proba,
            model_name=f"{model_tag}-{strategy}",
            loss_name=("adaptive_ce" if strategy == "adaptive_ce" else strategy),
            dataset_name=dataset_name
        )
        all_results.append(metrics)

# 요약 저장
results_df = pd.DataFrame(all_results)
results_df.sort_values(["dataset", "f1_macro"], ascending=[True, False], inplace=True)
results_csv = os.path.join(results_dir, "multiclass_summary.csv")
results_df.to_csv(results_csv, index=False)
print("\nSaved:", results_csv)
display(results_df.head(20))


Train shape: (464809, 54), Test shape: (116203, 54)
Class dist (train): {0: 0.3646056767403385, 1: 0.48759813170571137, 2: 0.06153710448808005, 3: 0.004728824097640106, 4: 0.01633789362942628, 5: 0.029891848049413847, 6: 0.03530052128938983}
