In [None]:
mport numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, precision_recall_curve
import xgboost as xgb
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
import warnings
warnings.filterwarnings('ignore')

# 設定中文字體
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei']
plt.rcParams['axes.unicode_minus'] = False

class XGBoostHawkeyeModel:
    def __init__(self):
        self.model = None
        self.scaler = StandardScaler()
        self.feature_names = None
        self.performance_history = []
        
    def generate_sample_data(self, n_samples=10000):
        """生成模擬的台灣金融業鷹眼模型數據"""
        print("正在生成模擬數據...")
        
        np.random.seed(42)
        
        # 客戶基本信息
        age = np.random.normal(40, 12, n_samples)
        income = np.random.lognormal(10.5, 0.8, n_samples)  # 年收入
        employment_years = np.random.exponential(8, n_samples)
        
        # 信用歷史
        credit_score = np.random.normal(650, 80, n_samples)
        credit_history_length = np.random.exponential(5, n_samples)
        num_credit_cards = np.random.poisson(3, n_samples)
        
        # 財務行為
        monthly_spending = income * np.random.uniform(0.3, 0.8, n_samples) / 12
        savings_ratio = np.random.beta(2, 5, n_samples)
        debt_to_income = np.random.beta(2, 8, n_samples)
        
        # 銀行往來紀錄
        num_bank_products = np.random.poisson(2, n_samples)
        avg_account_balance = income * np.random.uniform(0.1, 2, n_samples)
        transaction_frequency = np.random.poisson(50, n_samples)
        
        # 行業特定風險因子
        industry_risk = np.random.choice([0, 1, 2], n_samples, p=[0.6, 0.3, 0.1])  # 0:低風險, 1:中風險, 2:高風險
        region_risk = np.random.choice([0, 1], n_samples, p=[0.7, 0.3])  # 0:低風險地區, 1:高風險地區
        
        # 最近行為異常指標
        recent_large_transactions = np.random.poisson(1, n_samples)
        unusual_spending_pattern = np.random.binomial(1, 0.1, n_samples)
        late_payments_6m = np.random.poisson(0.5, n_samples)
        
        # 創建目標變數（詐騙/違約風險）
        # 基於各種因素計算風險分數
        risk_score = (
            -0.02 * (age - 40) +  # 年齡因子
            -0.0001 * (income - 500000) +  # 收入因子
            -0.01 * (credit_score - 650) +  # 信用分數因子
            0.5 * debt_to_income +  # 負債比因子
            0.3 * industry_risk +  # 行業風險因子
            0.2 * region_risk +  # 地區風險因子
            0.4 * unusual_spending_pattern +  # 異常消費模式
            0.1 * late_payments_6m +  # 遲繳紀錄
            np.random.normal(0, 0.3, n_samples)  # 隨機噪音
        )
        
        # 轉換為二元分類（約5%為高風險）
        risk_threshold = np.percentile(risk_score, 95)
        is_high_risk = (risk_score > risk_threshold).astype(int)
        
        # 建立DataFrame
        data = pd.DataFrame({
            'age': np.clip(age, 18, 80),
            'income': np.clip(income, 200000, 5000000),
            'employment_years': np.clip(employment_years, 0, 40),
            'credit_score': np.clip(credit_score, 300, 850),
            'credit_history_length': np.clip(credit_history_length, 0, 30),
            'num_credit_cards': np.clip(num_credit_cards, 0, 10),
            'monthly_spending': monthly_spending,
            'savings_ratio': np.clip(savings_ratio, 0, 1),
            'debt_to_income': np.clip(debt_to_income, 0, 2),
            'num_bank_products': np.clip(num_bank_products, 0, 8),
            'avg_account_balance': avg_account_balance,
            'transaction_frequency': transaction_frequency,
            'industry_risk': industry_risk,
            'region_risk': region_risk,
            'recent_large_transactions': recent_large_transactions,
            'unusual_spending_pattern': unusual_spending_pattern,
            'late_payments_6m': late_payments_6m,
            'is_high_risk': is_high_risk
        })
        
        print(f"數據生成完成！")
        print(f"總樣本數: {len(data)}")
        print(f"高風險樣本數: {sum(is_high_risk)} ({sum(is_high_risk)/len(data)*100:.2f}%)")
        
        return data
    
    def prepare_data(self, data):
        """準備訓練數據"""
        print("正在準備數據...")
        
        # 分離特徵和目標變數
        X = data.drop('is_high_risk', axis=1)
        y = data['is_high_risk']
        
        # 儲存特徵名稱
        self.feature_names = X.columns.tolist()
        
        # 分割訓練集和測試集
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )
        
        # 標準化數值特徵
        numerical_features = X.select_dtypes(include=[np.number]).columns
        X_train_scaled = X_train.copy()
        X_test_scaled = X_test.copy()
        
        X_train_scaled[numerical_features] = self.scaler.fit_transform(X_train[numerical_features])
        X_test_scaled[numerical_features] = self.scaler.transform(X_test[numerical_features])
        
        print(f"訓練集大小: {X_train_scaled.shape}")
        print(f"測試集大小: {X_test_scaled.shape}")
        print(f"類別分布 - 訓練集: {Counter(y_train)}")
        print(f"類別分布 - 測試集: {Counter(y_test)}")
        
        return X_train_scaled, X_test_scaled, y_train, y_test
    
    def calculate_scale_pos_weight(self, y_train):
        """計算scale_pos_weight參數"""
        neg_count = sum(y_train == 0)
        pos_count = sum(y_train == 1)
        scale_pos_weight = neg_count / pos_count
        print(f"負樣本數: {neg_count}, 正樣本數: {pos_count}")
        print(f"建議的 scale_pos_weight: {scale_pos_weight:.2f}")
        return scale_pos_weight
    
    def train_baseline_model(self, X_train, X_test, y_train, y_test):
        """訓練基線模型"""
        print("\n=== 階段1: 基線模型 ===")
        
        # 基本參數
        baseline_params = {
            'objective': 'binary:logistic',
            'eval_metric': 'auc',
            'random_state': 42,
            'n_jobs': -1
        }
        
        model = xgb.XGBClassifier(**baseline_params)
        model.fit(X_train, y_train)
        
        # 評估性能
        train_pred = model.predict(X_train)
        test_pred = model.predict(X_test)
        train_pred_proba = model.predict_proba(X_train)[:, 1]
        test_pred_proba = model.predict_proba(X_test)[:, 1]
        
        train_auc = roc_auc_score(y_train, train_pred_proba)
        test_auc = roc_auc_score(y_test, test_pred_proba)
        
        performance = {
            'stage': '基線模型',
            'train_auc': train_auc,
            'test_auc': test_auc,
            'parameters': baseline_params
        }
        self.performance_history.append(performance)
        
        print(f"基線模型 - 訓練集 AUC: {train_auc:.4f}")
        print(f"基線模型 - 測試集 AUC: {test_auc:.4f}")
        
        return model, performance
    
    def tune_core_parameters(self, X_train, X_test, y_train, y_test, scale_pos_weight):
        """調整核心參數：scale_pos_weight, min_child_weight, gamma"""
        print("\n=== 階段2: 調整核心參數 ===")
        
        # 參數網格
        param_grid = {
            'scale_pos_weight': [scale_pos_weight * 0.8, scale_pos_weight, scale_pos_weight * 1.2],
            'min_child_weight': [1, 3, 5],
            'gamma': [0, 0.1, 0.2]
        }
        
        base_params = {
            'objective': 'binary:logistic',
            'eval_metric': 'auc',
            'random_state': 42,
            'n_jobs': -1
        }
        
        # 網格搜索
        xgb_model = xgb.XGBClassifier(**base_params)
        grid_search = GridSearchCV(
            xgb_model, param_grid, 
            cv=5, scoring='roc_auc', n_jobs=-1, verbose=1
        )
        
        print("正在進行網格搜索...")
        grid_search.fit(X_train, y_train)
        
        best_model = grid_search.best_estimator_
        best_params = grid_search.best_params_
        
        # 評估性能
        train_pred_proba = best_model.predict_proba(X_train)[:, 1]
        test_pred_proba = best_model.predict_proba(X_test)[:, 1]
        
        train_auc = roc_auc_score(y_train, train_pred_proba)
        test_auc = roc_auc_score(y_test, test_pred_proba)
        
        performance = {
            'stage': '核心參數調整',
            'train_auc': train_auc,
            'test_auc': test_auc,
            'parameters': {**base_params, **best_params}
        }
        self.performance_history.append(performance)
        
        print(f"最佳核心參數: {best_params}")
        print(f"核心參數調整後 - 訓練集 AUC: {train_auc:.4f}")
        print(f"核心參數調整後 - 測試集 AUC: {test_auc:.4f}")
        
        return best_model, performance
    
    def tune_structure_parameters(self, X_train, X_test, y_train, y_test, core_params):
        """調整結構參數：max_depth, learning_rate, n_estimators"""
        print("\n=== 階段3: 調整結構參數 ===")
        
        # 參數網格
        param_grid = {
            'max_depth': [4, 6, 8],
            'learning_rate': [0.05, 0.1, 0.2],
            'n_estimators': [100, 200, 300]
        }
        
        # 網格搜索
        xgb_model = xgb.XGBClassifier(**core_params)
        grid_search = GridSearchCV(
            xgb_model, param_grid, 
            cv=5, scoring='roc_auc', n_jobs=-1, verbose=1
        )
        
        print("正在進行結構參數網格搜索...")
        grid_search.fit(X_train, y_train)
        
        best_model = grid_search.best_estimator_
        best_params = grid_search.best_params_
        
        # 評估性能
        train_pred_proba = best_model.predict_proba(X_train)[:, 1]
        test_pred_proba = best_model.predict_proba(X_test)[:, 1]
        
        train_auc = roc_auc_score(y_train, train_pred_proba)
        test_auc = roc_auc_score(y_test, test_pred_proba)
        
        final_params = {**core_params, **best_params}
        performance = {
            'stage': '結構參數調整',
            'train_auc': train_auc,
            'test_auc': test_auc,
            'parameters': final_params
        }
        self.performance_history.append(performance)
        
        print(f"最佳結構參數: {best_params}")
        print(f"最終模型 - 訓練集 AUC: {train_auc:.4f}")
        print(f"最終模型 - 測試集 AUC: {test_auc:.4f}")
        
        self.model = best_model
        return best_model, performance
    
    def evaluate_model(self, X_test, y_test):
        """詳細評估最終模型"""
        print("\n=== 最終模型評估 ===")
        
        # 預測
        y_pred = self.model.predict(X_test)
        y_pred_proba = self.model.predict_proba(X_test)[:, 1]
        
        # 分類報告
        print("\n分類報告:")
        print(classification_report(y_test, y_pred))
        
        # AUC分數
        auc_score = roc_auc_score(y_test, y_pred_proba)
        print(f"\nROC AUC Score: {auc_score:.4f}")
        
        # 混淆矩陣
        cm = confusion_matrix(y_test, y_pred)
        print(f"\n混淆矩陣:")
        print(cm)
        
        return {
            'auc': auc_score,
            'classification_report': classification_report(y_test, y_pred, output_dict=True),
            'confusion_matrix': cm,
            'predictions': y_pred,
            'prediction_probabilities': y_pred_proba
        }
    
    def plot_performance_comparison(self):
        """繪製性能比較圖"""
        plt.figure(figsize=(12, 6))
        
        stages = [p['stage'] for p in self.performance_history]
        train_aucs = [p['train_auc'] for p in self.performance_history]
        test_aucs = [p['test_auc'] for p in self.performance_history]
        
        x = range(len(stages))
        width = 0.35
        
        plt.subplot(1, 2, 1)
        plt.bar([i - width/2 for i in x], train_aucs, width, label='訓練集 AUC', alpha=0.8)
        plt.bar([i + width/2 for i in x], test_aucs, width, label='測試集 AUC', alpha=0.8)
        plt.xlabel('調參階段')
        plt.ylabel('AUC Score')
        plt.title('模型性能提升過程')
        plt.xticks(x, stages, rotation=45)
        plt.legend()
        plt.grid(True, alpha=0.3)
        
        # AUC提升量
        plt.subplot(1, 2, 2)
        test_auc_improvements = [0] + [test_aucs[i] - test_aucs[i-1] for i in range(1, len(test_aucs))]
        colors = ['green' if imp >= 0 else 'red' for imp in test_auc_improvements]
        plt.bar(x, test_auc_improvements, color=colors, alpha=0.7)
        plt.xlabel('調參階段')
        plt.ylabel('AUC 改善量')
        plt.title('各階段AUC提升情況')
        plt.xticks(x, stages, rotation=45)
        plt.grid(True, alpha=0.3)
        plt.axhline(y=0, color='black', linestyle='-', linewidth=0.5)
        
        plt.tight_layout()
        plt.show()
        
        # 印出詳細的性能改善報告
        print("\n=== 性能改善報告 ===")
        for i, perf in enumerate(self.performance_history):
            if i == 0:
                print(f"{perf['stage']}: 測試集 AUC = {perf['test_auc']:.4f} (基準)")
            else:
                improvement = perf['test_auc'] - self.performance_history[i-1]['test_auc']
                print(f"{perf['stage']}: 測試集 AUC = {perf['test_auc']:.4f} (提升 {improvement:+.4f})")
        
        total_improvement = self.performance_history[-1]['test_auc'] - self.performance_history[0]['test_auc']
        print(f"\n總體改善: {total_improvement:+.4f} ({total_improvement/self.performance_history[0]['test_auc']*100:+.2f}%)")
    
    def plot_feature_importance(self):
        """繪製特徵重要性"""
        if self.model is None:
            print("請先訓練模型")
            return
        
        # 獲取特徵重要性
        importance = self.model.feature_importances_
        feature_importance = pd.DataFrame({
            'feature': self.feature_names,
            'importance': importance
        }).sort_values('importance', ascending=False)
        
        plt.figure(figsize=(10, 8))
        sns.barplot(data=feature_importance.head(15), y='feature', x='importance')
        plt.title('特徵重要性排序 (Top 15)')
        plt.xlabel('重要性分數')
        plt.tight_layout()
        plt.show()
        
        print("特徵重要性排序:")
        for i, row in feature_importance.head(10).iterrows():
            print(f"{row['feature']}: {row['importance']:.4f}")

# 主要執行程序
def main():
    print("台灣金融業鷹眼模型 - XGBoost調參示範")
    print("=" * 50)
    
    # 初始化模型
    hawkeye_model = XGBoostHawkeyeModel()
    
    # 1. 生成模擬數據
    data = hawkeye_model.generate_sample_data(n_samples=10000)
    
    # 2. 準備數據
    X_train, X_test, y_train, y_test = hawkeye_model.prepare_data(data)
    
    # 3. 計算scale_pos_weight
    scale_pos_weight = hawkeye_model.calculate_scale_pos_weight(y_train)
    
    # 4. 階段性調參
    
    # 階段1: 基線模型
    baseline_model, baseline_perf = hawkeye_model.train_baseline_model(
        X_train, X_test, y_train, y_test
    )
    
    # 階段2: 調整核心參數
    core_model, core_perf = hawkeye_model.tune_core_parameters(
        X_train, X_test, y_train, y_test, scale_pos_weight
    )
    
    # 階段3: 調整結構參數
    final_model, final_perf = hawkeye_model.tune_structure_parameters(
        X_train, X_test, y_train, y_test, core_perf['parameters']
    )
    
    # 5. 模型評估
    evaluation_results = hawkeye_model.evaluate_model(X_test, y_test)
    
    # 6. 視覺化結果
    hawkeye_model.plot_performance_comparison()
    hawkeye_model.plot_feature_importance()
    
    # 7. 輸出最終結果摘要
    print("\n" + "=" * 50)
    print("最終結果摘要")
    print("=" * 50)
    print(f"最終模型 AUC: {evaluation_results['auc']:.4f}")
    print(f"模型總體提升: {final_perf['test_auc'] - baseline_perf['test_auc']:+.4f}")
    
    final_params = final_perf['parameters']
    print(f"\n最佳參數組合:")
    for param, value in final_params.items():
        print(f"  {param}: {value}")

if __name__ == "__main__":
    main()
