# 5G網路ML異常偵測模型訓練完整指南

## 概述

本筆記詳細說明如何在5G網路環境中訓練機器學習異常偵測模型，包含完整的訓練流程、特徵工程、模型選擇、評估方法等。

### 系統架構
- **MLAnomalyDetector**: 核心機器學習異常偵測器
- **MLFeatureExtractor**: 特徵提取器
- **RealTimeAnomalyDetector**: 實時異常偵測系統
- **TestSelector**: 整合測試系統

### 支援的ML模型
1. **Isolation Forest** (無監督) - 推薦用於測試
2. **One-Class SVM** (無監督)
3. **Random Forest** (監督式) - 需要標記數據
4. **DBSCAN** (無監督聚類)

### 應用場景
- 5G gNB隨機接入(RA)程序異常偵測
- 網路攻擊檢測
- 信號干擾檢測
- 設備故障診斷

## 1. 導入必要的函式庫

首先導入機器學習訓練所需的所有Python函式庫。

In [None]:
# 導入核心機器學習函式庫
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import json
import joblib
import warnings
warnings.filterwarnings('ignore')

# 導入scikit-learn機器學習模型
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.svm import OneClassSVM
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import (
    classification_report, confusion_matrix, 
    roc_auc_score, roc_curve, precision_recall_curve
)

# 導入其他必要函式庫
import time
import threading
import queue
from typing import Dict, List, Tuple, Optional, Callable

# 設定顯示選項
plt.style.use('default')
np.random.seed(42)

print("✅ 所有必要函式庫已成功導入")
print(f"NumPy版本: {np.__version__}")
print(f"Pandas版本: {pd.__version__}")
print(f"Matplotlib版本: {matplotlib.__version__}")
print(f"Scikit-learn版本: {sklearn.__version__}")

## 2. 建立ML異常偵測器類別

`MLAnomalyDetector`是核心的機器學習異常偵測器，支援多種ML算法並提供統一的訓練和預測介面。

In [None]:
class MLAnomalyDetector:
    """
    機器學習異常偵測器 - 使用多種ML算法進行5G網路異常偵測
    """
    
    def __init__(self, model_type: str = 'isolation_forest'):
        """
        初始化ML異常偵測器
        
        參數:
            model_type: 模型類型 ('isolation_forest', 'one_class_svm', 'random_forest', 'dbscan')
        """
        self.model_type = model_type
        self.model = None
        self.scaler = None
        self.is_trained = False
        self.feature_names = []
        self.threshold = 0.5
        
        # 初始化選定的模型
        self._initialize_model()
        print(f"✅ MLAnomalyDetector初始化完成，模型類型: {model_type}")
    
    def _initialize_model(self):
        """根據模型類型初始化對應的ML模型和數據標準化器"""
        
        if self.model_type == 'isolation_forest':
            # Isolation Forest: 基於隔離的異常偵測
            self.model = IsolationForest(
                contamination=0.1,      # 預期異常比例10%
                random_state=42,
                n_estimators=100,       # 決策樹數量
                max_samples='auto',     # 每棵樹的樣本數
                bootstrap=False
            )
            self.scaler = StandardScaler()
            
        elif self.model_type == 'one_class_svm':
            # One-Class SVM: 單類支持向量機
            self.model = OneClassSVM(
                kernel='rbf',           # 徑向基核函數
                gamma='scale',          # 核函數係數
                nu=0.1                  # 異常比例上界
            )
            self.scaler = StandardScaler()
            
        elif self.model_type == 'random_forest':
            # Random Forest: 隨機森林分類器（監督學習）
            self.model = RandomForestClassifier(
                n_estimators=100,       # 決策樹數量
                random_state=42,
                class_weight='balanced', # 平衡類別權重
                max_depth=10,           # 樹的最大深度
                min_samples_split=5,    # 節點分割所需最小樣本數
                min_samples_leaf=2      # 葉節點最小樣本數
            )
            self.scaler = RobustScaler()  # 對異常值更穩健
            
        elif self.model_type == 'dbscan':
            # DBSCAN: 基於密度的聚類算法
            self.model = DBSCAN(
                eps=0.5,                # 鄰域半徑
                min_samples=5,          # 核心點最小樣本數
                metric='euclidean'      # 距離度量
            )
            self.scaler = StandardScaler()
            
        else:
            raise ValueError(f"不支援的模型類型: {self.model_type}")
    
    def get_model_info(self) -> Dict:
        """獲取模型詳細資訊"""
        return {
            'model_type': self.model_type,
            'is_trained': self.is_trained,
            'feature_count': len(self.feature_names),
            'feature_names': self.feature_names,
            'threshold': self.threshold,
            'model_params': self.model.get_params() if self.model else None
        }

# 創建示例偵測器
detector_example = MLAnomalyDetector('isolation_forest')
print(f"模型資訊: {detector_example.get_model_info()}")

## 3. 特徵提取與數據預處理

特徵提取是ML異常偵測的關鍵步驟，我們從5G網路的RA統計數據、信號強度、時間戳等原始數據中提取有意義的特徵。

In [None]:
class MLFeatureExtractor:
    """
    機器學習特徵提取器 - 從5G網路數據中提取異常偵測特徵
    """
    
    def __init__(self):
        self.features = []
        self.labels = []
        
    def extract_ra_features(self, ra_stats: Dict) -> Dict:
        """
        從RA統計數據中提取特徵
        
        參數:
            ra_stats: RA統計資料字典
            
        返回:
            提取的RA特徵字典
        """
        features = {}
        
        # 基本統計特徵
        features['ra_success_rate'] = ra_stats.get('success_rate', 0)
        features['ra_initiated'] = ra_stats.get('ra_initiated', 0)
        features['ra_succeeded'] = ra_stats.get('ra_succeeded', 0)
        features['ra_failed'] = ra_stats.get('failed_attempts', 0)
        
        # 衍生特徵
        total_ra = ra_stats.get('ra_initiated', 1)
        features['ra_frequency'] = total_ra / 20  # 假設20秒測試時間
        features['failure_rate'] = ra_stats.get('failed_attempts', 0) / max(total_ra, 1)
        features['success_efficiency'] = ra_stats.get('ra_succeeded', 0) / max(total_ra, 1)
        
        return features
    
    def extract_timing_features(self, timestamps: List[float]) -> Dict:
        """
        從時間戳序列中提取時序特徵
        
        參數:
            timestamps: 時間戳列表
            
        返回:
            時序特徵字典
        """
        features = {}
        
        if len(timestamps) < 2:
            return {
                'avg_interval': 0, 'interval_variance': 0, 'interval_std': 0,
                'min_interval': 0, 'max_interval': 0, 'interval_range': 0
            }
        
        # 計算時間間隔
        intervals = np.diff(timestamps)
        
        # 統計特徵
        features['avg_interval'] = np.mean(intervals)
        features['interval_variance'] = np.var(intervals)
        features['interval_std'] = np.std(intervals)
        features['min_interval'] = np.min(intervals)
        features['max_interval'] = np.max(intervals)
        features['interval_range'] = np.max(intervals) - np.min(intervals)
        
        # 規律性特徵
        if len(intervals) > 3:
            # 計算相鄰間隔的變化率
            interval_changes = np.abs(np.diff(intervals))
            features['interval_stability'] = 1 / (1 + np.mean(interval_changes))
        else:
            features['interval_stability'] = 1
        
        return features
    
    def extract_signal_features(self, signal_data: List[float]) -> Dict:
        """
        從信號強度數據中提取特徵
        
        參數:
            signal_data: 信號強度列表
            
        返回:
            信號特徵字典
        """
        features = {}
        
        if not signal_data:
            return {
                'signal_mean': 0, 'signal_std': 0, 'signal_variance': 0,
                'signal_min': 0, 'signal_max': 0, 'signal_range': 0,
                'signal_change_rate': 0
            }
        
        signal_array = np.array(signal_data)
        
        # 基本統計特徵
        features['signal_mean'] = np.mean(signal_array)
        features['signal_std'] = np.std(signal_array)
        features['signal_variance'] = np.var(signal_array)
        features['signal_min'] = np.min(signal_array)
        features['signal_max'] = np.max(signal_array)
        features['signal_range'] = np.max(signal_array) - np.min(signal_array)
        
        # 信號變化特徵
        if len(signal_data) > 1:
            signal_diff = np.diff(signal_array)
            features['signal_change_rate'] = np.mean(np.abs(signal_diff))
            features['signal_trend'] = np.corrcoef(range(len(signal_array)), signal_array)[0, 1]
        else:
            features['signal_change_rate'] = 0
            features['signal_trend'] = 0
            
        return features
    
    def extract_comprehensive_features(self, test_data: Dict) -> Dict:
        """
        提取綜合特徵向量
        
        參數:
            test_data: 測試數據字典
            
        返回:
            完整的特徵字典
        """
        all_features = {}
        
        # RA程序特徵
        if 'ra_stats' in test_data:
            ra_features = self.extract_ra_features(test_data['ra_stats'])
            all_features.update({f'ra_{k}': v for k, v in ra_features.items()})
        
        # 時序特徵
        if 'timestamps' in test_data:
            timing_features = self.extract_timing_features(test_data['timestamps'])
            all_features.update({f'timing_{k}': v for k, v in timing_features.items()})
        
        # 信號特徵
        if 'signal_data' in test_data:
            signal_features = self.extract_signal_features(test_data['signal_data'])
            all_features.update({f'signal_{k}': v for k, v in signal_features.items()})
        
        # 環境與配置特徵
        all_features['test_duration'] = test_data.get('test_duration', 0)
        all_features['attacker_present'] = 1 if test_data.get('attacker_active', False) else 0
        all_features['airplane_mode_toggles'] = test_data.get('airplane_toggles', 0)
        all_features['loop_number'] = test_data.get('loop_number', 0)
        
        # 測試類型編碼
        test_type = test_data.get('test_type', 'unknown')
        all_features['test_type_cots_only'] = 1 if test_type == 'cots_only' else 0
        all_features['test_type_standard'] = 1 if test_type == 'standard' else 0
        all_features['test_type_attacker_priority'] = 1 if test_type == 'attacker_priority' else 0
        
        return all_features

# 創建特徵提取器並示範使用
extractor = MLFeatureExtractor()

# 示例數據
sample_data = {
    'ra_stats': {
        'success_rate': 95.5,
        'ra_initiated': 12,
        'ra_succeeded': 11,
        'failed_attempts': 1
    },
    'timestamps': [1.0, 2.5, 4.1, 6.2, 8.0],
    'signal_data': [52.3, 54.1, 53.8, 55.2, 54.9],
    'test_duration': 20,
    'attacker_active': False,
    'airplane_toggles': 4,
    'loop_number': 1,
    'test_type': 'cots_only'
}

# 提取特徵
features = extractor.extract_comprehensive_features(sample_data)
print(f"✅ 成功提取 {len(features)} 個特徵")
print("特徵範例:")
for i, (key, value) in enumerate(list(features.items())[:10]):
    print(f"  {key}: {value:.3f}")
print("  ...")

## 4. 合成訓練數據生成

由於真實的5G網路異常數據難以取得，我們生成合成的訓練數據來模擬正常和異常的網路行為模式。

In [None]:
def generate_synthetic_training_data(num_normal=100, num_anomaly=30):
    """
    生成合成的5G網路訓練數據
    
    參數:
        num_normal: 正常樣本數量
        num_anomaly: 異常樣本數量
        
    返回:
        (normal_data, anomaly_data): 正常和異常數據列表
    """
    
    normal_data = []
    anomaly_data = []
    
    print(f"🔄 正在生成 {num_normal} 個正常樣本...")
    
    # 生成正常行為數據
    for i in range(num_normal):
        # 正常情況：高成功率、穩定信號、規律時間間隔
        ra_stats = {
            'success_rate': np.clip(np.random.normal(95, 5), 85, 100),  # 高成功率
            'ra_initiated': np.random.randint(8, 15),                   # 中等RA嘗試次數
            'ra_succeeded': lambda x: np.random.randint(max(1, int(x*0.85)), x+1),
            'failed_attempts': np.random.randint(0, 3)                  # 少量失敗
        }
        ra_stats['ra_succeeded'] = ra_stats['ra_succeeded'](ra_stats['ra_initiated'])
        ra_stats['success_rate'] = (ra_stats['ra_succeeded'] / ra_stats['ra_initiated']) * 100
        
        # 正常時間戳：相對規律的間隔
        base_interval = np.random.uniform(1.5, 3.0)
        timestamps = []
        current_time = 0
        for _ in range(np.random.randint(8, 12)):
            current_time += base_interval + np.random.normal(0, 0.3)  # 小的隨機變動
            timestamps.append(current_time)
        
        # 正常信號：穩定且在合理範圍內
        base_signal = np.random.uniform(48, 58)  # 正常信號強度範圍
        signal_data = [base_signal + np.random.normal(0, 2) for _ in range(10)]
        
        test_data = {
            'ra_stats': ra_stats,
            'test_duration': 20,
            'attacker_active': False,  # 正常情況無攻擊
            'airplane_toggles': np.random.randint(3, 7),
            'loop_number': i,
            'test_type': 'normal',
            'timestamps': timestamps,
            'signal_data': signal_data
        }
        normal_data.append(test_data)
    
    print(f"🔄 正在生成 {num_anomaly} 個異常樣本...")
    
    # 生成異常行為數據
    for i in range(num_anomaly):
        # 異常情況：低成功率、不穩定信號、不規律時間間隔
        
        # 選擇異常類型
        anomaly_type = np.random.choice(['attack', 'interference', 'hardware_failure'])
        
        if anomaly_type == 'attack':
            # 攻擊場景：大量RA嘗試、較低成功率
            ra_stats = {
                'success_rate': np.clip(np.random.normal(60, 15), 30, 85),
                'ra_initiated': np.random.randint(15, 25),
                'ra_succeeded': lambda x: np.random.randint(max(1, int(x*0.4)), int(x*0.8)+1),
                'failed_attempts': np.random.randint(5, 15)
            }
            base_signal = np.random.uniform(55, 75)  # 較高的信號（攻擊者信號）
            signal_variation = 10
            
        elif anomaly_type == 'interference':
            # 干擾場景：信號不穩定、成功率中等
            ra_stats = {
                'success_rate': np.clip(np.random.normal(70, 12), 45, 90),
                'ra_initiated': np.random.randint(10, 20),
                'ra_succeeded': lambda x: np.random.randint(max(1, int(x*0.5)), int(x*0.9)+1),
                'failed_attempts': np.random.randint(2, 8)
            }
            base_signal = np.random.uniform(30, 50)  # 較低的信號（干擾影響）
            signal_variation = 15
            
        else:  # hardware_failure
            # 硬體故障場景：極低成功率、極不穩定
            ra_stats = {
                'success_rate': np.clip(np.random.normal(40, 20), 10, 70),
                'ra_initiated': np.random.randint(20, 30),
                'ra_succeeded': lambda x: np.random.randint(1, max(2, int(x*0.5))),
                'failed_attempts': np.random.randint(10, 20)
            }
            base_signal = np.random.uniform(20, 40)  # 很低的信號（硬體問題）
            signal_variation = 20
        
        ra_stats['ra_succeeded'] = ra_stats['ra_succeeded'](ra_stats['ra_initiated'])
        ra_stats['success_rate'] = (ra_stats['ra_succeeded'] / ra_stats['ra_initiated']) * 100
        
        # 異常時間戳：不規律的間隔
        timestamps = []
        current_time = 0
        for _ in range(np.random.randint(5, 15)):
            # 更大的隨機變動，模擬不穩定
            interval = np.random.exponential(2) + np.random.uniform(0.5, 4)
            current_time += interval
            timestamps.append(current_time)
        
        # 異常信號：不穩定且可能超出正常範圍
        signal_data = [base_signal + np.random.normal(0, signal_variation) for _ in range(10)]
        
        test_data = {
            'ra_stats': ra_stats,
            'test_duration': 20,
            'attacker_active': True,  # 異常情況假設有攻擊
            'airplane_toggles': np.random.randint(8, 15),
            'loop_number': i,
            'test_type': 'anomaly',
            'timestamps': timestamps,
            'signal_data': signal_data,
            'anomaly_type': anomaly_type
        }
        anomaly_data.append(test_data)
    
    print(f"✅ 合成數據生成完成")
    print(f"   正常樣本: {len(normal_data)}")
    print(f"   異常樣本: {len(anomaly_data)}")
    
    return normal_data, anomaly_data

# 生成示例訓練數據
normal_samples, anomaly_samples = generate_synthetic_training_data(100, 30)

# 分析生成的數據
print("\\n📊 數據分析:")

# 正常數據統計
normal_success_rates = [sample['ra_stats']['success_rate'] for sample in normal_samples]
print(f"正常數據 - 平均成功率: {np.mean(normal_success_rates):.1f}% (±{np.std(normal_success_rates):.1f})")

# 異常數據統計
anomaly_success_rates = [sample['ra_stats']['success_rate'] for sample in anomaly_samples]
print(f"異常數據 - 平均成功率: {np.mean(anomaly_success_rates):.1f}% (±{np.std(anomaly_success_rates):.1f})")

# 視覺化數據分佈
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.hist(normal_success_rates, bins=20, alpha=0.7, label='正常', color='green')
plt.hist(anomaly_success_rates, bins=20, alpha=0.7, label='異常', color='red')
plt.xlabel('RA成功率 (%)')
plt.ylabel('頻次')
plt.title('成功率分佈比較')
plt.legend()

plt.subplot(1, 2, 2)
normal_ra_counts = [sample['ra_stats']['ra_initiated'] for sample in normal_samples]
anomaly_ra_counts = [sample['ra_stats']['ra_initiated'] for sample in anomaly_samples]
plt.hist(normal_ra_counts, bins=15, alpha=0.7, label='正常', color='green')
plt.hist(anomaly_ra_counts, bins=15, alpha=0.7, label='異常', color='red')
plt.xlabel('RA嘗試次數')
plt.ylabel('頻次')
plt.title('RA嘗試次數分佈比較')
plt.legend()

plt.tight_layout()
plt.show()

## 5. 無監督學習模型訓練

無監督學習算法只需要正常數據進行訓練，適用於缺乏標記異常數據的場景。我們將實作Isolation Forest、One-Class SVM和DBSCAN。

In [None]:
# 為MLAnomalyDetector類別添加訓練方法
def train_unsupervised(self, X: np.ndarray, feature_names: List[str] = None):
    """
    無監督學習訓練（只使用正常數據）
    
    參數:
        X: 訓練特徵矩陣 (n_samples, n_features)
        feature_names: 特徵名稱列表
    """
    if self.model_type in ['random_forest']:
        print(f"⚠️ {self.model_type} 是監督學習模型，需要標籤數據")
        return False
    
    print(f"🚀 開始訓練無監督模型: {self.model_type}")
    print(f"訓練數據形狀: {X.shape}")
    
    # 數據標準化
    X_scaled = self.scaler.fit_transform(X)
    print(f"✅ 數據標準化完成")
    
    # 訓練模型
    if self.model_type == 'dbscan':
        # DBSCAN 進行聚類，異常點標記為 -1
        labels = self.model.fit_predict(X_scaled)
        n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
        n_anomalies = sum(labels == -1)
        print(f"✅ DBSCAN 聚類完成")
        print(f"   聚類數量: {n_clusters}")
        print(f"   異常點數量: {n_anomalies} ({n_anomalies/len(labels)*100:.1f}%)")
    else:
        # Isolation Forest 和 One-Class SVM
        self.model.fit(X_scaled)
        print(f"✅ {self.model_type} 訓練完成")
    
    self.is_trained = True
    self.feature_names = feature_names or []
    
    # 在訓練數據上進行預測以評估模型
    if self.model_type != 'dbscan':
        predictions = self.model.predict(X_scaled)
        n_anomalies = sum(predictions == -1)  # -1 表示異常
        print(f"訓練數據中檢測到的異常: {n_anomalies} ({n_anomalies/len(predictions)*100:.1f}%)")
    
    return True

def predict_anomaly_batch(self, test_data_list: List[Dict]) -> List[Dict]:
    """
    批量預測異常
    
    參數:
        test_data_list: 測試數據列表
        
    返回:
        預測結果列表
    """
    if not self.is_trained:
        return [{'error': 'Model not trained'} for _ in test_data_list]
    
    results = []
    extractor = MLFeatureExtractor()
    
    for test_data in test_data_list:
        try:
            # 提取特徵
            feature_vector = extractor.extract_comprehensive_features(test_data)
            
            # 確保特徵順序一致
            if self.feature_names:
                features = np.array([feature_vector.get(name, 0) for name in self.feature_names])
            else:
                sorted_keys = sorted(feature_vector.keys())
                features = np.array([feature_vector[key] for key in sorted_keys])
            
            features = features.reshape(1, -1)
            
            # 標準化
            features_scaled = self.scaler.transform(features)
            
            # 預測
            if self.model_type in ['isolation_forest', 'one_class_svm']:
                prediction = self.model.predict(features_scaled)[0]
                
                if hasattr(self.model, 'decision_function'):
                    decision_score = self.model.decision_function(features_scaled)[0]
                    confidence = abs(decision_score)
                    anomaly_score = -decision_score if self.model_type == 'isolation_forest' else decision_score
                else:
                    confidence = 0.5
                    anomaly_score = 0
                
                result = {
                    'is_anomaly': prediction == -1,
                    'confidence': confidence,
                    'anomaly_score': anomaly_score,
                    'prediction_value': prediction
                }
                
            elif self.model_type == 'dbscan':
                result = {
                    'is_anomaly': False,
                    'confidence': 0,
                    'anomaly_score': 0,
                    'error': 'DBSCAN cannot predict single samples'
                }
            
            results.append(result)
            
        except Exception as e:
            results.append({
                'is_anomaly': False,
                'confidence': 0,
                'anomaly_score': 0,
                'error': str(e)
            })
    
    return results

# 將方法添加到MLAnomalyDetector類別
MLAnomalyDetector.train_unsupervised = train_unsupervised
MLAnomalyDetector.predict_anomaly_batch = predict_anomaly_batch

# 訓練無監督模型
print("🔬 開始訓練無監督學習模型...")

# 提取正常數據的特徵
extractor = MLFeatureExtractor()
X_normal = []
feature_names = None

for data in normal_samples:
    features = extractor.extract_comprehensive_features(data)
    if feature_names is None:
        feature_names = sorted(features.keys())
    feature_vector = np.array([features[name] for name in feature_names])
    X_normal.append(feature_vector)

X_normal = np.array(X_normal)
print(f"正常數據特徵矩陣形狀: {X_normal.shape}")

# 訓練不同的無監督模型
models = {
    'isolation_forest': MLAnomalyDetector('isolation_forest'),
    'one_class_svm': MLAnomalyDetector('one_class_svm'),
    'dbscan': MLAnomalyDetector('dbscan')
}

trained_models = {}

for model_name, detector in models.items():
    print(f"\\n{'='*50}")
    print(f"訓練模型: {model_name}")
    print(f"{'='*50}")
    
    success = detector.train_unsupervised(X_normal, feature_names)
    if success:
        trained_models[model_name] = detector
        print(f"✅ {model_name} 訓練成功")
    else:
        print(f"❌ {model_name} 訓練失敗")

print(f"\\n✅ 無監督學習訓練完成，成功訓練 {len(trained_models)} 個模型")

## 6. 監督學習模型訓練

監督學習需要標記的正常和異常數據，可以提供更精確的分類結果。我們將使用Random Forest進行監督學習訓練。

In [None]:
# 為MLAnomalyDetector類別添加監督學習訓練方法
def train_supervised(self, X: np.ndarray, y: np.ndarray, feature_names: List[str] = None):
    """
    監督學習訓練（使用標記的正常和異常數據）
    
    參數:
        X: 特徵矩陣 (n_samples, n_features)
        y: 標籤向量 (n_samples,) - 0表示正常，1表示異常
        feature_names: 特徵名稱列表
    """
    if self.model_type not in ['random_forest']:
        print(f"⚠️ {self.model_type} 不是監督學習模型")
        return False
    
    print(f"🚀 開始訓練監督學習模型: {self.model_type}")
    print(f"訓練數據形狀: {X.shape}")
    print(f"類別分佈 - 正常: {sum(y==0)}, 異常: {sum(y==1)}")
    
    # 數據標準化
    X_scaled = self.scaler.fit_transform(X)
    print(f"✅ 數據標準化完成")
    
    # 分割訓練/測試集
    X_train, X_test, y_train, y_test = train_test_split(
        X_scaled, y, test_size=0.2, random_state=42, stratify=y
    )
    print(f"數據分割 - 訓練集: {X_train.shape[0]}, 測試集: {X_test.shape[0]}")
    
    # 訓練模型
    self.model.fit(X_train, y_train)
    print(f"✅ {self.model_type} 訓練完成")
    
    # 評估模型
    y_pred = self.model.predict(X_test)
    y_pred_proba = self.model.predict_proba(X_test)[:, 1]
    
    print(f"\\n📊 模型評估結果:")
    print("分類報告:")
    print(classification_report(y_test, y_pred, target_names=['正常', '異常']))
    
    # AUC分數
    if len(np.unique(y)) == 2:
        auc_score = roc_auc_score(y_test, y_pred_proba)
        print(f"AUC分數: {auc_score:.3f}")
    
    # 混淆矩陣
    cm = confusion_matrix(y_test, y_pred)
    print(f"\\n混淆矩陣:")
    print(f"     預測")
    print(f"實際  正常  異常")
    print(f"正常  {cm[0,0]:4d}  {cm[0,1]:4d}")
    print(f"異常  {cm[1,0]:4d}  {cm[1,1]:4d}")
    
    self.is_trained = True
    self.feature_names = feature_names or []
    
    # 特徵重要性分析
    if hasattr(self.model, 'feature_importances_') and feature_names:
        self._analyze_feature_importance()
    
    return True, {
        'auc_score': auc_score if len(np.unique(y)) == 2 else None,
        'confusion_matrix': cm,
        'classification_report': classification_report(y_test, y_pred, output_dict=True)
    }

def _analyze_feature_importance(self):
    """分析特徵重要性"""
    if not hasattr(self.model, 'feature_importances_'):
        return
    
    importances = self.model.feature_importances_
    feature_importance = list(zip(self.feature_names, importances))
    feature_importance.sort(key=lambda x: x[1], reverse=True)
    
    print(f"\\n🔍 前10個最重要的特徵:")
    for i, (feature, importance) in enumerate(feature_importance[:10]):
        print(f"  {i+1:2d}. {feature:<25}: {importance:.4f}")
    
    # 視覺化特徵重要性
    top_features = feature_importance[:15]  # 取前15個特徵
    features, importances = zip(*top_features)
    
    plt.figure(figsize=(10, 6))
    plt.barh(range(len(features)), importances)
    plt.yticks(range(len(features)), features)
    plt.xlabel('特徵重要性')
    plt.title('Random Forest 特徵重要性排名')
    plt.gca().invert_yaxis()
    plt.tight_layout()
    plt.show()

# 將方法添加到MLAnomalyDetector類別
MLAnomalyDetector.train_supervised = train_supervised
MLAnomalyDetector._analyze_feature_importance = _analyze_feature_importance

# 準備監督學習數據
print("📚 準備監督學習數據...")

# 提取所有數據的特徵（正常 + 異常）
all_data = normal_samples + anomaly_samples
X_all = []
y_all = []

for i, data in enumerate(all_data):
    features = extractor.extract_comprehensive_features(data)
    feature_vector = np.array([features[name] for name in feature_names])
    X_all.append(feature_vector)
    
    # 標籤：正常=0，異常=1
    if i < len(normal_samples):
        y_all.append(0)  # 正常
    else:
        y_all.append(1)  # 異常

X_all = np.array(X_all)
y_all = np.array(y_all)

print(f"完整數據集形狀: {X_all.shape}")
print(f"標籤分佈 - 正常: {sum(y_all==0)}, 異常: {sum(y_all==1)}")

# 訓練監督學習模型
print(f"\\n{'='*50}")
print(f"訓練監督學習模型: Random Forest")
print(f"{'='*50}")

rf_detector = MLAnomalyDetector('random_forest')
success, metrics = rf_detector.train_supervised(X_all, y_all, feature_names)

if success:
    trained_models['random_forest'] = rf_detector
    print(f"✅ Random Forest 訓練成功")
else:
    print(f"❌ Random Forest 訓練失敗")

print(f"\\n✅ 監督學習訓練完成")

## 7. 模型驗證與評估

評估訓練好的模型性能，使用測試數據進行預測並分析各模型的檢測能力。

In [None]:
# 生成測試數據
print("🧪 生成測試數據...")
test_normal, test_anomaly = generate_synthetic_training_data(20, 10)
test_data = test_normal + test_anomaly
test_labels = [0] * len(test_normal) + [1] * len(test_anomaly)

print(f"測試數據 - 正常: {len(test_normal)}, 異常: {len(test_anomaly)}")

# 評估所有訓練好的模型
def evaluate_model(detector, test_data, test_labels, model_name):
    """評估單個模型的性能"""
    print(f"\\n🔍 評估模型: {model_name}")
    print("-" * 30)
    
    if not detector.is_trained:
        print("❌ 模型未訓練")
        return None
    
    # 批量預測
    predictions = detector.predict_anomaly_batch(test_data)
    
    # 提取預測結果
    y_pred = []
    confidences = []
    
    for pred in predictions:
        if 'error' in pred:
            y_pred.append(0)  # 默認為正常
            confidences.append(0)
        else:
            y_pred.append(1 if pred['is_anomaly'] else 0)
            confidences.append(pred.get('confidence', 0))
    
    y_pred = np.array(y_pred)
    y_true = np.array(test_labels)
    
    # 計算評估指標
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
    
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    
    # 混淆矩陣
    cm = confusion_matrix(y_true, y_pred)
    tn, fp, fn, tp = cm.ravel() if cm.size == 4 else (0, 0, 0, 0)
    
    # 計算其他指標
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
    false_positive_rate = fp / (fp + tn) if (fp + tn) > 0 else 0
    false_negative_rate = fn / (fn + tp) if (fn + tp) > 0 else 0
    
    print(f"準確率 (Accuracy):     {accuracy:.3f}")
    print(f"精確率 (Precision):    {precision:.3f}")
    print(f"召回率 (Recall):       {recall:.3f}")
    print(f"F1分數:                {f1:.3f}")
    print(f"特異性 (Specificity):  {specificity:.3f}")
    print(f"假陽性率 (FPR):        {false_positive_rate:.3f}")
    print(f"假陰性率 (FNR):        {false_negative_rate:.3f}")
    
    print(f"\\n混淆矩陣:")
    print(f"     預測")
    print(f"實際  正常  異常")
    print(f"正常  {tn:4d}  {fp:4d}")
    print(f"異常  {fn:4d}  {tp:4d}")
    
    # 返回評估結果
    return {
        'model_name': model_name,
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'specificity': specificity,
        'fpr': false_positive_rate,
        'fnr': false_negative_rate,
        'confusion_matrix': cm,
        'predictions': y_pred,
        'confidences': confidences
    }

# 評估所有模型
evaluation_results = {}

for model_name, detector in trained_models.items():
    result = evaluate_model(detector, test_data, test_labels, model_name)
    if result:
        evaluation_results[model_name] = result

# 比較模型性能
print(f"\\n{'='*60}")
print(f"                    模型性能比較")
print(f"{'='*60}")

if evaluation_results:
    # 創建比較表格
    metrics = ['accuracy', 'precision', 'recall', 'f1_score', 'specificity']
    
    print(f"{'模型':<20}", end='')
    for metric in metrics:
        print(f"{metric.upper():<12}", end='')
    print()
    print("-" * 80)
    
    for model_name, result in evaluation_results.items():
        print(f"{model_name:<20}", end='')
        for metric in metrics:
            print(f"{result[metric]:<12.3f}", end='')
        print()
    
    # 視覺化比較
    plt.figure(figsize=(15, 10))
    
    # 性能指標比較
    plt.subplot(2, 3, 1)
    models = list(evaluation_results.keys())
    accuracies = [evaluation_results[m]['accuracy'] for m in models]
    plt.bar(models, accuracies, color='skyblue')
    plt.title('準確率比較')
    plt.ylabel('準確率')
    plt.xticks(rotation=45)
    
    plt.subplot(2, 3, 2)
    precisions = [evaluation_results[m]['precision'] for m in models]
    plt.bar(models, precisions, color='lightgreen')
    plt.title('精確率比較')
    plt.ylabel('精確率')
    plt.xticks(rotation=45)
    
    plt.subplot(2, 3, 3)
    recalls = [evaluation_results[m]['recall'] for m in models]
    plt.bar(models, recalls, color='salmon')
    plt.title('召回率比較')
    plt.ylabel('召回率')
    plt.xticks(rotation=45)
    
    plt.subplot(2, 3, 4)
    f1_scores = [evaluation_results[m]['f1_score'] for m in models]
    plt.bar(models, f1_scores, color='gold')
    plt.title('F1分數比較')
    plt.ylabel('F1分數')
    plt.xticks(rotation=45)
    
    # ROC曲線（僅限監督學習模型）
    plt.subplot(2, 3, 5)
    for model_name, result in evaluation_results.items():
        if model_name == 'random_forest':
            # 為監督學習模型繪製ROC曲線
            detector = trained_models[model_name]
            # 重新計算概率預測
            X_test = []
            for data in test_data:
                features = extractor.extract_comprehensive_features(data)
                feature_vector = np.array([features[name] for name in feature_names])
                X_test.append(feature_vector)
            
            X_test = np.array(X_test)
            X_test_scaled = detector.scaler.transform(X_test)
            y_prob = detector.model.predict_proba(X_test_scaled)[:, 1]
            
            fpr, tpr, _ = roc_curve(test_labels, y_prob)
            auc = roc_auc_score(test_labels, y_prob)
            plt.plot(fpr, tpr, label=f'{model_name} (AUC = {auc:.3f})')
    
    plt.plot([0, 1], [0, 1], 'k--', label='隨機分類器')
    plt.xlabel('假陽性率')
    plt.ylabel('真陽性率')
    plt.title('ROC曲線')
    plt.legend()
    
    # 信心度分佈
    plt.subplot(2, 3, 6)
    for model_name, result in evaluation_results.items():
        if model_name != 'dbscan':  # DBSCAN沒有信心度
            confidences = result['confidences']
            plt.hist(confidences, bins=20, alpha=0.5, label=model_name)
    
    plt.xlabel('預測信心度')
    plt.ylabel('頻次')
    plt.title('預測信心度分佈')
    plt.legend()
    
    plt.tight_layout()
    plt.show()
    
    # 推薦最佳模型
    print(f"\\n🏆 模型推薦:")
    best_f1_model = max(evaluation_results.items(), key=lambda x: x[1]['f1_score'])
    best_accuracy_model = max(evaluation_results.items(), key=lambda x: x[1]['accuracy'])
    
    print(f"最佳F1分數: {best_f1_model[0]} (F1 = {best_f1_model[1]['f1_score']:.3f})")
    print(f"最佳準確率: {best_accuracy_model[0]} (Accuracy = {best_accuracy_model[1]['accuracy']:.3f})")
    
else:
    print("❌ 沒有可評估的模型")

## 8. 實時異常偵測實作

建立RealTimeAnomalyDetector類別，實現連續監控、閾值設定、警報機制等實時偵測功能。

In [None]:
class RealTimeAnomalyDetector:
    """
    實時異常偵測系統 - 整合ML模型到現有的測試框架
    """
    
    def __init__(self, detector: MLAnomalyDetector = None, alert_callback: Callable = None):
        """
        初始化實時異常偵測器
        
        參數:
            detector: 訓練好的MLAnomalyDetector
            alert_callback: 異常警報回調函數
        """
        self.detector = detector
        self.feature_extractor = MLFeatureExtractor()
        self.alert_callback = alert_callback
        self.is_monitoring = False
        self.data_queue = queue.Queue()
        self.monitoring_thread = None
        
        # 異常偵測參數
        self.alert_threshold = 0.7
        self.consecutive_anomalies_threshold = 3
        self.time_window_minutes = 5
        
        # 統計資料
        self.anomaly_history = []
        self.total_predictions = 0
        self.total_anomalies = 0
        self.consecutive_anomalies = 0
        
        print(f"✅ RealTimeAnomalyDetector 初始化完成")
    
    def set_detector(self, detector: MLAnomalyDetector):
        """設置ML偵測器"""
        self.detector = detector
        print(f"✅ ML偵測器已設置: {detector.model_type}")
    
    def start_monitoring(self):
        """開始實時監控"""
        if self.is_monitoring:
            print("⚠️ 監控已經在運行中")
            return False
        
        if not self.detector or not self.detector.is_trained:
            print("❌ 沒有可用的訓練模型")
            return False
        
        self.is_monitoring = True
        self.monitoring_thread = threading.Thread(target=self._monitoring_loop)
        self.monitoring_thread.daemon = True
        self.monitoring_thread.start()
        
        print(f"🚀 實時異常監控已啟動 (模型: {self.detector.model_type})")
        return True
    
    def stop_monitoring(self):
        """停止實時監控"""
        self.is_monitoring = False
        if self.monitoring_thread:
            self.monitoring_thread.join(timeout=5)
        
        print(f"⏹️ 實時異常監控已停止")
    
    def feed_data(self, test_data: Dict):
        """餵入新的測試數據"""
        if not self.is_monitoring:
            return
        
        # 添加時間戳
        test_data['timestamp'] = datetime.now().isoformat()
        
        try:
            self.data_queue.put(test_data, timeout=1)
        except queue.Full:
            print("⚠️ 數據隊列已滿，清理舊數據")
            # 清空一半的舊數據
            for _ in range(self.data_queue.qsize() // 2):
                try:
                    self.data_queue.get_nowait()
                except queue.Empty:
                    break
            self.data_queue.put(test_data)
    
    def _monitoring_loop(self):
        """監控主循環"""
        print(f"🔄 監控主循環已啟動")
        
        while self.is_monitoring:
            try:
                # 從隊列獲取數據
                test_data = self.data_queue.get(timeout=1)
                
                # 進行異常偵測
                predictions = self.detector.predict_anomaly_batch([test_data])
                
                if predictions and not predictions[0].get('error'):
                    result = predictions[0]
                    
                    # 更新統計
                    self.total_predictions += 1
                    
                    if result.get('is_anomaly', False):
                        self.total_anomalies += 1
                        self.consecutive_anomalies += 1
                        
                        # 記錄異常
                        anomaly_record = {\n                            'timestamp': test_data.get('timestamp'),\n                            'confidence': result.get('confidence', 0),\n                            'anomaly_score': result.get('anomaly_score', 0),\n                            'test_data': test_data,\n                            'result': result,\n                            'consecutive_count': self.consecutive_anomalies\n                        }\n                        self.anomaly_history.append(anomaly_record)\n                        \n                        # 檢查是否需要發出警報\n                        if (result.get('confidence', 0) >= self.alert_threshold or \n                            self.consecutive_anomalies >= self.consecutive_anomalies_threshold):\n                            self._trigger_alert(anomaly_record)\n                        \n                        print(f\"🚨 異常偵測 - 信心度: {result.get('confidence', 0):.3f}, 連續次數: {self.consecutive_anomalies}\")\n                        \n                    else:\n                        self.consecutive_anomalies = 0  # 重置連續異常計數\n                \n                # 清理舊的異常記錄\n                self._cleanup_old_records()\n                \n            except queue.Empty:\n                continue\n            except Exception as e:\n                print(f\"❌ 監控循環錯誤: {e}\")\n    \n    def _trigger_alert(self, anomaly_record: Dict):\n        \"\"\"觸發異常警報\"\"\"\n        alert_message = f\"⚠️ 高信心度異常偵測警報!\"\n        alert_details = {\n            'timestamp': anomaly_record['timestamp'],\n            'confidence': anomaly_record['confidence'],\n            'anomaly_score': anomaly_record['anomaly_score'],\n            'consecutive_count': anomaly_record['consecutive_count'],\n            'model_type': self.detector.model_type\n        }\n        \n        print(f\"🚨 {alert_message}\")\n        print(f\"🚨 詳細資訊: {alert_details}\")\n        \n        # 呼叫自定義警報回調\n        if self.alert_callback:\n            try:\n                self.alert_callback(alert_message, alert_details, anomaly_record)\n            except Exception as e:\n                print(f\"❌ 警報回調失敗: {e}\")\n    \n    def _cleanup_old_records(self):\n        \"\"\"清理超過時間窗口的舊記錄\"\"\"\n        cutoff_time = datetime.now() - timedelta(minutes=self.time_window_minutes)\n        \n        self.anomaly_history = [\n            record for record in self.anomaly_history\n            if datetime.fromisoformat(record['timestamp']) > cutoff_time\n        ]\n    \n    def get_statistics(self) -> Dict:\n        \"\"\"獲取實時統計資料\"\"\"\n        recent_anomalies = len([\n            r for r in self.anomaly_history\n            if datetime.fromisoformat(r['timestamp']) > datetime.now() - timedelta(minutes=self.time_window_minutes)\n        ])\n        \n        anomaly_rate = (self.total_anomalies / max(self.total_predictions, 1)) * 100\n        \n        return {\n            'total_predictions': self.total_predictions,\n            'total_anomalies': self.total_anomalies,\n            'anomaly_rate': anomaly_rate,\n            'recent_anomalies': recent_anomalies,\n            'consecutive_anomalies': self.consecutive_anomalies,\n            'is_monitoring': self.is_monitoring,\n            'queue_size': self.data_queue.qsize(),\n            'model_type': self.detector.model_type if self.detector else None\n        }\n    \n    def set_alert_threshold(self, threshold: float):\n        \"\"\"設定警報閾值\"\"\"\n        self.alert_threshold = threshold\n        print(f\"🔧 警報閾值已設定為 {threshold}\")\n    \n    def set_consecutive_threshold(self, count: int):\n        \"\"\"設定連續異常警報閾值\"\"\"\n        self.consecutive_anomalies_threshold = count\n        print(f\"🔧 連續異常閾值已設定為 {count}\")\n\n# 測試實時異常偵測系統\nprint(\"\\n🧪 測試實時異常偵測系統...\")\n\n# 定義警報回調函數\ndef alert_callback(message: str, details: Dict, record: Dict):\n    print(f\"📢 警報回調: {message}\")\n    print(f\"📢 模型類型: {details['model_type']}\")\n    print(f\"📢 異常分數: {details['anomaly_score']:.3f}\")\n\n# 選擇最佳模型進行實時偵測\nif evaluation_results:\n    best_model_name = max(evaluation_results.items(), key=lambda x: x[1]['f1_score'])[0]\n    best_detector = trained_models[best_model_name]\n    \n    print(f\"選擇最佳模型進行實時偵測: {best_model_name}\")\n    \n    # 創建實時偵測器\n    rt_detector = RealTimeAnomalyDetector(best_detector, alert_callback)\n    rt_detector.set_alert_threshold(0.6)  # 較低的閾值以增加敏感度\n    rt_detector.set_consecutive_threshold(2)\n    \n    # 啟動監控\n    rt_detector.start_monitoring()\n    \n    # 模擬餵入測試數據\n    print(\"\\n🔄 模擬實時數據流...\")\n    \n    # 餵入一些正常數據\n    for i in range(5):\n        normal_data = test_normal[i % len(test_normal)]\n        rt_detector.feed_data(normal_data)\n        time.sleep(0.1)\n    \n    # 餵入一些異常數據\n    for i in range(3):\n        anomaly_data = test_anomaly[i % len(test_anomaly)]\n        rt_detector.feed_data(anomaly_data)\n        time.sleep(0.1)\n    \n    # 等待處理完成\n    time.sleep(2)\n    \n    # 獲取統計資料\n    stats = rt_detector.get_statistics()\n    print(f\"\\n📊 實時偵測統計:\")\n    for key, value in stats.items():\n        print(f\"  {key}: {value}\")\n    \n    # 停止監控\n    rt_detector.stop_monitoring()\n    \nelse:\n    print(\"❌ 沒有可用的訓練模型進行實時偵測測試\")"
    

## 9. 模型保存與載入

實作模型序列化功能，允許保存訓練好的模型並在後續測試中重複使用。

In [None]:
# 為MLAnomalyDetector類別添加保存和載入方法\ndef save_model(self, filepath: str):\n    \"\"\"儲存訓練好的模型\"\"\"\n    if not self.is_trained:\n        print(\"⚠️ 沒有已訓練的模型可以保存\")\n        return False\n    \n    model_data = {\n        'model': self.model,\n        'scaler': self.scaler,\n        'model_type': self.model_type,\n        'feature_names': self.feature_names,\n        'threshold': self.threshold,\n        'timestamp': datetime.now().isoformat(),\n        'is_trained': self.is_trained\n    }\n    \n    try:\n        joblib.dump(model_data, filepath)\n        print(f\"✅ 模型已保存到 {filepath}\")\n        return True\n    except Exception as e:\n        print(f\"❌ 模型保存失敗: {e}\")\n        return False\n\ndef load_model(self, filepath: str):\n    \"\"\"載入訓練好的模型\"\"\"\n    try:\n        model_data = joblib.load(filepath)\n        \n        self.model = model_data['model']\n        self.scaler = model_data['scaler']\n        self.model_type = model_data['model_type']\n        self.feature_names = model_data.get('feature_names', [])\n        self.threshold = model_data.get('threshold', 0.5)\n        self.is_trained = model_data.get('is_trained', True)\n        \n        print(f\"✅ 模型已從 {filepath} 載入\")\n        print(f\"   模型類型: {self.model_type}\")\n        print(f\"   特徵數量: {len(self.feature_names)}\")\n        print(f\"   訓練時間: {model_data.get('timestamp', 'Unknown')}\")\n        return True\n        \n    except Exception as e:\n        print(f\"❌ 模型載入失敗: {e}\")\n        return False\n\n# 將方法添加到MLAnomalyDetector類別\nMLAnomalyDetector.save_model = save_model\nMLAnomalyDetector.load_model = load_model\n\n# 保存所有訓練好的模型\nprint(\"💾 保存訓練好的模型...\")\n\nmodel_save_dir = \"/home/ksmo/ml_models\"\nimport os\nos.makedirs(model_save_dir, exist_ok=True)\n\nsaved_models = {}\n\nfor model_name, detector in trained_models.items():\n    filepath = os.path.join(model_save_dir, f\"5g_anomaly_{model_name}.joblib\")\n    success = detector.save_model(filepath)\n    if success:\n        saved_models[model_name] = filepath\n\nprint(f\"\\n✅ 成功保存 {len(saved_models)} 個模型:\")\nfor model_name, filepath in saved_models.items():\n    print(f\"  {model_name}: {filepath}\")\n\n# 測試模型載入\nprint(f\"\\n🔄 測試模型載入功能...\")\n\nif saved_models:\n    # 選擇一個模型進行載入測試\n    test_model_name = list(saved_models.keys())[0]\n    test_filepath = saved_models[test_model_name]\n    \n    # 創建新的偵測器並載入模型\n    new_detector = MLAnomalyDetector(test_model_name)\n    success = new_detector.load_model(test_filepath)\n    \n    if success:\n        print(f\"✅ 模型載入測試成功\")\n        \n        # 驗證載入的模型功能\n        test_sample = test_data[0]\n        predictions = new_detector.predict_anomaly_batch([test_sample])\n        \n        if predictions and not predictions[0].get('error'):\n            result = predictions[0]\n            print(f\"   預測測試: 異常={result['is_anomaly']}, 信心度={result.get('confidence', 0):.3f}\")\n        else:\n            print(f\"   ⚠️ 預測測試失敗\")\n    else:\n        print(f\"❌ 模型載入測試失敗\")\n\n# 創建模型資訊摘要\nmodel_summary = {\n    'training_date': datetime.now().isoformat(),\n    'models_trained': list(trained_models.keys()),\n    'evaluation_results': evaluation_results,\n    'best_model': {\n        'name': max(evaluation_results.items(), key=lambda x: x[1]['f1_score'])[0] if evaluation_results else None,\n        'f1_score': max(evaluation_results.items(), key=lambda x: x[1]['f1_score'])[1]['f1_score'] if evaluation_results else None\n    },\n    'training_data_summary': {\n        'normal_samples': len(normal_samples),\n        'anomaly_samples': len(anomaly_samples),\n        'features_count': len(feature_names),\n        'feature_names': feature_names\n    }\n}\n\n# 保存訓練摘要\nsummary_filepath = os.path.join(model_save_dir, \"training_summary.json\")\nwith open(summary_filepath, 'w', encoding='utf-8') as f:\n    json.dump(model_summary, f, indent=2, ensure_ascii=False)\n\nprint(f\"\\n📋 訓練摘要已保存到: {summary_filepath}\")\n\n# 顯示保存的檔案\nprint(f\"\\n📁 保存的檔案列表:\")\nfor filename in os.listdir(model_save_dir):\n    filepath = os.path.join(model_save_dir, filename)\n    file_size = os.path.getsize(filepath) / 1024  # KB\n    print(f\"  {filename:<35} ({file_size:.1f} KB)\")"
    

## 10. 整合到測試系統中

將ML異常偵測功能整合到TestSelector系統中，包含配置介面、統計報告和警報處理機制。

In [None]:
# 演示如何在TestSelector中使用ML異常偵測\n\nclass TestSelectorMLIntegration:\n    \"\"\"\n    演示TestSelector與ML異常偵測的整合\n    \"\"\"\n    \n    def __init__(self):\n        self.ml_enabled = False\n        self.ml_detector = None\n        self.real_time_detector = None\n        \n    def enable_ml_detection(self, model_path: str = None, model_type: str = 'isolation_forest'):\n        \"\"\"啟用ML異常偵測\"\"\"\n        try:\n            self.ml_detector = MLAnomalyDetector(model_type=model_type)\n            \n            if model_path and os.path.exists(model_path):\n                success = self.ml_detector.load_model(model_path)\n                if success:\n                    print(f\"✅ 已載入預訓練模型: {model_path}\")\n                else:\n                    print(f\"❌ 模型載入失敗，將使用未訓練的模型\")\n            else:\n                print(f\"⚠️ 未指定模型路徑或檔案不存在，模型需要訓練\")\n            \n            # 初始化實時偵測器\n            self.real_time_detector = RealTimeAnomalyDetector(\n                detector=self.ml_detector,\n                alert_callback=self._handle_ml_alert\n            )\n            \n            self.ml_enabled = True\n            print(f\"✅ ML異常偵測已啟用 (模型: {model_type})\")\n            \n        except Exception as e:\n            print(f\"❌ ML偵測啟用失敗: {e}\")\n            self.ml_enabled = False\n    \n    def _handle_ml_alert(self, message: str, details: Dict, record: Dict):\n        \"\"\"處理ML異常警報\"\"\"\n        print(f\"🚨 ML異常警報: {message}\")\n        print(f\"🚨 信心度: {details['confidence']:.3f}\")\n        print(f\"🚨 異常分數: {details['anomaly_score']:.3f}\")\n        print(f\"🚨 連續異常次數: {details['consecutive_count']}\")\n        \n        # 這裡可以添加自動回應邏輯:\n        # 1. 記錄到專用日誌檔案\n        # 2. 發送緊急通知給管理員\n        # 3. 觸發額外的診斷程序\n        # 4. 在嚴重情況下自動停止測試\n    \n    def enhanced_analyze_ra_procedure(self, ra_stats: Dict, loop_number: int, test_type: str = \"standard\") -> Dict:\n        \"\"\"增強的RA程序分析（整合ML偵測）\"\"\"\n        \n        # 原始RA分析\n        enhanced_stats = ra_stats.copy()\n        \n        # 如果ML偵測已啟用且模型已訓練\n        if self.ml_enabled and self.ml_detector and self.ml_detector.is_trained:\n            print(f\"🔍 執行ML分析 - 循環 {loop_number} ({test_type})\")\n            \n            # 根據測試類型調整預期行為\n            if test_type == \"cots_only\":\n                base_signal = 45\n                signal_variation = 3\n                expected_toggles = 3\n            else:\n                base_signal = 55\n                signal_variation = 8\n                expected_toggles = 5\n            \n            # 準備ML輸入數據\n            current_time = time.time()\n            test_data = {\n                'ra_stats': ra_stats,\n                'test_duration': 20,\n                'attacker_active': test_type != \"cots_only\",\n                'airplane_toggles': expected_toggles,\n                'loop_number': loop_number,\n                'test_type': test_type,\n                'timestamps': [current_time - (20 - i*2) for i in range(min(ra_stats.get('ra_initiated', 5), 10))],\n                'signal_data': [base_signal + (i * signal_variation / 10) + \n                              np.random.normal(0, 2 if test_type == \"cots_only\" else 5) \n                              for i in range(10)]\n            }\n            \n            # 執行ML異常偵測\n            try:\n                predictions = self.ml_detector.predict_anomaly_batch([test_data])\n                \n                if predictions and not predictions[0].get('error'):\n                    ml_result = predictions[0]\n                    \n                    # 針對COTS only模式調整判斷邏輯\n                    if test_type == \"cots_only\":\n                        adjusted_threshold = 0.8\n                        is_anomaly_adjusted = (ml_result.get('is_anomaly', False) and \n                                              ml_result.get('confidence', 0) > adjusted_threshold)\n                        \n                        if ml_result.get('is_anomaly', False) and not is_anomaly_adjusted:\n                            ml_result['is_anomaly'] = False\n                            ml_result['confidence'] = ml_result.get('confidence', 0) * 0.5\n                    \n                    # 將ML結果整合到RA統計中\n                    enhanced_stats.update({\n                        'ml_anomaly_detected': ml_result.get('is_anomaly', False),\n                        'ml_confidence': ml_result.get('confidence', 0),\n                        'ml_anomaly_score': ml_result.get('anomaly_score', 0),\n                        'ml_model_type': self.ml_detector.model_type,\n                        'ml_test_type': test_type\n                    })\n                    \n                    # 餵入實時偵測器\n                    if self.real_time_detector and self.real_time_detector.is_monitoring:\n                        self.real_time_detector.feed_data(test_data)\n                    \n                    # 顯示ML分析結果\n                    if ml_result.get('is_anomaly', False):\n                        print(f\"🔍 ML分析: 循環 {loop_number} 檢測到異常 ({test_type})!\")\n                        print(f\"🔍 信心度: {ml_result.get('confidence', 0):.3f}\")\n                        if test_type == \"cots_only\":\n                            print(f\"⚠️ 注意: COTS-only模式下檢測到異常（不尋常）\")\n                    else:\n                        print(f\"✅ ML分析: 循環 {loop_number} 行為正常 ({test_type})\")\n                        \n                else:\n                    print(f\"❌ ML分析失敗: {predictions[0].get('error', 'Unknown error')}\")\n                    \n            except Exception as e:\n                print(f\"❌ ML分析異常: {e}\")\n                enhanced_stats.update({\n                    'ml_anomaly_detected': False,\n                    'ml_confidence': 0,\n                    'ml_error': str(e)\n                })\n        else:\n            if self.ml_enabled and self.ml_detector and not self.ml_detector.is_trained:\n                print(f\"ℹ️ ML已啟用但模型未訓練，跳過ML分析\")\n        \n        return enhanced_stats\n    \n    def get_ml_statistics(self) -> Dict:\n        \"\"\"獲取ML統計資料\"\"\"\n        if self.real_time_detector:\n            return self.real_time_detector.get_statistics()\n        return {}\n    \n    def configure_ml_parameters(self):\n        \"\"\"配置ML參數\"\"\"\n        if not self.ml_enabled:\n            print(\"❌ ML偵測未啟用\")\n            return\n        \n        print(\"\\n🔧 ML偵測參數配置\")\n        print(\"-\" * 30)\n        \n        if self.real_time_detector:\n            current_stats = self.real_time_detector.get_statistics()\n            print(f\"當前狀態:\")\n            print(f\"  監控中: {current_stats['is_monitoring']}\")\n            print(f\"  總預測次數: {current_stats['total_predictions']}\")\n            print(f\"  總異常次數: {current_stats['total_anomalies']}\")\n            print(f\"  異常率: {current_stats['anomaly_rate']:.2f}%\")\n            \n            # 這裡可以添加互動式參數調整\n            print(f\"\\n可調整參數:\")\n            print(f\"  警報閾值: {self.real_time_detector.alert_threshold}\")\n            print(f\"  連續異常閾值: {self.real_time_detector.consecutive_anomalies_threshold}\")\n            print(f\"  時間窗口: {self.real_time_detector.time_window_minutes} 分鐘\")\n\n# 演示TestSelector與ML的整合\nprint(\"\\n🔗 演示TestSelector與ML異常偵測的整合\")\nprint(\"=\" * 50)\n\n# 創建整合演示\ntest_selector_ml = TestSelectorMLIntegration()\n\n# 載入最佳模型\nif saved_models and evaluation_results:\n    best_model_name = max(evaluation_results.items(), key=lambda x: x[1]['f1_score'])[0]\n    best_model_path = saved_models[best_model_name]\n    \n    print(f\"載入最佳模型: {best_model_name}\")\n    test_selector_ml.enable_ml_detection(model_path=best_model_path, model_type=best_model_name)\n    \n    # 啟動實時監控\n    if test_selector_ml.real_time_detector:\n        test_selector_ml.real_time_detector.start_monitoring()\n        print(\"🚀 實時監控已啟動\")\n    \n    # 模擬RA程序分析\n    print(\"\\n🧪 模擬RA程序分析...\")\n    \n    for loop in range(1, 4):\n        # 模擬RA統計數據\n        if loop <= 2:\n            # 前兩個循環：正常行為\n            mock_ra_stats = {\n                'success_rate': np.random.normal(95, 3),\n                'ra_initiated': np.random.randint(8, 12),\n                'ra_succeeded': 10,\n                'failed_attempts': 1\n            }\n            test_type = 'cots_only'\n        else:\n            # 第三個循環：異常行為\n            mock_ra_stats = {\n                'success_rate': np.random.normal(65, 10),\n                'ra_initiated': np.random.randint(18, 25),\n                'ra_succeeded': 12,\n                'failed_attempts': 8\n            }\n            test_type = 'standard'\n        \n        mock_ra_stats['success_rate'] = (mock_ra_stats['ra_succeeded'] / mock_ra_stats['ra_initiated']) * 100\n        \n        print(f\"\\n--- 循環 {loop} ({test_type}) ---\")\n        print(f\"RA統計: 啟動={mock_ra_stats['ra_initiated']}, 成功={mock_ra_stats['ra_succeeded']}, 成功率={mock_ra_stats['success_rate']:.1f}%\")\n        \n        # 執行增強分析\n        enhanced_stats = test_selector_ml.enhanced_analyze_ra_procedure(\n            mock_ra_stats, loop, test_type\n        )\n        \n        # 顯示ML分析結果\n        if 'ml_anomaly_detected' in enhanced_stats:\n            ml_status = \"異常\" if enhanced_stats['ml_anomaly_detected'] else \"正常\"\n            print(f\"ML分析結果: {ml_status} (信心度: {enhanced_stats.get('ml_confidence', 0):.3f})\")\n        \n        time.sleep(0.5)  # 模擬處理時間\n    \n    # 獲取最終統計\n    time.sleep(1)  # 等待處理完成\n    final_stats = test_selector_ml.get_ml_statistics()\n    \n    print(\"\\n📊 最終ML統計:\")\n    for key, value in final_stats.items():\n        print(f\"  {key}: {value}\")\n    \n    # 停止監控\n    if test_selector_ml.real_time_detector:\n        test_selector_ml.real_time_detector.stop_monitoring()\n        print(\"\\n⏹️ 實時監控已停止\")\n    \n    # 顯示配置資訊\n    test_selector_ml.configure_ml_parameters()\n    \nelse:\n    print(\"❌ 沒有可用的訓練模型進行整合演示\")\n\nprint(\"\\n✅ TestSelector ML整合演示完成\")"
    

## 總結與使用建議

### 🎯 訓練流程總結

本筆記詳細介紹了5G網路ML異常偵測模型的完整訓練流程：

1. **特徵工程**: 從RA統計、信號強度、時間戳等原始數據提取25+個特徵
2. **數據合成**: 生成正常和異常的合成訓練數據，模擬真實網路行為
3. **模型訓練**: 支援無監督（Isolation Forest、One-Class SVM、DBSCAN）和監督學習（Random Forest）
4. **模型評估**: 使用準確率、精確率、召回率、F1分數等指標評估性能
5. **實時偵測**: 建立多線程實時監控系統，支援閾值調整和警報機制
6. **系統整合**: 將ML功能無縫整合到現有的TestSelector測試框架

### 🏆 模型選擇建議

- **Isolation Forest**: 推薦用於初期測試和部署，無需標記數據，性能穩定
- **Random Forest**: 如有足夠的標記異常數據，可提供最高的檢測精度
- **One-Class SVM**: 適用於對異常敏感度要求較高的場景
- **DBSCAN**: 適用於發現未知的異常模式，但不支援單樣本預測

### ⚙️ 部署建議

1. **開發階段**: 使用合成數據訓練Isolation Forest模型進行概念驗證
2. **測試階段**: 收集真實的正常運行數據，重新訓練無監督模型
3. **生產階段**: 積累異常案例後，訓練監督學習模型以提高精度
4. **維護階段**: 定期重新訓練模型，適應網路環境變化

### 🔧 參數調優指南

- **警報閾值**: COTS-only測試建議0.8+，有攻擊者測試建議0.6+
- **連續異常閾值**: 建議設為2-3次，平衡敏感度和誤報率
- **時間窗口**: 建議5-10分鐘，適應測試循環週期

### 🚀 未來改進方向

1. **增量學習**: 實作線上學習機制，模型可持續適應新的網路環境
2. **多模態融合**: 整合更多數據源（功率、頻譜、流量等）提高檢測能力
3. **深度學習**: 探索LSTM、GRU等時序模型捕捉更複雜的異常模式
4. **聯邦學習**: 在多個基站間共享學習成果，同時保護隱私
5. **可解釋性**: 增加模型決策的可解釋性，幫助工程師理解異常原因

### 📋 檢查清單

部署前請確認：
- [ ] 模型已在足夠的訓練數據上訓練
- [ ] 評估指標滿足業務需求（建議F1 > 0.8）
- [ ] 警報閾值已根據實際環境調整
- [ ] 實時監控系統運行穩定
- [ ] 異常處理回調函數已正確配置
- [ ] 模型檔案已備份並可快速載入

透過本筆記的指導，您應該能夠成功建立和部署一套完整的5G網路ML異常偵測系統。