In [None]:
import numpy as np
import pandas as pd
import xgboost as xgb
import matplotlib.pyplot as plt
import time
from datetime import timedelta
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import StratifiedKFold
import torch


# 학습 곡선 시각화화
def plot_learning_curves(train_scores, eval_scores):
    plt.figure(figsize=(10, 6))
    plt.plot(train_scores, label='Train AUC')
    plt.plot(eval_scores, label='Validation AUC')
    plt.title('Learning Curves')
    plt.xlabel('Iteration')
    plt.ylabel('AUC Score')
    plt.legend()
    plt.grid(True)
    plt.savefig('learning_curves.png')
    plt.close()

# 오버피팅 분석
def analyze_overfitting(train_scores, eval_scores, val_scores, predictions):
    print("\n=== 오버피팅 분석 ===")

    # 검증 점수 안정성
    val_std = np.std(val_scores)
    print(f"검증 점수 평균: {np.mean(val_scores):.4f}")
    print(f"검증 점수 표준편차: {val_std:.4f}")
    print(f"검증 표준편차 위험도: {'높음' if val_std > 0.02 else '낮음'}")

    # 학습-검증 점수 차이
    score_gap = np.mean(train_scores) - np.mean(eval_scores)
    print(f"\n학습-검증 점수 차이: {score_gap:.4f}")
    print(f"점수 차이 위험도: {'높음' if score_gap > 0.05 else '낮음'}")

    # 예측 분포 확인
    extreme_preds = np.sum((predictions < 0.1) | (predictions > 0.9)) / len(predictions)
    print(f"극단적 예측 비율: {extreme_preds:.2%}")
    print(f"극단적 예측 위험도: {'높음' if extreme_preds > 0.3 else '낮음'}")

    # 모델 안정성
    score_range = max(val_scores) - min(val_scores)
    print(f"모델간 성능 범위: {score_range:.4f}")
    print(f"모델 안정성 위험도: {'높음' if score_range > 0.03 else '낮음'}")

    # 종합 위험도 점수 (0~1, 높을수록 위험)
    risk_score = (
        (val_std > 0.02) * 0.25 +
        (score_gap > 0.05) * 0.25 +
        (extreme_preds > 0.3) * 0.25 +
        (score_range > 0.03) * 0.25
    )

    print(f"\n종합 오버피팅 위험도: {risk_score:.2f} (0~1)")

    return {
        'val_std': val_std,
        'score_gap': score_gap,
        'extreme_preds': extreme_preds,
        'score_range': score_range,
        'risk_score': risk_score
    }


# XGBoost 최적 파라미터(optuna)
def get_base_params():
    base_params = {
        'max_depth': 8,
        'learning_rate': 0.01066843580150729,
        'min_child_weight': 3,
        'gamma': 2.4678454032083947,
        'subsample': 0.6764633483838101,
        'colsample_bytree': 0.9607914123554494,
        'reg_alpha': 8.354654668642027,
        'reg_lambda': 8.053982964153542,
        'scale_pos_weight': 1.3632074855869183,
        'random_state': 42,
        'objective': 'binary:logistic',
        'eval_metric': 'auc'
    }

    # GPU 가속 설정
    try:
        if torch.cuda.is_available():
            base_params.update({"tree_method": "hist", "device": "cuda:0"})
        else:
            base_params.update({"tree_method": "hist"})
    except:
        base_params.update({"tree_method": "hist"})
        
    return base_params


# 다양한 시드와 파라미터 변화를 활용한 XGBoost 앙상블 모델
def train_seed_stratified_ensemble(X, y, X_test, n_models=15, n_folds=5):
    # 기본 최적 파라미터 
    base_params = get_base_params()

    # 다양성을 위한 파라미터 변화 범위
    param_ranges = {
        'max_depth': [-1, 0, 1],  # 최적값 근처에서 변화
        'min_child_weight': [-1, 0, 1],
        'gamma': [-0.2, 0, 0.2],  # 작은 변화
        'subsample': [-0.03, 0, 0.03],  # 작은 변화
        'colsample_bytree': [-0.02, 0, 0.02],  # 작은 변화
        'reg_alpha': [-0.5, 0, 0.5],  # 소폭 변화
        'reg_lambda': [-0.5, 0, 0.5],  # 소폭 변화
        'scale_pos_weight': [-0.1, 0, 0.1]  # 소폭 변화
    }

    test_predictions = np.zeros((len(X_test), n_models))
    oof_predictions = np.zeros(len(X))  # Out-of-fold 예측값 저장
    val_scores = []
    best_iterations = []
    train_scores_all = []
    eval_scores_all = []

    print(f"학습 시작 시간: {time.strftime('%Y-%m-%d %H:%M:%S')}")
    start_time = time.time()

    # 각 모델별 파라미터와 시드 설정
    for i in range(n_models):
        # 기본 파라미터 복사
        model_params = base_params.copy()

        # 다양성을 위한 무작위 시드 설정
        seed = (i * 111 + 42) % 10000
        model_params['seed'] = seed
        model_params['random_state'] = seed

        # 파라미터 약간씩 변경하여 다양성 확보
        print(f"\n====== 모델 {i+1}/{n_models} (시드: {seed}) ======")
        print("파라미터 변경:")

        for param, ranges in param_ranges.items():
            # 정수형 파라미터
            if param in ['max_depth', 'min_child_weight']:
                change = np.random.choice(ranges)
                if param in model_params:
                    model_params[param] = max(1, model_params[param] + change)
                    print(f"  {param}: {base_params[param]} -> {model_params[param]}")
            # 실수형 파라미터
            else:
                if param in model_params:
                    change = np.random.uniform(ranges[0], ranges[2])
                    # 반올림하여 보기 좋게 표시
                    original = model_params[param]
                    model_params[param] = max(0.001, model_params[param] + change)
                    print(f"  {param}: {original:.6f} -> {model_params[param]:.6f}")

        # StratifiedKFold 정의
        skf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=seed)
        fold_val_scores = []
        fold_predictions = np.zeros((len(X_test), n_folds))

        # 각 폴드별 학습
        for fold, (train_idx, val_idx) in enumerate(skf.split(X, y)):
            print(f"\nFold {fold + 1}/{n_folds}")

            X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
            y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]

            dtrain = xgb.DMatrix(X_train, label=y_train)
            dval = xgb.DMatrix(X_val, label=y_val)
            dtest = xgb.DMatrix(X_test)

            # 학습 과정 저장을 위한 평가 결과 저장소
            evals_result = {}

            model = xgb.train(
                model_params,
                dtrain,
                num_boost_round=5000,
                evals=[(dtrain, 'train'), (dval, 'eval')],
                early_stopping_rounds=50,
                verbose_eval=100,
                evals_result=evals_result
            )

            # 학습 곡선 데이터 저장 (첫 번째 폴드만)
            if fold == 0:
                train_scores = evals_result['train']['auc'][:model.best_iteration]
                eval_scores = evals_result['eval']['auc'][:model.best_iteration]
                train_scores_all.append(train_scores)
                eval_scores_all.append(eval_scores)
                best_iterations.append(model.best_iteration)

            # Out-of-fold 예측
            val_preds = model.predict(dval, iteration_range=(0, model.best_iteration+1))
            oof_predictions[val_idx] = val_preds

            # 검증 점수 계산
            fold_val_score = roc_auc_score(y_val, val_preds)
            fold_val_scores.append(fold_val_score)
            print(f"Fold {fold + 1} 검증 AUC: {fold_val_score:.4f}")

            # 테스트 예측
            fold_predictions[:, fold] = model.predict(
                dtest, iteration_range=(0, model.best_iteration+1)
            )

        # 폴드별 평균 검증 점수
        mean_val_score = np.mean(fold_val_scores)
        val_scores.append(mean_val_score)
        print(f"\n모델 {i+1} 평균 검증 AUC: {mean_val_score:.4f}")

        # 테스트 예측값 (폴드 평균)
        test_predictions[:, i] = np.mean(fold_predictions, axis=1)

    # 학습 시간 계산
    train_time = time.time() - start_time
    print(f"\n총 학습 시간: {timedelta(seconds=int(train_time))}")

    # 모델별 성능 출력
    print("\n=== 개별 모델 성능 ===")
    for i, score in enumerate(val_scores):
        print(f"모델 {i+1} - 평균 검증 AUC: {score:.4f}")

    # 학습 곡선 분석을 위한 처리
    min_length = min(len(scores) for scores in train_scores_all)
    train_scores_aligned = np.array([scores[:min_length] for scores in train_scores_all])
    eval_scores_aligned = np.array([scores[:min_length] for scores in eval_scores_all])
    
    # 평균 학습 곡선 계산
    avg_train_scores = train_scores_aligned.mean(axis=0)
    avg_eval_scores = eval_scores_aligned.mean(axis=0)

    # 앙상블 방식 계산
    # 1. 단순 평균
    simple_avg_predictions = test_predictions.mean(axis=1)

    # 2. 성능 가중치 기반 평균 (성능이 좋은 모델에 더 높은 가중치)
    normalized_scores = np.array(val_scores) / np.sum(val_scores)
    weighted_predictions = np.zeros(len(X_test))
    for i in range(n_models):
        weighted_predictions += normalized_scores[i] * test_predictions[:, i]

    # 3. 상위 N개 모델만 사용하는 앙상블
    top_n = max(3, n_models // 2)  # 상위 절반 또는 최소 3개
    top_indices = np.argsort(val_scores)[-top_n:]
    top_scores = np.array([val_scores[i] for i in top_indices])
    top_weights = top_scores / np.sum(top_scores)

    top_predictions = np.zeros(len(X_test))
    for i, idx in enumerate(top_indices):
        top_predictions += top_weights[i] * test_predictions[:, idx]

    # 예측 불확실성 분석
    pred_std = test_predictions.std(axis=1)
    high_uncertainty = np.percentile(pred_std, 90)
    uncertainty_ratio = (pred_std > high_uncertainty).mean()

    print(f"\n=== 앙상블 방식 비교 ===")
    print(f"단순 평균 예측 분포: {simple_avg_predictions.mean():.4f} ± {simple_avg_predictions.std():.4f}")
    print(f"가중치 평균 예측 분포: {weighted_predictions.mean():.4f} ± {weighted_predictions.std():.4f}")
    print(f"상위 {top_n}개 모델 평균 예측 분포: {top_predictions.mean():.4f} ± {top_predictions.std():.4f}")

    # 오버피팅 분석 및 시각화
    plot_learning_curves(avg_train_scores, avg_eval_scores)
    overfitting_metrics = analyze_overfitting(avg_train_scores, avg_eval_scores,
                                           val_scores, weighted_predictions)

    # 최종 예측값 선택 (오버피팅 위험도에 따라)
    if overfitting_metrics['risk_score'] < 0.25:
        print("\n가중치 기반 예측 사용 (오버피팅 위험 매우 낮음)")
        final_predictions = weighted_predictions
    elif overfitting_metrics['risk_score'] < 0.5:
        print("\n상위 모델 앙상블 사용 (오버피팅 위험 낮음)")
        final_predictions = top_predictions
    else:
        print("\n단순 평균 예측 사용 (오버피팅 위험 높음)")
        final_predictions = simple_avg_predictions

    return final_predictions, pred_std, test_predictions, val_scores, oof_predictions


# 예측 결과 저장
def save_results(predictions, uncertainties, all_preds, test_ids):
    # 기본 예측 결과 저장
    submission = pd.DataFrame({
        'ID': test_ids,
        'probability': predictions
    })
    submission.to_csv('xgb_diverse_ensemble_submission.csv', index=False)
    print("예측 결과가 'xgb_diverse_ensemble_submission.csv'에 저장되었습니다.")

    # 상세 결과 저장 (불확실성 포함)
    detailed_results = pd.DataFrame({
        'ID': test_ids,
        'predicted_probability': predictions,
        'prediction_uncertainty': uncertainties,
        'high_uncertainty': uncertainties > np.percentile(uncertainties, 90)
    })
    detailed_results.to_csv('xgb_diverse_ensemble_detailed.csv', index=False)
    print("상세 예측 결과가 'xgb_diverse_ensemble_detailed.csv'에 저장되었습니다.")

    # 개별 모델 예측 저장
    model_predictions = pd.DataFrame(all_preds,
                                  columns=[f'model_{i+1}' for i in range(all_preds.shape[1])])
    model_predictions.insert(0, 'ID', test_ids)
    model_predictions.to_csv('xgb_diverse_individual_predictions.csv', index=False)
    print("개별 모델 예측이 'xgb_diverse_individual_predictions.csv'에 저장되었습니다.")


def main():
    # 데이터 로드
    print("데이터 로드 중...")
    try:      
        # 특성과 타겟 분리
        X = train_processed.drop(columns=["임신 성공 여부"])
        y = train_processed["임신 성공 여부"]
        
        print(f"학습 데이터 형태: {X.shape}")
        print(f"테스트 데이터 형태: {test_processed.shape}")
        
        # 앙상블 학습 및 예측
        print("\n앙상블 모델 학습 시작...")
        predictions, uncertainties, all_preds, scores, oof_preds = train_seed_stratified_ensemble(
            X, y, test_processed, n_models=20, n_folds=5
        )
        
        # OOF 점수 계산
        oof_score = roc_auc_score(y, oof_preds)
        print(f"\nOut-of-fold AUC: {oof_score:.4f}")
        
        # 결과 저장
        save_results(predictions, uncertainties, all_preds, test['ID'])
        
    except Exception as e:
        print(f"오류 발생: {e}")


if __name__ == "__main__":
    main()