In [None]:
!pip -q install ipynbname
!pip -q install pandas numpy ipynbname scikit-learn torch

In [None]:
import pandas as pd
import numpy as np
import sqlite3
import shutil
import ipynbname
import datetime
import sys
import os
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import StandardScaler, FunctionTransformer, MultiLabelBinarizer
from sklearn.impute import SimpleImputer

from sklearn.neighbors import NearestNeighbors
# import cudf
# from cuml.neighbors import NearestNeighbors
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

orin_train_path = '../data/train.csv'
train_path = '../data/custom_train_7.csv'
test_path = '../data/custom_test_7.csv'
sample_path = '../data/sample_submission.csv'

# 1. 데이터 로드
origin= pd.read_csv(orin_train_path).drop(columns=['ID'])
train = pd.read_csv(train_path).drop(columns=['ID'])
test = pd.read_csv(test_path).drop(columns=['ID'])


In [None]:
def drop_columns(df):
    cols = [
        '불임 원인 - 여성 요인',  # 고유값 1
        '불임 원인 - 정자 면역학적 요인',  # train, test 모두 '1'인 데이터 1개 >> 신뢰할 수 없음
        '난자 해동 경과일',
    ]
    df = df.drop(cols, axis=1)
    return df

def 특정시술유형(train, test):
    def categorize_procedure(proc):
        tokens = [token.strip() for token in proc.split(",") if token.strip() and not token.strip().isdigit()]
        # 우선순위에 따른 범주화
        if tokens.count("Unknown") >= 1:
            return "Unknown"
        if tokens.count("AH") >= 1:
            return "AH"
        if tokens.count("BLASTOCYST") >= 1:
            return "BLASTOCYST"
        if tokens.count("ICSI") >= 2 or tokens.count("IVF") >= 2:
            return "2ICSI_2IVF"
        if tokens.count("IVF") >= 1 and tokens.count("ICSI") >= 1:
            return "IVF_ICSI"
        if tokens == "ICSI":
            return "ICSI"
        if tokens == "IVF":
            return "IVF"
        return ",".join(tokens) if tokens else None

    for df in [train, test]:
        df['특정 시술 유형'] = df['특정 시술 유형'].str.replace(" / ", ",")
        df['특정 시술 유형'] = df['특정 시술 유형'].str.replace(":", ",")
        df['특정 시술 유형'] = df['특정 시술 유형'].str.replace(" ", "")

    counts = train['특정 시술 유형'].value_counts()
    allowed_categories = counts[counts >= 100].index.tolist()

    # allowed_categories에 속하지 않는 값은 "Unknown"으로 대체
    train.loc[~train['특정 시술 유형'].isin(allowed_categories), '특정 시술 유형'] = "Unknown"
    test.loc[~test['특정 시술 유형'].isin(allowed_categories), '특정 시술 유형'] = "Unknown"

    train['특정 시술 유형'] = train['특정 시술 유형'].apply(categorize_procedure)
    test['특정 시술 유형'] = test['특정 시술 유형'].apply(categorize_procedure)

    train['시술유형_통합'] = train['시술 유형'].astype(str) + '_' + train['특정 시술 유형'].astype(str)
    test['시술유형_통합'] = test['시술 유형'].astype(str) + '_' + test['특정 시술 유형'].astype(str)

    drop_cols = ['시술 유형', '특정 시술 유형']
    train = train.drop(drop_cols, axis=1)
    test = test.drop(drop_cols, axis=1)

    return train, test

def 시술횟수(df_train):
    for col in [col for col in df_train.columns if '횟수' in col]:
        df_train[col] = df_train[col].replace({'6회 이상':'6회'})
        df_train[col] = df_train[col].str[0].astype(int)
    df_train['시술_임신'] = df_train['총 임신 횟수'] - df_train['총 시술 횟수']
    df_train = df_train.drop('총 시술 횟수', axis=1)
    return df_train

def 배란유도유형(df_train, df_test):
    mapping = {
        '기록되지 않은 시행': 1,
        '알 수 없음': 0,
        '세트로타이드 (억제제)': 0,
        '생식선 자극 호르몬': 0,
    }
    df_train['배란 유도 유형'] = df_train['배란 유도 유형'].replace(mapping)
    df_test['배란 유도 유형'] = df_test['배란 유도 유형'].replace(mapping)
    return df_train, df_test

def 난자기증자나이(df_train, df_test):
    mapping = {
        '만20세 이하': 20,
        '만21-25세': 25,
        '만26-30세': 30,
        '만31-35세': 35,
        '알 수 없음': 20,  # 만20세 이하와 동일하게 처리
    }
    df_train['난자 기증자 나이'] = df_train['난자 기증자 나이'].replace(mapping)
    df_test['난자 기증자 나이'] = df_test['난자 기증자 나이'].replace(mapping)
    return df_train, df_test

def 배아생성주요이유(df_train, df_test):
    df_train['배아 생성 주요 이유'] = df_train['배아 생성 주요 이유'].fillna('DI')
    df_test['배아 생성 주요 이유'] = df_test['배아 생성 주요 이유'].fillna('DI')

    df_train['배아 생성 이유 리스트'] = df_train['배아 생성 주요 이유'].apply(lambda x: [reason.strip() for reason in x.split(',')])
    df_test['배아 생성 이유 리스트'] = df_test['배아 생성 주요 이유'].apply(lambda x: [reason.strip() for reason in x.split(',')])

    mlb = MultiLabelBinarizer()
    train_one_hot = pd.DataFrame(
        mlb.fit_transform(df_train['배아 생성 이유 리스트']),
        columns=mlb.classes_,
        index=df_train.index
    )
    train_one_hot.columns = ['배아생성이유_' + col for col in train_one_hot.columns]

    test_one_hot = pd.DataFrame(
        mlb.transform(df_test['배아 생성 이유 리스트']),
        columns=mlb.classes_,
        index=df_test.index
    )
    test_one_hot.columns = ['배아생성이유_' + col for col in test_one_hot.columns]

    df_train = pd.concat([df_train, train_one_hot], axis=1)
    df_test = pd.concat([df_test, test_one_hot], axis=1)

    cols_to_drop = [
        '배아 생성 주요 이유',
        '배아 생성 이유 리스트',
        '배아생성이유_연구용',
        '배아생성이유_DI'
    ]
    df_train = df_train.drop(cols_to_drop, axis=1, errors='ignore')
    df_test = df_test.drop(cols_to_drop, axis=1, errors='ignore')

    cols = ['배아생성이유_기증용',
            '배아생성이유_난자 저장용',
            '배아생성이유_배아 저장용',
            '배아생성이유_현재 시술용']

    df_train[cols] = df_train[cols].div(df_train[cols].sum(axis=1).replace(0, np.nan), axis=0).fillna(0)
    df_test[cols] = df_test[cols].div(df_test[cols].sum(axis=1).replace(0, np.nan), axis=0).fillna(0)

    return df_train, df_test

def 단일배아이식여부(df_train, df_val):
    df_train['단일 배아 이식 여부'] = df_train['단일 배아 이식 여부'].fillna(0)
    df_val['단일 배아 이식 여부'] = df_val['단일 배아 이식 여부'].fillna(0)
    return df_train, df_val


def 기증자정자와혼합된난자수(df_train, df_test):
    df_train["기증자 정자와 혼합된 난자 수"] = df_train["기증자 정자와 혼합된 난자 수"].fillna(2)
    df_test["기증자 정자와 혼합된 난자 수"] = df_test["기증자 정자와 혼합된 난자 수"].fillna(2)
    return df_train, df_test

def type_to_category(train, test, cols):
    train[cols] = train[cols].astype('category')
    test[cols] = test[cols].astype('category')
    return train, test

def impute_nan(train, test):
    cols_to_impute = [
        '임신 시도 또는 마지막 임신 경과 연수', # DI, IVF랑 관련 X
    ]
    imputer = SimpleImputer(strategy='mean')
    train[cols_to_impute] = imputer.fit_transform(train[cols_to_impute])
    test[cols_to_impute] = imputer.transform(test[cols_to_impute])

    cols_to_impute = [
        '난자 채취 경과일',
        '난자 혼합 경과일',
        '배아 이식 경과일',
        '배아 해동 경과일',

        '착상 전 유전 검사 사용 여부',
        'PGD 시술 여부',
        'PGS 시술 여부',

        ### DI only
        '착상 전 유전 진단 사용 여부',
        '총 생성 배아 수',
        '미세주입된 난자 수',
        '미세주입에서 생성된 배아 수',
        '이식된 배아 수',
        '미세주입 배아 이식 수',
        '저장된 배아 수',
        '미세주입 후 저장된 배아 수',
        '해동된 배아 수',
        '해동 난자 수',
        '수집된 신선 난자 수',
        '저장된 신선 난자 수',
        '혼합된 난자 수',
        '파트너 정자와 혼합된 난자 수',
        '기증자 정자와 혼합된 난자 수',
        '동결 배아 사용 여부',
        '신선 배아 사용 여부',
        '기증 배아 사용 여부',
        '대리모 여부',
        ### DI
    ]
    train[cols_to_impute] = train[cols_to_impute].fillna(0)
    test[cols_to_impute] = test[cols_to_impute].fillna(0)

    return train, test

def num_feature_scailing(train, test):
    # cols_to_divide = [
    #     '연간 소득',
    #     '최대 신용한도',
    #     '현재 대출 잔액',
    #     '현재 미상환 신용액',
    #     '월 상환 부채액',
    # ]
    # train[cols_to_divide] = train[cols_to_divide] / 100000
    # test[cols_to_divide] = test[cols_to_divide] / 100000

    cols_to_log = [
        '총 생성 배아 수',
    ]
    log_transformer = FunctionTransformer(np.log1p, validate=True)
    train[cols_to_log] = log_transformer.fit_transform(train[cols_to_log])
    test[cols_to_log] = log_transformer.transform(test[cols_to_log])

    # numeric_cols = train.select_dtypes(include=["number"]).columns.tolist()
    # cat_cols = [col for col in train.columns if pd.api.types.is_categorical_dtype(train[col])]
    # cols_to_scale = [
    #     col for col in numeric_cols
    #     if col not in cat_cols and col != '임신 성공 여부'
    # ]
    # scaler = StandardScaler()
    # train[cols_to_scale] = scaler.fit_transform(train[cols_to_scale])
    # test[cols_to_scale] = scaler.transform(test[cols_to_scale])

    return train, test

def all_process(train, val):
    # 기본 전처리 단계
    train, val = drop_columns(train), drop_columns(val)
    train, val = 특정시술유형(train, val)
    train, val = 시술횟수(train), 시술횟수(val)

    cols_to_encoding = [
        "시술 시기 코드",
        "시술 당시 나이",
        "배란 유도 유형",
        # "클리닉 내 총 시술 횟수",
        # "IVF 시술 횟수",
        # "DI 시술 횟수",
        # "총 임신 횟수",
        # "IVF 임신 횟수",
        # "DI 임신 횟수",
        # "총 출산 횟수",
        # "IVF 출산 횟수",
        # "DI 출산 횟수",
        "난자 출처",
        "정자 출처",
        "난자 기증자 나이",
        "정자 기증자 나이",
        '시술유형_통합',
    ]
    train, val = type_to_category(train, val, cols=cols_to_encoding)

    train, val = 단일배아이식여부(train, val)
    train, val = 배란유도유형(train, val)
    train, val = 배아생성주요이유(train, val)

    train, val = impute_nan(train, val)
    train, val = num_feature_scailing(train, val)

    return train, val

train = pd.read_csv(train_path).drop(columns=['ID'])
test = pd.read_csv(test_path).drop(columns=['ID'])

train, test = all_process(train, test)

print(train.shape, test.shape)

In [None]:
#train, _ = all_process(train, train)

cat_cols = [col for col in train.columns if pd.api.types.is_categorical_dtype(train[col])]
num_cols = train.select_dtypes(include=["number"]).columns.tolist()
num_cols = [
    col for col in num_cols
    if col not in cat_cols and col != '임신 성공 여부'
]
print(f'범주형 변수: \n {cat_cols}')
print(f'수치형 변수: \n {num_cols}, {len(num_cols)}')

In [None]:

## 4. TABR 모델 정의 (PyTorch)

# TABR 모델은 **Transformer 인코더**를 기반으로 표 형태 데이터의 특성 간 상호작용을 학습하는 모델입니다. 각 행(row)은 하나의 샘플에 해당하며, 여러 열(column)이 하나의 "피처 시퀀스"로 간주됩니다. 모델 구조를 간략히 설명하면 다음과 같습니다:

# - **입력 임베딩:** 범주형 변수는 정수 인코딩된 값을 임베딩 벡터로 변환하고, 수치형 변수는 1차원 값을 선형 투영하여 임베딩 차원으로 변환합니다. 결과적으로 모든 특징은 동일한 임베딩 차원 `d_model`의 벡터 표현을 갖게 됩니다. 추가로, 각 **특성(column)에 대한 고유한 임베딩**을 더해줘 모델이 각 벡터의 의미(어떤 컬럼인지)를 구분할 수 있도록 합니다. (이는 Transformer에서 위치 임베딩을 추가하는 것과 유사하며, 여기서는 열 위치/ID 임베딩으로 볼 수 있습니다.)
# - **Transformer 인코더:** 임베딩된 특징 벡터 시퀀스를 입력으로 하여 여러 층의 Multi-Head Self-Attention과 피드포워드 층으로 구성된 Transformer 인코더를 통과시킵니다. Self-Attention 메커니즘을 통해 **특성 간의 복합 관계**를 학습합니다. (`n_heads`는 헤드 수, `n_layers`는 인코더 레이어 수)
# - **출력 및 분류:** Transformer 인코더의 출력을 일렬로 평탄화하여 하나의 긴 벡터로 만들고, 최종적으로 **이진 분류를 위한 출력 노드**에 연결합니다. (이 예에서는 `임신 성공 여부` 예측이므로 sigmoid를 거치기 전의 로지스틱 출력 하나를 생성합니다.)

# 이제 PyTorch `nn.Module`로 TABR 모델을 구현하겠습니다.

# **모델 구현 고려사항:**
# - `cat_dims`: 각 범주형 특성별 임베딩 필요 크기 (고유값 개수 + 1).
# - `num_numeric`: 수치형 특성 개수 (추가된 KNN 특성 포함). 각 수치형 특성마다 하나의 작은 `nn.Linear(1 -> d_model)` 층을 사용합니다.
# - 모든 임베딩/투영 결과 벡터의 차원은 `embed_dim = d_model`로 통일합니다.
# - Feature(컬럼) 임베딩: `self.feature_embeds = nn.Embedding(total_features, embed_dim)`를 통해 전체 특징 수 만큼의 임베딩 벡터를 정의합니다. 입력 시퀀스 순서에 따라 해당 벡터를 더합니다.
# - Transformer 인코더: `nn.TransformerEncoder`를 사용하여 지정한 층 수만큼 반복 적용합니다.
# - 최종 분류 층: `nn.Linear(total_features * embed_dim, 1)`로 flatten된 모든 특징의 정보를 받아 이진 분류 출력 산출.

# python

# TABR 모델 정의
class TABRModel(nn.Module):
    def __init__(self, cat_dims, num_numeric, embed_dim=32, n_heads=4, n_layers=2, dropout=0.1):
        """
        cat_dims: 각 범주형 특성의 임베딩 카테고리 개수 리스트 (고유값 + 1)
        num_numeric: 수치형 특성 개수
        """
        super().__init__()
        # 범주형 특성 임베딩 레이어 생성 (각 범주형 컬럼마다 별도 Embedding)
        self.cat_embeds = nn.ModuleList([
            nn.Embedding(num_categories, embed_dim)
            for num_categories in cat_dims
        ])
        # 수치형 특성 투영 레이어 생성 (각 수치형 컬럼마다 별도 Linear)
        self.num_proj = nn.ModuleList([
            \
            nn.Linear(1, embed_dim)
            for _ in range(num_numeric)
        ])

        # 전체 특징 개수 (범주형 + 수치형)
        self.total_features = len(cat_dims) + num_numeric
        # 특징 위치 임베딩 (특정 열(column) 전용 임베딩)
        self.feature_embeds = nn.Embedding(self.total_features, embed_dim)
        # Transformer 인코더 설정
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim, nhead=n_heads, dim_feedforward=embed_dim*4, dropout=dropout
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)
        # 최종 분류를 위한 출력 층
        self.classifier = nn.Linear(self.total_features * embed_dim, 1)

    def forward(self, x_cat, x_num):
        """
        x_cat: LongTensor [batch_size, n_cat_features] - 범주형 입력 (정수 코드)
        x_num: FloatTensor [batch_size, n_num_features] - 수치형 입력 (실수)
        """
        batch_size = x_cat.size(0)
        # 1. 범주형 임베딩: 각 범주형 컬럼별 임베딩 벡터
        cat_tokens = []  # 각 토큰: [batch, embed_dim]
        for i, embed_layer in enumerate(self.cat_embeds):
            cat_tok = embed_layer(x_cat[:, i])    # i번째 범주형 컬럼 -> 임베딩
            cat_tokens.append(cat_tok)
        # 2. 수치형 투영: 각 수치형 컬럼별 Linear 투영
        num_tokens = []
        for j, linear_layer in enumerate(self.num_proj):
            # x_num의 j번째 컬럼 (batch_size,) -> (batch_size,1)로 shape 변환 후 Linear
            num_val = x_num[:, j].unsqueeze(1)
            num_tok = linear_layer(num_val)
            num_tokens.append(num_tok)
        # 3. 모든 특징 토큰 결합 (범주형 + 수치형 순서 연결)
        tokens = cat_tokens + num_tokens   # 리스트 합치기
        # tokens 리스트 길이는 total_features, 각 원소 shape [batch_size, embed_dim]
        # 리스트를 스택하여 shape: [batch_size, total_features, embed_dim]
        x = torch.stack(tokens, dim=1)
        # 4. 특징 위치 임베딩 추가
        seq_indices = torch.arange(self.total_features, device=x.device)  # [0,1,...,total_features-1]
        seq_emb = self.feature_embeds(seq_indices)  # [total_features, embed_dim]
        # 배치 차원으로 broadcast 위해 unsqueeze 후 더하기
        x = x + seq_emb.unsqueeze(0)  # [1, seq_len, embed_dim] -> broadcasting add
        # 5. Transformer 인코더 적용
        # PyTorch Transformer는 입력 shape: [seq_len, batch, embed_dim]을 기대하므로 전치(transpose)
        x = x.permute(1, 0, 2)  # [seq_len, batch_size, embed_dim]
        x = self.transformer(x)  # 인코더 통과
        x = x.permute(1, 0, 2)   # [batch_size, seq_len, embed_dim]로 복귀
        # 6. 토큰별 출력 평탄화 (flatten)
        x = x.reshape(batch_size, -1)  # [batch_size, seq_len*embed_dim]
        # 7. 최종 이진 분류 출력
        out = self.classifier(x)  # [batch_size, 1] 로지스틱 출력 (시그모이드 전)
        return out

In [None]:
n_splits = 5
seed_lst = [333]

for seed in seed_lst:
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=seed)
    train = pd.read_csv(train_path).drop(columns=['ID'])
    test = pd.read_csv(test_path).drop(columns=['ID'])

    fold_roc_auc_scores = []

    for fold, (train_idx, val_idx) in enumerate(skf.split(train, train['임신 성공 여부'])):
        print(f"\n----- [{seed}, 폴드 {fold}] -----")

        # 데이터 분할 및 전처리
        train_df = train.iloc[train_idx].reset_index(drop=True)
        val_df = train.iloc[val_idx].reset_index(drop=True)

        train_proc, val_proc = all_process(train_df.copy(), val_df.copy())

        y_train = train_proc['임신 성공 여부'].copy()
        y_val = val_proc['임신 성공 여부'].copy()
        X_train = train_proc.drop('임신 성공 여부', axis=1)
        X_val = val_proc.drop('임신 성공 여부', axis=1)

        # 범주형 정수 인코딩
        cat_dims = []
        for col in cat_cols:
            X_train[col] = X_train[col].astype('category')
            train_categories = X_train[col].cat.categories
            X_val[col] = pd.Categorical(X_val[col], categories=train_categories)
            X_train[col] = X_train[col].cat.codes.astype(int)
            X_val[col] = X_val[col].cat.codes.astype(int)
            cat_dims.append(len(train_categories) + 1)

        # 수치형 표준화
        scaler = StandardScaler()
        X_train[num_cols] = scaler.fit_transform(X_train[num_cols])
        X_val[num_cols] = scaler.transform(X_val[num_cols])

        # KNN 이웃 평균
        m = 96
        knn_model = NearestNeighbors(n_neighbors=m, algorithm='auto')
        knn_model.fit(X_train.values)

        neigh_idx_train = knn_model.kneighbors(X_train.values, return_distance=False)
        neigh_idx_val = knn_model.kneighbors(X_val.values, return_distance=False)

        neigh_idx_train_no_self = []
        for i, neighbors in enumerate(neigh_idx_train):
            neigh_idx = [n for n in neighbors if n != i][:m]
            neigh_idx_train_no_self.append(neigh_idx)
        neigh_idx_train_no_self = np.array(neigh_idx_train_no_self)

        y_train_array = y_train.to_numpy()
        knn_feat_train = y_train_array[neigh_idx_train_no_self].mean(axis=1)
        knn_feat_val = y_train_array[neigh_idx_val].mean(axis=1)
        X_train['knn_target_mean'] = knn_feat_train
        X_val['knn_target_mean'] = knn_feat_val

        # KNN 평균 표준화
        num_cols_with_knn = num_cols + ['knn_target_mean']
        scaler_knn = StandardScaler()
        X_train[['knn_target_mean']] = scaler_knn.fit_transform(X_train[['knn_target_mean']])
        X_val[['knn_target_mean']] = scaler_knn.transform(X_val[['knn_target_mean']])

        # 텐서 변환
        X_train_cat = torch.tensor(X_train[cat_cols].values, dtype=torch.long)
        X_train_num = torch.tensor(X_train[num_cols_with_knn].values, dtype=torch.float32)
        y_train_tensor = torch.tensor(y_train.values, dtype=torch.float32)

        X_val_cat = torch.tensor(X_val[cat_cols].values, dtype=torch.long)
        X_val_num = torch.tensor(X_val[num_cols_with_knn].values, dtype=torch.float32)
        y_val_tensor = torch.tensor(y_val.values, dtype=torch.float32)

        # DataLoader
        batch_size = 4096
        train_dataset = TensorDataset(X_train_cat, X_train_num, y_train_tensor)
        val_dataset = TensorDataset(X_val_cat, X_val_num, y_val_tensor)
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

        # 모델 정의 및 학습
        model = TABRModel(cat_dims=cat_dims, num_numeric=len(num_cols_with_knn), embed_dim=32, n_heads=4, n_layers=2, dropout=0.1)
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        model.to(device)

        optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
        criterion = nn.BCEWithLogitsLoss()

        early_stopping_patience = 3
        patience_counter = 0
        best_loss = float('inf')
        best_epoch = -1

        num_epochs = 100
        for epoch in range(1, num_epochs + 1):
            model.train()
            running_loss = 0.0
            for batch in train_loader:
                cat_batch, num_batch, labels = batch
                cat_batch = cat_batch.to(device)
                num_batch = num_batch.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()
                outputs = model(cat_batch, num_batch).squeeze(1)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                running_loss += loss.item() * labels.size(0)

            avg_loss = running_loss / len(train_dataset)
            print(f"Fold {fold} - Epoch {epoch:02d}: Training Loss = {avg_loss:.4f}")

            # 검증
            model.eval()
            val_loss_total = 0.0
            with torch.no_grad():
                for batch in val_loader:
                    cat_batch, num_batch, labels = batch
                    cat_batch = cat_batch.to(device)
                    num_batch = num_batch.to(device)
                    labels = labels.to(device)
                    outputs = model(cat_batch, num_batch).squeeze(1)
                    loss = criterion(outputs, labels)
                    val_loss_total += loss.item() * labels.size(0)

            avg_val_loss = val_loss_total / len(val_dataset)
            print(f"Fold {fold} - Epoch {epoch:02d}: Validation Loss = {avg_val_loss:.4f}")

            if avg_val_loss < best_loss:
                best_loss = avg_val_loss
                best_epoch = epoch
                torch.save({
                    'model_state_dict': model.state_dict(),
                    'knn_model': knn_model,
                    'scaler_knn': scaler_knn,
                }, f'data seed: 7, best_model_fold{fold}.pth')
                patience_counter = 0
            else:
                patience_counter += 1

            if patience_counter >= early_stopping_patience:
                print(f"Early stopping triggered at epoch {epoch}. No improvement for {early_stopping_patience} consecutive epochs.")
                break

        print(f"Fold {fold}: Best Validation Loss = {best_loss:.4f} at Epoch {best_epoch}")
        fold_roc_auc_scores.append(-best_loss)

    # fold 루프 끝나고 평균 출력
    valid_losses = [-v for v in fold_roc_auc_scores if v != float('inf')]
    if valid_losses:
        mean_val_loss = np.mean(valid_losses)
        print(f"\nAverage Validation Loss across folds: {mean_val_loss:.4f}")
    else:
        print("No valid Validation Loss values were computed across folds.")


In [None]:
import pandas as pd
import torch
from torch.utils.data import TensorDataset, DataLoader
from sklearn.preprocessing import StandardScaler
import numpy as np

test_pred=[]

# 0. 디바이스 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
for fold_num in range(5):
    # 1. 저장된 모델과 KNN 관련 객체 먼저 로드
    checkpoint = torch.load(f'data seed: 7, best_model_fold{fold_num}.pth', map_location=device, weights_only=False)
    knn_model = checkpoint['knn_model']
    scaler_knn = checkpoint['scaler_knn']

    # 2. 데이터 로드 및 전처리
    test = pd.read_csv(test_path).drop(columns=['ID'])
    train = pd.read_csv(train_path).drop(columns=['ID'])
    train_proc, test_proc = all_process(train.copy(), test.copy())  # 전처리
    X_test = test_proc.copy()
    y_train_array = train_proc['임신 성공 여부'].to_numpy()

    # 3. 범주형 정수 인코딩 (train 기준으로)
    cat_dims = []
    for col in cat_cols:
        X_test[col] = X_test[col].astype('category')
        train_cat = train_proc[col].astype('category')
        train_categories = train_cat.cat.categories
        X_test[col] = pd.Categorical(X_test[col], categories=train_categories)
        test_codes = X_test[col].cat.codes.replace(-1, len(train_categories))  # unseen → 마지막 인덱스
        X_test[col] = test_codes.astype(int)
        cat_dims.append(len(train_categories) + 1)

    # 4. 수치형 정규화 (train 기준)
    scaler = StandardScaler()
    X_test[num_cols] = scaler.fit(train_proc[num_cols]).transform(X_test[num_cols])

    # 5. KNN 기반 특성 생성
    neigh_idx_test = knn_model.kneighbors(X_test.values, n_neighbors=96, return_distance=False)
    knn_feat_test = y_train_array[neigh_idx_test].mean(axis=1)
    X_test['knn_target_mean'] = knn_feat_test
    X_test[['knn_target_mean']] = scaler_knn.transform(X_test[['knn_target_mean']])

    # 6. PyTorch 텐서 변환
    X_test_cat = torch.tensor(X_test[cat_cols].values, dtype=torch.long)
    X_test_num = torch.tensor(X_test[num_cols + ['knn_target_mean']].values, dtype=torch.float32)

    # 7. DataLoader 생성
    test_dataset = TensorDataset(X_test_cat, X_test_num)
    test_loader = DataLoader(test_dataset, batch_size=1024, shuffle=False)

    # 8. 모델 정의 및 파라미터 로드
    model = TABRModel(
        cat_dims=cat_dims,
        num_numeric=len(num_cols + ['knn_target_mean']),
        embed_dim=32,
        n_heads=4,
        n_layers=2,
        dropout=0.1
    )
    model.load_state_dict(checkpoint['model_state_dict'])
    model.to(device)
    model.eval()

    # 9. 예측 수행
    all_preds = []
    with torch.no_grad():
        for cat_batch, num_batch in test_loader:
            cat_batch = cat_batch.to(device)
            num_batch = num_batch.to(device)
            outputs = model(cat_batch, num_batch).squeeze(1)
            preds = torch.sigmoid(outputs)
            all_preds.extend(preds.cpu().numpy())


    test_pred.append(all_preds)
    print(f"{fold_num} is ended")

last_pred=np.mean(test_pred, axis=0)

#data seed ( 1 / 7 )
train, test = train_test_split(origin, test_size=0.2, random_state=7, stratify=origin['임신 성공 여부'])
y_test=test['임신 성공 여부'].copy()
auc = roc_auc_score(y_test, last_pred)
print(f"fold: {fold_num}, AUC: {auc:.6f}")
