# Lab 3-2: E2E ML Pipeline 통합

데이터 로드부터 모델 배포까지 완전 자동화된 MLOps 파이프라인을 구축합니다.

## 파이프라인 구조

```
┌─────────────┐   ┌─────────────┐   ┌─────────────┐
│  load_data  │──▶│  preprocess │──▶│ train_model │
└─────────────┘   └─────────────┘   └──────┬──────┘
                                          │
                                          ▼
                                   ┌─────────────┐
                                   │  evaluate   │
                                   └──────┬──────┘
                                          │
                              ┌───────────┴───────────┐
                              │                       │
                              ▼ (deploy)              ▼ (skip)
                       ┌─────────────┐         ┌──────────┐
                       │   deploy    │         │  alert   │
                       │  (KServe)   │         │          │
                       └─────────────┘         └──────────┘
```

## 1. 환경 설정

In [None]:
# 패키지 설치
!pip install kfp==1.8.22 mlflow==2.9.2 scikit-learn pandas -q
print("✅ 패키지 설치 완료!")

In [None]:
# Import
import kfp
from kfp import dsl
from kfp.components import create_component_from_func
from kfp import compiler
import os

print(f"KFP Version: {kfp.__version__}")

## 2. 환경 변수 설정

In [None]:
# 환경 변수 (자신의 설정으로 변경!)
USER_NAMESPACE = os.getenv('NAMESPACE', 'kubeflow-user01')  # 본인 네임스페이스

MLFLOW_TRACKING_URI = os.getenv(
    'MLFLOW_TRACKING_URI',
    'http://mlflow-server-service.mlflow-system.svc.cluster.local:5000'
)

print(f"Namespace: {USER_NAMESPACE}")
print(f"MLflow URI: {MLFLOW_TRACKING_URI}")

## 3. 컴포넌트 정의

### 3.1 데이터 로드

In [None]:
@create_component_from_func
def load_data(data_source: str = "sklearn") -> str:
    """데이터를 로드하고 저장합니다."""
    import pandas as pd
    from sklearn.datasets import fetch_california_housing
    
    print("=" * 50)
    print("  Step 1: Load Data")
    print("=" * 50)
    
    if data_source == "sklearn":
        data = fetch_california_housing()
        df = pd.DataFrame(data.data, columns=data.feature_names)
        df['target'] = data.target
    else:
        df = pd.read_csv(data_source)
    
    output_path = "/tmp/raw_data.csv"
    df.to_csv(output_path, index=False)
    
    print(f"  ✅ Data loaded: {len(df)} rows, {len(df.columns)} columns")
    print(f"  ✅ Saved to: {output_path}")
    
    return output_path

### 3.2 전처리

In [None]:
@create_component_from_func
def preprocess(data_path: str, test_size: float = 0.2) -> str:
    """데이터 전처리 및 Train/Test 분할"""
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    import os
    
    print("=" * 50)
    print("  Step 2: Preprocess")
    print("=" * 50)
    
    df = pd.read_csv(data_path)
    
    X = df.drop('target', axis=1)
    y = df['target']
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=42
    )
    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    output_dir = "/tmp/processed"
    os.makedirs(output_dir, exist_ok=True)
    
    np.save(f"{output_dir}/X_train.npy", X_train_scaled)
    np.save(f"{output_dir}/X_test.npy", X_test_scaled)
    np.save(f"{output_dir}/y_train.npy", y_train.values)
    np.save(f"{output_dir}/y_test.npy", y_test.values)
    
    print(f"  ✅ Train: {len(X_train)}, Test: {len(X_test)}")
    print(f"  ✅ Saved to: {output_dir}")
    
    return output_dir

### 3.3 모델 학습 (MLflow 연동)

In [None]:
@create_component_from_func
def train_model(
    data_dir: str,
    mlflow_tracking_uri: str,
    experiment_name: str = "e2e-pipeline",
    n_estimators: int = 100,
    max_depth: int = 10
) -> str:
    """모델 학습 및 MLflow에 기록"""
    import numpy as np
    import mlflow
    import mlflow.sklearn
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import mean_squared_error, r2_score
    import os
    
    print("=" * 50)
    print("  Step 3: Train Model")
    print("=" * 50)
    
    os.environ['MLFLOW_TRACKING_URI'] = mlflow_tracking_uri
    
    # 데이터 로드
    X_train = np.load(f"{data_dir}/X_train.npy")
    X_test = np.load(f"{data_dir}/X_test.npy")
    y_train = np.load(f"{data_dir}/y_train.npy")
    y_test = np.load(f"{data_dir}/y_test.npy")
    
    # MLflow 설정
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    mlflow.set_experiment(experiment_name)
    
    with mlflow.start_run() as run:
        run_id = run.info.run_id
        
        # 파라미터 로깅
        mlflow.log_params({
            "n_estimators": n_estimators,
            "max_depth": max_depth,
            "random_state": 42
        })
        
        # 모델 학습
        model = RandomForestRegressor(
            n_estimators=n_estimators,
            max_depth=max_depth,
            random_state=42,
            n_jobs=-1
        )
        model.fit(X_train, y_train)
        
        # 평가
        y_pred = model.predict(X_test)
        mse = mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        
        # 메트릭 로깅
        mlflow.log_metrics({"mse": mse, "rmse": np.sqrt(mse), "r2": r2})
        
        # 모델 저장
        mlflow.sklearn.log_model(model, "model")
        
        print(f"  ✅ R2: {r2:.4f}, RMSE: {np.sqrt(mse):.4f}")
        print(f"  ✅ Run ID: {run_id}")
    
    return run_id

### 3.4 모델 평가

In [None]:
@create_component_from_func
def evaluate_model(
    run_id: str,
    mlflow_tracking_uri: str,
    r2_threshold: float = 0.8
) -> str:
    """모델 평가 및 배포 결정"""
    import mlflow
    import os
    
    print("=" * 50)
    print("  Step 4: Evaluate Model")
    print("=" * 50)
    
    os.environ['MLFLOW_TRACKING_URI'] = mlflow_tracking_uri
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    
    client = mlflow.tracking.MlflowClient()
    run = client.get_run(run_id)
    
    r2 = float(run.data.metrics.get("r2", 0))
    
    print(f"  Run ID: {run_id}")
    print(f"  R2 Score: {r2:.4f}")
    print(f"  Threshold: {r2_threshold}")
    
    if r2 >= r2_threshold:
        decision = "deploy"
        print(f"  ✅ Decision: DEPLOY")
    else:
        decision = "skip"
        print(f"  ⚠️ Decision: SKIP")
    
    return decision

### 3.5 모델 배포 / 알림

In [None]:
@create_component_from_func
def deploy_model(run_id: str, model_name: str, namespace: str, mlflow_tracking_uri: str):
    """KServe InferenceService로 모델 배포"""
    print("=" * 50)
    print("  Step 5: Deploy Model")
    print("=" * 50)
    
    print(f"  ✅ Deploying model '{model_name}' to namespace '{namespace}'")
    print(f"  ✅ Run ID: {run_id}")
    # 실제 배포 코드는 deployer.py 참조


@create_component_from_func
def send_alert(run_id: str, message: str = "Model did not meet threshold"):
    """성능 미달 알림"""
    print("=" * 50)
    print("  Step 5 (Alt): Send Alert")
    print("=" * 50)
    
    print(f"  ⚠️ ALERT: {message}")
    print(f"  Run ID: {run_id}")

## 4. 파이프라인 정의

In [None]:
@dsl.pipeline(
    name='E2E ML Pipeline',
    description='End-to-End Machine Learning Pipeline'
)
def e2e_ml_pipeline(
    data_source: str = "sklearn",
    mlflow_tracking_uri: str = "http://mlflow-server-service.mlflow-system.svc.cluster.local:5000",
    experiment_name: str = "e2e-pipeline",
    model_name: str = "california-model",
    namespace: str = "kubeflow-user01",
    n_estimators: int = 100,
    max_depth: int = 10,
    r2_threshold: float = 0.8
):
    # Step 1: 데이터 로드
    load_task = load_data(data_source=data_source)
    
    # Step 2: 전처리
    preprocess_task = preprocess(data_path=load_task.output)
    
    # Step 3: 모델 학습
    train_task = train_model(
        data_dir=preprocess_task.output,
        mlflow_tracking_uri=mlflow_tracking_uri,
        experiment_name=experiment_name,
        n_estimators=n_estimators,
        max_depth=max_depth
    )
    
    # Step 4: 평가
    evaluate_task = evaluate_model(
        run_id=train_task.output,
        mlflow_tracking_uri=mlflow_tracking_uri,
        r2_threshold=r2_threshold
    )
    
    # Step 5: 조건부 배포
    with dsl.Condition(evaluate_task.output == "deploy"):
        deploy_model(
            run_id=train_task.output,
            model_name=model_name,
            namespace=namespace,
            mlflow_tracking_uri=mlflow_tracking_uri
        )
    
    with dsl.Condition(evaluate_task.output == "skip"):
        send_alert(run_id=train_task.output)

print("✅ 파이프라인 정의 완료!")

## 5. 파이프라인 컴파일

In [None]:
pipeline_file = 'e2e_pipeline.yaml'

compiler.Compiler().compile(
    pipeline_func=e2e_ml_pipeline,
    package_path=pipeline_file
)

print(f"✅ Pipeline compiled: {pipeline_file}")

## 6. 파이프라인 실행

In [None]:
# KFP Client 생성
try:
    client = kfp.Client()
    print(f"✅ KFP Client connected")
    print(f"   Host: {client._host}")
    print(f"   Namespace: {client.get_user_namespace()}")
except Exception as e:
    print(f"⚠️ Could not connect to KFP: {e}")
    client = None

In [None]:
# 파이프라인 실행
if client:
    run = client.create_run_from_pipeline_func(
        e2e_ml_pipeline,
        arguments={
            'data_source': 'sklearn',
            'experiment_name': 'e2e-pipeline',
            'model_name': 'california-model',
            'namespace': USER_NAMESPACE,
            'n_estimators': 100,
            'max_depth': 10,
            'r2_threshold': 0.75
        },
        experiment_name='e2e-experiment',
        run_name='e2e-run-001'
    )
    
    print(f"\n✅ Pipeline submitted!")
    print(f"   Run ID: {run.run_id}")
else:
    print("\n⚠️ Client not connected. Upload YAML manually via Kubeflow UI.")

## ✅ Lab 3-2 완료!

### 학습한 내용
- 다단계 파이프라인 구성
- MLflow 통합
- 조건부 분기 (`dsl.Condition`)
- KServe 자동 배포

### 다음 단계
- Day 3 프로젝트 실습