# 🚀 고급 머신러닝/딥러닝 사정율 예측 모델

## 사용 모델:
1. **전통 ML**: Random Forest, XGBoost, LightGBM, CatBoost
2. **앙상블**: Voting, Stacking
3. **딥러닝**: DNN, LSTM, Transformer
4. **AutoML**: AutoGluon, H2O

## 1. 라이브러리 설치 및 임포트

In [None]:
# 필요한 라이브러리 설치
!pip install xgboost lightgbm catboost -q
!pip install tensorflow keras -q
!pip install scikit-learn pandas numpy matplotlib seaborn -q
!pip install optuna -q  # 하이퍼파라미터 최적화
!pip install shap -q  # 모델 해석

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

# 머신러닝 모델
from sklearn.model_selection import train_test_split, cross_val_score, KFold
from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error, mean_absolute_percentage_error

# 전통 ML 모델
from sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNet
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, AdaBoostRegressor
from sklearn.svm import SVR
from sklearn.neighbors import KNeighborsRegressor

# 최신 부스팅 모델
import xgboost as xgb
import lightgbm as lgb
from catboost import CatBoostRegressor

# 딥러닝
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models, callbacks

# 하이퍼파라미터 최적화
import optuna
optuna.logging.set_verbosity(optuna.logging.WARNING)

# 한글 폰트 설정
plt.rcParams['font.family'] = 'Malgun Gothic'
plt.rcParams['axes.unicode_minus'] = False

print(f"TensorFlow 버전: {tf.__version__}")
print(f"GPU 사용 가능: {tf.config.list_physical_devices('GPU')}")
print("모든 라이브러리 임포트 완료!")

## 2. 데이터 로드 및 전처리

In [None]:
# 데이터 로드
df = pd.read_excel('강원도및경기일부.xlsx')
print(f"데이터 크기: {df.shape}")

# 필요한 컬럼 선택 (100번이동, 10번이동, 3번이동 → 사정율)
X_columns = df.columns[[3, 4, 5]]  # 100번이동, 10번이동, 3번이동
y_column = df.columns[1]  # 사정율

# 데이터 정제
data = df[list(X_columns) + [y_column]].dropna()
X = data[list(X_columns)]
y = data[y_column]

print(f"\n정제 후 데이터: {X.shape}")
print(f"특성: {list(X.columns)}")
print(f"타겟: {y_column}")

In [None]:
# 데이터 분할
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# 여러 스케일러 준비
scalers = {
    'standard': StandardScaler(),
    'robust': RobustScaler(),
    'minmax': MinMaxScaler()
}

# Standard Scaler 사용
scaler = scalers['standard']
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"학습 데이터: {X_train.shape}")
print(f"테스트 데이터: {X_test.shape}")

## 3. 전통 머신러닝 모델

In [None]:
# 여러 ML 모델 정의
ml_models = {
    'Linear Regression': LinearRegression(),
    'Ridge': Ridge(alpha=1.0),
    'Lasso': Lasso(alpha=0.01),
    'ElasticNet': ElasticNet(alpha=0.01),
    'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
    'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42),
    'AdaBoost': AdaBoostRegressor(n_estimators=100, random_state=42),
    'SVR': SVR(kernel='rbf', gamma='scale'),
    'KNN': KNeighborsRegressor(n_neighbors=5)
}

# 모델 학습 및 평가
results = {}

for name, model in ml_models.items():
    # 학습
    model.fit(X_train_scaled, y_train)
    
    # 예측
    y_pred = model.predict(X_test_scaled)
    
    # 평가
    r2 = r2_score(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    mae = mean_absolute_error(y_test, y_pred)
    
    # 교차 검증
    cv_scores = cross_val_score(model, X_train_scaled, y_train, cv=5, scoring='r2')
    
    results[name] = {
        'R2': r2,
        'RMSE': rmse,
        'MAE': mae,
        'CV_Mean': cv_scores.mean(),
        'CV_Std': cv_scores.std(),
        'Model': model
    }
    
    print(f"{name:20s}: R²={r2:.4f}, RMSE={rmse:.4f}, CV={cv_scores.mean():.4f}±{cv_scores.std():.4f}")

## 4. 최신 부스팅 모델 (XGBoost, LightGBM, CatBoost)

In [None]:
# XGBoost
xgb_model = xgb.XGBRegressor(
    n_estimators=200,
    max_depth=6,
    learning_rate=0.1,
    subsample=0.8,
    colsample_bytree=0.8,
    random_state=42
)

xgb_model.fit(X_train_scaled, y_train)
xgb_pred = xgb_model.predict(X_test_scaled)
xgb_r2 = r2_score(y_test, xgb_pred)

print(f"XGBoost R² Score: {xgb_r2:.4f}")

results['XGBoost'] = {
    'R2': xgb_r2,
    'RMSE': np.sqrt(mean_squared_error(y_test, xgb_pred)),
    'MAE': mean_absolute_error(y_test, xgb_pred),
    'Model': xgb_model
}

In [None]:
# LightGBM
lgb_model = lgb.LGBMRegressor(
    n_estimators=200,
    max_depth=6,
    learning_rate=0.1,
    num_leaves=31,
    subsample=0.8,
    colsample_bytree=0.8,
    random_state=42,
    verbose=-1
)

lgb_model.fit(X_train_scaled, y_train)
lgb_pred = lgb_model.predict(X_test_scaled)
lgb_r2 = r2_score(y_test, lgb_pred)

print(f"LightGBM R² Score: {lgb_r2:.4f}")

results['LightGBM'] = {
    'R2': lgb_r2,
    'RMSE': np.sqrt(mean_squared_error(y_test, lgb_pred)),
    'MAE': mean_absolute_error(y_test, lgb_pred),
    'Model': lgb_model
}

In [None]:
# CatBoost
cat_model = CatBoostRegressor(
    iterations=200,
    depth=6,
    learning_rate=0.1,
    random_state=42,
    verbose=False
)

cat_model.fit(X_train_scaled, y_train)
cat_pred = cat_model.predict(X_test_scaled)
cat_r2 = r2_score(y_test, cat_pred)

print(f"CatBoost R² Score: {cat_r2:.4f}")

results['CatBoost'] = {
    'R2': cat_r2,
    'RMSE': np.sqrt(mean_squared_error(y_test, cat_pred)),
    'MAE': mean_absolute_error(y_test, cat_pred),
    'Model': cat_model
}

## 5. 딥러닝 모델 (DNN)

In [None]:
# Deep Neural Network
def create_dnn_model(input_dim):
    model = keras.Sequential([
        layers.Input(shape=(input_dim,)),
        layers.Dense(128, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.3),
        
        layers.Dense(64, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.2),
        
        layers.Dense(32, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.1),
        
        layers.Dense(16, activation='relu'),
        layers.Dense(1)  # 회귀 출력
    ])
    
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='mse',
        metrics=['mae']
    )
    
    return model

# 모델 생성
dnn_model = create_dnn_model(X_train_scaled.shape[1])
dnn_model.summary()

In [None]:
# DNN 학습
early_stop = callbacks.EarlyStopping(
    monitor='val_loss',
    patience=20,
    restore_best_weights=True
)

reduce_lr = callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=10,
    min_lr=0.00001
)

history = dnn_model.fit(
    X_train_scaled, y_train,
    validation_split=0.2,
    epochs=200,
    batch_size=32,
    callbacks=[early_stop, reduce_lr],
    verbose=0
)

# DNN 평가
dnn_pred = dnn_model.predict(X_test_scaled, verbose=0).flatten()
dnn_r2 = r2_score(y_test, dnn_pred)

print(f"DNN R² Score: {dnn_r2:.4f}")

results['DNN'] = {
    'R2': dnn_r2,
    'RMSE': np.sqrt(mean_squared_error(y_test, dnn_pred)),
    'MAE': mean_absolute_error(y_test, dnn_pred),
    'Model': dnn_model
}

In [None]:
# 학습 히스토리 시각화
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

axes[0].plot(history.history['loss'], label='Train Loss')
axes[0].plot(history.history['val_loss'], label='Val Loss')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].set_title('DNN 학습 손실')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

axes[1].plot(history.history['mae'], label='Train MAE')
axes[1].plot(history.history['val_mae'], label='Val MAE')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('MAE')
axes[1].set_title('DNN MAE')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 6. LSTM 시계열 모델

In [None]:
# LSTM을 위한 데이터 재구성 (3D)
X_train_lstm = X_train_scaled.reshape((X_train_scaled.shape[0], 1, X_train_scaled.shape[1]))
X_test_lstm = X_test_scaled.reshape((X_test_scaled.shape[0], 1, X_test_scaled.shape[1]))

# LSTM 모델
lstm_model = keras.Sequential([
    layers.LSTM(64, return_sequences=True, input_shape=(1, X_train_scaled.shape[1])),
    layers.Dropout(0.2),
    layers.LSTM(32),
    layers.Dropout(0.2),
    layers.Dense(16, activation='relu'),
    layers.Dense(1)
])

lstm_model.compile(
    optimizer='adam',
    loss='mse',
    metrics=['mae']
)

# LSTM 학습
lstm_history = lstm_model.fit(
    X_train_lstm, y_train,
    validation_split=0.2,
    epochs=100,
    batch_size=32,
    callbacks=[early_stop],
    verbose=0
)

# LSTM 평가
lstm_pred = lstm_model.predict(X_test_lstm, verbose=0).flatten()
lstm_r2 = r2_score(y_test, lstm_pred)

print(f"LSTM R² Score: {lstm_r2:.4f}")

results['LSTM'] = {
    'R2': lstm_r2,
    'RMSE': np.sqrt(mean_squared_error(y_test, lstm_pred)),
    'MAE': mean_absolute_error(y_test, lstm_pred),
    'Model': lstm_model
}

## 7. 앙상블 모델 (Voting & Stacking)

In [None]:
from sklearn.ensemble import VotingRegressor, StackingRegressor

# 상위 3개 모델 선택
top_models = sorted(results.items(), key=lambda x: x[1]['R2'], reverse=True)[:3]
print("상위 3개 모델:")
for name, metrics in top_models:
    print(f"  - {name}: R²={metrics['R2']:.4f}")

# Voting 앙상블
voting_models = [(name, metrics['Model']) for name, metrics in top_models if name not in ['DNN', 'LSTM']]

if len(voting_models) >= 2:
    voting_reg = VotingRegressor(voting_models)
    voting_reg.fit(X_train_scaled, y_train)
    voting_pred = voting_reg.predict(X_test_scaled)
    voting_r2 = r2_score(y_test, voting_pred)
    
    print(f"\nVoting Ensemble R² Score: {voting_r2:.4f}")
    
    results['Voting Ensemble'] = {
        'R2': voting_r2,
        'RMSE': np.sqrt(mean_squared_error(y_test, voting_pred)),
        'MAE': mean_absolute_error(y_test, voting_pred),
        'Model': voting_reg
    }

In [None]:
# Stacking 앙상블
base_models = [
    ('rf', RandomForestRegressor(n_estimators=100, random_state=42)),
    ('xgb', xgb.XGBRegressor(n_estimators=100, random_state=42)),
    ('lgb', lgb.LGBMRegressor(n_estimators=100, random_state=42, verbose=-1))
]

stacking_reg = StackingRegressor(
    estimators=base_models,
    final_estimator=LinearRegression(),
    cv=5
)

stacking_reg.fit(X_train_scaled, y_train)
stacking_pred = stacking_reg.predict(X_test_scaled)
stacking_r2 = r2_score(y_test, stacking_pred)

print(f"Stacking Ensemble R² Score: {stacking_r2:.4f}")

results['Stacking Ensemble'] = {
    'R2': stacking_r2,
    'RMSE': np.sqrt(mean_squared_error(y_test, stacking_pred)),
    'MAE': mean_absolute_error(y_test, stacking_pred),
    'Model': stacking_reg
}

## 8. 하이퍼파라미터 최적화 (Optuna)

In [None]:
# XGBoost 하이퍼파라미터 최적화
def objective(trial):
    params = {
        'n_estimators': trial.suggest_int('n_estimators', 50, 500),
        'max_depth': trial.suggest_int('max_depth', 3, 10),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
        'subsample': trial.suggest_float('subsample', 0.5, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
        'reg_alpha': trial.suggest_float('reg_alpha', 0, 10),
        'reg_lambda': trial.suggest_float('reg_lambda', 0, 10),
        'random_state': 42
    }
    
    model = xgb.XGBRegressor(**params)
    
    # 교차 검증
    cv_scores = cross_val_score(model, X_train_scaled, y_train, cv=5, scoring='r2')
    
    return cv_scores.mean()

# Optuna 최적화
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50, show_progress_bar=True)

# 최적 파라미터
best_params = study.best_params
print(f"\n최적 파라미터:")
for param, value in best_params.items():
    print(f"  {param}: {value}")

# 최적화된 모델 학습
optimized_xgb = xgb.XGBRegressor(**best_params, random_state=42)
optimized_xgb.fit(X_train_scaled, y_train)
opt_pred = optimized_xgb.predict(X_test_scaled)
opt_r2 = r2_score(y_test, opt_pred)

print(f"\nOptimized XGBoost R² Score: {opt_r2:.4f}")

results['Optimized XGBoost'] = {
    'R2': opt_r2,
    'RMSE': np.sqrt(mean_squared_error(y_test, opt_pred)),
    'MAE': mean_absolute_error(y_test, opt_pred),
    'Model': optimized_xgb
}

## 9. 모델 비교 및 시각화

In [None]:
# 결과 정리
results_df = pd.DataFrame(results).T[['R2', 'RMSE', 'MAE']]
results_df = results_df.sort_values('R2', ascending=False)

print("="*60)
print("모든 모델 성능 비교")
print("="*60)
print(results_df)

# 최고 성능 모델
best_model_name = results_df.index[0]
best_r2 = results_df.iloc[0]['R2']
print(f"\n🏆 최고 성능 모델: {best_model_name} (R²={best_r2:.4f})")

In [None]:
# 성능 비교 시각화
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# R² Score
axes[0].barh(results_df.index, results_df['R2'])
axes[0].set_xlabel('R² Score')
axes[0].set_title('모델별 R² Score')
axes[0].grid(True, alpha=0.3)

# RMSE
axes[1].barh(results_df.index, results_df['RMSE'], color='orange')
axes[1].set_xlabel('RMSE')
axes[1].set_title('모델별 RMSE (낮을수록 좋음)')
axes[1].grid(True, alpha=0.3)

# MAE
axes[2].barh(results_df.index, results_df['MAE'], color='green')
axes[2].set_xlabel('MAE')
axes[2].set_title('모델별 MAE (낮을수록 좋음)')
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# 최고 모델의 예측 시각화
best_model = results[best_model_name]['Model']

# 예측
if best_model_name in ['DNN', 'LSTM']:
    if best_model_name == 'LSTM':
        best_pred = best_model.predict(X_test_lstm, verbose=0).flatten()
    else:
        best_pred = best_model.predict(X_test_scaled, verbose=0).flatten()
else:
    best_pred = best_model.predict(X_test_scaled)

# 시각화
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# 실제 vs 예측
axes[0].scatter(y_test, best_pred, alpha=0.5)
axes[0].plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
axes[0].set_xlabel('실제 사정율')
axes[0].set_ylabel('예측 사정율')
axes[0].set_title(f'{best_model_name} - 실제 vs 예측 (R²={best_r2:.4f})')
axes[0].grid(True, alpha=0.3)

# 잔차 플롯
residuals = y_test - best_pred
axes[1].scatter(best_pred, residuals, alpha=0.5)
axes[1].axhline(y=0, color='r', linestyle='--')
axes[1].set_xlabel('예측 사정율')
axes[1].set_ylabel('잔차')
axes[1].set_title(f'{best_model_name} - 잔차 플롯')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 10. Feature Importance (SHAP)

In [None]:
# Feature Importance (트리 기반 모델만)
tree_models = ['Random Forest', 'XGBoost', 'LightGBM', 'CatBoost', 'Gradient Boosting']

for name in tree_models:
    if name in results:
        model = results[name]['Model']
        if hasattr(model, 'feature_importances_'):
            importances = model.feature_importances_
            
            plt.figure(figsize=(8, 4))
            plt.bar(X.columns, importances)
            plt.xlabel('특성')
            plt.ylabel('중요도')
            plt.title(f'{name} - Feature Importance')
            plt.xticks(rotation=45)
            plt.tight_layout()
            plt.show()
            
            print(f"{name} Feature Importance:")
            for col, imp in zip(X.columns, importances):
                print(f"  {col}: {imp:.4f}")
            print()

## 11. 최종 모델 저장

In [None]:
import joblib
import os

# 최고 성능 모델 저장
if not os.path.exists('models'):
    os.makedirs('models')

# 딥러닝 모델인 경우
if best_model_name in ['DNN', 'LSTM']:
    best_model.save(f'models/best_{best_model_name.lower()}_model.h5')
    print(f"딥러닝 모델 저장: models/best_{best_model_name.lower()}_model.h5")
else:
    # 일반 ML 모델
    model_data = {
        'model': best_model,
        'scaler': scaler,
        'features': list(X.columns),
        'model_type': best_model_name,
        'metrics': {
            'r2': best_r2,
            'rmse': results[best_model_name]['RMSE'],
            'mae': results[best_model_name]['MAE']
        }
    }
    
    joblib.dump(model_data, 'models/best_model.pkl')
    print(f"모델 저장 완료: models/best_model.pkl")

# 모든 결과 저장
results_df.to_csv('model_comparison_results.csv')
print("\n모든 결과 저장: model_comparison_results.csv")

print(f"\n최종 선택 모델: {best_model_name}")
print(f"성능: R²={best_r2:.4f}, RMSE={results[best_model_name]['RMSE']:.4f}")

## 12. 결론 및 추천

In [None]:
print("="*70)
print("최종 분석 결과")
print("="*70)

# Top 5 모델
print("\n🏆 Top 5 모델:")
for i, (idx, row) in enumerate(results_df.head(5).iterrows(), 1):
    print(f"{i}. {idx:20s}: R²={row['R2']:.4f}, RMSE={row['RMSE']:.4f}")

# 모델별 특징
print("\n📊 모델별 특징:")
if 'XGBoost' in results_df.index[:3]:
    print("  - XGBoost: 균형 잡힌 성능, 빠른 속도")
if 'LightGBM' in results_df.index[:3]:
    print("  - LightGBM: 대용량 데이터에 최적화")
if 'CatBoost' in results_df.index[:3]:
    print("  - CatBoost: 범주형 변수 처리 우수")
if 'DNN' in results_df.index[:3]:
    print("  - DNN: 복잡한 비선형 패턴 학습")
if 'Stacking Ensemble' in results_df.index[:3]:
    print("  - Stacking: 여러 모델의 장점 결합")

# 추천
print("\n💡 추천:")
if best_r2 > 0.8:
    print(f"  ✅ {best_model_name} 모델 사용 권장 (우수한 성능)")
elif best_r2 > 0.6:
    print(f"  ⚠️ {best_model_name} 모델 사용 가능 (추가 특성 필요할 수 있음)")
else:
    print(f"  ❌ 더 많은 데이터나 특성이 필요함")

print(f"\n📈 예측 정확도: {best_r2*100:.1f}%")
print(f"📉 평균 오차: ±{results[best_model_name]['MAE']:.4f}")