In [None]:
!pip install pandas numpy scikit-learn xgboost tqdm

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from sklearn.metrics import roc_auc_score, accuracy_score
from tqdm import tqdm

In [None]:
# AWS S3에서 데이터 load
tqdm.pandas()
url = "https://snu-bigdata-fintech-ai.s3.ap-northeast-2.amazonaws.com/data/interim/preprocessed_data_ver.2.3.(IRR%2BLOG%2BBINARY).csv"
df = pd.read_csv(url)

In [None]:
# 모든 열 다 보기
pd.set_option('display.max_columns', None)

# 모든 행 다 보기
pd.set_option('display.max_rows', None)

# 열 너비 넉넉하게
pd.set_option('display.width', None)

# 열 안의 값 자르지 않게
pd.set_option('display.max_colwidth', None)

In [None]:
df.head()

In [None]:
# 독립변수에서 제외할 변수 목록
drop_cols = [
    'term', 'last_pymnt_d', 'installment', 'funded_amnt',
    'recoveries', 'collection_recovery_fee', 'default', 'issue_d'
]

X = df.drop(columns=drop_cols)
y = df['default']

In [None]:
!pip install fredapi

In [None]:
from fredapi import Fred

# FRED API 연결
fred = Fred(api_key="f9ef939a9a1eeff85d31ed2c984507d9")  # 🔑 실제 API 키로 대체해야 함

# 미국 3년 만기 국채 수익률(GS3) 시계열 불러오기
series = fred.get_series('GS3')  # pandas.Series 형식으로 반환됨


print(series.head())

In [None]:
def get_nearest_rate(issue_date, rate_series):
    if pd.isnull(issue_date):
        return np.nan
    try:
        idx = rate_series.index.get_indexer([issue_date], method='nearest')[0]
        return rate_series.iloc[idx] / 100  # % 단위 → 소수로 변환
    except Exception as e:
        print(f"Error: {issue_date} ▶ {e}")
        return np.nan


df['risk_free_rate'] = df['issue_d'].apply(lambda x: get_nearest_rate(x, series))

In [None]:
print(df[['issue_d', 'risk_free_rate', 'last_pymnt_d']].head())

In [None]:
from dateutil.relativedelta import relativedelta

def create_cash_flow_from_dates(row):
    try:
        term = int(row['term'])
        default = int(row['default'])
        issue_d = pd.to_datetime(row['issue_d'])  # 이미 datetime 형식
        last_pymnt_d = pd.to_datetime(row['last_pymnt_d'])  # 이건 문자열일 수도 있으니 변환
        installment = float(row['installment'])
        funded_amnt = float(row['funded_amnt'])
        recoveries = float(row['recoveries'])
        collection_fee = float(row['collection_recovery_fee'])

        # 첫 현금흐름: 대출 실행
        cash_flow = [-funded_amnt]

        if pd.isnull(issue_d) or pd.isnull(last_pymnt_d):
            return np.nan

        # 몇 회차까지 납입했는지 계산
        delta = relativedelta(last_pymnt_d, issue_d)
        last_pymnt_num = delta.years * 12 + delta.months

        for month in range(1, term + 1):
            if default == 1:
                if month <= last_pymnt_num:
                    cash_flow.append(installment)
                elif month == last_pymnt_num + 1:
                    cash_flow.append(recoveries - collection_fee)
                else:
                    cash_flow.append(0)
            else:
                cash_flow.append(installment)

        return cash_flow

    except Exception as e:
        print(f"[오류] index={row.name}, error={e}")
        return np.nan


In [None]:
df['cash_flow'] = df.apply(create_cash_flow_from_dates, axis=1)

In [None]:
!pip install numpy-financial

In [None]:
import numpy_financial as npf

def get_irr(cash_flow):
    if not isinstance(cash_flow, list) or len(cash_flow) == 0:
        return np.nan
    irr_monthly = npf.irr(cash_flow)
    if irr_monthly is None or np.isnan(irr_monthly):
        return np.nan
    return (1 + irr_monthly) ** 12 - 1  # 연환산

In [None]:
df['irr'] = df['cash_flow'].apply(get_irr)

In [None]:
df['irr'] = df['irr'].fillna(df['risk_free_rate'])
df.to_csv('../../data/interim/preprocessed_dat_ver.3.0.csv', index=False)

In [None]:
# 결과 저장용 리스트
best_thresholds = []
validation_sharpes = []
test_sharpes = []
test_approval_rates = []
test_irr_means = []
test_irr_positive_rates = []


# Sharpe 계산 함수
def calculate_sharpe(returns, risk_free_rates):
    excess = returns - risk_free_rates
    if excess.std(ddof=1) == 0:
        return -np.inf
    return excess.mean() / excess.std(ddof=1)

In [None]:
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split, RandomizedSearchCV
import numpy as np

# 하이퍼파라미터 범위 지정
param_dist = {
    'n_estimators': [100, 200, 300],
    'max_depth': [3, 5, 7, 10],
    'learning_rate': [0.01, 0.05, 0.1],
    'subsample': [0.6, 0.8, 1.0],
    'colsample_bytree': [0.6, 0.8, 1.0],
    'gamma': [0, 1, 5],
    'min_child_weight': [1, 3, 5],
    'reg_alpha': [0, 0.1, 1],
    'reg_lambda': [1, 5, 10]
}

# 튜닝용 고정된 데이터셋으로 하이퍼파라미터 탐색 1회 수행
X_tune, _, y_tune, _ = train_test_split(X, y, test_size=0.3, stratify=y)

model_base = XGBClassifier(use_label_encoder=False, eval_metric='logloss')
search = RandomizedSearchCV(
    model_base, param_distributions=param_dist, n_iter=9,
    scoring='roc_auc', cv=2, n_jobs=1
)
search.fit(X_tune, y_tune)

best_params = search.best_params_
print("✅ Best hyperparameters (from tuning):", best_params)

In [None]:
# 결과 저장용 리스트
best_models = []
best_thresholds = []
validation_sharpes = []
test_sharpes = []
test_approval_rates = []
test_irr_means = []
test_irr_positive_rates = []

# 해당 best 파라미터를 기반으로 100번 반복 학습 및 평가
for i in tqdm(range(100)):
    # Train-test split
    X_temp, X_test, y_temp, y_test = train_test_split(
        X, y, test_size=0.2, random_state=i, stratify=y
    )
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp, test_size=0.25, random_state=i, stratify=y_temp
    )

    # best 하이퍼파라미터로 모델 생성 및 학습
    model = XGBClassifier(**best_params, eval_metric='logloss')
    model.fit(X_train, y_train)

    # 검증 세트 예측 및 threshold 탐색
    y_pred_proba = model.predict_proba(X_val)[:, 1]
    thresholds = np.arange(0.0, 1.0, 0.05)

    best_sharpe = -np.inf
    best_threshold = None
    val_indices = X_val.index
    df_val = df.loc[val_indices]

    for threshold in thresholds:
        approved_mask = y_pred_proba <= threshold
        denied_mask = ~approved_mask

        selected = df_val.copy()
        selected.loc[approved_mask, 'irr_adj'] = selected.loc[approved_mask, 'irr']
        selected.loc[denied_mask, 'irr_adj'] = selected.loc[denied_mask, 'risk_free_rate']

        returns = selected['irr_adj']
        risk_free = selected['risk_free_rate']
        valid = returns.notnull() & risk_free.notnull()

        if valid.sum() < 2:
            continue

        sharpe = calculate_sharpe(returns[valid], risk_free[valid])

        if sharpe > best_sharpe:
            best_sharpe = sharpe
            best_threshold = threshold

    best_models.append(model)
    best_thresholds.append(best_threshold)
    validation_sharpes.append(best_sharpe)

    # Test 데이터 평가
    y_test_proba = model.predict_proba(X_test)[:, 1]
    test_approved_mask = y_test_proba <= best_threshold
    df_test = df.loc[X_test.index]
    test_selected = df_test[test_approved_mask]

    returns_test = test_selected['irr']
    risk_free_test = test_selected['risk_free_rate']
    valid = returns_test.notnull() & risk_free_test.notnull()

    returns_test = returns_test[valid]
    risk_free_test = risk_free_test[valid]

    sharpe_test = calculate_sharpe(returns_test, risk_free_test)
    test_sharpes.append(sharpe_test)
    test_approval_rates.append(len(returns_test) / len(df_test))
    test_irr_means.append(returns_test.mean())
    test_irr_positive_rates.append((returns_test > 0).mean())

# 최종 결과 출력
best_idx = np.argmax(test_sharpes)
print("✅ Best model index:", best_idx)
print("✅ Best validation Sharpe ratio:", validation_sharpes[best_idx])
print("✅ Best test Sharpe ratio:", test_sharpes[best_idx])
print("✅ Best approval rate:", test_approval_rates[best_idx])
print("✅ Mean IRR:", test_irr_means[best_idx])
print("✅ Positive IRR ratio:", test_irr_positive_rates[best_idx])
print("✅ Best threshold:", best_thresholds[best_idx])
print("✅ Best model params:", best_models[best_idx].get_params())


In [None]:
print("Best thresholds from 2 runs:")
print(best_thresholds)

print("\nValidation Sharpe Ratios from 2 runs:")
print(validation_sharpes)

print("\nTest Sharpe Ratios from 2 runs:")
print(test_sharpes)

print("\nTest Approval Rates:")
print(test_approval_rates)

print("\nTest IRR Means:")
print(test_irr_means)

print("\nTest IRR Positive Rates:")
print(test_irr_positive_rates)

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc

# 최적 모델
best_model = best_models[best_idx]

# 검증 데이터에서 예측 확률
y_val_proba = best_model.predict_proba(X_val)[:, 1]

# --------------------------
# 1. ROC Curve 시각화
# --------------------------
fpr, tpr, roc_thresholds = roc_curve(y_val, y_val_proba)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(7, 5))
plt.plot(fpr, tpr, label=f"AUC = {roc_auc:.4f}", color='blue')
plt.plot([0, 1], [0, 1], linestyle='--', color='gray')
plt.title("✅ ROC Curve (Validation Set)")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

# --------------------------
# 2. Threshold별 Sharpe Ratio
# --------------------------
thresholds = np.arange(0.0, 1.0, 0.05)
sharpe_ratios = []

for threshold in thresholds:
    approved_mask = y_val_proba <= threshold
    denied_mask = ~approved_mask

    selected = df_val.copy()
    selected.loc[approved_mask, 'irr_adj'] = selected.loc[approved_mask, 'irr']
    selected.loc[denied_mask, 'irr_adj'] = selected.loc[denied_mask, 'risk_free_rate']

    returns = selected['irr_adj']
    risk_free = selected['risk_free_rate']
    valid = returns.notnull() & risk_free.notnull()

    if valid.sum() < 2:
        sharpe_ratios.append(np.nan)
    else:
        sharpe = calculate_sharpe(returns[valid], risk_free[valid])
        sharpe_ratios.append(sharpe)

plt.figure(figsize=(7, 5))
plt.plot(thresholds, sharpe_ratios, marker='o', color='green')
plt.title("✅ Sharpe Ratio by Threshold (Validation Set)")
plt.xlabel("Threshold")
plt.ylabel("Sharpe Ratio")
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
# 🔍 IRR 평균 및 Positive 비율 비교
plt.figure(figsize=(7, 5))
plt.scatter(test_approval_rates, test_sharpes, color='purple')
plt.title("🎯 Approval Rate vs Sharpe Ratio")
plt.xlabel("Approval Rate")
plt.ylabel("Test Sharpe Ratio")
plt.grid(True)
plt.show()