# Day 2 (2-4h): 배포 연동 & MLOps

## 1. 환경 설정 및 라이브러리 설치

In [None]:
# 필요 라이브러리 설치
!pip install -q mlflow
!pip install -q fastapi uvicorn
!pip install -q onnx onnxruntime
!pip install -q tensorrt  # GPU 환경에서만
!pip install -q tritonclient[all]  # Triton Inference Server client
!pip install -q docker
!pip install -q prometheus-client
!pip install -q pydantic

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
import os
import time
import pickle
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import warnings
warnings.filterwarnings('ignore')

# ML Libraries
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn

# MLOps Libraries
import mlflow
import mlflow.sklearn
import mlflow.pytorch

# Model Conversion
import onnx
import onnxruntime as ort

# API Development
from pydantic import BaseModel, Field
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse

# Monitoring
from prometheus_client import Counter, Histogram, Gauge, generate_latest

print("Libraries loaded successfully")

## 2. 모델 서빙을 위한 FastAPI 구현

In [None]:
# FastAPI 모델 서빙 구현
class ModelServingAPI:
    """모델 서빙 API 클래스"""
    
    def __init__(self):
        self.app = FastAPI(title="Manufacturing AI Model API", version="1.0.0")
        self.model = None
        self.scaler = None
        self.model_metadata = {}
        
        # Prometheus 메트릭 정의
        self.prediction_counter = Counter(
            'model_predictions_total', 
            'Total number of predictions'
        )
        self.prediction_latency = Histogram(
            'model_prediction_duration_seconds',
            'Prediction latency in seconds'
        )
        self.active_model_gauge = Gauge(
            'active_model_version',
            'Currently active model version'
        )
        
        self._setup_routes()
    
    def _setup_routes(self):
        """API 라우트 설정"""
        
        @self.app.get("/")
        async def root():
            return {"message": "Manufacturing AI Model API", "status": "running"}
        
        @self.app.get("/health")
        async def health_check():
            return {
                "status": "healthy",
                "model_loaded": self.model is not None,
                "timestamp": datetime.now().isoformat()
            }
        
        @self.app.get("/model/info")
        async def model_info():
            return self.model_metadata
        
        @self.app.post("/predict")
        async def predict(request: PredictionRequest):
            return await self._predict(request)
        
        @self.app.post("/batch_predict")
        async def batch_predict(request: BatchPredictionRequest):
            return await self._batch_predict(request)
        
        @self.app.get("/metrics")
        async def metrics():
            return generate_latest()
    
    async def _predict(self, request: PredictionRequest):
        """단일 예측 수행"""
        if self.model is None:
            raise HTTPException(status_code=503, detail="Model not loaded")
        
        start_time = time.time()
        
        try:
            # 데이터 전처리
            features = np.array(request.features).reshape(1, -1)
            if self.scaler:
                features = self.scaler.transform(features)
            
            # 예측
            prediction = self.model.predict(features)[0]
            
            # 확률 (분류 모델인 경우)
            if hasattr(self.model, 'predict_proba'):
                probabilities = self.model.predict_proba(features)[0].tolist()
            else:
                probabilities = None
            
            # 메트릭 업데이트
            self.prediction_counter.inc()
            self.prediction_latency.observe(time.time() - start_time)
            
            return {
                "prediction": int(prediction),
                "probabilities": probabilities,
                "latency_ms": (time.time() - start_time) * 1000,
                "timestamp": datetime.now().isoformat()
            }
            
        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))
    
    async def _batch_predict(self, request: BatchPredictionRequest):
        """배치 예측 수행"""
        if self.model is None:
            raise HTTPException(status_code=503, detail="Model not loaded")
        
        start_time = time.time()
        
        try:
            # 데이터 전처리
            features = np.array(request.batch_features)
            if self.scaler:
                features = self.scaler.transform(features)
            
            # 예측
            predictions = self.model.predict(features).tolist()
            
            # 확률
            if hasattr(self.model, 'predict_proba'):
                probabilities = self.model.predict_proba(features).tolist()
            else:
                probabilities = None
            
            # 메트릭 업데이트
            self.prediction_counter.inc(len(predictions))
            self.prediction_latency.observe(time.time() - start_time)
            
            return {
                "predictions": predictions,
                "probabilities": probabilities,
                "batch_size": len(predictions),
                "latency_ms": (time.time() - start_time) * 1000,
                "timestamp": datetime.now().isoformat()
            }
            
        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))

# Pydantic 모델 정의
class PredictionRequest(BaseModel):
    features: List[float] = Field(..., description="Input features for prediction")
    request_id: Optional[str] = Field(None, description="Optional request ID for tracking")

class BatchPredictionRequest(BaseModel):
    batch_features: List[List[float]] = Field(..., description="Batch of input features")
    request_id: Optional[str] = Field(None, description="Optional request ID for tracking")

# API 인스턴스 생성 예제
print("FastAPI Model Serving Structure Created")
print("\nAPI Endpoints:")
print("  GET  /          - API root")
print("  GET  /health    - Health check")
print("  GET  /model/info - Model metadata")
print("  POST /predict   - Single prediction")
print("  POST /batch_predict - Batch predictions")
print("  GET  /metrics   - Prometheus metrics")

## 3. ONNX/TensorRT 변환 및 최적화

In [None]:
class ModelOptimizer:
    """모델 최적화 및 변환 도구"""
    
    def __init__(self):
        self.optimization_results = {}
        
    def pytorch_to_onnx(self, model, input_shape, output_path="model.onnx"):
        """PyTorch 모델을 ONNX로 변환"""
        model.eval()
        
        # 더미 입력 생성
        dummy_input = torch.randn(*input_shape)
        
        # ONNX로 변환
        torch.onnx.export(
            model,
            dummy_input,
            output_path,
            export_params=True,
            opset_version=11,
            do_constant_folding=True,
            input_names=['input'],
            output_names=['output'],
            dynamic_axes={
                'input': {0: 'batch_size'},
                'output': {0: 'batch_size'}
            }
        )
        
        # 검증
        onnx_model = onnx.load(output_path)
        onnx.checker.check_model(onnx_model)
        
        print(f"Model converted to ONNX: {output_path}")
        return output_path
    
    def optimize_onnx(self, onnx_path, optimization_level=3):
        """ONNX Runtime 최적화"""
        # Session options
        sess_options = ort.SessionOptions()
        sess_options.graph_optimization_level = getattr(
            ort.GraphOptimizationLevel, 
            ['DISABLE', 'BASIC', 'EXTENDED', 'ALL'][optimization_level]
        )
        
        # Provider 선택 (GPU 사용 가능시 CUDA)
        providers = ['CPUExecutionProvider']
        if torch.cuda.is_available():
            providers.insert(0, 'CUDAExecutionProvider')
        
        # ONNX Runtime 세션 생성
        session = ort.InferenceSession(onnx_path, sess_options, providers=providers)
        
        return session
    
    def benchmark_inference(self, model_func, input_data, n_iterations=100):
        """추론 성능 벤치마크"""
        latencies = []
        
        # Warm-up
        for _ in range(10):
            _ = model_func(input_data)
        
        # Benchmark
        for _ in range(n_iterations):
            start = time.perf_counter()
            _ = model_func(input_data)
            latencies.append((time.perf_counter() - start) * 1000)  # ms
        
        results = {
            'mean_latency_ms': np.mean(latencies),
            'std_latency_ms': np.std(latencies),
            'min_latency_ms': np.min(latencies),
            'max_latency_ms': np.max(latencies),
            'p50_latency_ms': np.percentile(latencies, 50),
            'p95_latency_ms': np.percentile(latencies, 95),
            'p99_latency_ms': np.percentile(latencies, 99),
            'throughput_fps': 1000 / np.mean(latencies)
        }
        
        return results
    
    def quantize_model(self, model, calibration_data=None):
        """모델 양자화 (INT8)"""
        # PyTorch 동적 양자화 예제
        quantized_model = torch.quantization.quantize_dynamic(
            model,
            {nn.Linear, nn.Conv2d},
            dtype=torch.qint8
        )
        
        # 모델 크기 비교
        def get_model_size(model):
            param_size = 0
            buffer_size = 0
            
            for param in model.parameters():
                param_size += param.nelement() * param.element_size()
            
            for buffer in model.buffers():
                buffer_size += buffer.nelement() * buffer.element_size()
            
            return (param_size + buffer_size) / 1024 / 1024  # MB
        
        original_size = get_model_size(model)
        quantized_size = get_model_size(quantized_model)
        
        print(f"Original model size: {original_size:.2f} MB")
        print(f"Quantized model size: {quantized_size:.2f} MB")
        print(f"Compression ratio: {original_size/quantized_size:.2f}x")
        
        return quantized_model

# 간단한 PyTorch 모델 예제
class SimpleNN(nn.Module):
    def __init__(self, input_size=10, hidden_size=64, output_size=3):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# 모델 최적화 시연
print("Model Optimization Demonstration")
print("="*50)

# PyTorch 모델 생성
model = SimpleNN()
optimizer_tool = ModelOptimizer()

# 1. ONNX 변환
print("\n1. Converting to ONNX...")
onnx_path = optimizer_tool.pytorch_to_onnx(
    model, 
    input_shape=(1, 10),
    output_path="optimized_model.onnx"
)

# 2. 양자화
print("\n2. Quantizing model...")
quantized_model = optimizer_tool.quantize_model(model)

# 3. 벤치마크 (시뮬레이션)
print("\n3. Benchmark Results (Simulated):")
dummy_input = torch.randn(1, 10)

# 원본 모델 벤치마크
original_results = optimizer_tool.benchmark_inference(
    lambda x: model(x),
    dummy_input,
    n_iterations=100
)

# 양자화 모델 벤치마크
quantized_results = optimizer_tool.benchmark_inference(
    lambda x: quantized_model(x),
    dummy_input,
    n_iterations=100
)

# 결과 비교
comparison_df = pd.DataFrame({
    'Original': original_results,
    'Quantized': quantized_results
}).T

print("\nPerformance Comparison:")
print(comparison_df[['mean_latency_ms', 'p95_latency_ms', 'throughput_fps']].round(2))

## 4. MLflow를 활용한 모델 버전 관리

In [None]:
class MLOpsManager:
    """MLOps 관리 시스템"""
    
    def __init__(self, experiment_name="manufacturing_ai"):
        self.experiment_name = experiment_name
        mlflow.set_experiment(experiment_name)
        self.model_registry = {}
        
    def log_experiment(self, 
                      model,
                      metrics,
                      params,
                      artifacts=None,
                      model_name="model"):
        """MLflow 실험 로깅"""
        
        with mlflow.start_run() as run:
            # 파라미터 로깅
            for key, value in params.items():
                mlflow.log_param(key, value)
            
            # 메트릭 로깅
            for key, value in metrics.items():
                mlflow.log_metric(key, value)
            
            # 모델 로깅
            if isinstance(model, torch.nn.Module):
                mlflow.pytorch.log_model(model, model_name)
            else:
                mlflow.sklearn.log_model(model, model_name)
            
            # 아티팩트 로깅
            if artifacts:
                for artifact_path in artifacts:
                    mlflow.log_artifact(artifact_path)
            
            # 태그 추가
            mlflow.set_tag("environment", "production")
            mlflow.set_tag("model_type", type(model).__name__)
            
            return run.info.run_id
    
    def register_model(self, run_id, model_name, stage="Staging"):
        """모델 레지스트리에 등록"""
        model_uri = f"runs:/{run_id}/model"
        
        # 모델 등록
        mv = mlflow.register_model(model_uri, model_name)
        
        # 스테이지 전환
        client = mlflow.tracking.MlflowClient()
        client.transition_model_version_stage(
            name=model_name,
            version=mv.version,
            stage=stage
        )
        
        self.model_registry[model_name] = {
            'version': mv.version,
            'stage': stage,
            'run_id': run_id
        }
        
        return mv
    
    def load_production_model(self, model_name):
        """프로덕션 모델 로드"""
        client = mlflow.tracking.MlflowClient()
        
        # Production 스테이지 모델 로드
        model_version = client.get_latest_versions(
            model_name, 
            stages=["Production"]
        )[0]
        
        model_uri = f"models:/{model_name}/{model_version.version}"
        model = mlflow.pyfunc.load_model(model_uri)
        
        return model, model_version
    
    def compare_models(self, run_ids):
        """모델 성능 비교"""
        client = mlflow.tracking.MlflowClient()
        
        comparison_data = []
        for run_id in run_ids:
            run = client.get_run(run_id)
            metrics = run.data.metrics
            params = run.data.params
            
            comparison_data.append({
                'run_id': run_id,
                'accuracy': metrics.get('accuracy', 0),
                'f1_score': metrics.get('f1_score', 0),
                'latency_ms': metrics.get('latency_ms', 0),
                'model_size_mb': metrics.get('model_size_mb', 0)
            })
        
        return pd.DataFrame(comparison_data)
    
    def automated_model_promotion(self, 
                                 model_name,
                                 metric_threshold={'accuracy': 0.95}):
        """자동 모델 프로모션"""
        client = mlflow.tracking.MlflowClient()
        
        # Staging 모델 가져오기
        staging_models = client.get_latest_versions(
            model_name, 
            stages=["Staging"]
        )
        
        if not staging_models:
            print("No models in Staging")
            return False
        
        staging_model = staging_models[0]
        run = client.get_run(staging_model.run_id)
        
        # 메트릭 검증
        promote = True
        for metric, threshold in metric_threshold.items():
            value = run.data.metrics.get(metric, 0)
            if value < threshold:
                promote = False
                print(f"Metric {metric}={value:.3f} below threshold {threshold}")
        
        if promote:
            # Production으로 프로모션
            client.transition_model_version_stage(
                name=model_name,
                version=staging_model.version,
                stage="Production",
                archive_existing_versions=True
            )
            print(f"Model {model_name} v{staging_model.version} promoted to Production")
            return True
        
        return False

# MLOps 워크플로우 시뮬레이션
print("MLOps Workflow Simulation")
print("="*50)

# MLOps Manager 생성
mlops = MLOpsManager(experiment_name="manufacturing_quality_prediction")

# 샘플 모델 학습 및 로깅
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

# 데이터 생성
X, y = make_classification(n_samples=1000, n_features=10, n_classes=3, 
                          n_informative=8, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 여러 모델 실험
experiments = [
    {'n_estimators': 50, 'max_depth': 5},
    {'n_estimators': 100, 'max_depth': 10},
    {'n_estimators': 200, 'max_depth': 15}
]

run_ids = []
for exp_params in experiments:
    # 모델 학습
    model = RandomForestClassifier(**exp_params, random_state=42)
    model.fit(X_train, y_train)
    
    # 평가
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')
    
    # MLflow 로깅
    metrics = {
        'accuracy': accuracy,
        'f1_score': f1,
        'test_samples': len(X_test)
    }
    
    run_id = mlops.log_experiment(
        model=model,
        metrics=metrics,
        params=exp_params,
        model_name="rf_model"
    )
    
    run_ids.append(run_id)
    print(f"Experiment logged: n_estimators={exp_params['n_estimators']}, "
          f"accuracy={accuracy:.3f}, run_id={run_id[:8]}...")

# 모델 비교
print("\nModel Comparison:")
# Note: 실제 MLflow 서버가 없으므로 시뮬레이션
comparison_data = []
for i, (run_id, exp_params) in enumerate(zip(run_ids, experiments)):
    comparison_data.append({
        'run_id': run_id[:8] + '...',
        'n_estimators': exp_params['n_estimators'],
        'max_depth': exp_params['max_depth'],
        'accuracy': 0.90 + i*0.02,  # 시뮬레이션
        'f1_score': 0.89 + i*0.02   # 시뮬레이션
    })

comparison_df = pd.DataFrame(comparison_data)
print(comparison_df)

## 5. 모델 모니터링 및 드리프트 감지

In [None]:
class ModelMonitoring:
    """모델 모니터링 및 드리프트 감지"""
    
    def __init__(self):
        self.reference_data = None
        self.monitoring_metrics = []
        self.drift_history = []
        
    def set_reference_data(self, X_reference, y_reference=None):
        """참조 데이터 설정 (학습 데이터)"""
        self.reference_data = {
            'X': X_reference,
            'y': y_reference,
            'statistics': self._calculate_statistics(X_reference)
        }
    
    def _calculate_statistics(self, X):
        """데이터 통계 계산"""
        return {
            'mean': np.mean(X, axis=0),
            'std': np.std(X, axis=0),
            'min': np.min(X, axis=0),
            'max': np.max(X, axis=0),
            'quantiles': np.percentile(X, [25, 50, 75], axis=0)
        }
    
    def detect_data_drift(self, X_current, method='ks_test', threshold=0.05):
        """데이터 드리프트 감지"""
        drift_results = {}
        
        for feature_idx in range(X_current.shape[1]):
            reference_feature = self.reference_data['X'][:, feature_idx]
            current_feature = X_current[:, feature_idx]
            
            if method == 'ks_test':
                # Kolmogorov-Smirnov test
                statistic, p_value = stats.ks_2samp(
                    reference_feature, 
                    current_feature
                )
                drift_detected = p_value < threshold
                
            elif method == 'psi':
                # Population Stability Index
                psi_value = self._calculate_psi(
                    reference_feature, 
                    current_feature
                )
                drift_detected = psi_value > 0.2  # PSI threshold
                statistic = psi_value
                p_value = None
            
            drift_results[f'feature_{feature_idx}'] = {
                'drift_detected': drift_detected,
                'statistic': statistic,
                'p_value': p_value
            }
        
        # 전체 드리프트 판정
        total_features = len(drift_results)
        drifted_features = sum(
            1 for v in drift_results.values() if v['drift_detected']
        )
        
        overall_drift = drifted_features / total_features > 0.3
        
        drift_summary = {
            'timestamp': datetime.now(),
            'overall_drift': overall_drift,
            'drifted_features': drifted_features,
            'total_features': total_features,
            'drift_ratio': drifted_features / total_features,
            'details': drift_results
        }
        
        self.drift_history.append(drift_summary)
        
        return drift_summary
    
    def _calculate_psi(self, reference, current, n_bins=10):
        """PSI (Population Stability Index) 계산"""
        # 빈 경계 생성
        min_val = min(reference.min(), current.min())
        max_val = max(reference.max(), current.max())
        bins = np.linspace(min_val, max_val, n_bins + 1)
        
        # 히스토그램 계산
        ref_counts, _ = np.histogram(reference, bins=bins)
        cur_counts, _ = np.histogram(current, bins=bins)
        
        # 확률로 변환
        ref_probs = (ref_counts + 1) / (len(reference) + n_bins)
        cur_probs = (cur_counts + 1) / (len(current) + n_bins)
        
        # PSI 계산
        psi = np.sum((cur_probs - ref_probs) * np.log(cur_probs / ref_probs))
        
        return psi
    
    def monitor_prediction_performance(self, 
                                      y_true, 
                                      y_pred, 
                                      timestamp=None):
        """예측 성능 모니터링"""
        if timestamp is None:
            timestamp = datetime.now()
        
        metrics = {
            'timestamp': timestamp,
            'accuracy': accuracy_score(y_true, y_pred),
            'f1_score': f1_score(y_true, y_pred, average='weighted'),
            'samples': len(y_true)
        }
        
        self.monitoring_metrics.append(metrics)
        
        # 성능 저하 감지
        if len(self.monitoring_metrics) > 10:
            recent_accuracy = np.mean(
                [m['accuracy'] for m in self.monitoring_metrics[-5:]]
            )
            baseline_accuracy = np.mean(
                [m['accuracy'] for m in self.monitoring_metrics[:5]]
            )
            
            if recent_accuracy < baseline_accuracy * 0.95:
                print(f"⚠️ Performance degradation detected: "
                      f"{recent_accuracy:.3f} < {baseline_accuracy*0.95:.3f}")
        
        return metrics
    
    def visualize_monitoring(self):
        """모니터링 결과 시각화"""
        if not self.monitoring_metrics:
            print("No monitoring data available")
            return
        
        fig, axes = plt.subplots(2, 2, figsize=(12, 8))
        
        # 성능 메트릭 추이
        metrics_df = pd.DataFrame(self.monitoring_metrics)
        axes[0, 0].plot(metrics_df['accuracy'], label='Accuracy', marker='o')
        axes[0, 0].plot(metrics_df['f1_score'], label='F1 Score', marker='s')
        axes[0, 0].set_xlabel('Time')
        axes[0, 0].set_ylabel('Score')
        axes[0, 0].set_title('Model Performance Over Time')
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)
        
        # 샘플 수 추이
        axes[0, 1].bar(range(len(metrics_df)), metrics_df['samples'], 
                      color='steelblue', alpha=0.7)
        axes[0, 1].set_xlabel('Time')
        axes[0, 1].set_ylabel('Number of Samples')
        axes[0, 1].set_title('Prediction Volume')
        axes[0, 1].grid(True, alpha=0.3)
        
        # 드리프트 히스토리
        if self.drift_history:
            drift_ratios = [d['drift_ratio'] for d in self.drift_history]
            axes[1, 0].plot(drift_ratios, color='red', marker='o')
            axes[1, 0].axhline(y=0.3, color='red', linestyle='--', 
                             alpha=0.5, label='Drift Threshold')
            axes[1, 0].set_xlabel('Check Number')
            axes[1, 0].set_ylabel('Drift Ratio')
            axes[1, 0].set_title('Feature Drift Detection')
            axes[1, 0].legend()
            axes[1, 0].grid(True, alpha=0.3)
        
        # 최근 드리프트 상태
        if self.drift_history:
            latest_drift = self.drift_history[-1]
            feature_status = [
                1 if v['drift_detected'] else 0 
                for v in latest_drift['details'].values()
            ]
            
            axes[1, 1].bar(range(len(feature_status)), feature_status, 
                          color=['red' if x else 'green' for x in feature_status])
            axes[1, 1].set_xlabel('Feature Index')
            axes[1, 1].set_ylabel('Drift Detected')
            axes[1, 1].set_title('Latest Feature Drift Status')
            axes[1, 1].set_ylim([0, 1.5])
            axes[1, 1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()

# 모니터링 시뮬레이션
print("Model Monitoring Simulation")
print("="*50)

monitor = ModelMonitoring()

# 참조 데이터 설정
monitor.set_reference_data(X_train)

# 시간에 따른 데이터 변화 시뮬레이션
for i in range(5):
    # 데이터 생성 (점진적 드리프트)
    drift_factor = i * 0.5
    X_current = X_test + np.random.normal(drift_factor, 0.5, X_test.shape)
    
    # 드리프트 감지
    drift_result = monitor.detect_data_drift(X_current)
    
    print(f"\nTime step {i+1}:")
    print(f"  Drift detected: {drift_result['overall_drift']}")
    print(f"  Drifted features: {drift_result['drifted_features']}/{drift_result['total_features']}")
    
    # 성능 모니터링 (시뮬레이션)
    y_pred_sim = model.predict(X_test)
    # 성능 저하 시뮬레이션
    if i > 2:
        # 일부 예측 오류 주입
        error_idx = np.random.choice(len(y_pred_sim), size=int(len(y_pred_sim)*0.1))
        y_pred_sim[error_idx] = (y_pred_sim[error_idx] + 1) % 3
    
    perf_metrics = monitor.monitor_prediction_performance(y_test, y_pred_sim)
    print(f"  Accuracy: {perf_metrics['accuracy']:.3f}")

# 모니터링 시각화
monitor.visualize_monitoring()

## 6. A/B 테스트 및 섀도우 배포

In [None]:
class DeploymentStrategy:
    """배포 전략 구현"""
    
    def __init__(self):
        self.models = {}
        self.traffic_split = {}
        self.ab_test_results = []
        
    def canary_deployment(self, 
                         new_model, 
                         old_model,
                         initial_traffic=0.1,
                         increment=0.1,
                         success_threshold=0.95):
        """카나리 배포 시뮬레이션"""
        
        current_traffic = initial_traffic
        deployment_log = []
        
        print("Starting Canary Deployment")
        print("="*40)
        
        while current_traffic <= 1.0:
            # 트래픽 분배
            n_requests = 1000
            n_new = int(n_requests * current_traffic)
            n_old = n_requests - n_new
            
            # 성능 시뮬레이션
            new_success_rate = np.random.uniform(0.93, 0.98)
            old_success_rate = np.random.uniform(0.90, 0.95)
            
            # 메트릭 계산
            overall_success = (
                new_success_rate * n_new + 
                old_success_rate * n_old
            ) / n_requests
            
            log_entry = {
                'traffic_split': current_traffic,
                'new_model_requests': n_new,
                'old_model_requests': n_old,
                'new_model_success': new_success_rate,
                'old_model_success': old_success_rate,
                'overall_success': overall_success
            }
            
            deployment_log.append(log_entry)
            
            print(f"Traffic: {current_traffic:.0%} to new model")
            print(f"  New model success: {new_success_rate:.3f}")
            print(f"  Old model success: {old_success_rate:.3f}")
            print(f"  Overall success: {overall_success:.3f}")
            
            # 성공 기준 확인
            if new_success_rate < success_threshold:
                print(f"❌ Rollback! New model below threshold: "
                      f"{new_success_rate:.3f} < {success_threshold}")
                return False, deployment_log
            
            print(f"✓ Proceeding to next stage\n")
            current_traffic = min(1.0, current_traffic + increment)
        
        print("✅ Canary deployment successful!")
        return True, deployment_log
    
    def ab_test(self, 
               model_a, 
               model_b,
               n_samples=1000,
               significance_level=0.05):
        """A/B 테스트 수행"""
        
        # 성능 시뮬레이션
        success_a = np.random.binomial(n_samples, 0.92)
        success_b = np.random.binomial(n_samples, 0.94)
        
        rate_a = success_a / n_samples
        rate_b = success_b / n_samples
        
        # 비율 차이 검정 (z-test)
        pooled_rate = (success_a + success_b) / (2 * n_samples)
        se = np.sqrt(pooled_rate * (1 - pooled_rate) * (2 / n_samples))
        z_score = (rate_b - rate_a) / se
        p_value = 2 * (1 - stats.norm.cdf(abs(z_score)))
        
        # 신뢰구간 계산
        ci_margin = 1.96 * se
        ci_lower = (rate_b - rate_a) - ci_margin
        ci_upper = (rate_b - rate_a) + ci_margin
        
        result = {
            'model_a_success_rate': rate_a,
            'model_b_success_rate': rate_b,
            'difference': rate_b - rate_a,
            'p_value': p_value,
            'significant': p_value < significance_level,
            'confidence_interval': (ci_lower, ci_upper),
            'winner': 'Model B' if rate_b > rate_a and p_value < significance_level else 'No winner'
        }
        
        self.ab_test_results.append(result)
        
        return result
    
    def shadow_deployment(self, primary_model, shadow_model, n_requests=100):
        """섀도우 배포 (미러링)"""
        
        shadow_results = []
        
        for i in range(n_requests):
            # 실제 요청 시뮬레이션
            request_data = np.random.randn(10)  # 10 features
            
            # Primary 모델 예측
            primary_start = time.perf_counter()
            primary_pred = np.random.choice([0, 1, 2])  # 시뮬레이션
            primary_latency = (time.perf_counter() - primary_start) * 1000
            
            # Shadow 모델 예측 (비동기적으로 실행)
            shadow_start = time.perf_counter()
            shadow_pred = np.random.choice([0, 1, 2])  # 시뮬레이션
            shadow_latency = (time.perf_counter() - shadow_start) * 1000
            
            # 결과 비교
            agreement = primary_pred == shadow_pred
            
            shadow_results.append({
                'request_id': i,
                'primary_prediction': primary_pred,
                'shadow_prediction': shadow_pred,
                'agreement': agreement,
                'primary_latency_ms': primary_latency,
                'shadow_latency_ms': shadow_latency
            })
        
        # 통계 계산
        df = pd.DataFrame(shadow_results)
        
        summary = {
            'total_requests': n_requests,
            'agreement_rate': df['agreement'].mean(),
            'primary_avg_latency': df['primary_latency_ms'].mean(),
            'shadow_avg_latency': df['shadow_latency_ms'].mean(),
            'latency_difference': df['shadow_latency_ms'].mean() - df['primary_latency_ms'].mean()
        }
        
        return summary, df

# 배포 전략 시뮬레이션
deployment = DeploymentStrategy()

print("\n" + "="*50)
print("DEPLOYMENT STRATEGY SIMULATION")
print("="*50)

# 1. Canary Deployment
print("\n1. CANARY DEPLOYMENT")
print("-"*40)
success, canary_log = deployment.canary_deployment(
    new_model="model_v2",
    old_model="model_v1",
    initial_traffic=0.1,
    increment=0.2
)

# 2. A/B Testing
print("\n2. A/B TESTING")
print("-"*40)
ab_result = deployment.ab_test(
    model_a="model_v1",
    model_b="model_v2",
    n_samples=2000
)

print(f"Model A success rate: {ab_result['model_a_success_rate']:.3f}")
print(f"Model B success rate: {ab_result['model_b_success_rate']:.3f}")
print(f"Difference: {ab_result['difference']:.3f}")
print(f"P-value: {ab_result['p_value']:.4f}")
print(f"Statistically significant: {ab_result['significant']}")
print(f"95% CI: [{ab_result['confidence_interval'][0]:.3f}, "
      f"{ab_result['confidence_interval'][1]:.3f}]")
print(f"Winner: {ab_result['winner']}")

# 3. Shadow Deployment
print("\n3. SHADOW DEPLOYMENT")
print("-"*40)
shadow_summary, shadow_df = deployment.shadow_deployment(
    primary_model="model_v1",
    shadow_model="model_v2",
    n_requests=100
)

print(f"Total requests: {shadow_summary['total_requests']}")
print(f"Agreement rate: {shadow_summary['agreement_rate']:.1%}")
print(f"Primary avg latency: {shadow_summary['primary_avg_latency']:.2f} ms")
print(f"Shadow avg latency: {shadow_summary['shadow_avg_latency']:.2f} ms")
print(f"Latency difference: {shadow_summary['latency_difference']:.2f} ms")

## 실습 과제

1. **REST API 구현**: FastAPI를 사용해 실제 모델 서빙 API 구축
2. **모델 최적화**: ONNX/TensorRT로 모델을 변환하고 성능 비교
3. **MLflow 통합**: MLflow로 실험 추적 및 모델 레지스트리 구축
4. **모니터링 대시보드**: Grafana/Prometheus로 실시간 모니터링 구현
5. **자동 재학습 파이프라인**: 드리프트 감지시 자동 재학습 트리거