### Hybrid Pipeline 雙階段混和
> 非監督篩式清洗 + 監督式分類

> autoencoder(AE) + catboost

#### 載入外部程式庫

anaconda 需額外載入
> tensorflow

> catboost

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import classification_report
from tensorflow.keras.models import Model           #AE
from tensorflow.keras.layers import Input, Dense    #AE
from tensorflow.keras.optimizers import Adam        #AE
from catboost import CatBoostClassifier             #catboost
# Evaluater
import sys
import os
import itertools


#### 資料前處理

> Autoencoder 需要標準化

> catboost 基本無須前處理

> 警示上戶須加上特徵值

#### 建立 Autoencoder (AE)

In [None]:
def build_autoencoder(input_dim):
    """建立簡單的 AE 模型"""
    input_layer = Input(shape=(input_dim,))
    
    # Encoder: 壓縮特徵
    encoded = Dense(16, activation='relu')(input_layer)
    encoded = Dense(8, activation='relu')(encoded)
    
    # Decoder: 還原特徵
    decoded = Dense(16, activation='relu')(encoded)
    output_layer = Dense(input_dim, activation='sigmoid')(decoded) # 若資料正規化到 0-1 用 sigmoid
    
    autoencoder = Model(inputs=input_layer, outputs=output_layer)
    autoencoder.compile(optimizer=Adam(learning_rate=0.001), loss='mse')
    return autoencoder

#### 定義資料集

In [None]:
X_unlabeled_scaled = "" # 標準化無標記綜合資料(含特徵)
X_unlabeled = ""        # 無標記綜合資料(含特徵)
X_known_anomalies = ""  # 已知警示帳戶(含特徵)

#### Step1 : 非監督清洗

In [None]:
# 建立並訓練 AE
# 注意：這裡使用全部未標記資料訓練，模型會傾向學習"大眾(正常)"的模式
input_dim = X_unlabeled_scaled.shape[1]
ae_model = build_autoencoder(input_dim)
ae_model.fit(
    X_unlabeled_scaled, X_unlabeled_scaled,
    epochs=20, 
    batch_size=256, 
    shuffle=True, 
    verbose=0
)

# 計算重建誤差 (Reconstruction Error / MSE)
reconstructions = ae_model.predict(X_unlabeled_scaled)
mse = np.mean(np.power(X_unlabeled_scaled - reconstructions, 2), axis=1)

# 設定閾值：選取誤差最小的前 80% 作為「可靠的正常樣本 (Reliable Negatives)」
# 剩下的 20% 被視為模糊地帶，暫時不參與訓練，避免誤導模型
threshold = np.percentile(mse, 80) 
mask_reliable = mse < threshold

X_reliable_negatives = X_unlabeled[mask_reliable] # 取得原始數值(非 scaled)
print(f"篩選出 {len(X_reliable_negatives)} 筆可靠正常樣本，剔除 {len(X_unlabeled) - len(X_reliable_negatives)} 筆潛在雜訊。")

#### Step2 : 監督式分類

In [None]:
# 建立&切分訓練集
# 建構訓練集：可靠正常 (Label 0) + 已知異常 (Label 1)
X_train_final = np.vstack([X_reliable_negatives, X_known_anomalies])
y_train_final = np.hstack([np.zeros(len(X_reliable_negatives)), np.ones(len(X_known_anomalies))])

# 切分驗證集 (為了調參使用)
X_train, X_test, y_train, y_test = train_test_split(X_train_final, y_train_final, test_size=0.2, stratify=y_train_final, random_state=42)

#### 參數調整

In [None]:
# 定義 CatBoost 模型
# auto_class_weights='Balanced' 對於異常偵測極為重要，自動處理樣本不平衡
model = CatBoostClassifier(
    auto_class_weights='Balanced', # 覆蓋預設值 (None -> Balanced)
    eval_metric='AUC',             # 覆蓋預設值 (Logloss -> AUC)
)

In [None]:
# 參數調整
# 定義參數網格 (Parameter Grid)
param_dist = {
    'iterations': [200, 500],           # 樹的數量
    'learning_rate': [0.01, 0.05, 0.1], # 學習率
    'depth': [4, 6, 8],                 # 樹的深度 (太深容易 overfitting)
    'l2_leaf_reg': [1, 3, 5, 7],        # L2 正則化係數
    'random_strength': [1, 5, 10],      # 防止過擬合的隨機性強度
    'bagging_temperature': [0, 1]       # 貝葉斯自助抽樣的強度
}

##### Evaluater評分

In [None]:
# 假設您的環境設定
sys.path.append(os.path.dirname(os.getcwd()))
from Util import Evaluater

# 產生所有參數組合 (Cartesian Product)
keys, values = zip(*param_dist.items())
param_combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]

print(f"總共將測試 {len(param_combinations)} 組參數...\n")

# 2. 開始迴圈訓練
best_f1 = 0 # 假設您最在意 F1 score (若是異常偵測通常看 Recall 或 F1)
best_params = {}

for params in param_combinations:
    print("="*60)
    print(f"正在測試參數: {params}")
    
    # 訓練模型
    model.fit(X_train, y_train, eval_set=(X_test, y_test))
    
    # 3. 使用您的 Evaluater 進行評分
    # 這裡假設 Evaluater 會直接 print 出報表
    print(f"--- 評估結果 (Params: {params}) ---")
    
    # 呼叫您的模組
    # 如果 Evaluater.evaluate_model 有回傳字典(metrics dict)，您可以接住它來自動選最好的
    metrics = Evaluater.evaluate_model(model, (X_train, X_test, y_train, y_test))
    
    # 擴充功能：如果您的 Evaluater 有回傳值 (例如 dict)，可以自動記錄最佳參數
    # if metrics and metrics['f1_score'] > best_f1:
    #     best_f1 = metrics['f1_score']
    #     best_params = params

print("="*60)
print("所有參數測試完畢。請根據上方 Evaluater 的輸出報表選擇最佳組合。")

#### 結果產出

In [None]:
best_model = None # 選擇最好的參數

In [None]:
X_uncertain = X_unlabeled[~mask_reliable] # 第一階段被剔除的高誤差群體
probs = best_model.predict_proba(X_uncertain)[:, 1]

# 找出高風險帳戶 (例如預測機率 > 0.9)
high_risk_indices = np.where(probs > 0.9)[0]
print(f"\n在模糊地帶資料中，模型額外發現了 {len(high_risk_indices)} 個高風險異常。")