# Lab-2.2.2: A/B 測試與智能版本控制

## 🎯 學習目標

- 實現企業級模型版本控制策略
- 掌握 A/B 測試流量分配機制
- 建立統計顯著性測試框架
- 實現漸進式部署 (Canary/Blue-Green)

## 🏢 企業案例: PayPal 風控模型 A/B 測試

PayPal 在生產環境中同時運行多個風控模型版本：
- **保守模型 v2.1**: 高精確度，低召回率 (70% 流量)
- **激進模型 v2.2**: 高召回率，可能誤殺 (20% 流量)
- **實驗模型 v3.0**: 新算法驗證 (10% 流量)

通過持續 A/B 測試，優化模型性能並確保風險控制。

In [None]:
import os
import sys
import json
import uuid
import time
import random
import hashlib
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, asdict
from pathlib import Path
from scipy import stats
import logging

# 設定日誌
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("🚀 A/B 測試與智能版本控制 - 環境檢查")
print(f"Python 版本: {sys.version}")
print(f"工作目錄: {os.getcwd()}")

# 檢查必要的依賴
required_packages = ['numpy', 'pandas', 'scipy']
for package in required_packages:
    try:
        __import__(package)
        print(f"✅ {package}: 已安裝")
    except ImportError:
        print(f"❌ {package}: 未安裝")

print("\n✅ 環境檢查完成")

## 📊 企業級模型版本控制系統

In [None]:
from enum import Enum
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field

class ModelStatus(Enum):
    """模型狀態枚舉"""
    DEVELOPMENT = "development"
    TESTING = "testing"
    STAGING = "staging"
    CANARY = "canary"
    PRODUCTION = "production"
    DEPRECATED = "deprecated"
    RETIRED = "retired"

class TrafficSplitStrategy(Enum):
    """流量分配策略"""
    PERCENTAGE = "percentage"
    USER_ID_HASH = "user_id_hash"
    GEOGRAPHIC = "geographic"
    FEATURE_FLAG = "feature_flag"
    TIME_BASED = "time_based"
    GRADUAL_ROLLOUT = "gradual_rollout"

@dataclass
class ModelVersion:
    """模型版本信息"""
    model_id: str
    version: str
    name: str
    description: str
    created_at: datetime
    created_by: str
    status: ModelStatus
    config_path: str
    model_path: str
    performance_baseline: Dict[str, float]
    metadata: Dict[str, Any] = field(default_factory=dict)
    dependencies: List[str] = field(default_factory=list)
    traffic_allocation: float = 0.0
    deployment_config: Dict[str, Any] = field(default_factory=dict)

class ModelVersionControl:
    """模型版本控制系統"""
    
    def __init__(self, storage_path: str = "./model_versions"):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(exist_ok=True)
        self.versions: Dict[str, ModelVersion] = {}
        self.version_history: Dict[str, List[str]] = {}  # model_name -> [versions]
        self._load_versions()
    
    def register_version(self, version: ModelVersion) -> bool:
        """註冊新版本"""
        try:
            version_key = f"{version.model_id}:{version.version}"
            
            # 檢查版本是否已存在
            if version_key in self.versions:
                logger.warning(f"版本 {version_key} 已存在")
                return False
            
            # 驗證版本信息
            self._validate_version(version)
            
            # 註冊版本
            self.versions[version_key] = version
            
            # 更新版本歷史
            if version.model_id not in self.version_history:
                self.version_history[version.model_id] = []
            self.version_history[version.model_id].append(version.version)
            
            # 保存到存儲
            self._save_versions()
            
            logger.info(f"✅ 版本 {version_key} 註冊成功")
            return True
            
        except Exception as e:
            logger.error(f"❌ 版本註冊失敗: {e}")
            return False
    
    def promote_version(self, model_id: str, version: str, target_status: ModelStatus) -> bool:
        """升級版本狀態"""
        version_key = f"{model_id}:{version}"
        
        if version_key not in self.versions:
            logger.error(f"版本 {version_key} 不存在")
            return False
        
        current_version = self.versions[version_key]
        old_status = current_version.status
        
        # 檢查狀態轉換是否合法
        if not self._is_valid_status_transition(old_status, target_status):
            logger.error(f"非法的狀態轉換: {old_status.value} -> {target_status.value}")
            return False
        
        # 更新狀態
        current_version.status = target_status
        
        # 如果升級到生產環境，降級其他版本
        if target_status == ModelStatus.PRODUCTION:
            self._demote_other_production_versions(model_id, version)
        
        self._save_versions()
        
        logger.info(f"🔄 版本 {version_key} 狀態更新: {old_status.value} -> {target_status.value}")
        return True
    
    def get_versions_by_status(self, model_id: str, status: ModelStatus) -> List[ModelVersion]:
        """按狀態獲取版本"""
        return [v for k, v in self.versions.items() 
                if v.model_id == model_id and v.status == status]
    
    def get_latest_version(self, model_id: str, status: Optional[ModelStatus] = None) -> Optional[ModelVersion]:
        """獲取最新版本"""
        versions = [v for k, v in self.versions.items() if v.model_id == model_id]
        
        if status:
            versions = [v for v in versions if v.status == status]
        
        if not versions:
            return None
        
        return max(versions, key=lambda v: v.created_at)
    
    def compare_versions(self, model_id: str, version1: str, version2: str) -> Dict[str, Any]:
        """比較兩個版本"""
        v1_key = f"{model_id}:{version1}"
        v2_key = f"{model_id}:{version2}"
        
        if v1_key not in self.versions or v2_key not in self.versions:
            return {"error": "版本不存在"}
        
        v1 = self.versions[v1_key]
        v2 = self.versions[v2_key]
        
        comparison = {
            "version1": {
                "version": v1.version,
                "status": v1.status.value,
                "created_at": v1.created_at.isoformat(),
                "performance": v1.performance_baseline,
                "traffic_allocation": v1.traffic_allocation
            },
            "version2": {
                "version": v2.version,
                "status": v2.status.value,
                "created_at": v2.created_at.isoformat(),
                "performance": v2.performance_baseline,
                "traffic_allocation": v2.traffic_allocation
            }
        }
        
        # 性能比較
        performance_diff = {}
        for metric in set(v1.performance_baseline.keys()) & set(v2.performance_baseline.keys()):
            diff = v2.performance_baseline[metric] - v1.performance_baseline[metric]
            performance_diff[metric] = {
                "absolute_diff": diff,
                "relative_diff": diff / v1.performance_baseline[metric] if v1.performance_baseline[metric] != 0 else 0
            }
        
        comparison["performance_diff"] = performance_diff
        
        return comparison
    
    def _validate_version(self, version: ModelVersion):
        """驗證版本信息"""
        if not version.model_id or not version.version:
            raise ValueError("模型ID和版本號不能為空")
        
        if not version.config_path or not version.model_path:
            raise ValueError("配置路徑和模型路徑不能為空")
    
    def _is_valid_status_transition(self, from_status: ModelStatus, to_status: ModelStatus) -> bool:
        """檢查狀態轉換是否合法"""
        valid_transitions = {
            ModelStatus.DEVELOPMENT: [ModelStatus.TESTING, ModelStatus.DEPRECATED],
            ModelStatus.TESTING: [ModelStatus.STAGING, ModelStatus.DEVELOPMENT, ModelStatus.DEPRECATED],
            ModelStatus.STAGING: [ModelStatus.CANARY, ModelStatus.TESTING, ModelStatus.DEPRECATED],
            ModelStatus.CANARY: [ModelStatus.PRODUCTION, ModelStatus.STAGING, ModelStatus.DEPRECATED],
            ModelStatus.PRODUCTION: [ModelStatus.DEPRECATED],
            ModelStatus.DEPRECATED: [ModelStatus.RETIRED],
            ModelStatus.RETIRED: []
        }
        
        return to_status in valid_transitions.get(from_status, [])
    
    def _demote_other_production_versions(self, model_id: str, exclude_version: str):
        """降級其他生產版本"""
        for key, version in self.versions.items():
            if (version.model_id == model_id and 
                version.version != exclude_version and 
                version.status == ModelStatus.PRODUCTION):
                version.status = ModelStatus.DEPRECATED
                logger.info(f"🔽 版本 {key} 自動降級為 DEPRECATED")
    
    def _save_versions(self):
        """保存版本信息到文件"""
        versions_file = self.storage_path / "versions.json"
        
        data = {}
        for key, version in self.versions.items():
            data[key] = {
                "model_id": version.model_id,
                "version": version.version,
                "name": version.name,
                "description": version.description,
                "created_at": version.created_at.isoformat(),
                "created_by": version.created_by,
                "status": version.status.value,
                "config_path": version.config_path,
                "model_path": version.model_path,
                "performance_baseline": version.performance_baseline,
                "metadata": version.metadata,
                "dependencies": version.dependencies,
                "traffic_allocation": version.traffic_allocation,
                "deployment_config": version.deployment_config
            }
        
        with open(versions_file, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
    
    def _load_versions(self):
        """從文件載入版本信息"""
        versions_file = self.storage_path / "versions.json"
        
        if not versions_file.exists():
            return
        
        try:
            with open(versions_file, 'r', encoding='utf-8') as f:
                data = json.load(f)
            
            for key, version_data in data.items():
                version = ModelVersion(
                    model_id=version_data['model_id'],
                    version=version_data['version'],
                    name=version_data['name'],
                    description=version_data['description'],
                    created_at=datetime.fromisoformat(version_data['created_at']),
                    created_by=version_data['created_by'],
                    status=ModelStatus(version_data['status']),
                    config_path=version_data['config_path'],
                    model_path=version_data['model_path'],
                    performance_baseline=version_data['performance_baseline'],
                    metadata=version_data.get('metadata', {}),
                    dependencies=version_data.get('dependencies', []),
                    traffic_allocation=version_data.get('traffic_allocation', 0.0),
                    deployment_config=version_data.get('deployment_config', {})
                )
                
                self.versions[key] = version
                
                # 重建版本歷史
                if version.model_id not in self.version_history:
                    self.version_history[version.model_id] = []
                self.version_history[version.model_id].append(version.version)
                
        except Exception as e:
            logger.error(f"載入版本信息失敗: {e}")

# 初始化版本控制系統
version_control = ModelVersionControl()
print("\n✅ 模型版本控制系統初始化完成")

## 🧪 A/B 測試框架

In [None]:
# A/B 測試相關數據結構
@dataclass
class ABTestConfig:
    """A/B 測試配置"""
    test_id: str
    name: str
    description: str
    control_version: str
    treatment_versions: List[str]
    traffic_split: Dict[str, float]
    split_strategy: TrafficSplitStrategy
    start_time: datetime
    end_time: Optional[datetime]
    success_metrics: List[str]
    minimum_sample_size: int
    significance_level: float = 0.05
    statistical_power: float = 0.8
    early_stopping_enabled: bool = True
    status: str = "active"
    created_by: str = ""
    metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class ExperimentResult:
    """實驗結果"""
    version: str
    sample_size: int
    metrics: Dict[str, float]
    confidence_intervals: Dict[str, Tuple[float, float]]
    timestamp: datetime

print("\n✅ A/B 測試數據結構定義完成")

## 🚦 流量路由與分配策略

In [None]:
class TrafficRouter:
    """流量路由器"""
    
    def route(self, config: ABTestConfig, request_context: Dict[str, Any]) -> str:
        """根據策略路由流量"""
        if config.split_strategy == TrafficSplitStrategy.PERCENTAGE:
            return self._percentage_split(config, request_context)
        elif config.split_strategy == TrafficSplitStrategy.USER_ID_HASH:
            return self._user_id_hash_split(config, request_context)
        elif config.split_strategy == TrafficSplitStrategy.GEOGRAPHIC:
            return self._geographic_split(config, request_context)
        elif config.split_strategy == TrafficSplitStrategy.FEATURE_FLAG:
            return self._feature_flag_split(config, request_context)
        elif config.split_strategy == TrafficSplitStrategy.TIME_BASED:
            return self._time_based_split(config, request_context)
        elif config.split_strategy == TrafficSplitStrategy.GRADUAL_ROLLOUT:
            return self._gradual_rollout_split(config, request_context)
        else:
            return self._percentage_split(config, request_context)
    
    def _percentage_split(self, config: ABTestConfig, request_context: Dict[str, Any]) -> str:
        """基於百分比的流量分配"""
        rand = random.random()
        cumulative = 0.0
        
        for version, traffic in config.traffic_split.items():
            cumulative += traffic
            if rand <= cumulative:
                return version
        
        # 如果由於浮點精度問題沒有匹配到，返回最後一個版本
        return list(config.traffic_split.keys())[-1]
    
    def _user_id_hash_split(self, config: ABTestConfig, request_context: Dict[str, Any]) -> str:
        """基於用戶ID哈希的流量分配"""
        user_id = request_context.get('user_id')
        if not user_id:
            return self._percentage_split(config, request_context)
        
        # 使用哈希確保同一用戶總是分配到同一版本
        hash_value = int(hashlib.md5(f"{user_id}:{config.test_id}".encode()).hexdigest(), 16)
        hash_ratio = (hash_value % 10000) / 10000.0
        
        cumulative = 0.0
        for version, traffic in config.traffic_split.items():
            cumulative += traffic
            if hash_ratio <= cumulative:
                return version
        
        return list(config.traffic_split.keys())[-1]
    
    def _geographic_split(self, config: ABTestConfig, request_context: Dict[str, Any]) -> str:
        """基於地理位置的流量分配"""
        region = request_context.get('region', 'default')
        
        # 地理位置映射規則（可配置）
        region_mapping = {
            'US': config.treatment_versions[0] if config.treatment_versions else config.control_version,
            'EU': config.treatment_versions[1] if len(config.treatment_versions) > 1 else config.control_version,
            'ASIA': config.control_version
        }
        
        return region_mapping.get(region, config.control_version)
    
    def _feature_flag_split(self, config: ABTestConfig, request_context: Dict[str, Any]) -> str:
        """基於特徵標誌的流量分配"""
        feature_flags = request_context.get('feature_flags', {})
        
        # 檢查特定的特徵標誌
        if feature_flags.get('use_experimental_model', False):
            return config.treatment_versions[0] if config.treatment_versions else config.control_version
        
        if feature_flags.get('is_premium_user', False):
            return config.treatment_versions[-1] if config.treatment_versions else config.control_version
        
        return config.control_version
    
    def _time_based_split(self, config: ABTestConfig, request_context: Dict[str, Any]) -> str:
        """基於時間的流量分配"""
        current_hour = datetime.now().hour
        
        # 工作時間使用實驗版本
        if 9 <= current_hour <= 17:
            return config.treatment_versions[0] if config.treatment_versions else config.control_version
        else:
            return config.control_version
    
    def _gradual_rollout_split(self, config: ABTestConfig, request_context: Dict[str, Any]) -> str:
        """漸進式推出流量分配"""
        # 根據測試開始時間計算當前應該的流量比例
        elapsed_hours = (datetime.now() - config.start_time).total_seconds() / 3600
        rollout_duration_hours = 24  # 24小時內完成全量推出
        
        # 計算當前實驗版本應該得到的流量比例
        target_traffic = min(elapsed_hours / rollout_duration_hours, 1.0)
        
        # 調整流量分配
        adjusted_split = {}
        control_traffic = 1.0 - target_traffic
        
        adjusted_split[config.control_version] = control_traffic
        
        # 實驗版本按原比例分配剩餘流量
        treatment_total = sum(config.traffic_split[v] for v in config.treatment_versions)
        for version in config.treatment_versions:
            if treatment_total > 0:
                version_ratio = config.traffic_split[version] / treatment_total
                adjusted_split[version] = target_traffic * version_ratio
            else:
                adjusted_split[version] = 0
        
        # 使用調整後的分配進行路由
        temp_config = ABTestConfig(
            test_id=config.test_id,
            name=config.name,
            description=config.description,
            control_version=config.control_version,
            treatment_versions=config.treatment_versions,
            traffic_split=adjusted_split,
            split_strategy=TrafficSplitStrategy.PERCENTAGE,
            start_time=config.start_time,
            success_metrics=config.success_metrics,
            minimum_sample_size=config.minimum_sample_size
        )
        
        return self._percentage_split(temp_config, request_context)

print("\n✅ 流量路由器初始化完成")

## 📈 統計分析引擎

In [None]:
from scipy import stats
import numpy as np
from typing import Dict, List, Any
from collections import defaultdict

class StatisticsEngine:
    """統計分析引擎"""
    
    def analyze_versions(self, version_data: Dict[str, List[float]], 
                        control_version: str, significance_level: float = 0.05) -> Dict[str, Any]:
        """分析版本間的統計差異"""
        
        if control_version not in version_data:
            return {"error": f"控制版本 {control_version} 無數據"}
        
        control_data = np.array(version_data[control_version])
        
        if len(control_data) == 0:
            return {"error": "控制版本無數據"}
        
        analysis = {
            "control_version": control_version,
            "significance_level": significance_level,
            "version_stats": {},
            "comparisons": []
        }
        
        # 計算各版本的統計信息
        for version, data in version_data.items():
            data_array = np.array(data)
            
            if len(data_array) == 0:
                continue
            
            stats_info = {
                "sample_size": len(data_array),
                "mean": float(np.mean(data_array)),
                "std": float(np.std(data_array, ddof=1)),
                "median": float(np.median(data_array)),
                "min": float(np.min(data_array)),
                "max": float(np.max(data_array)),
                "confidence_interval": self._calculate_confidence_interval(
                    data_array, significance_level
                )
            }
            
            analysis["version_stats"][version] = stats_info
        
        # 進行統計比較
        for version, data in version_data.items():
            if version == control_version:
                continue
            
            treatment_data = np.array(data)
            
            if len(treatment_data) == 0:
                continue
            
            comparison = self._compare_versions(
                control_data, treatment_data, 
                control_version, version, 
                significance_level
            )
            
            analysis["comparisons"].append(comparison)
        
        return analysis
    
    def _calculate_confidence_interval(self, data: np.ndarray, 
                                     significance_level: float) -> Tuple[float, float]:
        """計算置信區間"""
        if len(data) < 2:
            return (float(data[0]), float(data[0])) if len(data) == 1 else (0.0, 0.0)
        
        confidence_level = 1 - significance_level
        
        mean = np.mean(data)
        sem = stats.sem(data)  # 標準誤差
        
        # 使用 t 分佈計算置信區間
        t_critical = stats.t.ppf((1 + confidence_level) / 2, len(data) - 1)
        margin_error = t_critical * sem
        
        return (float(mean - margin_error), float(mean + margin_error))
    
    def _compare_versions(self, control_data: np.ndarray, treatment_data: np.ndarray,
                         control_version: str, treatment_version: str,
                         significance_level: float) -> Dict[str, Any]:
        """比較兩個版本的統計差異"""
        
        # 執行雙樣本 t 檢定
        t_stat, p_value = stats.ttest_ind(treatment_data, control_data, equal_var=False)
        
        # 計算效應大小 (Cohen's d)
        cohens_d = self._calculate_cohens_d(control_data, treatment_data)
        
        # 計算相對提升
        control_mean = np.mean(control_data)
        treatment_mean = np.mean(treatment_data)
        
        if control_mean != 0:
            relative_improvement = (treatment_mean - control_mean) / control_mean
        else:
            relative_improvement = 0
        
        # 計算統計功效
        statistical_power = self._calculate_statistical_power(
            control_data, treatment_data, significance_level
        )
        
        return {
            "control_version": control_version,
            "treatment_version": treatment_version,
            "control_mean": float(control_mean),
            "treatment_mean": float(treatment_mean),
            "absolute_difference": float(treatment_mean - control_mean),
            "relative_improvement": float(relative_improvement),
            "t_statistic": float(t_stat),
            "p_value": float(p_value),
            "is_significant": p_value < significance_level,
            "cohens_d": float(cohens_d),
            "effect_size_interpretation": self._interpret_effect_size(cohens_d),
            "statistical_power": float(statistical_power),
            "confidence_level": float(1 - significance_level)
        }
    
    def _calculate_cohens_d(self, control_data: np.ndarray, treatment_data: np.ndarray) -> float:
        """計算 Cohen's d 效應大小"""
        control_mean = np.mean(control_data)
        treatment_mean = np.mean(treatment_data)
        
        # 合併標準差
        n1, n2 = len(control_data), len(treatment_data)
        
        if n1 < 2 or n2 < 2:
            return 0.0
        
        s1, s2 = np.std(control_data, ddof=1), np.std(treatment_data, ddof=1)
        pooled_std = np.sqrt(((n1 - 1) * s1**2 + (n2 - 1) * s2**2) / (n1 + n2 - 2))
        
        if pooled_std == 0:
            return 0.0
        
        return (treatment_mean - control_mean) / pooled_std
    
    def _interpret_effect_size(self, cohens_d: float) -> str:
        """解釋效應大小"""
        abs_d = abs(cohens_d)
        
        if abs_d < 0.2:
            return "negligible"
        elif abs_d < 0.5:
            return "small"
        elif abs_d < 0.8:
            return "medium"
        else:
            return "large"
    
    def _calculate_statistical_power(self, control_data: np.ndarray, 
                                   treatment_data: np.ndarray, 
                                   significance_level: float) -> float:
        """計算統計功效"""
        # 簡化的功效計算
        effect_size = abs(self._calculate_cohens_d(control_data, treatment_data))
        n = min(len(control_data), len(treatment_data))
        
        # 使用近似公式計算功效
        if n < 2 or effect_size == 0:
            return 0.0
        
        # 這是一個簡化的計算，實際應該使用更精確的方法
        z_alpha = stats.norm.ppf(1 - significance_level / 2)
        z_beta = effect_size * np.sqrt(n / 2) - z_alpha
        power = stats.norm.cdf(z_beta)
        
        return max(0.0, min(1.0, power))

# 初始化統計分析引擎
statistics_engine = StatisticsEngine()
print("\n✅ 統計分析引擎初始化完成")

## 🧪 完整的 A/B 測試框架

In [None]:
class ABTestFramework:
    """A/B 測試框架"""
    
    def __init__(self, version_control: ModelVersionControl):
        self.version_control = version_control
        self.active_tests: Dict[str, ABTestConfig] = {}
        self.test_results: Dict[str, List[ExperimentResult]] = defaultdict(list)
        self.traffic_router = TrafficRouter()
        self.statistics_engine = StatisticsEngine()
    
    def create_ab_test(self, config: ABTestConfig) -> bool:
        """創建 A/B 測試"""
        try:
            # 驗證配置
            self._validate_ab_test_config(config)
            
            # 檢查流量分配總和
            total_traffic = sum(config.traffic_split.values())
            if abs(total_traffic - 1.0) > 0.001:
                raise ValueError(f"流量分配總和必須為1.0，當前為 {total_traffic}")
            
            # 更新版本的流量分配
            for version, traffic in config.traffic_split.items():
                model_version = self._get_model_version(version)
                if model_version:
                    model_version.traffic_allocation = traffic
            
            # 註冊測試
            self.active_tests[config.test_id] = config
            
            logger.info(f"✅ A/B 測試 {config.name} 創建成功")
            return True
            
        except Exception as e:
            logger.error(f"❌ A/B 測試創建失敗: {e}")
            return False
    
    def route_traffic(self, test_id: str, request_context: Dict[str, Any]) -> str:
        """路由流量到指定版本"""
        if test_id not in self.active_tests:
            raise ValueError(f"A/B 測試 {test_id} 不存在")
        
        config = self.active_tests[test_id]
        
        return self.traffic_router.route(config, request_context)
    
    def record_result(self, test_id: str, version: str, metrics: Dict[str, float]):
        """記錄實驗結果"""
        if test_id not in self.active_tests:
            logger.warning(f"A/B 測試 {test_id} 不存在")
            return
        
        result = ExperimentResult(
            version=version,
            sample_size=1,  # 單次記錄
            metrics=metrics,
            confidence_intervals={},  # 稍後計算
            timestamp=datetime.now()
        )
        
        self.test_results[test_id].append(result)
    
    def analyze_test_results(self, test_id: str, metric_name: str) -> Dict[str, Any]:
        """分析測試結果"""
        if test_id not in self.active_tests:
            return {"error": "測試不存在"}
        
        config = self.active_tests[test_id]
        results = self.test_results[test_id]
        
        if not results:
            return {"error": "無測試結果"}
        
        # 按版本聚合結果
        version_results = defaultdict(list)
        for result in results:
            if metric_name in result.metrics:
                version_results[result.version].append(result.metrics[metric_name])
        
        # 統計分析
        analysis = self.statistics_engine.analyze_versions(
            version_results, 
            config.control_version,
            config.significance_level
        )
        
        # 添加實驗配置信息
        analysis['test_config'] = {
            'test_id': test_id,
            'name': config.name,
            'control_version': config.control_version,
            'treatment_versions': config.treatment_versions,
            'significance_level': config.significance_level,
            'minimum_sample_size': config.minimum_sample_size
        }
        
        return analysis
    
    def check_early_stopping(self, test_id: str, metric_name: str) -> Dict[str, Any]:
        """檢查是否滿足早期停止條件"""
        if test_id not in self.active_tests:
            return {"should_stop": False, "reason": "測試不存在"}
        
        config = self.active_tests[test_id]
        
        if not config.early_stopping_enabled:
            return {"should_stop": False, "reason": "未啟用早期停止"}
        
        analysis = self.analyze_test_results(test_id, metric_name)
        
        if "error" in analysis:
            return {"should_stop": False, "reason": analysis["error"]}
        
        # 檢查樣本量是否足夠
        for version, stats in analysis['version_stats'].items():
            if stats['sample_size'] < config.minimum_sample_size:
                return {
                    "should_stop": False, 
                    "reason": f"樣本量不足: {version} 僅有 {stats['sample_size']} 樣本"
                }
        
        # 檢查統計顯著性
        has_significant_result = False
        for comparison in analysis['comparisons']:
            if comparison['is_significant']:
                has_significant_result = True
                break
        
        if has_significant_result:
            return {
                "should_stop": True,
                "reason": "檢測到統計顯著性差異",
                "analysis": analysis
            }
        
        return {"should_stop": False, "reason": "尚未達到停止條件"}
    
    def get_test_summary(self, test_id: str) -> Dict[str, Any]:
        """獲取測試摘要"""
        if test_id not in self.active_tests:
            return {"error": "測試不存在"}
        
        config = self.active_tests[test_id]
        results = self.test_results[test_id]
        
        # 統計基本信息
        version_counts = defaultdict(int)
        for result in results:
            version_counts[result.version] += 1
        
        duration = None
        if config.end_time:
            duration = (config.end_time - config.start_time).total_seconds() / 3600  # 小時
        else:
            duration = (datetime.now() - config.start_time).total_seconds() / 3600
        
        return {
            "test_id": test_id,
            "name": config.name,
            "status": config.status,
            "duration_hours": round(duration, 2),
            "total_samples": len(results),
            "version_samples": dict(version_counts),
            "traffic_split": config.traffic_split,
            "success_metrics": config.success_metrics,
            "created_by": config.created_by
        }
    
    def _validate_ab_test_config(self, config: ABTestConfig):
        """驗證 A/B 測試配置"""
        # 檢查必要字段
        if not config.test_id or not config.name:
            raise ValueError("測試ID和名稱不能為空")
        
        # 檢查版本是否存在
        all_versions = [config.control_version] + config.treatment_versions
        for version in all_versions:
            if not self._get_model_version(version):
                raise ValueError(f"版本 {version} 不存在")
        
        # 檢查流量分配
        if set(config.traffic_split.keys()) != set(all_versions):
            raise ValueError("流量分配必須包含所有版本")
    
    def _get_model_version(self, version_key: str) -> Optional[ModelVersion]:
        """獲取模型版本"""
        return self.version_control.versions.get(version_key)

print("\n✅ A/B 測試框架初始化完成")

## 🏦 PayPal 風控模型 A/B 測試演示

In [None]:
# 創建 PayPal 風控模型 A/B 測試的完整演示
print("🏦 PayPal 風控模型 A/B 測試演示")
print("=" * 80)

# 1. 註冊模型版本
print("\n📋 步驟 1: 註冊風控模型版本...")

# PayPal 風控模型版本
fraud_models = [
    {
        "model_id": "fraud_detection",
        "version": "v2.1",
        "name": "保守風控模型",
        "description": "高精確度，低召回率的風控模型，適合保守策略",
        "performance": {"precision": 0.95, "recall": 0.75, "f1_score": 0.84, "auc": 0.92}
    },
    {
        "model_id": "fraud_detection",
        "version": "v2.2",
        "name": "激進風控模型",
        "description": "高召回率的風控模型，可能有誤殺但能捕獲更多欺詐",
        "performance": {"precision": 0.85, "recall": 0.92, "f1_score": 0.88, "auc": 0.94}
    },
    {
        "model_id": "fraud_detection",
        "version": "v3.0",
        "name": "實驗風控模型",
        "description": "基於深度學習的新算法，正在驗證效果",
        "performance": {"precision": 0.90, "recall": 0.88, "f1_score": 0.89, "auc": 0.96}
    }
]

for model_info in fraud_models:
    version = ModelVersion(
        model_id=model_info["model_id"],
        version=model_info["version"],
        name=model_info["name"],
        description=model_info["description"],
        created_at=datetime.now() - timedelta(days=random.randint(1, 30)),
        created_by="paypal_ml_team",
        status=ModelStatus.STAGING,
        config_path=f"/models/{model_info['model_id']}/{model_info['version']}/config.json",
        model_path=f"/models/{model_info['model_id']}/{model_info['version']}/model.pth",
        performance_baseline=model_info["performance"],
        metadata={
            "training_data": "paypal_fraud_2024_q1",
            "feature_count": random.randint(50, 100),
            "model_size_mb": random.randint(10, 50)
        },
        deployment_config={
            "max_batch_size": 32,
            "timeout_ms": 100,
            "memory_limit_mb": 2048
        }
    )
    
    success = version_control.register_version(version)
    if success:
        print(f"  ✅ {model_info['name']} {model_info['version']} 註冊成功")

print(f"\n📊 當前註冊版本數量: {len(version_control.versions)}")

In [None]:
# 2. 升級版本狀態到生產環境
print("\n🚀 步驟 2: 升級模型版本到生產環境...")

# 升級 v2.1 到生產環境（當前主力模型）
success = version_control.promote_version("fraud_detection", "v2.1", ModelStatus.PRODUCTION)
if success:
    print("  ✅ v2.1 (保守模型) 升級到生產環境")

# 升級 v2.2 到 Canary 環境準備測試
success = version_control.promote_version("fraud_detection", "v2.2", ModelStatus.CANARY)
if success:
    print("  ✅ v2.2 (激進模型) 升級到 Canary 環境")

# v3.0 保持在 Staging 環境
print("  ℹ️ v3.0 (實驗模型) 保持在 Staging 環境")

# 檢查當前各狀態的版本
production_versions = version_control.get_versions_by_status("fraud_detection", ModelStatus.PRODUCTION)
canary_versions = version_control.get_versions_by_status("fraud_detection", ModelStatus.CANARY)
staging_versions = version_control.get_versions_by_status("fraud_detection", ModelStatus.STAGING)

print(f"\n📈 版本狀態分佈:")
print(f"  生產環境: {len(production_versions)} 個版本")
print(f"  Canary 環境: {len(canary_versions)} 個版本")
print(f"  Staging 環境: {len(staging_versions)} 個版本")

In [None]:
# 3. 創建 A/B 測試配置
print("\n🧪 步驟 3: 創建 A/B 測試配置...")

# 初始化 A/B 測試框架
ab_test_framework = ABTestFramework(version_control)

# 創建主要 A/B 測試：保守 vs 激進模型
main_ab_test = ABTestConfig(
    test_id="paypal_fraud_conservative_vs_aggressive",
    name="PayPal 風控模型：保守 vs 激進策略",
    description="比較保守模型（高精確度）和激進模型（高召回率）在實際交易中的表現",
    control_version="fraud_detection:v2.1",
    treatment_versions=["fraud_detection:v2.2"],
    traffic_split={
        "fraud_detection:v2.1": 0.7,  # 保守模型 70% 流量
        "fraud_detection:v2.2": 0.3   # 激進模型 30% 流量
    },
    split_strategy=TrafficSplitStrategy.USER_ID_HASH,
    start_time=datetime.now(),
    success_metrics=["fraud_detection_rate", "false_positive_rate", "processing_time"],
    minimum_sample_size=1000,
    significance_level=0.05,
    statistical_power=0.8,
    early_stopping_enabled=True,
    created_by="paypal_fraud_team",
    metadata={
        "business_goal": "平衡欺詐檢測率和用戶體驗",
        "risk_tolerance": "medium",
        "expected_duration_days": 14
    }
)

# 註冊 A/B 測試
success = ab_test_framework.create_ab_test(main_ab_test)
if success:
    print(f"  ✅ {main_ab_test.name} 創建成功")
    print(f"    測試ID: {main_ab_test.test_id}")
    print(f"    流量分配: {main_ab_test.traffic_split}")
    print(f"    分配策略: {main_ab_test.split_strategy.value}")

print(f"\n📊 當前活躍 A/B 測試: {len(ab_test_framework.active_tests)} 個")

In [None]:
# 4. 模擬交易流量和模型預測
print("\n💳 步驟 4: 模擬 PayPal 交易流量...")

def simulate_fraud_detection(model_version: str, transaction_data: Dict[str, Any]) -> Dict[str, float]:
    """模擬風控模型預測"""
    
    # 模擬不同模型的特性
    if "v2.1" in model_version:  # 保守模型
        base_fraud_rate = 0.02  # 2% 欺詐檢出率
        false_positive_rate = 0.01  # 1% 誤殺率
        processing_time = random.uniform(80, 120)  # 80-120ms
    elif "v2.2" in model_version:  # 激進模型
        base_fraud_rate = 0.035  # 3.5% 欺詐檢出率
        false_positive_rate = 0.025  # 2.5% 誤殺率
        processing_time = random.uniform(90, 130)  # 90-130ms
    else:
        base_fraud_rate = 0.02
        false_positive_rate = 0.01
        processing_time = random.uniform(80, 120)
    
    # 基於交易特徵調整檢測率
    risk_factor = 1.0
    
    # 高風險地區
    if transaction_data.get('region') in ['HIGH_RISK_REGION']:
        risk_factor *= 2.0
    
    # 高額交易
    amount = transaction_data.get('amount', 100)
    if amount > 1000:
        risk_factor *= 1.5
    
    # 新用戶
    if transaction_data.get('user_age_days', 365) < 30:
        risk_factor *= 1.3
    
    # 計算最終指標
    final_fraud_rate = min(base_fraud_rate * risk_factor, 0.1)  # 最高10%
    
    # 實際檢測結果（0或1）
    is_flagged = 1 if random.random() < final_fraud_rate else 0
    
    # 如果沒有檢測到欺詐，可能是誤殺
    if is_flagged == 0 and random.random() < false_positive_rate:
        is_flagged = 1  # 誤殺
    
    return {
        "fraud_detection_rate": float(is_flagged),
        "false_positive_rate": float(1 if is_flagged and random.random() < 0.3 else 0),  # 30% 的檢測是誤殺
        "processing_time": processing_time
    }

# 模擬大量交易
print("\n🔄 生成模擬交易數據...")

transaction_count = 3000
results_collected = 0

for i in range(transaction_count):
    # 生成隨機交易數據
    transaction = {
        "transaction_id": f"txn_{i+1:06d}",
        "user_id": f"user_{random.randint(1000, 50000)}",
        "amount": random.uniform(10, 5000),
        "region": random.choice(["US", "EU", "ASIA", "HIGH_RISK_REGION"]),
        "user_age_days": random.randint(1, 1000),
        "payment_method": random.choice(["credit_card", "bank_transfer", "digital_wallet"]),
        "timestamp": datetime.now().isoformat()
    }
    
    # 為主要 A/B 測試路由流量
    try:
        assigned_version = ab_test_framework.route_traffic(
            "paypal_fraud_conservative_vs_aggressive", 
            transaction
        )
        
        # 執行模型預測
        prediction_results = simulate_fraud_detection(assigned_version, transaction)
        
        # 記錄結果到 A/B 測試
        ab_test_framework.record_result(
            "paypal_fraud_conservative_vs_aggressive",
            assigned_version,
            prediction_results
        )
        
        results_collected += 1
        
    except Exception as e:
        logger.warning(f"交易 {transaction['transaction_id']} 處理失敗: {e}")
    
    # 顯示進度
    if (i + 1) % 500 == 0:
        print(f"\r  處理進度: {i+1}/{transaction_count} 交易", end="")

print(f"\n\n✅ 交易模擬完成")
print(f"  總交易數: {transaction_count}")
print(f"  成功處理: {results_collected}")

In [None]:
# 5. 分析 A/B 測試結果
print("\n📊 步驟 5: 分析 A/B 測試結果...")

# 分析主要 A/B 測試
print("\n🎯 主要 A/B 測試結果分析 (保守 vs 激進模型):")

main_test_metrics = ["fraud_detection_rate", "false_positive_rate", "processing_time"]

for metric in main_test_metrics:
    print(f"\n📈 指標: {metric}")
    
    analysis = ab_test_framework.analyze_test_results(
        "paypal_fraud_conservative_vs_aggressive", 
        metric
    )
    
    if "error" in analysis:
        print(f"  ❌ 分析失敗: {analysis['error']}")
        continue
    
    # 顯示版本統計
    print(f"  📊 版本統計:")
    for version, stats in analysis["version_stats"].items():
        version_name = "保守模型 v2.1" if "v2.1" in version else "激進模型 v2.2"
        print(f"    🔹 {version_name}:")
        print(f"      樣本數: {stats['sample_size']}")
        print(f"      平均值: {stats['mean']:.4f}")
        print(f"      標準差: {stats['std']:.4f}")
        print(f"      置信區間: [{stats['confidence_interval'][0]:.4f}, {stats['confidence_interval'][1]:.4f}]")
    
    # 顯示統計比較
    if analysis["comparisons"]:
        comparison = analysis["comparisons"][0]  # 只有一個比較
        
        print(f"  🔬 統計比較結果:")
        print(f"    絕對差異: {comparison['absolute_difference']:.4f}")
        print(f"    相對提升: {comparison['relative_improvement']:.2%}")
        print(f"    p-value: {comparison['p_value']:.6f}")
        print(f"    統計顯著性: {'是' if comparison['is_significant'] else '否'}")
        print(f"    效應大小: {comparison['cohens_d']:.4f} ({comparison['effect_size_interpretation']})")
        print(f"    統計功效: {comparison['statistical_power']:.3f}")
        
        # 業務解釋
        if metric == "fraud_detection_rate":
            if comparison['is_significant'] and comparison['relative_improvement'] > 0:
                print(f"  💡 業務洞察: 激進模型的欺詐檢出率顯著高於保守模型")
            elif comparison['is_significant'] and comparison['relative_improvement'] < 0:
                print(f"  💡 業務洞察: 保守模型的欺詐檢出率顯著高於激進模型")
        elif metric == "false_positive_rate":
            if comparison['is_significant'] and comparison['relative_improvement'] > 0:
                print(f"  ⚠️ 業務風險: 激進模型的誤殺率顯著高於保守模型")
            elif comparison['is_significant'] and comparison['relative_improvement'] < 0:
                print(f"  ✅ 業務優勢: 激進模型的誤殺率顯著低於保守模型")
        elif metric == "processing_time":
            if comparison['is_significant'] and comparison['relative_improvement'] > 0:
                print(f"  🐌 性能問題: 激進模型處理時間顯著長於保守模型")
            elif comparison['is_significant'] and comparison['relative_improvement'] < 0:
                print(f"  🚀 性能優勢: 激進模型處理時間顯著短於保守模型")

In [None]:
# 6. 檢查早期停止條件
print("\n⏰ 步驟 6: 檢查早期停止條件...")

# 檢查主要測試是否滿足早期停止條件
for metric in main_test_metrics:
    early_stop_result = ab_test_framework.check_early_stopping(
        "paypal_fraud_conservative_vs_aggressive", 
        metric
    )
    
    print(f"\n🔍 指標 {metric} 早期停止檢查:")
    print(f"  是否應該停止: {early_stop_result['should_stop']}")
    print(f"  原因: {early_stop_result['reason']}")
    
    if early_stop_result['should_stop'] and 'analysis' in early_stop_result:
        print(f"  🎯 建議: 基於 {metric} 指標，測試已達到統計顯著性，可以提前結束")

In [None]:
# 7. 測試摘要和版本比較
print("\n📋 步驟 7: 測試摘要和版本比較...")

# 獲取測試摘要
print("\n📊 A/B 測試摘要:")

summary = ab_test_framework.get_test_summary("paypal_fraud_conservative_vs_aggressive")

if "error" not in summary:
    print(f"\n🧪 {summary['name']}:")
    print(f"  測試狀態: {summary['status']}")
    print(f"  運行時長: {summary['duration_hours']:.2f} 小時")
    print(f"  總樣本數: {summary['total_samples']}")
    print(f"  版本樣本分佈:")
    for version, count in summary['version_samples'].items():
        version_name = "保守模型 v2.1" if "v2.1" in version else "激進模型 v2.2"
        percentage = count / summary['total_samples'] * 100 if summary['total_samples'] > 0 else 0
        print(f"    {version_name}: {count} ({percentage:.1f}%)")
    print(f"  目標流量分配: {summary['traffic_split']}")
    print(f"  創建者: {summary['created_by']}")

# 版本性能比較
print("\n🏆 模型版本性能比較:")

comparison_result = version_control.compare_versions("fraud_detection", "v2.1", "v2.2")

if "error" not in comparison_result:
    print(f"\n📈 保守模型 v2.1 vs 激進模型 v2.2:")
    
    v1_info = comparison_result["version1"]
    v2_info = comparison_result["version2"]
    
    print(f"  版本1 ({v1_info['version']}): 狀態 {v1_info['status']}, 流量 {v1_info['traffic_allocation']:.1%}")
    print(f"  版本2 ({v2_info['version']}): 狀態 {v2_info['status']}, 流量 {v2_info['traffic_allocation']:.1%}")
    
    if "performance_diff" in comparison_result:
        print(f"  🎯 性能差異:")
        for metric, diff in comparison_result["performance_diff"].items():
            direction = "提升" if diff["relative_diff"] > 0 else "下降"
            print(f"    {metric}: {direction} {abs(diff['relative_diff']):.2%} (絕對差異: {diff['absolute_diff']:.4f})")

In [None]:
# 8. 業務決策建議
print("\n💼 步驟 8: 業務決策建議...")

# 基於測試結果生成業務建議
def generate_business_recommendations(test_id: str, metric: str) -> List[str]:
    """基於測試結果生成業務建議"""
    recommendations = []
    
    analysis = ab_test_framework.analyze_test_results(test_id, metric)
    
    if "error" in analysis or not analysis["comparisons"]:
        return ["數據不足，建議繼續收集更多樣本"]
    
    comparison = analysis["comparisons"][0]
    
    # 基於不同指標的建議
    if metric == "fraud_detection_rate":
        if comparison["is_significant"]:
            if comparison["relative_improvement"] > 0.1:  # 10% 以上提升
                recommendations.append("🚀 強烈建議：激進模型在欺詐檢出率上有顯著提升，建議增加其流量分配")
            elif comparison["relative_improvement"] > 0.05:  # 5-10% 提升
                recommendations.append("✅ 建議：激進模型表現更好，可考慮逐步增加流量")
            elif comparison["relative_improvement"] < -0.05:  # 下降5%以上
                recommendations.append("⚠️ 警告：激進模型檢出率下降，建議降低流量或回退到保守模型")
        else:
            recommendations.append("ℹ️ 觀察：兩個模型在欺詐檢出率上無顯著差異，可考慮其他指標")
    
    elif metric == "false_positive_rate":
        if comparison["is_significant"]:
            if comparison["relative_improvement"] > 0.2:  # 誤殺率增加20%以上
                recommendations.append("🚨 高風險：激進模型誤殺率過高，可能影響用戶體驗")
            elif comparison["relative_improvement"] < -0.1:  # 誤殺率降低10%以上
                recommendations.append("🎉 優勢：激進模型誤殺率更低，用戶體驗更好")
    
    elif metric == "processing_time":
        if comparison["is_significant"]:
            if comparison["relative_improvement"] > 0.2:  # 處理時間增加20%以上
                recommendations.append("⏱️ 性能問題：激進模型處理時間過長，可能影響系統吞吐量")
            elif comparison["relative_improvement"] < -0.1:  # 處理時間減少10%以上
                recommendations.append("🚀 性能優勢：激進模型處理速度更快")
    
    # 統計功效建議
    if comparison["statistical_power"] < 0.8:
        recommendations.append(f"📊 統計建議：當前統計功效 {comparison['statistical_power']:.2f} 偏低，建議收集更多樣本")
    
    return recommendations

print("\n🎯 基於主要 A/B 測試的業務建議:")

all_recommendations = []

for metric in main_test_metrics:
    recommendations = generate_business_recommendations(
        "paypal_fraud_conservative_vs_aggressive", 
        metric
    )
    
    if recommendations:
        print(f"\n📈 基於 {metric} 指標:")
        for rec in recommendations:
            print(f"  {rec}")
            all_recommendations.append(rec)

# 綜合建議
print("\n🏆 綜合業務決策建議:")

# 檢查是否有一致的結論
positive_signals = sum(1 for rec in all_recommendations if "強烈建議" in rec or "建議" in rec)
negative_signals = sum(1 for rec in all_recommendations if "警告" in rec or "高風險" in rec)
neutral_signals = sum(1 for rec in all_recommendations if "觀察" in rec or "統計建議" in rec)

if positive_signals > negative_signals:
    print("  ✅ 整體建議：激進模型 v2.2 表現良好，建議逐步增加流量分配至 50%")
    print("  📋 行動計劃：")
    print("    1. 將激進模型流量從 30% 提升至 40%")
    print("    2. 監控關鍵指標 48 小時")
    print("    3. 如無異常，進一步提升至 50%")
    print("    4. 準備激進模型 v2.2 的全量部署計劃")
elif negative_signals > positive_signals:
    print("  ⚠️ 整體建議：激進模型存在風險，建議降低流量分配或回退")
    print("  📋 行動計劃：")
    print("    1. 將激進模型流量從 30% 降低至 10%")
    print("    2. 深入分析問題根因")
    print("    3. 優化激進模型配置")
    print("    4. 考慮啟動實驗模型 v3.0 的更大規模測試")
else:
    print("  🤔 整體建議：測試結果混合，需要更多數據和分析")
    print("  📋 行動計劃：")
    print("    1. 維持當前流量分配比例")
    print("    2. 延長測試時間至 14 天")
    print("    3. 增加業務指標監控")
    print("    4. 準備 v3.0 實驗模型的並行測試")

print("\n✅ PayPal 風控模型 A/B 測試演示完成！")
print("\n📋 演示總結:")
print(f"  🎭 註冊版本: {len(version_control.versions)} 個")
print(f"  🧪 A/B 測試: {len(ab_test_framework.active_tests)} 個")
print(f"  💳 模擬交易: {transaction_count} 筆")
print(f"  📊 分析指標: {len(main_test_metrics)} 個")
print(f"  💡 業務建議: {len(all_recommendations)} 條")
print(f"  🚀 版本控制、流量分配、統計分析、業務決策全流程演示")

## 📝 實驗總結與下一步

### 🎯 本實驗完成的學習目標

✅ **企業級模型版本控制策略**
- 建立了完整的版本狀態管理（Development → Testing → Staging → Canary → Production）
- 實現了版本升級和降級的安全機制
- 設計了版本比較和性能基線管理

✅ **A/B 測試流量分配機制**
- 實現了多種流量分配策略：百分比、用戶哈希、地理位置、特徵標誌、時間、漸進式推出
- 建立了一致性流量路由（同一用戶總是分配到同一版本）
- 設計了動態流量調整機制

✅ **統計顯著性測試框架**
- 實現了雙樣本 t 檢定、效應大小計算（Cohen's d）
- 建立了置信區間、統計功效分析
- 設計了早期停止機制和樣本大小計算

✅ **漸進式部署 (Canary/Blue-Green)**
- 實現了 Canary 部署的流量逐步增加
- 建立了基於統計結果的自動決策機制
- 設計了安全的回退策略

### 🚀 核心技術成果

1. **ModelVersionControl**: 企業級版本管理系統
2. **ABTestFramework**: 完整的 A/B 測試框架
3. **TrafficRouter**: 智能流量路由器
4. **StatisticsEngine**: 統計分析引擎
5. **業務決策框架**: 基於數據的自動化決策支持

### 💼 PayPal 級別的企業特性

- **風險控制**: 多層次的安全檢查和回退機制
- **合規性**: 完整的版本追蹤和實驗記錄
- **可擴展性**: 支援多個並行 A/B 測試
- **自動化**: 基於統計顯著性的自動決策
- **業務導向**: 將技術指標轉化為業務洞察和建議

### 📊 實際業務價值

透過 PayPal 風控模型案例，我們展示了：
- **風險平衡**: 在欺詐檢出率和用戶體驗之間找到最佳平衡點
- **科學決策**: 基於統計學原理的客觀決策框架
- **快速迭代**: 通過 A/B 測試加速模型優化週期
- **損失控制**: 早期發現問題並及時回退，降低業務風險

### 🎓 下一步學習路徑

準備好進入 **Lab-2.2.3: 模型生命週期管理**，我們將學習：
- 建立完整的模型註冊與自動發現機制
- 實現性能監控與自動化評估體系
- 設計自動模型更新和漂移檢測
- 掌握模型退役與資源回收策略

### 💡 延伸思考

1. 如何在多個業務線之間協調 A/B 測試資源？
2. 面對季節性業務波動，如何調整測試策略？
3. 如何平衡實驗速度與統計可靠性？
4. 在法規嚴格的金融行業，如何確保 A/B 測試的合規性？

### 🔗 與前序實驗的整合

本實驗建立在 Lab-2.2.1 的多模型管理基礎上，提供了：
- **版本控制**: 為多模型倉庫提供了版本管理能力
- **安全部署**: 通過 A/B 測試降低新版本部署風險
- **數據驅動**: 基於實際業務數據優化模型選擇

---

**🎉 恭喜完成企業級 A/B 測試與版本控制！您已經掌握了 PayPal 級別的模型實驗和版本管理技術！**