### Libraries

In [None]:
import numpy as np
import pandas as pd
import warnings

from catboost import CatBoostClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import VotingClassifier, AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix

### Load data

In [None]:
from dotenv import load_dotenv
import os 
load_dotenv()

In [None]:
df_train = pd.read_csv(os.getenv('TRAIN_DATA_PATH')).drop(columns=['ID'])
df_test = pd.read_csv(os.getenv('TEST_DATA_PATH')).drop(columns=['ID'])

df_train.head(8).to_csv('train_sample.csv', encoding = 'utf-8-sig', index = False)

In [None]:
# object 타입 컬럼 확인
cat_features = list(df_train.select_dtypes(include=['object']).columns)
df_train[cat_features].isnull().mean() * 100

In [None]:
print(df_train.shape)
df_train.head(2)

In [None]:
print(df_test.shape)
df_test.head(2)

### Preprocessing

#### 컬럼 제거

In [None]:
# 결측 비율 80% 이상 제거
missing_ratio = df_train.isnull().mean()
column_missing = missing_ratio[missing_ratio >= 0.8].index.tolist()

# nunique = 1 제거 
unique_counts = df_train.nunique()
column_nunique = unique_counts[unique_counts == 1].index.tolist()

# 컬럼 제거
dropped_columns = list(set(column_missing + column_nunique + ['배란 유도 유형'])) # 배란 유도 유형 제외

df_train = df_train.drop(columns=dropped_columns)
df_test = df_test.drop(columns=dropped_columns)

print("제거된 컬럼 개수:", len(dropped_columns))
print("제거된 컬럼:", dropped_columns)
print("df_train.shape:", df_train.shape)
print("df_test.shape:", df_test.shape)

#### 29-38. astype(int)

In [None]:
# '횟수'를 포함하는 컬럼 찾기
count_columns = [col for col in df_train.columns if '횟수' in col]

def extract_number(value):
    if isinstance(value, str):
        return int(value[0])  # 맨 앞자리 숫자로 변환
    return value

for col in count_columns:
    df_train[col] = df_train[col].apply(extract_number).astype(int)
    df_test[col] = df_test[col].apply(extract_number).astype(int)
print("변환된 컬럼:", count_columns)

#### 29,31,32. 총 시술 횟수 재정의

In [None]:
# 총 시술 횟수 재정의
def update_total_counts(df):
    df['총 시술 횟수'] = df['IVF 시술 횟수'] + df['DI 시술 횟수']
    df['총 임신 횟수'] = df['IVF 임신 횟수'] + df['DI 임신 횟수']
    df['총 출산 횟수'] = df['IVF 출산 횟수'] + df['DI 출산 횟수']
    return df

df_train = update_total_counts(df_train)
df_test = update_total_counts(df_test)

#### 2. 55-56. 나이

In [None]:
# 중앙값으로 나이 변환
def mapping_age(df, column, age):
    df[column] = df[column].map(age)
    return df

patient_age = {
    '만18-34세': 26,  
    '만35-37세': 36,  
    '만38-39세': 38.5,  
    '만40-42세': 41,  
    '만43-44세': 43.5,  
    '만45-50세': 47.5,  
    '알 수 없음': 999  # value_counts = 329, 999 또는 NaN로 대체
}

egg_age = {
    '만20세 이하': 18,  
    '만21-25세': 23,  
    '만26-30세': 28,  
    '만31-35세': 33,  
    '알 수 없음': 999
}

sperm_age = {
    '만20세 이하': 18,  
    '만21-25세': 23,  
    '만26-30세': 28,  
    '만31-35세': 33,  
    '만36-40세': 38,  
    '만41-45세': 43,  
    '알 수 없음': 999 
}

df_train = mapping_age(df_train, '시술 당시 나이', patient_age)
df_test = mapping_age(df_test, '시술 당시 나이', patient_age)
df_train = mapping_age(df_train, '난자 기증자 나이', egg_age)
df_test = mapping_age(df_test, '난자 기증자 나이', egg_age)
df_train = mapping_age(df_train, '정자 기증자 나이', sperm_age)
df_test = mapping_age(df_test, '정자 기증자 나이', sperm_age)

#### 5. 특정 시술 유형

In [None]:
def ICSI_check(df):
    df['특정 시술 유형 - ICSI'] = np.where(df['시술 유형'] != 'IVF', -1, 
                             np.where(df['특정 시술 유형'].str.contains('ICSI', na=False), 2, 1))

    df = df.drop(columns=['특정 시술 유형'])
    
    return df

df_train = ICSI_check(df_train)
df_test = ICSI_check(df_test)

#### 28. 배아 생성 주요 이유: 기증용, 난자 저장용, 배아 저장용, 연구용, 현재 시술용

In [None]:
def treatment_use(df):

    # df['시술용_여부'] = df['배아 생성 주요 이유'].astype(str).apply(lambda x: 1 if '현재 시술용' in x else 0)
    df.drop('배아 생성 주요 이유',axis=1, inplace=True)
    return df
df_train=treatment_use(df_train)
df_test=treatment_use(df_test)

In [None]:
def drop_cols(df):
    cols = ['남성 주 불임 원인', '남성 부 불임 원인', 
            '여성 주 불임 원인', '여성 부 불임 원인',
            '부부 주 불임 원인', '부부 부 불임 원인']
    df = df.drop(columns=cols, errors='ignore')
    return df

df_train = drop_cols(df_train)
df_test = drop_cols(df_test)

In [None]:
df_train.columns

## 새롭게 추가하는 것들_다은

In [None]:

def advanced_features(df):
    epsilon = 1e-5  # 분모 0 방지를 위한 작은 값

    binary_cols = [
        '배란 자극 여부', '단일 배아 이식 여부', '착상 전 유전 진단 사용 여부',
        '동결 배아 사용 여부', '신선 배아 사용 여부', '기증 배아 사용 여부', '대리모 여부'
    ]
    # 이미 0/1로 변환되어 있다고 가정하고, 총 개수 합산
    df['바이너리_합'] = df[binary_cols].sum(axis=1)
    # 예시로 특정 두 변수 간 상호작용 (배란 자극과 단일 배아 이식)
    df['배란자극x단일이식'] = df['배란 자극 여부'] * df['단일 배아 이식 여부']
    
    # 6. 경과일 차이: 배아 이식 경과일과 난자 혼합 경과일의 차이
    df['경과일_차이'] = df['배아 이식 경과일'] - df['난자 혼합 경과일']
    
    # 10. 불임 원인 관련 피처: 여러 불임 원인 컬럼을 합쳐서 총 원인 수 산출
    infertility_cols = [
        '불명확 불임 원인', '불임 원인 - 난관 질환', '불임 원인 - 남성 요인',
        '불임 원인 - 배란 장애', '불임 원인 - 자궁경부 문제', '불임 원인 - 자궁내막증',
        '불임 원인 - 정자 농도', '불임 원인 - 정자 면역학적 요인',
        '불임 원인 - 정자 운동성', '불임 원인 - 정자 형태'
    ]
    # 만약 해당 컬럼들이 문자열(Y/N)이라면 0/1로 변환
    for col in infertility_cols:
        if df[col].dtype == 'object':
            df[col] = df[col].map({'Y': 1, 'N': 0})
    df['불임원인_수'] = df[infertility_cols].sum(axis=1)
    
    # 11. ICSI와 배란자극 여부 상호작용: ICSI 효과가 배란자극 여부와 어떻게 연관되는지
    df['ICSIx배란자극'] = df['특정 시술 유형 - ICSI'] * df['배란 자극 여부']
    
    # 12. 배아 저장 비율: 생성된 배아 중 얼마나 많은 배아가 저장되었는지
    df['배아저장비율'] = df['저장된 배아 수'] / (df['총 생성 배아 수'] + epsilon)
    
    return df

# 학습/테스트 데이터에 적용
df_train = advanced_features(df_train)
df_test  = advanced_features(df_test)

### Feature engineering

In [None]:
def feature_engineering(df):
    
    # 임신 성공률
    # df['총 임신 성공률'] = df['총 임신 횟수'] / df['총 시술 횟수']
    df['IVF 임신 성공률'] = df['IVF 임신 횟수'] / df['IVF 시술 횟수']
    df['배아 이식 대비 임신 성공률'] = df['총 임신 횟수'] / df['이식된 배아 수']
    # [ADD] 배아 이식 대비 임신 실패률
    df['배아 이식 대비 임신 실패률'] = 1- df['배아 이식 대비 임신 성공률']
    
    # 출산 성공률
    # df['총 출산 성공률'] = df['총 출산 횟수'] / df['총 임신 횟수']
    df['IVF 출산 성공률'] = df['IVF 출산 횟수'] / df['IVF 임신 횟수']
    df['배아 이식 대비 출산 성공률'] = df['총 출산 횟수'] / df['이식된 배아 수']
    # [ADD] 배아 이식 대비 출산 실패률
    df['배아 이식 대비 출산 실패률'] = 1-df['배아 이식 대비 출산 성공률']
    
    # 나이 
    # df['나이 대비 임신 확률'] = df['총 임신 횟수'] / df['시술 당시 나이']
    # df['나이 대비 배아 생성 확률'] = df['총 생성 배아 수'] / df['시술 당시 나이']
    # df['나이 대비 배아 이식 확률'] = df['이식된 배아 수'] / df['시술 당시 나이']
    
    # 시술 효율성
    # df['시술당 평균 배아 생성 수'] = df['총 생성 배아 수'] / df['총 시술 횟수'] 
    # df['시술당 평균 이식 배아 수'] = df['이식된 배아 수'] / df['총 시술 횟수'] 
    # df['총 배아 이식 확률'] = df['이식된 배아 수'] / df['총 생성 배아 수']
    # df['미세주입 배아 생성 확률'] = df['미세주입에서 생성된 배아 수'] / df['미세주입된 난자 수']
    # df['미세주입 배아 이식 확률'] = df['미세주입 배아 이식 수'] / df['미세주입에서 생성된 배아 수']
    
    # 이식 경과일
    # df["배아 이식 경과 카테고리"] = pd.cut(df["배아 이식 경과일"], 
    #                          bins=[0, 2, 4, df["배아 이식 경과일"].max()], 
    #                          labels=[0, 1, 2])
    # df["배아 이식 경과 카테고리"] = df["배아 이식 경과 카테고리"].astype(str)

    # df["배아 저장 대비 이식 기간"] = df["배아 이식 경과일"] / (df["저장된 배아 수"])
    df["이식된 배아 대비 이식 기간"] = df["배아 이식 경과일"] / (df["이식된 배아 수"])

    df['총 난자 수']=df['수집된 신선 난자 수']+df['해동 난자 수']
        
    df['난자 사용률']=df['혼합된 난자 수']/(df['총 난자 수'])
    # [ADD] 난자 사용 실패률
    df['난자 사용 실패률'] = 1 - df['난자 사용률']
    
    df['IVF 난자 수']=df['혼합된 난자 수']-df['미세주입된 난자 수']

    df['총 배아 수']=df['총 생성 배아 수']+df['해동된 배아 수']
    
    df['IVF 배아 수']=df['총 생성 배아 수']-df['미세주입에서 생성된 배아 수']

    df['미세주입 배아 생성 확률'] = df['미세주입에서 생성된 배아 수'] / df['미세주입된 난자 수']
    # [ADD] 미세주입 배아 생성 실패 확률
    df['미세주입 배아 생성 실패 확률'] = 1 - df['미세주입 배아 생성 확률']
    
    df['IVF 배아 생성 확률']= df['IVF 배아 수']/(df['IVF 난자 수'])
    # [ADD] IVF 배아 생성 실패 확률
    df['IVF 배아 생성 실패 확률'] = 1 - df['IVF 배아 생성 확률']
    
    df['총 배아 생성 확률']=df['총 배아 수']/(df['총 난자 수'])
    # [ADD] 총 배아 생성 실패 확률
    df['총 배아 생성 실패 확률'] = 1 - df['총 배아 생성 확률']
    
    df['이식률']=df['이식된 배아 수']/(df['총 배아 수'])
    #[ADD] 이식 실패률
    df['이식실패률'] = 1 - df['이식률']
    
    df['미세주입 배아 이식 확률'] = df['미세주입 배아 이식 수'] / (df['미세주입에서 생성된 배아 수'])
    #[ADD] 미세주입 배아 시식 실패 확률
    df['미세주입 배아 이식 실패률'] = 1 - df['미세주입 배아 이식 확률']
    
    df['동결&IVF 배아 이식 확률'] = (df['이식된 배아 수']-df['미세주입 배아 이식 수'])/(df['총 배아 수']-df['미세주입에서 생성된 배아 수'])
    #[ADD] 동결&IVF 배아 이식 실패 확률
    df['동결&IVF 배아 이식 실패 확률'] = 1 - df['동결&IVF 배아 이식 확률']
    # df['임신횟수*나이']=df['총 임신 횟수'] * df['시술 당시 나이']
    # df['시술횟수*나이']=df['총 시술 횟수'] * df['시술 당시 나이']
    # df['난자사용률*배아수']=df['난자 사용률']*df['총 배아 수']
    # df['이식률*경과일']=df['이식률']*df['배아 이식 경과일']
    # df['이식률/나이']=df['이식률']/df['시술 당시 나이']
    return df

df_train = feature_engineering(df_train)
df_test = feature_engineering(df_test)



In [None]:
def drop_cols(df):
    cols = ['총 난자 수','IVF 난자 수','IVF 배아 수','IVF 배아 생성 확률','동결&IVF 배아 이식 확률']
    
    df = df.drop(columns=cols, errors='ignore')

    return df

df_train = drop_cols(df_train)
df_test = drop_cols(df_test)

In [None]:
def di_null(df):
    df.loc[df['시술 유형'] == 'DI'] = df.loc[df['시술 유형'] == 'DI'].fillna(-1)
    return df

df_train=di_null(df_train)
df_test=di_null(df_test)

In [None]:
# 결측값 대체
def replace_inf_and_nan(df, value):
    df.replace([np.inf, -np.inf], value, inplace=True)  # inf, -inf를 999로 변환
    df.fillna(value, inplace=True)  # NaN도 999로 변환
    return df

# 변환 적용
value = 999
df_train = replace_inf_and_nan(df_train, value)
df_test = replace_inf_and_nan(df_test, value)

### Feature selection

In [None]:
# object 타입 컬럼 확인
cat_features = list(df_train.select_dtypes(include=['object']).columns)
df_train[cat_features].isnull().mean() * 100

In [None]:
from sklearn.preprocessing import OneHotEncoder
def onehot_category_encoding(df: pd.DataFrame, column_name: str) -> pd.DataFrame:
    oe = OneHotEncoder(sparse=False)
    # df_train 대신 전달된 df를 사용하고, 결과를 int로 변환
    onehot_encoded = oe.fit_transform(df[[column_name]]).astype(int)
    column_names = oe.get_feature_names_out([column_name])
    df_onehot = pd.DataFrame(onehot_encoded, columns=column_names, index=df.index)
    df = df.join(df_onehot)
    
    # 기존 컬럼 삭제
    df.drop(columns=[column_name], inplace=True)
    return df

In [None]:
# LabelEncoder 적용
for col in cat_features:
    # [BEFORE] Label Encoding
    le = LabelEncoder() 
    df_train[col] = le.fit_transform(df_train[col])  
    df_test[col] = le.transform(df_test[col]) 
    
    # [AFTER] Onehot Encoding
    # df_train = onehot_category_encoding(df_train, col)
    # df_test = onehot_category_encoding(df_test, col)

# [ADD]
모두 0~1 정규형 바꾸기


In [None]:
# 정규화
df_train = (df_train - df_train.mean()) / df_train.std()
df_train

In [None]:
# 결측 비율
missing_ratio = df_train.isnull().mean() * 100
print(missing_ratio)

### Modeling

In [None]:
X = df_train.drop('임신 성공 여부', axis=1)
y = df_train['임신 성공 여부']
X_test = df_test

#### Stratified K-Fold 

In [None]:
# Stratified K-Fold 설정
n_splits = 5
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=123)

metrics = {model: [] for model in ['CatBoost']}
feature_importances = {model: [] for model in ['CatBoost']}
test_proba = {model: [] for model in ['CatBoost']}

for fold, (train_idx, val_idx) in enumerate(skf.split(X, y), 1):
    print(f"===== Fold {fold} =====")

    X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
    y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

    # 모델 정의
    cat_model = CatBoostClassifier(
        iterations=700, learning_rate=0.03, depth=8, l2_leaf_reg=10,
        subsample=0.8, colsample_bylevel=0.8, random_strength=10,
        loss_function='Logloss', eval_metric='AUC', verbose=100,random_seed=123
    )


    # 모델 학습
    for model in [cat_model]:
        model.fit(X_train, y_train)

    # 평가 함수
    def evaluate_model(model, X_val, y_true):
        y_pred = model.predict(X_val)
        y_pred_proba = model.predict_proba(X_val)[:, 1]

        return {
            'Accuracy': accuracy_score(y_true, y_pred),
            'Precision': precision_score(y_true, y_pred),
            'Recall': recall_score(y_true, y_pred),
            'F1 Score': f1_score(y_true, y_pred),
            'ROC AUC Score': roc_auc_score(y_true, y_pred_proba)
        }

    # 평가 및 변수 중요도 저장
    for model_name, model in zip(metrics.keys(), [cat_model]):
        metrics[model_name].append(evaluate_model(model, X_val, y_val))

    for model_name, model in zip(['CatBoost'], [cat_model]):
        feature_importances[model_name].append(model.feature_importances_)
        
    # 테스트 데이터 예측 확률 저장
    test_proba['CatBoost'].append(cat_model.predict_proba(X_test)[:, 1])

In [None]:
# 평가 지표 평균 출력
print("===== Stratified K-Fold 평균 성능 =====")
for model_name, model_metrics in metrics.items():
    avg_metrics = {metric: np.mean([fold_metric[metric] for fold_metric in model_metrics]) for metric in model_metrics[0]}
    
    print(f"\n== {model_name} Model ==")
    for metric, value in avg_metrics.items():
        print(f"{metric}: {value:.6f}")
        

In [None]:
# 최종 변수 중요도 평균 계산
df_fi_list = []
for model_name, fi_list in feature_importances.items():
    avg_importance = np.mean(fi_list, axis=0)
    df_fi = pd.DataFrame({
        'Feature': X_train.columns,
        model_name: avg_importance 
    })
    df_fi = df_fi.sort_values(by=model_name, ascending=False).reset_index(drop=True)
    df_fi_list.append(df_fi)
    
df_fi_final = pd.concat(df_fi_list, axis=1)
df_fi_final.to_csv('feature_importances.csv', encoding = 'utf-8-sig', index = False)
df_fi_final

In [None]:
df_train.columns.drop(df_fi_final.Feature)

In [None]:
# catboost 기준 변수 중요도 0.1 미만 변수 제거
df_cat = df_fi_final.iloc[:, :2]
df_selected = df_cat[df_cat['CatBoost'] > 0.05]
selected_features = df_selected['Feature'].tolist()

# 중요도가 높은 피처만 선택하여 새로운 데이터 생성
X = X[selected_features]
X_test = X_test[selected_features]

### Re-modeling

In [None]:
# Stratified K-Fold 설정
n_splits = 5
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=123)

metrics = {model: [] for model in ['CatBoost']}
feature_importances = {model: [] for model in ['CatBoost']}
test_proba = {model: [] for model in ['CatBoost', 'XGBoost']}

for fold, (train_idx, val_idx) in enumerate(skf.split(X, y), 1):
    print(f"===== Fold {fold} =====")

    X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
    y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

    # 모델 정의
    cat_model = CatBoostClassifier(
        iterations=700, learning_rate=0.03, depth=8, l2_leaf_reg=10,
        subsample=0.8, colsample_bylevel=0.8, random_strength=10,
        loss_function='Logloss', eval_metric='AUC', verbose=100, random_seed=123
    )

    # 모델 학습
    for model in [cat_model]:
        model.fit(X_train, y_train)

    # 평가 함수
    def evaluate_model(model, X_val, y_true):
        y_pred = model.predict(X_val)
        y_pred_proba = model.predict_proba(X_val)[:, 1]

        return {
            'Accuracy': accuracy_score(y_true, y_pred),
            'Precision': precision_score(y_true, y_pred),
            'Recall': recall_score(y_true, y_pred),
            'F1 Score': f1_score(y_true, y_pred),
            'ROC AUC Score': roc_auc_score(y_true, y_pred_proba)
        }

    # 평가 및 변수 중요도 저장
    for model_name, model in zip(metrics.keys(), [cat_model]):
        metrics[model_name].append(evaluate_model(model, X_val, y_val))

    for model_name, model in zip(['CatBoost'], [cat_model]):
        feature_importances[model_name].append(model.feature_importances_)
        
    # 테스트 데이터 예측 확률 저장
    test_proba['CatBoost'].append(cat_model.predict_proba(X_test)[:, 1])

In [None]:
# 평가 지표 평균 출력
print("===== Stratified K-Fold 평균 성능 =====")
for model_name, model_metrics in metrics.items():
    avg_metrics = {metric: np.mean([fold_metric[metric] for fold_metric in model_metrics]) for metric in model_metrics[0]}
    
    print(f"\n== {model_name} Model ==")
    for metric, value in avg_metrics.items():
        print(f"{metric}: {value:.6f}")

### Prediction

In [None]:
# Best AUC 기록한 모델의 pred_proba로 선택
pred_proba = np.mean(test_proba['CatBoost'], axis=0)
# pred_proba = np.mean(test_proba['XGBoost'], axis=0)
# pred_proba = np.mean(test_proba['LightGBM'], axis=0)
# pred_proba = np.mean(test_proba['AdaBoost'], axis=0)
# pred_proba = np.mean(test_proba['Ensemble'], axis=0)

### Submission

In [None]:
sample_submission = pd.read_csv(os.getenv('SUBMISSION_DATA_PATH'))
sample_submission.head()

In [None]:
sample_submission['probability'] = pred_proba
# 저장
import datetime 
now = datetime.datetime.now()
save_path = os.path.join(f'./log/submission/{now.strftime("%Y%m%d_%H%M%S")}_eiden.csv')
sample_submission.to_csv(save_path, index=False)
sample_submission.head()

In [None]:
# 확인용
submission = pd.read_csv(save_path)
submission.head()