<a href="https://colab.research.google.com/github/dusdnd1467/deeplearning/blob/master/heart_disease(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd

# XPT 파일 불러오기
df = pd.read_sas("/content/LLCP2023.XPT", format="xport")

# CSV 파일로 저장
df.to_csv("2023 BRFSS.csv", index=False)

df = pd.read_csv("/content/2023 BRFSS.csv")

#변수 추출
df_preporcessing = df[['_MICHD', '_RFHYPE6', 'TOLDHI3', 'CHOLCHK3', '_BMI5', 'SMOKE100', 'CVDSTRK3', 'DIABETE4', '_TOTINDA', 'DRNKANY6', '_RFBING6', '_HLTHPL1', 'MEDCOST1', 'GENHLTH', 'MENTHLTH', 'PHYSHLTH', 'DIFFWALK', '_SEX', '_AGEG5YR', 'EDUCA', 'INCOME3']]

df_preporcessing.to_csv("preporcessing.csv", index=False)

# 1) 원본에서 88 → 0으로 치환
df_preporcessing.loc[:, ['MENTHLTH', 'PHYSHLTH']] = (df_preporcessing.loc[:, ['MENTHLTH', 'PHYSHLTH']].replace(88, 0))

# 2) 데이터 구분
df_delA = df_preporcessing[['_RFHYPE6', 'TOLDHI3', 'CHOLCHK3', 'SMOKE100', 'CVDSTRK3',
                            'DIABETE4', '_TOTINDA', 'DRNKANY6', '_RFBING6', '_HLTHPL1',
                            'MEDCOST1', 'GENHLTH', 'DIFFWALK', 'EDUCA']]  # 7, 8
df_delB = df_preporcessing[['MENTHLTH', 'PHYSHLTH', 'INCOME3']]  # 77, 99
df_delC = df_preporcessing['_AGEG5YR']  # 14

# 3) 이상치 제거(답변 거부, 공란 등)
df_cleaned = df_preporcessing[~df_preporcessing[df_delA.columns].isin([7, 9, 77, 99]).any(axis=1)]
df_cleaned = df_cleaned[~df_cleaned[df_delB.columns].isin([77, 99]).any(axis=1)]
df_cleaned = df_cleaned[~df_cleaned['_AGEG5YR'].isin([14])]

# 4) 결측치 제거
df_cleaned = df_cleaned.dropna(axis=0, subset=None, inplace=False)

# 5) CSV로 저장
df_cleaned.to_csv("heart_disease_BRFSS2023.csv", index=False)

In [None]:
import os, warnings
import numpy as np
import pandas as pd
from collections import Counter

from sklearn.model_selection import StratifiedKFold
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer

from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import RandomOverSampler, SMOTE
from imblearn.combine import SMOTEENN, SMOTETomek
from imblearn.under_sampling import RandomUnderSampler, ClusterCentroids

from tqdm.auto import tqdm

# -------------------- 설정 --------------------
seed = 42
np.random.seed(seed)

original_csv = "/content/heart_disease_BRFSS2023.csv"
target_col   = "_MICHD"

# fold 안에서 적용할 리샘플링 전략들
resampling_strategies = [
    "none",             # 리샘플링 없음 (원본 분포 그대로)
    "RandomOver",
    "RandomUnder",
    "SMOTE",
    "SMOTEENN",
    "SMOTETomek",
    "ClusterCentroids",
]

# -------------------- 타깃 0/1 표준화 --------------------
def normalize_binary_target(y_raw: pd.Series) -> pd.Series:
    y = y_raw.copy()
    y_num = pd.to_numeric(y, errors="coerce")
    y_num = y_num.replace({7: np.nan, 9: np.nan})
    uniq = set(pd.unique(y_num.dropna()))
    if uniq.issubset({0, 1}):
        return y_num.astype("Int64")
    if uniq.issubset({1, 2}):
        return y_num.map({1: 1, 2: 0}).astype("Int64")
    # 문자열 대응
    y_str = y.astype(str).str.strip().str.lower()
    map_dict = {"yes": 1, "y": 1, "true": 1, "1": 1,
                "no": 0, "n": 0, "false": 0, "0": 0}
    y_map = y_str.map(map_dict)
    return pd.to_numeric(y_map, errors="coerce").astype("Int64")

# -------------------- 유틸 --------------------
def detect_columns(X: pd.DataFrame):
    obj_cats = list(X.select_dtypes(include=["object", "category"]).columns)
    low_card = []
    for c in X.columns:
        if c in obj_cats:
            continue
        nunq = X[c].nunique(dropna=True)
        if nunq <= 10:
            low_card.append(c)
    cat_cols = sorted(set(obj_cats + low_card))
    num_cols = [c for c in X.columns if c not in cat_cols]
    return cat_cols, num_cols

def make_preprocessor(cat_cols, num_cols, scale_numeric=False):
    try:
        ohe = OneHotEncoder(handle_unknown="ignore", sparse_output=False)
    except TypeError:

        ohe = OneHotEncoder(handle_unknown="ignore", sparse=False)

    cat = Pipeline([
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("onehot",  ohe),
    ])

    if scale_numeric:
        num = Pipeline([
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler",  StandardScaler()),
        ])
    else:
        num = Pipeline([
            ("imputer", SimpleImputer(strategy="median")),
        ])

    return ColumnTransformer(
        [("cat", cat, cat_cols),
         ("num", num, num_cols)],
        remainder="drop"
    )

def make_models(y, use_class_weight=True):
    try:
        from xgboost import XGBClassifier as _XGB
        XGB_OK = True
    except ImportError:
        XGB_OK = False
        _XGB = None

    cnt = Counter(y)
    spw = (cnt.get(0, 0) / cnt.get(1, 1)) if cnt.get(1, 0) > 0 else 1.0
    cw = "balanced" if use_class_weight else None

    models = {
        "LogisticRegression": LogisticRegression(
            max_iter=1000,
            class_weight=cw,
            n_jobs=-1,
            random_state=seed
        ),
        "MLP": MLPClassifier(
            hidden_layer_sizes=(64, 32),
            activation="relu",
            alpha=1e-4,
            learning_rate_init=1e-3,
            max_iter=500,
            early_stopping=True,
            n_iter_no_change=10,
            random_state=seed
        ),
        "RandomForest": RandomForestClassifier(
            n_estimators=300,
            class_weight=cw,
            n_jobs=-1,
            random_state=seed
        ),
        "DecisionTree": DecisionTreeClassifier(
            class_weight=cw,
            random_state=seed
        ),
    }

    if XGB_OK:
        models["XGBoost"] = _XGB(
            n_estimators=500,
            max_depth=6,
            learning_rate=0.05,
            subsample=0.9,
            colsample_bytree=0.9,
            reg_lambda=1.0,
            objective="binary:logistic",
            eval_metric="logloss",
            tree_method="hist",
            n_jobs=-1,
            random_state=seed,
            scale_pos_weight=spw if use_class_weight else 1.0
        )

    return models

def get_pos_scores(pipeline, X):
    # 파이프라인 단위로 양성 클래스 점수 추출
    if hasattr(pipeline, "predict_proba"):
        try:
            return pipeline.predict_proba(X)[:, 1]
        except Exception:
            pass
    if hasattr(pipeline, "decision_function"):
        s = pipeline.decision_function(X)
        return s if s.ndim == 1 else s[:, 1]
    return pipeline.predict(X)

# 리샘플링 전략 이름 → imblearn 객체 매핑
def make_resampler(name):
    if name == "none":
        return None
    elif name == "RandomOver":
        return RandomOverSampler(random_state=seed)
    elif name == "RandomUnder":
        return RandomUnderSampler(random_state=seed)
    elif name == "SMOTE":
        return SMOTE(random_state=seed)
    elif name == "SMOTEENN":
        return SMOTEENN(random_state=seed)
    elif name == "SMOTETomek":
        return SMOTETomek(random_state=seed)
    elif name == "ClusterCentroids":
        return ClusterCentroids(random_state=seed)
    else:
        raise ValueError(f"Unknown resampling strategy: {name}")

# -------------------- 데이터 로드  --------------------
if not os.path.exists(original_csv):
    raise FileNotFoundError(f"원본 CSV를 찾을 수 없습니다: {original_csv}")

df = pd.read_csv(original_csv)

if target_col not in df.columns:
    raise KeyError(f"타깃 컬럼 '{target_col}' 이(가) 존재하지 않습니다.")

y_raw = df[target_col]
y_bin = normalize_binary_target(y_raw)
mask  = y_bin.notna()

X = df.drop(columns=[target_col]).loc[mask].reset_index(drop=True)
y = y_bin.loc[mask].astype(int).reset_index(drop=True)

if not set(pd.unique(y)).issubset({0, 1}):
    raise ValueError("타깃 라벨이 0/1 이외의 값을 포함하고 있습니다.")

cat_cols, num_cols = detect_columns(X)
print("[INFO] 범주형 컬럼 수:", len(cat_cols), " / 수치형 컬럼 수:", len(num_cols))

# -------------------- CV 설정 --------------------
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=seed)
rows = []

print(f"[INFO] 리샘플링 전략: {len(resampling_strategies)}개, "
      f"각 전략마다 모델×{cv.get_n_splits()}-fold 진행")

# -------------------- 리샘플링 × 모델 × CV --------------------
for res_name in tqdm(resampling_strategies, desc="Resampling"):
    resampler = make_resampler(res_name)


    ratio_pos = (y == 1).mean()
    use_cw = (res_name == "none") and (ratio_pos < 0.3)
    models = make_models(y, use_class_weight=use_cw)

    for model_name, model in tqdm(models.items(), leave=False,
                                  desc=f"Models({res_name})"):

        pre = make_preprocessor(
            cat_cols, num_cols,
            scale_numeric=(model_name in ["LogisticRegression", "MLP"])
        )

        steps = [("preprocess", pre)]
        if resampler is not None:
            steps.append(("resample", resampler))
        steps.append(("clf", model))

        pipe = ImbPipeline(steps)

        accs, pres, recs, f1s, aucs = [], [], [], [], []

        for tr_idx, te_idx in tqdm(cv.split(X, y),
                                   total=cv.get_n_splits(),
                                   leave=False,
                                   desc=f"{model_name} folds"):
            X_tr, X_te = X.iloc[tr_idx], X.iloc[te_idx]
            y_tr, y_te = y.iloc[tr_idx], y.iloc[te_idx]

            pipe.fit(X_tr, y_tr)

            y_pred  = pipe.predict(X_te)
            y_score = get_pos_scores(pipe, X_te)

            accs.append(accuracy_score(y_te, y_pred))
            pres.append(precision_score(y_te, y_pred,
                                        average="macro",
                                        zero_division=0))
            recs.append(recall_score(y_te, y_pred,
                                     average="macro",
                                     zero_division=0))
            f1s.append(f1_score(y_te, y_pred,
                                average="macro",
                                zero_division=0))
            try:
                aucs.append(roc_auc_score(y_te, y_score))
            except Exception:
                aucs.append(np.nan)

        rows.append({
            "resampling": res_name,
            "model": model_name,
            "accuracy": f"{np.mean(accs):.4f} ± {np.std(accs, ddof=1):.4f}",
            "precision_macro": f"{np.mean(pres):.4f} ± {np.std(pres, ddof=1):.4f}",
            "recall_macro": f"{np.mean(recs):.4f} ± {np.std(recs, ddof=1):.4f}",
            "f1_macro": f"{np.mean(f1s):.4f} ± {np.std(f1s, ddof=1):.4f}",
            "roc_auc": f"{np.nanmean(aucs):.4f} ± {np.nanstd(aucs, ddof=1):.4f}",
            "_f1_mean": np.mean(f1s),
            "_auc_mean": np.nanmean(aucs)
        })

# -------------------- 결과 정리 & 저장 --------------------
res_df = (pd.DataFrame(rows)
          .sort_values(by=["resampling", "_f1_mean", "_auc_mean"],
                       ascending=[True, False, False]))

display_cols = ["resampling", "model",
                "accuracy", "precision_macro",
                "recall_macro", "f1_macro", "roc_auc"]

print("\n[INFO] 교차검증 결과 요약:")
print(res_df[display_cols].to_string(index=False))

out_csv = "./cv_results_resampling_within_folds.csv"
res_df[display_cols].to_csv(out_csv, index=False, encoding="utf-8")
print(f"[INFO] 저장 완료 → {out_csv}")


In [None]:
!pip install imbalanced-learn shap -q

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer


from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from sklearn.tree import DecisionTreeClassifier

from xgboost import XGBClassifier

from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.combine import SMOTEENN, SMOTETomek


from imblearn.over_sampling import RandomOverSampler, SMOTE
from imblearn.under_sampling import RandomUnderSampler, ClusterCentroids

import shap
import matplotlib.pyplot as plt

seed = 42
np.random.seed(seed)

# ---------------------------------------------
# 1. 데이터 로드 + 타깃 0/1 표준화
# ---------------------------------------------
original_csv = "/content/heart_disease_BRFSS2023.csv"
target_col   = "_MICHD"

df = pd.read_csv(original_csv)

def normalize_binary_target(y_raw: pd.Series) -> pd.Series:
    y = y_raw.copy()
    y_num = pd.to_numeric(y, errors="coerce")
    y_num = y_num.replace({7: np.nan, 9: np.nan})
    uniq = set(pd.unique(y_num.dropna()))
    if uniq.issubset({0, 1}):
        return y_num.astype("Int64")
    if uniq.issubset({1, 2}):
        return y_num.map({1: 1, 2: 0}).astype("Int64")
    # 문자열 대응
    y_str = y.astype(str).str.strip().str.lower()
    map_dict = {"yes": 1, "y": 1, "true": 1, "1": 1,
                "no": 0, "n": 0, "false": 0, "0": 0}
    y_map = y_str.map(map_dict)
    return pd.to_numeric(y_map, errors="coerce").astype("Int64")

y_raw = df[target_col]
y_bin = normalize_binary_target(y_raw)
mask  = y_bin.notna()

X = df.drop(columns=[target_col]).loc[mask].reset_index(drop=True)
y = y_bin.loc[mask].astype(int).reset_index(drop=True)

# ---------------------------------------------
# 2. 범주형/수치형 컬럼 구분 + 전처리 정의
# ---------------------------------------------
def detect_columns(X: pd.DataFrame):
    obj_cats = list(X.select_dtypes(include=["object", "category"]).columns)
    low_card = []
    for c in X.columns:
        if c in obj_cats:
            continue
        nunq = X[c].nunique(dropna=True)
        if nunq <= 10:
            low_card.append(c)
    cat_cols = sorted(set(obj_cats + low_card))
    num_cols = [c for c in X.columns if c not in cat_cols]
    return cat_cols, num_cols

cat_cols, num_cols = detect_columns(X)

def make_preprocessor(cat_cols, num_cols, scale_numeric=True):
    # sklearn 버전별 대응
    try:
        ohe = OneHotEncoder(handle_unknown="ignore", sparse_output=False)
    except TypeError:
        ohe = OneHotEncoder(handle_unknown="ignore", sparse=False)

    cat_pipe = Pipeline([
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("onehot",  ohe),
    ])

    if scale_numeric:
        num_pipe = Pipeline([
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler",  StandardScaler()),
        ])
    else:
        num_pipe = Pipeline([
            ("imputer", SimpleImputer(strategy="median")),
        ])

    pre = ColumnTransformer(
        [("cat", cat_pipe, cat_cols),
         ("num", num_pipe, num_cols)],
        remainder="drop"
    )
    return pre

preprocessor = make_preprocessor(cat_cols, num_cols, scale_numeric=True)

# ---------------------------------------------
# 3. Train/Test 분할
# ---------------------------------------------
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    stratify=y,
    random_state=seed
)

# ---------------------------------------------
# 4. 리샘플링 & 모델 정의
# ---------------------------------------------

#  [리샘플링 방법 교체 포인트]
#   - 현재: SMOTETomek
#   - 예시:
#       * 리샘플링 안 하고 싶을 때:
#           → 아래 resampler 관련 스텝을 ImbPipeline에서 제거
#           resampler = RandomUnderSampler(random_state=seed)
#           resampler = RandomOverSampler(random_state=seed)
#           resampler = SMOTE(random_state=seed)
#           resampler = SMOTEENN(random_state=seed)
#           resampler = ClusterCentroids(random_state=seed)
resampler = SMOTETomek(random_state=seed)

# [모델 교체 포인트]
#   - 현재: XGBClassifier
#   - 예시:
#           clf = RandomForestClassifier(
#               n_estimators=300,
#               max_depth=None,
#               n_jobs=-1,
#               random_state=seed
#           )
#           clf = LogisticRegression(
#               max_iter=2000,
#               n_jobs=-1,
#               random_state=seed
#           )
#           clf = MLPClassifier(
#               hidden_layer_sizes=(64, 32),
#               max_iter=300,
#               random_state=seed
#           )
#           clf = DecisionTreeClassifier(
#               max_depth=6,
#               random_state=seed
#           )
#

xgb = XGBClassifier(
    n_estimators=500,
    max_depth=6,
    learning_rate=0.05,
    subsample=0.9,
    colsample_bytree=0.9,
    reg_lambda=1.0,
    objective="binary:logistic",
    eval_metric="logloss",
    tree_method="hist",
    n_jobs=-1,
    random_state=seed,
    scale_pos_weight=1.0,
)

clf = xgb  # ← 위에서 다른 모델로 교체했다면 여기 교체

# ImbPipeline 정의
pipe = ImbPipeline([
    ("preprocess", preprocessor),
    ("resample",  resampler),  # 리샘플링 끄려면 이 줄을 제거
    ("clf",       clf),
])

pipe.fit(X_train, y_train)

# ---------------------------------------------
# 5. SHAP을 위한 입력 준비
# ---------------------------------------------

X_test_proc = pipe.named_steps["preprocess"].transform(X_test)
feature_names = pipe.named_steps["preprocess"].get_feature_names_out()

xgb_model = pipe.named_steps["clf"]


explainer = shap.TreeExplainer(xgb_model)

)
shap_values = explainer.shap_values(X_test_proc, check_additivity=False)


if isinstance(shap_values, list):

    shap_to_plot = shap_values[1]
else:
    shap_to_plot = shap_values

# 6) summary_plot에 쓸 DataFrame (열 개수를 SHAP 값에 딱 맞게)
n_features = shap_to_plot.shape[1]
X_test_for_plot = pd.DataFrame(
    X_test_proc[:, :n_features],
    columns=feature_names[:n_features]
)

# ---------------------------------------------
# 7. Summary plot (beeswarm + bar)
# ---------------------------------------------
plt.figure(figsize=(6, 8))
shap.summary_plot(shap_to_plot, X_test_for_plot, show=False)
plt.tight_layout()
plt.show()

plt.figure(figsize=(6, 8))
shap.summary_plot(shap_to_plot, X_test_for_plot, plot_type="bar", show=False)
plt.tight_layout()
plt.show()
