In [2]:
import torch
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Using device:", device)

Using device: cuda


In [None]:
!pip install catboost
!pip install imbalanced-learn
!pip install optuna
!pip install tabpfn

import pandas as pd
import numpy as np
import re
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
import lightgbm as lgb
from catboost import CatBoostClassifier
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import PolynomialFeatures, OrdinalEncoder, StandardScaler, OneHotEncoder
from sklearn.ensemble import VotingClassifier
from sklearn.linear_model import LogisticRegression
import optuna
from optuna.samplers import TPESampler
from tabpfn import TabPFNClassifier

In [4]:
# ---------------------------
# Data Loader Function
# ---------------------------
def load_data(train_file, test_file, submission_file):
    train = pd.read_csv(train_file).drop(columns=['ID'])
    test = pd.read_csv(test_file).drop(columns=['ID'])
    X = train.drop('임신 성공 여부', axis=1)
    y = train['임신 성공 여부']
    print("✅ 데이터 로드 완료!")
    print("train.csv의 X shape:", X.shape)
    print("train.csv의 y shape:", y.shape)
    return X, y, test, submission_file


In [5]:
# ---------------------------
# Preprocessing Class
# ---------------------------
class Preprocessor:
    def __init__(self):
        # 나이 매핑 정보
        self.age_mapping = {
            "만18-34세": 1,
            "만35-37세": 2,
            "만38-39세": 3,
            "만40-42세": 4,
            "만43-44세": 5,
            "만45-50세": 6,
            "알 수 없음": 3.5  # 중립값
        }
        # 삭제할 컬럼들
        self.drop_columns_cat = ["배란 유도 유형", "불임 원인-여성요인"]
        self.drop_columns_numeric = ["난자 채취 경과일", "난자 해동 경과일", "난자 혼합 경과일", "임신 시도 또는 마지막 임신 경과 연수", "배아 해동 경과일"]

        # 전처리할 카테고리형 컬럼 리스트 (필요에 따라 수정)
        self.categorical_columns = [
            "시술 시기 코드",
            "시술 유형",
            "특정 시술 유형",
            "배란 자극 여부",
            "단일 배아 이식 여부",
            "착상 전 유전 진단 사용 여부",
            "남성 주 불임 원인",
            "남성 부 불임 원인",
            "여성 주 불임 원인",
            "여성 부 불임 원인",
            "부부 주 불임 원인",
            "부부 부 불임 원인",
            "불명확 불임 원인",
            "불임 원인 - 난관 질환",
            "불임 원인 - 남성 요인",
            "불임 원인 - 배란 장애",
            "불임 원인 - 자궁경부 문제",
            "불임 원인 - 자궁내막증",
            "불임 원인 - 정자 농도",
            "불임 원인 - 정자 운동성",
            "불임 원인 - 정자 형태",
            "배아 생성 주요 이유",
            "총 시술 횟수",
            "클리닉 내 총 시술 횟수",
            "IVF 시술 횟수",
            "DI 시술 횟수",
            "총 임신 횟수",
            "IVF 임신 횟수",
            "DI 임신 횟수",
            "총 출산 횟수",
            "IVF 출산 횟수",
            "DI 출산 횟수",
            "난자 출처",
            "정자 출처",
            "난자 기증자 나이",
            "정자 기증자 나이",
            "동결 배아 사용 여부",
            "신선 배아 사용 여부",
            "기증 배아 사용 여부",
            "대리모 여부"
        ]
        # 수치형 컬럼 리스트 (필요에 따라 수정)
        self.numeric_columns = [
            "총 생성 배아 수",
            "미세주입된 난자 수",
            "미세주입에서 생성된 배아 수",
            "이식된 배아 수",
            "미세주입 배아 이식 수",
            "저장된 배아 수",
            "미세주입 후 저장된 배아 수",
            "해동된 배아 수",
            "해동 난자 수",
            "수집된 신선 난자 수",
            "저장된 신선 난자 수",
            "혼합된 난자 수",
            "파트너 정자와 혼합된 난자 수",
            "기증자 정자와 혼합된 난자 수",
        ]

    def process_age_feature(self, df):
        # 시술 당시 나이 Ordinal Encoding 및 다항식 변환
        df["시술 당시 나이_ordinal"] = df["시술 당시 나이"].astype(str).map(self.age_mapping)
        df["시술 당시 나이_ordinal"] = df["시술 당시 나이_ordinal"].astype(float)

        poly = PolynomialFeatures(degree=3, include_bias=False)
        poly_features = poly.fit_transform(df[["시술 당시 나이_ordinal"]])
        df["시술 당시 나이^2"] = poly_features[:, 1]  # x^2
        df["시술 당시 나이^3"] = poly_features[:, 2]  # x^3

        # 나이 페널티
        df["나이 페널티"] = df["시술 당시 나이_ordinal"] * (-0.1)
        df["나이 페널티_2"] = df["시술 당시 나이_ordinal"] * (-0.2)

        # 원본 나이 컬럼 삭제
        df.drop(columns=["시술 당시 나이"], inplace=True)
        return df

    def process_categorical_features(self, df):
        # 결측값 처리: 필요한 컬럼에 대해 0으로 채움
        df["착상 전 유전 검사 사용 여부"] = df["착상 전 유전 검사 사용 여부"].fillna(0)
        df["착상 전 유전 진단 사용 여부"] = df["착상 전 유전 진단 사용 여부"].fillna(0)
        df["PGD 시술 여부"] = df["PGD 시술 여부"].fillna(0)
        df["PGS 시술 여부"] = df["PGS 시술 여부"].fillna(0)
        # 불필요한 컬럼 삭제
        df = df.drop(columns=self.drop_columns_cat, errors="ignore")
        # 카테고리형 컬럼들을 문자열로 변환
        for col in self.categorical_columns:
            df[col] = df[col].astype(str)
        return df

    def process_numeric_features(self, df):
        # 결측값 처리: 배아 이식 경과일은 평균 3일로 대체
        df["배아 이식 경과일"] = df["배아 이식 경과일"].fillna(3)
        # 불필요한 컬럼 삭제
        df = df.drop(columns=self.drop_columns_numeric, errors="ignore")
        return df

    def encode_categorical(self, train_df, test_df):
        ordinal_encoder = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=np.nan)
        train_df[self.categorical_columns] = ordinal_encoder.fit_transform(train_df[self.categorical_columns])
        test_df[self.categorical_columns] = ordinal_encoder.transform(test_df[self.categorical_columns])
        return train_df, test_df

    def scale_numeric(self, train_df, test_df):
        # 수치형 변수 스케일링
        scaler = StandardScaler()
        train_df[self.numeric_columns] = scaler.fit_transform(train_df[self.numeric_columns])

    def preprocess(self, train_df, test_df, y):
        # 각 단계별 전처리 실행
        train_df = self.process_age_feature(train_df)
        test_df = self.process_age_feature(test_df)

        train_df = self.process_categorical_features(train_df)
        test_df = self.process_categorical_features(test_df)

        train_df = self.process_numeric_features(train_df)
        test_df = self.process_numeric_features(test_df)

        # 수치형 스케일링 및 정규화
        self.scale_numeric(train_df, test_df)

        # 카테고리형 인코딩
        train_df, test_df = self.encode_categorical(train_df, test_df)

        # Feature Weighting
        train_df["시술 당시 나이_weighted"] = train_df["시술 당시 나이_ordinal"] * 1.5
        test_df["시술 당시 나이_weighted"] = test_df["시술 당시 나이_ordinal"] * 1.5

        print("✅ 데이터 전처리 완료!")
        return train_df, test_df, y


In [6]:
# ---------------------------
# Resampler Class
# ---------------------------
class Resampler:
    def __init__(self, method='none'):
        self.method = method.lower()

    def resample(self, X, y):
        if self.method == 'undersample':
            from imblearn.under_sampling import RandomUnderSampler
            sampler = RandomUnderSampler(random_state=42)
            X_res, y_res = sampler.fit_resample(X, y)
            print("✅ Undersampling applied. New shape:", X_res.shape)

        elif self.method == 'oversample':
            from imblearn.over_sampling import RandomOverSampler
            sampler = RandomOverSampler(random_state=42)
            X_res, y_res = sampler.fit_resample(X, y)
            print("✅ Oversampling applied. New shape:", X_res.shape)

        elif self.method == 'smote':
            from imblearn.over_sampling import SMOTE
            sampler = SMOTE(random_state=42)
            X_res, y_res = sampler.fit_resample(X, y)
            print("✅ SMOTE applied. New shape:", X_res.shape)

        else:
            X_res, y_res = X, y
            print("✅ No resampling applied.")

        return X_res, y_res


In [7]:
# ---------------------------
# Model Trainer Class
# ---------------------------
class ModelTrainer:
    def __init__(self, X_train, X_val, y_train, y_val):
        self.X_train = X_train
        self.X_val = X_val
        self.y_train = y_train
        self.y_val = y_val
        self.base_models = None
        self.meta_model = None

    def train_xgb(self):
        xgb_model = XGBClassifier(
            objective="binary:logistic",
            eval_metric="logloss",
            subsample=0.7426464507655118,
            n_estimators=461,
            max_depth=5,
            learning_rate=0.03882069104127232,
            colsample_bytree=0.9097334148049476,
            tree_method="hist",
            device="cuda",
        )
        # numpy array를 사용하여 fit 호출
        xgb_model.fit(self.X_train, self.y_train, eval_set=[(self.X_val, self.y_val)], verbose=True)
        auc = roc_auc_score(self.y_val, xgb_model.predict_proba(self.X_val)[:, 1])
        print(f"✅ XGBoost ROC-AUC Score: {auc:.4f}")
        return xgb_model

    def train_lgbm(self):
        lgbm_model = LGBMClassifier(
            n_estimators=527,
            learning_rate=0.04148924654004078,
            max_depth=5,
            num_leaves=85,
            subsample=0.8339351226312508,
            colsample_bytree=0.6960519772928399,
            min_child_samples=46,
            random_state=42,
        )
        lgbm_model.fit(
            self.X_train, self.y_train,
            eval_set=[(self.X_val, self.y_val)],
            callbacks=[lgb.early_stopping(50, verbose=True)]
        )
        auc = roc_auc_score(self.y_val, lgbm_model.predict_proba(self.X_val)[:, 1])
        print(f"✅ LGBM ROC-AUC Score: {auc:.4f}")
        return lgbm_model

    def train_catboost(self):
        catboost_model = CatBoostClassifier(
            iterations=607,
            learning_rate=0.046970220945236694,
            depth=8,
            l2_leaf_reg=1.3270415457360085,
            border_count=79,
            bagging_temperature=1.4405235392731484,
            random_strength=0.9001167344802684,
            eval_metric="AUC",
            random_seed=42,
            verbose=100,
            task_type="GPU"
        )
        catboost_model.fit(
            self.X_train, self.y_train,
            eval_set=(self.X_val, self.y_val),
            early_stopping_rounds=50
        )
        auc = roc_auc_score(self.y_val, catboost_model.predict_proba(self.X_val)[:, 1])
        print(f"✅ CatBoost ROC-AUC Score: {auc:.4f}")
        return catboost_model

    def train_voting_classifier(self, models):
        voting_clf = VotingClassifier(estimators=models, voting='soft')
        voting_clf.fit(self.X_train, self.y_train)
        auc = roc_auc_score(self.y_val, voting_clf.predict_proba(self.X_val)[:, 1])
        print(f"✅ Voting Classifier ROC-AUC Score: {auc:.4f}")
        return voting_clf


In [8]:
# ---------------------------
# Hyperparameter tuning function
# ---------------------------


In [None]:
# ---------------------------
# Main Pipeline
# ---------------------------
def main():
    # 1. Data loading
    train_file = "train.csv"
    test_file = "test.csv"
    submission_file = "sample_submission.csv"
    X, y, test_df, submission_file = load_data(train_file, test_file, submission_file)

    # 2. Pre-processing
    preprocessor = Preprocessor()
    X_train_encoded, X_test_encoded, y = preprocessor.preprocess(X.copy(), test_df.copy(), y)

    print("Pre-processed X shape:", X_train_encoded.shape)
    print("Pre-processed y shape:", y.shape)
    print("y null 개수:", y.isnull().sum())

    # 2.5. Feature Interaction
    X_train_encoded["나이_배아이식_상호작용"] = X_train_encoded["시술 당시 나이_ordinal"] * X_train_encoded["배아 이식 경과일"]
    X_test_encoded["나이_배아이식_상호작용"] = X_test_encoded["시술 당시 나이_ordinal"] * X_test_encoded["배아 이식 경과일"]

    X_train_encoded["나이_총시술횟수_상호작용"] = X_train_encoded["시술 당시 나이_ordinal"] * X_train_encoded["총 시술 횟수"]
    X_test_encoded["나이_총시술횟수_상호작용"] = X_test_encoded["시술 당시 나이_ordinal"] * X_test_encoded["총 시술 횟수"]

    X_train_encoded["나이_총임신횟수_상호작용"] = X_train_encoded["시술 당시 나이_ordinal"] * X_train_encoded["총 임신 횟수"]
    X_test_encoded["나이_총임신횟수_상호작용"] = X_test_encoded["시술 당시 나이_ordinal"] * X_test_encoded["총 임신 횟수"]

    X_train_encoded["나이_총출산횟수_상호작용"] = X_train_encoded["시술 당시 나이_ordinal"] * X_train_encoded["총 출산 횟수"]
    X_test_encoded["나이_총출산횟수_상호작용"] = X_test_encoded["시술 당시 나이_ordinal"] * X_test_encoded["총 출산 횟수"]

    X_train_encoded["IVF1_상호작용"] = X_train_encoded["IVF 임신 횟수"] * X_train_encoded["IVF 시술 횟수"]
    X_test_encoded["IVF1_상호작용"] = X_test_encoded["IVF 임신 횟수"] * X_test_encoded["IVF 시술 횟수"]

    X_train_encoded["DI1_상호작용"] = X_train_encoded["DI 임신 횟수"] * X_train_encoded["DI 시술 횟수"]
    X_test_encoded["DI1_상호작용"] = X_test_encoded["DI 임신 횟수"] * X_test_encoded["DI 시술 횟수"]

    X_train_encoded["IVF2_상호작용"] = X_train_encoded["IVF 임신 횟수"] * X_train_encoded["IVF 출산 횟수"]
    X_test_encoded["IVF2_상호작용"] = X_test_encoded["IVF 임신 횟수"] * X_test_encoded["IVF 출산 횟수"]

    X_train_encoded["DI2_상호작용"] = X_train_encoded["DI 임신 횟수"] * X_train_encoded["DI 출산 횟수"]
    X_test_encoded["DI2_상호작용"] = X_test_encoded["DI 임신 횟수"] * X_test_encoded["DI 출산 횟수"]

    X_train_encoded["배아생성_이식_상호작용"] = X_train_encoded["총 생성 배아 수"] * X_train_encoded["이식된 배아 수"]
    X_test_encoded["배아생성_이식_상호작용"] = X_test_encoded["총 생성 배아 수"] * X_test_encoded["이식된 배아 수"]

    X_train_encoded["미세주입 성공률"] = X_train_encoded["미세주입에서 생성된 배아 수"] / (X_train_encoded["미세주입된 난자 수"] + 1)
    X_test_encoded["미세주입 성공률"] = X_test_encoded["미세주입에서 생성된 배아 수"] / (X_test_encoded["미세주입된 난자 수"] + 1)

    X_train_encoded["배아 이식 성공률"] = X_train_encoded["이식된 배아 수"] / (X_train_encoded["총 생성 배아 수"] + 1)
    X_test_encoded["배아 이식 성공률"] = X_test_encoded["이식된 배아 수"] / (X_test_encoded["총 생성 배아 수"] + 1)

    # 3. Splitting dataset
    X_train, X_val, y_train, y_val = train_test_split(
        X_train_encoded, y, test_size=0.2, random_state=42, stratify=y
    )

    # 4. Resampling
    # 옵션: ['undersample', 'oversample', 'smote', 'none']
    sampling_method = 'none'
    resampler = Resampler(method=sampling_method)
    X_train_res, y_train_res = resampler.resample(X_train, y_train)
    print("임신 성공 1 개수:", (y_train_res == 1).sum())
    print("임신 실패 0 개수:", (y_train_res == 0).sum())

    # 5. Training
    trainer = ModelTrainer(X_train_res, X_val, y_train_res, y_val)
    xgb_model = trainer.train_xgb()
    lgbm_model = trainer.train_lgbm()
    catboost_model = trainer.train_catboost()

    subset_size = 10000
    if len(X_train_res) > subset_size:
        indices = np.random.choice(len(X_train_res), subset_size, replace=False)
        X_tabpfn = X_train_res.iloc[indices]
        y_tabpfn = y_train_res.iloc[indices]
    else:
        X_tabpfn = X_train_res
        y_tabpfn = y_train_res

    tabpfn_model = TabPFNClassifier(
        N_ensemble_configurations=24,
        max_features=subset_size,
        device=device
    )
    tabpfn_model.fit(X_tabpfn.values.astype(np.float32), y_tabpfn.values)

    # X_val_np = X_val.astype(np.float64).to_numpy()
    # X_val_tabpfn_trans = qt.transform(X_val_np)
    # tabpfn_val_proba = tabpfn_model.predict_proba(X_val_tabpfn_trans)[:, 1]
    # tabpfn_auc = roc_auc_score(y_val, tabpfn_val_proba)
    # print(f"✅ TabPFN Validation ROC-AUC Score: {tabpfn_auc:.4f}")

    # 6. Voting Ensemble
    models = [
        ('xgb', xgb_model),
        ('lgbm', lgbm_model),
        ('catboost', catboost_model),
        ('tabpfn', tabpfn_model)
    ]
    voting_clf = trainer.train_voting_classifier(models)

    # 7. 테스트 데이터 예측 및 제출 파일 생성
    test_proba = voting_clf.predict_proba(X_test_encoded)[:, 1]
    age_factor = (3.5 - X_test_encoded["시술 당시 나이_ordinal"]) * 0.001
    adjusted_pred_proba = test_proba + age_factor
    adjusted_pred_proba = np.clip(adjusted_pred_proba, 0, 1)

    submission = pd.read_csv(submission_file)
    submission['probability'] = adjusted_pred_proba
    output_file = "tabpfn_ensemble_submission.csv"
    submission.to_csv(output_file, index=False)
    print(f"✅ 최종 제출 파일 저장 완료: {output_file}")

    # Colab 환경이면 파일 다운로드 (로컬 환경이면 생략)
    try:
        from google.colab import files
        files.download(output_file)
    except ImportError:
        pass

if __name__ == "__main__":
    main()
