## 데이터 로드

In [1]:
from pathlib import Path
import numpy as np
import pandas as pd

# ====== 데이터 로드 ======

# 학습(train) 데이터 파일 후보 경로들
# - 여러 파일명/크기의 데이터셋 중 "실제로 존재하는 파일"을 자동으로 선택하려는 목적
train_candidates = [
    Path("../data/29757_train_merged.csv"),
    Path("../data/1000_train_merged.csv"),
    Path("../data/10000_train_merged.csv"),
]

# 테스트(test) 데이터 파일 후보 경로들
test_candidates = [
    Path("../data/29757_test_merged.csv"),
    Path("../data/1000_test_merged.csv"),
    Path("../data/10000_test_merged.csv"),
]

def resolve_path(candidates: list[Path]) -> Path:
    """주어진 후보 경로 목록에서, 실제로 존재하는 첫 번째 파일 경로를 반환"""
    for p in candidates:
        # 파일이 존재하면 그 경로를 즉시 반환
        if p.exists():
            return p

    # 후보 중 어떤 파일도 없으면 이후 진행이 불가능하므로 에러로 중단
    raise FileNotFoundError(f"No dataset found in: {candidates}")

# train_candidates 중 존재하는 파일을 찾아서 CSV를 DataFrame으로 읽어옴
train_df = pd.read_csv(resolve_path(train_candidates))

# test_candidates 중 존재하는 파일을 찾아서 CSV를 DataFrame으로 읽어옴
test_df = pd.read_csv(resolve_path(test_candidates))

FileNotFoundError: No dataset found in: [PosixPath('../data/29757_train_merged.csv'), PosixPath('../data/1000_train_merged.csv'), PosixPath('../data/10000_train_merged.csv')]

### raw데이터 클래스 비율 확인

In [None]:
import matplotlib.pyplot as plt

# 정답(라벨) 컬럼 이름
target_col = "died_in_icu"

# 비율을 비교할 데이터셋들을 이름과 함께 묶어둠
datasets = {
    "train_proc": train_df,
    "test_proc": test_df,
}

# 그래프 2개(가로 1행 2열)를 한 번에 생성
# - sharey=True: 두 그래프의 y축(비율 스케일)을 동일하게 맞춰서 비교가 쉬움
fig, axes = plt.subplots(1, 2, figsize=(10, 6), sharey=True)

# train, test를 하나씩 꺼내면서 각 축(ax)에 막대 그래프를 그림
for ax, (name, df) in zip(axes, datasets.items()):

    # 라벨 값(예: 0, 1)의 개수를 셈
    # - sort_index(): 0,1 순서로 정렬(보기 좋게)
    counts = df[target_col].value_counts().sort_index()

    # 전체 개수로 나눠서 각 클래스의 비율로 변환
    ratios = counts / counts.sum()

    # 클래스(0/1)별 비율을 막대 그래프로 표시
    # - x축: 클래스 값(문자열로 변환)
    # - y축: 비율
    ax.bar(ratios.index.astype(str), ratios.values, color=["tab:blue", "tab:red"])

    # x축 라벨(무슨 컬럼 기준인지)
    ax.set_xlabel(target_col)

    # 그래프 제목(어느 데이터셋의 비율인지)
    ax.set_title(f"Class Ratio in {name}")

    # 각 막대 위에 비율(%) 텍스트로 표시
    for i, v in enumerate(ratios.values):
        ax.text(i, v + 0.01, f"{v:.2%}", ha="center")

# 왼쪽 그래프에만 y축 라벨 표시(공유 y축이라 하나만 있어도 됨)
axes[0].set_ylabel("Ratio")

# 요소들이 겹치지 않게 레이아웃 자동 조정
plt.tight_layout()

# 그래프 출력
plt.show()

## 사용 피처 정의

In [None]:
# ====== 기본 설정 ======

# 예측(분류)해야 하는 정답 라벨 컬럼 이름
target_col = "died_in_icu"

# 환자(또는 입원 단위)를 구분하는 ID 컬럼 후보들
possible_group_cols = ["patientunitstayid", "patient_id"]

# 시간(관측 시점)을 나타내는 컬럼 후보들
possible_time_cols = ["observationoffset"]

# train_df에 실제로 존재하는 ID 컬럼을 후보 목록에서 찾아서 하나 선택
# - next(..., None): 조건을 만족하는 첫 번째 컬럼을 가져오고, 없으면 None
group_col = next((c for c in possible_group_cols if c in train_df.columns), None)

# train_df에 실제로 존재하는 시간 컬럼을 후보 목록에서 찾아서 하나 선택
time_col = next((c for c in possible_time_cols if c in train_df.columns), None)

# ID 컬럼이나 시간 컬럼을 못 찾으면, 이후 과정이 성립하지 않으니 즉시 에러로 중단
if group_col is None or time_col is None:
    raise ValueError("patient id or time column not found")

# train_df에서 숫자형(number) 타입인 컬럼들만 추려냄 (모델 입력 후보 피처 풀)
numeric_cols = train_df.select_dtypes(include=["number"]).columns

# 피처로 쓰면 안 되는(또는 쓰지 않을) 컬럼들을 제외 목록으로 정의
# - target_col: 정답 라벨(학습에 누설되면 안 됨)
# - patient_id / patientunitstayid: 식별자(ID)는 보통 피처로 쓰지 않음
# - observationoffset: 시간 컬럼은 여기선 기본 피처에서 제외(별도 처리할 수도 있음)
exclude = {target_col, "patient_id", "patientunitstayid", "observationoffset"}

# 숫자형 컬럼 중에서 exclude에 포함된 것들을 빼고 최종 "기본 피처 컬럼" 목록 생성
base_cols = [c for c in numeric_cols if c not in exclude]

## 전처리 함수 설정

In [None]:
def add_missing_and_time_features(df: pd.DataFrame, base_cols: list[str]) -> pd.DataFrame:
    """
    결측치 여부(missing)와 '마지막으로 값이 관측된 이후 경과 시간' 같은 파생 피처를 만들기 위한 함수.
    """
    # 같은 환자(group) 안에서 시간 순서대로 정렬(시계열 전처리의 기본)
    df = df.sort_values([group_col, time_col]).copy()

    for col in base_cols:
        # 해당 컬럼이 결측치인지(NA인지) 0/1로 표시하는 값
        miss = df[col].isna().astype(int)

        # 해당 컬럼이 "관측된 시점"의 시간값만 남기고, 환자별로 최근 관측 시점을 앞으로 채움(ffill)
        last_time = df[time_col].where(df[col].notna()).groupby(df[group_col]).ffill()

        # 현재 시점 - 마지막 관측 시점 = 마지막으로 값이 있었던 이후 경과 시간
        tsince = df[time_col] - last_time

        # 마지막 관측 시점이 아예 없었던 경우(처음부터 결측)는 time_col 값으로 대체
        tsince = tsince.fillna(df[time_col])

        # 실제로 파생 피처를 df에 추가하는 부분(현재는 사용 안 해서 주석 처리됨)
        # df[f"{col}_miss"] = miss
        # df[f"{col}_tsince"] = tsince

    return df


def impute_base(df: pd.DataFrame, base_cols: list[str], medians: pd.Series) -> pd.DataFrame:
    """
    기본 피처(base_cols)의 결측치를 채우는(impute) 함수.

    조건(규칙):
    - 조건 1) 같은 환자(group) 내에서 이전에 관측된 값이 있으면 → 그 값을 사용(이전값 carry-forward)
    - 조건 2) 이전 관측값이 전혀 없으면(첫 관측부터 결측) → 전체 중앙값(medians) 사용
    """
    df = df.sort_values([group_col, time_col]).copy()

    # 환자별로 "이전까지 관측된 가장 최근 값"을 계산 (결측인 구간은 이전값이 채워짐)
    prev_value = df.groupby(group_col, sort=False)[base_cols].ffill()

    # 조건 1: 결측이고, 이전 관측값(prev_value)이 존재하면 그 값으로 채움
    df[base_cols] = df[base_cols].where(df[base_cols].notna(), prev_value)

    # 조건 2: 그래도 결측(=이전 관측값 자체가 없던 케이스)은 전체 중앙값으로 채움
    df[base_cols] = df[base_cols].fillna(medians)

    return df


def sample_k_per_stay(df, group_col, time_col, k=10):
    """
    환자(group)마다 최대 k개의 시점(row)만 뽑아서 데이터 양을 줄이는 샘플링 함수.

    - group 내 row 수가 k 이하이면 전부 사용
    - k보다 많으면, 시간축 전체에 고르게 퍼지도록 linspace로 k개 인덱스를 선택
    """
    df = df.sort_values([group_col, time_col]).copy()

    def pick_rows(g):
        n = len(g)
        if n <= k:
            return g

        # 0 ~ n-1 구간에서 k개를 균등 간격으로 선택(처음/중간/끝 포함)
        idx = np.linspace(0, n - 1, k, dtype=int)
        return g.iloc[idx]

    # 환자별로 pick_rows를 적용해서 결과를 합침
    return df.groupby(group_col, group_keys=False).apply(pick_rows)


def balance_ratio(df, target_col="died_in_icu", ratio_pos=0.08, random_state=42):
    """
    타겟 클래스 비율을 특정 비율(ratio_pos)로 맞추기 위해
    다수 클래스를 줄이거나(언더샘플링), 경우에 따라 소수 클래스를 줄이는 샘플링 함수.

    - pos: target==1인 행들
    - neg: target==0인 행들
    """
    pos = df[df[target_col] == 1]
    neg = df[df[target_col] == 0]

    # 목표 비율 ratio_pos를 만들기 위해 필요한 neg 개수 계산
    desired_neg = int(len(pos) * (1 - ratio_pos) / ratio_pos)

    if desired_neg >= len(neg):
        # neg가 너무 적어서 목표 비율을 못 맞추면, pos를 줄여서 비율을 맞춤
        desired_pos = int(len(neg) * ratio_pos / (1 - ratio_pos))
        pos = pos.sample(n=desired_pos, random_state=random_state)
    else:
        # neg가 충분히 많으면, neg를 줄여서 비율을 맞춤
        neg = neg.sample(n=desired_neg, random_state=random_state)

    # pos/neg 합친 뒤 섞고(shuffle) 인덱스 정리
    return (
        pd.concat([pos, neg])
        .sample(frac=1, random_state=random_state)
        .reset_index(drop=True)
    )


def sample_total_rows_by_group(df, group_col, n_total=1000, random_state=42):
    """
    전체 행을 n_total개만 뽑되, 특정 그룹에만 몰리지 않도록
    그룹을 번갈아가며 1개씩 뽑는 방식으로 샘플링하는 함수.

    목표:
    - 한 환자(group)에서만 잔뜩 뽑히는 편향을 줄임
    - 여러 환자에서 골고루 행을 가져오도록 함
    """
    rng = np.random.default_rng(random_state)

    # 그룹별로 해당 그룹이 가진 행 인덱스 목록을 저장
    groups = {gid: g.index.to_numpy() for gid, g in df.groupby(group_col)}

    # 그룹 순서를 섞고, 각 그룹 내부 인덱스 순서도 섞음
    group_ids = list(groups.keys())
    rng.shuffle(group_ids)
    for gid in group_ids:
        rng.shuffle(groups[gid])

    picks = []

    # 라운드 로빈 방식: 각 그룹에서 1개씩 번갈아 뽑기
    while len(picks) < n_total and group_ids:
        next_group_ids = []

        for gid in group_ids:
            if len(picks) >= n_total:
                break

            idxs = groups[gid]
            if len(idxs) == 0:
                continue

            # 이 그룹에서 하나 뽑고, 남은 인덱스는 다음 라운드로 넘김
            picks.append(idxs[0])
            groups[gid] = idxs[1:]

            if len(groups[gid]) > 0:
                next_group_ids.append(gid)

        # 아직 남은 인덱스가 있는 그룹만 다음 라운드에서 계속 참여
        group_ids = next_group_ids

    # 뽑힌 인덱스의 행만 모은 뒤 섞고 인덱스 정리
    result = df.loc[picks].sample(frac=1, random_state=random_state).reset_index(drop=True)
    return result

## 최종 데이터셋 구성

In [None]:
# ====== 사용 피처 ======
# 현재는 기본 피처만 사용 (파생변수 생성 코드는 아래에서 선택적으로 켤 수 있음)
feature_cols = base_cols


# ====== 전처리 파이프라인 (필요한 단계만 골라 켜기) ======

# 1) 샘플링(둘 중 하나 선택)
# - 옵션 A: group 기준으로 전체 n_total개를 "그룹이 골고루 섞이게" 샘플링
# train_proc = sample_total_rows_by_group(train_df, group_col, n_total=1000, random_state=42)
# test_proc  = sample_total_rows_by_group(test_df,  group_col, n_total=1000, random_state=42)

# - 옵션 B: stay/group별로 시계열에서 k개만 고르게 샘플링
train_proc = sample_k_per_stay(train_df, group_col, time_col, k=10)
test_proc  = sample_k_per_stay(test_df,  group_col, time_col, k=10)


# 2) 파생변수 생성(선택)
# train_proc = add_missing_and_time_features(train_proc, base_cols)
# test_proc  = add_missing_and_time_features(test_proc,  base_cols)


# 3) 결측치 처리(공통)
# - train에서 중앙값을 구하고, train/test 모두 동일한 기준(medians)으로 채움
train_medians = train_proc[base_cols].median()

train_proc = impute_base(train_proc, base_cols, train_medians)
test_proc  = impute_base(test_proc,  base_cols, train_medians)


# 4) 클래스 비율 맞추기(보통 train만 적용)
train_proc = balance_ratio(train_proc, target_col=target_col, ratio_pos=0.08)
# test_proc = balance_ratio(test_proc, target_col=target_col, ratio_pos=0.08)


# 5) 최종 학습/평가용 데이터 구성
X_train = train_proc[feature_cols]
y_train = train_proc[target_col]

X_test  = test_proc[feature_cols]
y_test  = test_proc[target_col]

## 불균형 전처리 비교

In [None]:
import matplotlib.pyplot as plt

target_col = "died_in_icu"

datasets = {
    "train_proc": train_proc,
    "test_proc": test_proc,
}

fig, axes = plt.subplots(1, 2, figsize=(10, 6), sharey=True)

for ax, (name, df) in zip(axes, datasets.items()):
    counts = df[target_col].value_counts().sort_index()
    ratios = counts / counts.sum()

    ax.bar(ratios.index.astype(str), ratios.values, color=["tab:blue", "tab:red"])
    ax.set_xlabel(target_col)
    ax.set_title(f"Class Ratio in {name}")

    for i, v in enumerate(ratios.values):
        ax.text(i, v + 0.01, f"{v:.2%}", ha="center")

axes[0].set_ylabel("Ratio")
plt.tight_layout()
plt.show()


## 모델 학습

In [None]:
import numpy as np
import pandas as pd

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer

# ====== 모델 후보 정의 ======
# (옵션) 클래스 불균형 정도를 반영하기 위한 값. 일부 모델(XGBoost/LightGBM 등)에서 사용 가능
scale_pos_weight = 92 / 8

models = {
    # Logistic Regression: 표준화(StandardScaler) 후 학습
    "LR": Pipeline([
        # ("imputer", SimpleImputer(strategy="median")),  # 결측치 처리가 필요하면 사용
        ("scaler", StandardScaler()),
        ("clf", LogisticRegression(max_iter=1000)),
    ]),

    # Random Forest: 트리 기반이라 스케일링 없이도 보통 동작
    "RF": RandomForestClassifier(
        n_estimators=400,
        random_state=42,
        n_jobs=-1,
        class_weight="balanced",  # 클래스 불균형 보정
    ),
}

# XGBoost: 설치되어 있으면 모델 후보에 추가
try:
    import xgboost as xgb
    models["XGBoost"] = xgb.XGBClassifier(
        n_estimators=400,
        max_depth=5,
        learning_rate=0.05,
        subsample=0.9,
        colsample_bytree=0.9,
        eval_metric="logloss",
        random_state=42,
        n_jobs=-1,
        # scale_pos_weight=scale_pos_weight,  # 불균형 보정이 필요하면 사용
    )
except Exception:
    pass

# LightGBM: 설치되어 있으면 모델 후보에 추가
try:
    import lightgbm as lgb
    models["LightGBM"] = lgb.LGBMClassifier(
        n_estimators=400,
        learning_rate=0.05,
        num_leaves=31,
        random_state=42,
        # scale_pos_weight=scale_pos_weight,  # 불균형 보정이 필요하면 사용
    )
except Exception:
    pass


# ====== row 단위 점수 -> stay(group) 단위 점수로 집계 ======
def aggregate_stay_scores(
    meta_df: pd.DataFrame,
    scores: np.ndarray,
    group_col: str,
    target_col: str,
    score_agg: str = "mean",
    label_agg: str = "max",
):
    """
    기능:
    - row 단위 예측 점수(scores)를 stay(group) 단위로 모아서 하나의 점수로 만든다.
    - 동시에 row 단위 라벨(target_col)도 stay 단위 라벨로 합친다.

    입력:
    - meta_df: scores와 동일한 row 순서를 가진 데이터프레임(최소 group_col, target_col 포함)
    - scores : 각 row의 예측 점수(확률 등), shape = [n_rows]
    - score_agg: 같은 stay 안의 점수를 어떻게 하나로 합칠지 ("mean", "max" 등)
    - label_agg: 같은 stay 안의 라벨을 어떻게 하나로 합칠지 ("max"면 한 번이라도 1이면 1)

    출력:
    - y_stay   : stay 단위 정답 라벨 배열
    - s_stay   : stay 단위 예측 점수 배열
    - stay_ids : stay id 배열(어떤 stay 순서인지 추적용)
    """
    tmp = meta_df[[group_col, target_col]].copy()
    tmp["score"] = scores

    # stay 단위 예측 점수 집계
    if score_agg == "mean":
        s_stay = tmp.groupby(group_col)["score"].mean()
    elif score_agg == "max":
        s_stay = tmp.groupby(group_col)["score"].max()
    elif score_agg == "last":
        raise ValueError("score_agg='last'는 time_col 기반 정렬 로직이 추가로 필요합니다.")
    else:
        raise ValueError("score_agg must be one of {'mean','max'} (or implement more).")

    # stay 단위 정답 라벨 집계
    if label_agg == "max":
        y_stay = tmp.groupby(group_col)[target_col].max()
    elif label_agg == "mean":
        y_stay = tmp.groupby(group_col)[target_col].mean()
    else:
        raise ValueError("label_agg must be one of {'max','mean'}")

    return y_stay.to_numpy(), s_stay.to_numpy(), y_stay.index.to_numpy()


# ====== 모델 학습 + 예측 + stay 단위로 변환 ======
pred_scores_stay = {}   # 모델별 stay 점수 저장
y_test_stay_ref = None  # stay 단위 정답 라벨(모델 공통이므로 1번만 저장)

for name, model in models.items():
    # 1) 모델 학습
    model.fit(X_train, y_train)

    # 2) row 단위 예측 점수 생성
    # - predict_proba가 있으면 양성(1) 확률 사용
    # - 없으면 decision_function 값을 시그모이드로 변환해서 점수처럼 사용
    if hasattr(model, "predict_proba"):
        scores_row = model.predict_proba(X_test)[:, 1]
    else:
        raw = model.decision_function(X_test)
        scores_row = 1 / (1 + np.exp(-raw))

    # 3) stay 단위 집계에 필요한 메타 데이터(같은 row 순서여야 함)
    meta_test = test_proc  # scores_row의 row 순서와 반드시 일치해야 함

    # 4) row -> stay로 점수/라벨 집계
    y_stay, s_stay, stay_ids = aggregate_stay_scores(
        meta_df=meta_test,
        scores=scores_row,
        group_col=group_col,
        target_col=target_col,
        score_agg="mean",  # 또는 "max"
        label_agg="max",
    )

    # 5) 저장
    pred_scores_stay[name] = s_stay
    if y_test_stay_ref is None:
        y_test_stay_ref = y_stay

print("models used:", list(pred_scores_stay.keys()))
print("stay-level test n:", len(y_test_stay_ref))

## 성능평가

### ROC + Calibration + Decision Curve

In [None]:
# ====== 성능평가(1): ROC + Calibration + Decision Curve ======
# 목적:
# - ROC: 모델이 양성(1)을 음성(0)보다 "점수 높게" 잘 구분하는지(순위/구분 능력) 비교
# - Calibration: 모델 점수(확률)가 "실제 확률"처럼 믿을만한지(신뢰도) 확인
# - Decision Curve: 임계값(threshold)에 따른 의사결정 관점의 순이득(net benefit) 확인
#
# 추가(ROC로 과적합을 '간접적으로' 보는 방법):
# - ROC 자체는 과적합을 직접 판단하는 도구가 아니다.
# - 하지만 같은 방식으로 Train ROC/AUC와 Test ROC/AUC를 함께 그렸을 때,
#   Train은 높은데 Test가 크게 떨어지면 "과적합 가능성" 신호로 볼 수 있다.
# - 단, 그룹/환자 단위 데이터에서는 split이 group 기준이 아니면 누수 때문에 ROC가 과대평가될 수 있어
#   (과적합이 아니라 데이터 누수일 수도 있음: 해석 주의)

import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import roc_curve, roc_auc_score
from sklearn.calibration import calibration_curve

fig, axes = plt.subplots(1, 3, figsize=(18, 5))


# ---------------------------------------------------------------------
# 1) ROC Curve (Receiver Operating Characteristic)
# ---------------------------------------------------------------------
# 무엇을 알 수 있나?
# - 점수 기반으로 양성과 음성을 얼마나 잘 구분하는지(순위 능력)를 본다.
# - AUC가 클수록 전반적인 구분 능력이 좋다.
#
# 해석 포인트:
# - 곡선이 좌상단에 가까울수록 좋다(낮은 FPR에서 높은 TPR).
# - 대각선(랜덤 기준)보다 위에 있어야 의미가 있다.
#
# 과적합을 ROC로 간접 확인하려면:
# - Train ROC/AUC와 Test ROC/AUC를 같은 방식으로 비교한다.
# - Train AUC >> Test AUC이면 과적합 신호.
#   (단, split이 환자/그룹 단위로 제대로 되었는지 먼저 확인해야 함)
#
# 주의:
# - ROC는 "임계값을 어디에 둘지"를 직접 결정해주지 않는다.
# - 그리고 score/label의 단위(row-level vs stay-level)가 섞이면 해석이 무너진다.

ax = axes[0]
for name, scores in pred_scores_stay.items():
    # fpr: False Positive Rate = 1 - specificity
    # tpr: True Positive Rate  = sensitivity
    fpr, tpr, _ = roc_curve(y_test, scores)
    auc = roc_auc_score(y_test, scores)
    ax.plot(fpr, tpr, label=f"{name} (AUC={auc:.3f})")

# 랜덤 기준선(무작위 모델이면 ROC는 대각선에 가까움)
ax.plot([0, 1], [0, 1], "k--", alpha=0.5)

ax.set_xlabel("1 - Specificity (FPR)")
ax.set_ylabel("Sensitivity (TPR)")
ax.set_title("ROC Curves")
ax.legend(loc="lower right")


# ---------------------------------------------------------------------
# 2) Calibration Curve
# ---------------------------------------------------------------------
# 무엇을 알 수 있나?
# - 모델이 출력한 확률이 실제 확률로서 얼마나 믿을만한지(신뢰도)를 본다.
#   예: 예측확률 0.8인 샘플들이 실제로도 ~80% 양성인가?
#
# 해석 포인트:
# - 대각선(Ideal)에 가까울수록 좋다.
# - 곡선이 대각선보다 위: 실제 양성 비율이 예측보다 높음(보수적으로 낮게 예측하는 경향)
# - 곡선이 대각선보다 아래: 예측이 실제보다 높음(과신/over-confident 경향)
#
# 주의:
# - 구간 나누기(n_bins, strategy)에 따라 곡선 모양이 변할 수 있다.
# - 위험도 기반 의사결정(알림/치료/검사)에서는 ROC보다 Calibration이 더 중요할 때도 많다.

ax = axes[1]
for name, scores in pred_scores_stay.items():
    # 예측확률을 10구간으로 나눠서
    # 각 구간의 평균 예측확률(prob_pred)과 실제 양성비율(prob_true)을 비교
    prob_true, prob_pred = calibration_curve(
        y_test, scores, n_bins=10, strategy="uniform"
    )
    ax.plot(prob_pred, prob_true, marker="o", label=name)

# 이상적 기준선: 예측확률 == 실제양성비율
ax.plot([0, 1], [0, 1], "k--", alpha=0.5, label="Ideal")

ax.set_xlabel("Mean predicted probability")
ax.set_ylabel("Fraction of positives")
ax.set_title("Calibration Curves")
ax.legend(loc="lower right")


# ---------------------------------------------------------------------
# 3) Decision Curve Analysis (DCA)
# ---------------------------------------------------------------------
# 무엇을 알 수 있나?
# - 임계값 t에서 "t 이상이면 개입한다"라는 정책을 썼을 때,
#   진짜 양성 개입(TP)과 불필요 개입(FP)의 trade-off를 반영한 순이득(net benefit)을 본다.
#
# 해석 포인트:
# - y축(net benefit)이 높을수록 좋다.
# - 모델 곡선이 "Treat all"(전부 개입) / "Treat none"(아무도 개입 안 함)보다 위에 있으면
#   그 임계값 구간에서는 모델을 쓰는 게 이득이다.
#
# 주의(중요):
# - DCA는 score가 확률로 해석 가능할수록 의미가 좋다(캘리브레이션과 연결).
# - 그리고 row-level / stay-level을 섞으면 의미가 없어진다.
#   (y_true와 scores는 반드시 같은 단위로 맞춰야 함)

ax = axes[2]
thresholds = np.linspace(0.01, 0.99, 99)

def decision_curve(y_true, scores):
    """
    기능:
    - 임계값 t마다:
      t 이상이면 양성으로 보고 개입(preds)한다고 가정한 뒤,
      TP/FP를 세어 net benefit을 계산한다.
    """
    n = len(y_true)
    net_benefit = []

    for t in thresholds:
        preds = scores >= t
        tp = ((preds == 1) & (y_true == 1)).sum()
        fp = ((preds == 1) & (y_true == 0)).sum()

        # Net Benefit = TP/N - FP/N * (t/(1-t))
        nb = (tp / n) - (fp / n) * (t / (1 - t))
        net_benefit.append(nb)

    return np.array(net_benefit)

for name, scores in pred_scores_stay.items():
    # y_test_stay_ref를 쓰고 있으니, scores도 stay-level이어야 해석이 맞다
    nb = decision_curve(np.array(y_test_stay_ref), scores)
    ax.plot(thresholds, nb, label=name)

# 비교 기준선 1) Treat all: 모두 개입했을 때
prevalence = np.mean(y_test_stay_ref)
nb_all = prevalence - (1 - prevalence) * (thresholds / (1 - thresholds))
ax.plot(thresholds, nb_all, "k--", alpha=0.4, label="Treat all")

# 비교 기준선 2) Treat none: 아무도 개입 안 하면 net benefit=0
ax.plot(thresholds, np.zeros_like(thresholds), "k:", alpha=0.6, label="Treat none")

ax.set_xlabel("Threshold probability")
ax.set_ylabel("Net benefit")
ax.set_title("Decision Curve Analysis")
ax.legend(loc="upper right")


plt.tight_layout()
plt.show()

### Confusion Matrix

In [None]:
# ====== 성능평가(2): Confusion Matrix (환자/stay 단위) ======
# 목적:
# - 임계값(threshold)을 0.5로 고정했을 때, 모델이 어떤 종류의 실수를 하는지 한눈에 보기
#
# Confusion Matrix로 알 수 있는 것:
# - TP(진짜 양성을 맞춤): 실제 1을 1로 예측
# - TN(진짜 음성을 맞춤): 실제 0을 0으로 예측
# - FP(거짓 양성): 실제 0인데 1로 예측 → 불필요 개입/경고가 늘어날 수 있음
# - FN(거짓 음성): 실제 1인데 0으로 예측 → 놓치면 위험한 케이스(의료에서는 특히 중요)
#
# 해석 포인트:
# - 이 매트릭스는 "임계값 0.5에서의 성능"만 보여준다.
# - ROC/AUC처럼 임계값 전체를 보지 않고, 특정 운영 기준에서의 trade-off를 보여준다.
#
# 주의:
# - 현재 코드는 "환자/stay 단위"로 평가:
#   y_test_stay_ref(환자 단위 라벨)과 pred_scores_stay(환자 단위 점수)가 같은 단위여야 의미가 있다.
# - 임계값 0.5는 임의의 기준일 수 있으니, 실제 운영 목적에 따라 다른 threshold가 더 적절할 수 있다.
#   (예: FN을 줄이고 싶으면 threshold를 낮추는 방향)

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import math
import numpy as np
import matplotlib.pyplot as plt  # 이 블록 단독 실행을 위해 명시

# 모델 개수에 맞춰 서브플롯 격자 크기 결정
n_models = len(pred_scores_stay)
n_cols = 3
n_rows = math.ceil(n_models / n_cols)

fig, axes = plt.subplots(n_rows, n_cols, figsize=(5 * n_cols, 4 * n_rows))

# axes를 1차원 배열 형태로 맞춰서 반복문에서 다루기 쉽게 만듦
axes = np.array(axes).reshape(-1)

for ax, (name, scores) in zip(axes, pred_scores_stay.items()):
    # 임계값 0.5 기준으로 환자 단위 예측(0/1) 생성
    y_pred = (scores >= 0.5).astype(int)

    # 환자 단위 정답 vs 예측으로 Confusion Matrix 계산
    cm = confusion_matrix(y_test_stay_ref, y_pred)

    # Confusion Matrix를 시각화하는 디스플레이 객체 생성 후 서브플롯(ax)에 그림
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot(ax=ax, cmap="Blues", colorbar=False)

    # 각 서브플롯 제목에 모델 이름 표시
    ax.set_title(f"{name}")

# 남는 서브플롯 칸은 보기 좋게 비활성화
for ax in axes[n_models:]:
    ax.axis("off")

plt.tight_layout()
plt.show()

### 성능지표 분석

In [None]:
from __future__ import annotations

import numpy as np
import pandas as pd

from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, average_precision_score
)

def evaluate_row_level(
    pred_scores: dict[str, np.ndarray | pd.Series],
    y_true: np.ndarray | pd.Series,
    threshold: float = 0.5,
) -> pd.DataFrame:
    """
    기능:
    - 여러 모델의 예측 점수(pred_scores)를 받아서,
      같은 정답(y_true)에 대해 주요 성능지표를 한 번에 계산해 표(DataFrame)로 반환한다.

    이 함수로 알 수 있는 것(지표 해석):
    - ACC(Accuracy): 전체 중 맞춘 비율 (불균형 데이터에서는 착시가 날 수 있음)
    - PREC(Precision): '양성으로 예측한 것' 중 실제 양성 비율 (FP가 많으면 낮아짐)
    - REC(Recall/Sensitivity): '실제 양성'을 얼마나 놓치지 않고 잡았는지 (FN이 많으면 낮아짐)
    - F1: Precision과 Recall의 균형(조화평균)
    - AUC(ROC-AUC): 임계값 전체에서의 구분 능력(순위/분리 능력)
    - PR-AUC(AP): 불균형 데이터에서 양성 탐지 성능을 더 민감하게 보는 지표
      (양성이 드문 상황에서는 ROC-AUC보다 PR-AUC를 더 신뢰하는 경우가 많음)

    threshold로 알 수 있는 것:
    - threshold(기본 0.5)를 기준으로 0/1로 잘라서(분류) 계산되는 지표:
      ACC, PREC, REC, F1
    - threshold와 무관하게 점수 자체로 계산되는 지표:
      AUC, PR-AUC

    주의:
    - 함수 이름은 evaluate_row_level이지만,
      y_true와 pred_scores가 어떤 단위(row/stay)든 "둘이 같은 단위"로만 맞으면 동작한다.
      (지금 호출은 stay 단위 점수/라벨이라 사실상 stay-level 평가임)
    """
    # y_true를 numpy int 배열로 통일
    y_true = np.asarray(y_true).astype(int)

    results = []
    for name, scores in pred_scores.items():
        # 모델별 점수를 numpy float 배열로 통일
        y_score = np.asarray(scores).astype(float)

        # threshold 기준으로 양성(1)/음성(0) 예측 생성
        y_pred = (y_score >= threshold).astype(int)

        # ----- 임계값 기반 지표(0/1 예측으로 계산) -----
        acc = accuracy_score(y_true, y_pred)
        prec = precision_score(y_true, y_pred, zero_division=0)
        rec = recall_score(y_true, y_pred, zero_division=0)
        f1 = f1_score(y_true, y_pred, zero_division=0)

        # ----- 점수 기반 지표(확률/점수 그대로 사용) -----
        # y_true가 한 클래스만 존재하면(AUC 계산 불가) NaN 처리
        if np.unique(y_true).size < 2:
            auc = np.nan
            ap = np.nan
        else:
            auc = roc_auc_score(y_true, y_score)
            ap = average_precision_score(y_true, y_score)

        # 모델별 결과를 한 줄(딕셔너리)로 저장
        results.append({
            "model": name,                 # 모델 이름
            "N_rows": len(y_true),         # 평가에 사용된 샘플 수(여기서는 stay 수일 수도 있음)
            "pos_rate": float(y_true.mean()),  # 양성 비율(불균형 정도 확인용)
            "threshold": threshold,        # 사용한 임계값
            "ACC": acc,
            "PREC": prec,
            "REC": rec,
            "F1": f1,
            "AUC": auc,
            "PR-AUC": ap,
        })

    # 결과를 DataFrame으로 만들고,
    # 불균형 상황에서 유용한 PR-AUC를 우선으로, 그 다음 AUC로 정렬
    out = (
        pd.DataFrame(results)
        .set_index("model")
        .sort_values(["PR-AUC", "AUC"], ascending=False)
    )
    return out


# ====== 사용 예시: 모델별 성능표 출력 ======
metrics_df = evaluate_row_level(
    pred_scores=pred_scores_stay,  # 모델별 예측 점수(dict)
    y_true=y_test_stay_ref,        # 정답 라벨
    threshold=0.5                  # 점수를 0/1로 자르는 기준
)

# 보기 좋게 소수점 4자리로 출력
print(metrics_df.to_string(float_format=lambda x: f"{x:.4f}"))

### SHAP 분석

In [None]:
# ====== 성능평가(3): SHAP 분석 + 중요 피처 Top-N ======
# 목적:
# - 모델이 예측을 할 때, 어떤 피처가 결과에 얼마나 영향을 줬는지(설명 가능성) 확인
# - 모델별로 "중요 피처"를 비교해서, 해석/검증(도메인 타당성, 누수 의심 등)에 사용
#
# 해석:
# - 모델별로 상위 피처가 비슷하면: 데이터가 일관되게 신호를 주고 있을 가능성
# - 특정 모델만 이상한 피처(예: ID/시간/누수 가능 컬럼)가 상위면: 그 모델이 특정 구조를 악용했을 가능성 존재
# SHAP으로 알 수 있는 것:
# 1) Feature Importance(전역): 전체적으로 어떤 피처가 많이 영향을 주는지
#    - mean(|SHAP|)가 크면, 그 피처가 예측값을 자주/크게 움직였다는 뜻
# 2) Feature Effect(방향성/분포): 피처 값이 커질수록 위험도가 올라가나/내려가나
#    - summary_plot(dot)은 값의 크기(색)와 SHAP 값(좌/우)로 방향성을 보여줌
#
# 주의(중요):
# - SHAP은 "인과"를 말해주지 않는다. (중요하다고 해서 원인이라는 뜻 아님)
# - 강하게 상관된 피처들이 있으면 중요도가 분산되거나 왜곡될 수 있다.
# - KernelExplainer는 매우 느리고 근사(샘플링) 기반이라 결과가 흔들릴 수 있다.
# - 설명은 'row-level 입력(X_shap)' 기준이다. (stay-level 집계 점수 설명이 아님)

import shap
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# SHAP 계산에 쓸 샘플 데이터(너무 크면 느려서 일부만 샘플링)
X_shap = X_test.sample(n=min(1000, len(X_test)), random_state=42)

# 상위 N개 피처만 출력/시각화
top_n = 30

for name, model in models.items():
    try:
        # ------------------------------------------------------------
        # 1) 모델 유형에 따라 SHAP Explainer 선택
        # ------------------------------------------------------------
        # - 트리 계열 모델: TreeExplainer (빠르고 안정적)
        # - 그 외 모델: KernelExplainer (모델-불문, 대신 느리고 근사)
        if name in ["RF", "XGBoost", "LightGBM"]:
            explainer = shap.TreeExplainer(model)
            shap_values = explainer.shap_values(X_shap)
        else:
            # KernelExplainer는 "배경 데이터(background)"를 기준으로 기여도를 계산하므로
            # 전체에서 일부(예: 100개)를 뽑아 배경으로 사용
            bg = X_shap.sample(100, random_state=42)

            # model.predict_proba를 넘겨서 "확률 기준"으로 설명하도록 함
            explainer = shap.KernelExplainer(model.predict_proba, bg)

            # nsamples=100: 근사 계산에 쓸 샘플 수(늘리면 더 정확하지만 느림)
            shap_values = explainer.shap_values(X_shap, nsamples=100)

        # ------------------------------------------------------------
        # 2) 이진분류에서 shap_values 형태 정리
        # ------------------------------------------------------------
        # 어떤 explainer는 클래스별로 리스트 [class0, class1] 형태를 반환한다.
        # 보통 양성(1) 클래스에 대한 설명을 보기 위해 shap_values[1]을 사용
        if isinstance(shap_values, list) and len(shap_values) > 1:
            sv = shap_values[1]   # 양성 클래스(1) 기준 SHAP 값
        else:
            sv = shap_values      # 이미 (n_samples, n_features) 형태인 경우

        # ------------------------------------------------------------
        # 3) 전역 중요도 계산: mean(|SHAP|)로 Top-N 피처 뽑기
        # ------------------------------------------------------------
        mean_abs = np.abs(sv).mean(axis=0)
        imp = pd.Series(mean_abs, index=X_shap.columns).sort_values(ascending=False)

        # 텍스트로 Top-N 출력
        print(f"\n{name} Top {top_n} features:")
        print(imp.head(top_n))

        # 간단한 가로 막대그래프(Top-N)
        plt.figure(figsize=(6, 4))
        imp.head(top_n).sort_values().plot(kind="barh")
        plt.title(f"{name} SHAP Feature Importance (Top {top_n})")
        plt.tight_layout()
        plt.show()

        # ------------------------------------------------------------
        # 4) SHAP 기본 시각화
        # ------------------------------------------------------------
        # (A) bar: 전역 중요도(가장 영향 큰 피처 순)
        shap.summary_plot(sv, X_shap, show=True, plot_type="bar")

        # (B) dot: 각 샘플에서 SHAP 분포 + 피처 값에 따른 방향성
        # - x축: SHAP 값 (예측을 양성 방향으로 밀면 +, 음성 방향이면 -)
        # - 색: 피처 값 크기(높음/낮음)
        shap.summary_plot(sv, X_shap, show=True)

    except Exception as e:
        # 특정 모델에서 SHAP이 지원되지 않거나(또는 너무 느리거나) 오류가 날 수 있음
        print(f"SHAP failed for {name}: {e}")