In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.impute import KNNImputer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import StratifiedKFold, cross_val_score
from imblearn.over_sampling import SMOTE
from sklearn.metrics import roc_curve, auc
from sklearn.feature_selection import SelectFromModel
import warnings
import optuna

warnings.filterwarnings('ignore')

# ------------------ 단계 1: 데이터 로드 및 전처리 ------------------
train_df = pd.read_csv("train.csv")
test_df = pd.read_csv("test.csv")
test_uid = test_df[["UID"]]

train_df.drop(columns=["UID"], inplace=True)
test_df.drop(columns=["UID"], inplace=True)

X = train_df.drop(columns=["채무 불이행 여부"])
y = train_df["채무 불이행 여부"]

# 현재 직장 근속 연수 변환
def convert_years(val):
    if val == '1년 미만':
        return 0
    elif val == '10년 이상':
        return 10
    else:
        return int(val.replace('년', ''))

X['현재 직장 근속 연수'] = X['현재 직장 근속 연수'].apply(convert_years)
test_df['현재 직장 근속 연수'] = test_df['현재 직장 근속 연수'].apply(convert_years)

# 로그 변환
log_columns = ["현재 미상환 신용액", "월 상환 부채액", "현재 대출 잔액", "연간 소득"]
for col in log_columns:
    X[col] = np.log1p(X[col])
    test_df[col] = np.log1p(test_df[col])

# 연체 없음 변수
X["연체 없음"] = (X["마지막 연체 이후 경과 개월 수"] == 0).astype(int)
test_df["연체 없음"] = (test_df["마지막 연체 이후 경과 개월 수"] == 0).astype(int)

# ------------------ 단계 2: 이상치 처리 (Winsorizing) ------------------
def winsorize(data, upper_percentile=0.99, lower_percentile=0.01):
    upper_bound = data.quantile(upper_percentile)
    lower_bound = data.quantile(lower_percentile)
    return np.clip(data, lower_bound, upper_bound)

X['연간 소득'] = winsorize(X['연간 소득'])
X['현재 대출 잔액'] = winsorize(X['현재 대출 잔액'])
X['현재 미상환 신용액'] = winsorize(X['현재 미상환 신용액'])
X['월 상환 부채액'] = winsorize(X['월 상환 부채액'])
X['신용 거래 연수'] = winsorize(X['신용 거래 연수'])

test_df['연간 소득'] = winsorize(test_df['연간 소득'])
test_df['현재 대출 잔액'] = winsorize(test_df['현재 대출 잔액'])
test_df['현재 미상환 신용액'] = winsorize(test_df['현재 미상환 신용액'])
test_df['월 상환 부채액'] = winsorize(test_df['월 상환 부채액'])
test_df['신용 거래 연수'] = winsorize(test_df['신용 거래 연수'])

# ------------------ 단계 3: 변수 변환 ------------------

# 신용 점수 범주형 변환
def categorize_credit_score(score):
    if score < 600:
        return '저신용'
    elif score < 700:
        return '중신용'
    else:
        return '고신용'

X['신용 점수 범주'] = X['신용 점수'].apply(categorize_credit_score)
test_df['신용 점수 범주'] = test_df['신용 점수'].apply(categorize_credit_score)

# 체납, 신용 문제, 파산 이진 변수 생성
X['체납 경험'] = (X['체납 세금 압류 횟수'] > 0).astype(int)
X['신용 문제 경험'] = (X['신용 문제 발생 횟수'] > 0).astype(int)
X['개인 파산 경험'] = (X['개인 파산 횟수'] > 0).astype(int)

test_df['체납 경험'] = (test_df['체납 세금 압류 횟수'] > 0).astype(int)
test_df['신용 문제 경험'] = (test_df['신용 문제 발생 횟수'] > 0).astype(int)
test_df['개인 파산 경험'] = (test_df['개인 파산 횟수'] > 0).astype(int)

# 기존 수치형 변수 제거
X.drop(columns=['신용 점수', '체납 세금 압류 횟수', '신용 문제 발생 횟수', '개인 파산 횟수'], inplace=True)
test_df.drop(columns=['신용 점수', '체납 세금 압류 횟수', '신용 문제 발생 횟수', '개인 파산 횟수'], inplace=True)

# ------------------ 단계 4: 파생 변수 생성 ------------------

X['신용 문제 통합'] = X['체납 경험'] + X['신용 문제 경험'] + X['개인 파산 경험']
X['소득 대비 부채 비율'] = X['월 상환 부채액'] / X['연간 소득']
X['대출 잔액 대비 상환액 비율'] = X['월 상환 부채액'] / X['현재 대출 잔액']

test_df['신용 문제 통합'] = test_df['체납 경험'] + test_df['신용 문제 경험'] + test_df['개인 파산 경험']
test_df['소득 대비 부채 비율'] = test_df['월 상환 부채액'] / test_df['연간 소득']
test_df['대출 잔액 대비 상환액 비율'] = test_df['월 상환 부채액'] / test_df['현재 대출 잔액']

# ------------------ 단계 5: 범주형 변수 처리 및 원-핫 인코딩 ------------------

# 주거 형태 통합
X['주거 형태'].replace({'주택 담보 대출 (비거주 중)': '기타'}, inplace=True)
test_df['주거 형태'].replace({'주택 담보 대출 (비거주 중)': '기타'}, inplace=True)

# 대출 목적 통합
low_frequency_loan_purposes = ['기타', '소규모 사업 자금', '휴가 비용', '주택 구매', '이사 비용']
X['대출 목적'].replace(low_frequency_loan_purposes, '기타', inplace=True)
test_df['대출 목적'].replace(low_frequency_loan_purposes, '기타', inplace=True)

# 원-핫 인코딩
X = pd.get_dummies(X, columns=['주거 형태', '대출 목적', '대출 상환 기간', '신용 점수 범주'], drop_first=True)
test_df = pd.get_dummies(test_df, columns=['주거 형태', '대출 목적', '대출 상환 기간', '신용 점수 범주'], drop_first=True)

# ------------------ 단계 6: 결측치 처리(KNNImputer) ------------------
imputer = KNNImputer(n_neighbors=5)
X_imputed = imputer.fit_transform(X)
test_imputed = imputer.transform(test_df)

# ------------------ 단계 7: PolynomialFeatures (제거) ------------------
# poly = PolynomialFeatures(degree=1, interaction_only=True, include_bias=False)
# X_poly = poly.fit_transform(X_imputed)
# test_poly = poly.transform(test_imputed)

# ------------------ 단계 8: SMOTE (sampling_strategy 변경) ------------------
smote = SMOTE(random_state=42, sampling_strategy=0.8)  # sampling_strategy 변경
X_resampled, y_resampled = smote.fit_resample(X_imputed, y) # X_imputed

# ------------------ 단계 9: Feature Selection (SelectFromModel 사용) ------------------

# SelectFromModel: Logistic Regression의 feature importance 기반
#   threshold: feature importance가 이 값 이상인 feature만 선택
#   'mean', 'median', 0.5 * 'mean' 등 사용 가능
logistic_selector = LogisticRegression(penalty='l1', solver='liblinear', random_state=42, class_weight='balanced')
selector = SelectFromModel(logistic_selector, threshold='median') # threshold 설정
X_selected = selector.fit_transform(X_resampled, y_resampled)
test_selected = selector.transform(test_imputed) # test_imputed

# ------------------ 단계 10: 모델 구성 및 튜닝 (Optuna + Logistic Regression) ------------------

def objective(trial):
    penalty = trial.suggest_categorical('penalty', ['l1', 'l2', 'elasticnet'])

    if penalty == 'l1':
        solver = 'liblinear'
        C = trial.suggest_float('C', 1e-4, 1e2, log=True)
        l1_ratio = None
    elif penalty == 'l2':
        solver = trial.suggest_categorical('solver', ['lbfgs', 'newton-cg', 'sag', 'liblinear'])
        C = trial.suggest_float('C', 1e-4, 1e2, log=True)
        l1_ratio = None
    else:
        solver = 'saga'
        C = trial.suggest_float('C', 1e-4, 1e2, log=True)
        l1_ratio = trial.suggest_float('l1_ratio', 0, 1)

    params = {
        'C': C,
        'penalty': penalty,
        'solver': solver,
        'class_weight': 'balanced',
        'random_state': 42,
        'l1_ratio': l1_ratio
    }

    model = LogisticRegression(**params)
    cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)  # n_splits 조정
    scores = cross_val_score(model, X_selected, y_resampled, cv=cv, scoring='roc_auc', n_jobs=1)
    return scores.mean()

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=20)  # n_trials 조정

print("Best Parameters:", study.best_params)
print("Best AUC:", study.best_value)

best_params = study.best_params.copy()
if best_params['penalty'] == 'l1':
    best_params['solver'] = 'liblinear'
elif best_params['penalty'] == 'elasticnet':
    best_params['solver'] = 'saga'

logreg_pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(**best_params, random_state=42))
])
logreg_pipeline.fit(X_selected, y_resampled)

y_pred_proba = logreg_pipeline.predict_proba(X_selected)[:, 1]
fpr, tpr, thresholds = roc_curve(y_resampled, y_pred_proba)
roc_auc = auc(fpr, tpr)
optimal_idx = np.argmax(tpr - fpr)
optimal_threshold = thresholds[optimal_idx]
print("Optimal Threshold:", optimal_threshold)

# ------------------ 단계 11: 테스트 데이터 예측 및 제출 파일 생성 ------------------
test_pred_proba = logreg_pipeline.predict_proba(test_selected)[:, 1]
test_pred = (test_pred_proba >= optimal_threshold).astype(int)

submission = pd.DataFrame({
    'UID': test_uid.values.ravel(),
    '채무 불이행 확률': test_pred_proba
})
submission.to_csv('submission_optuna_logreg_selectfrommodel.csv', index=False)  # 파일 이름 변경
print("✅ 'submission_optuna_logreg_selectfrommodel.csv' 파일 저장 완료!")

  from .autonotebook import tqdm as notebook_tqdm
  File "c:\Users\dakhu\anaconda3\envs\DL\lib\site-packages\joblib\externals\loky\backend\context.py", line 257, in _count_physical_cores
    cpu_info = subprocess.run(
  File "c:\Users\dakhu\anaconda3\envs\DL\lib\subprocess.py", line 503, in run
    with Popen(*popenargs, **kwargs) as process:
  File "c:\Users\dakhu\anaconda3\envs\DL\lib\subprocess.py", line 971, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "c:\Users\dakhu\anaconda3\envs\DL\lib\subprocess.py", line 1456, in _execute_child
    hp, ht, pid, tid = _winapi.CreateProcess(executable, args,
[I 2025-03-14 17:35:12,258] A new study created in memory with name: no-name-b4da07f0-5ca6-474f-837e-749eccbd001a
[I 2025-03-14 17:35:15,321] Trial 0 finished with value: 0.6888716481400768 and parameters: {'penalty': 'l2', 'solver': 'newton-cg', 'C': 0.004401375569159449}. Best is trial 0 with value: 0.6888716481400768.
[I 2025-03-14 17:35:15,362] Tri

Best Parameters: {'penalty': 'l1', 'C': 3.868016198396439}
Best AUC: 0.6961075413234606
Optimal Threshold: 0.46690475182977875
✅ 'submission_optuna_logreg_selectfrommodel.csv' 파일 저장 완료!
