# ML Pipeline Orchestration

## Overview
Building enterprise-grade ML pipelines with:
- **Apache Airflow**: Workflow orchestration and scheduling
- **Feature Stores**: Feast, Tecton for feature management
- **Data Versioning**: DVC, MLflow for reproducibility
- **MLOps Patterns**: CI/CD for ML, model registry, A/B testing

## Why Pipeline Orchestration?
- **Reproducibility**: Track every step from data to deployment
- **Automation**: Schedule retraining, monitoring, rollbacks
- **Collaboration**: Teams share features and models
- **Compliance**: Audit trails for regulated industries

## Interview Focus
- DAG design patterns
- Feature engineering at scale
- Training-serving skew prevention
- Model deployment strategies
- Pipeline monitoring and alerting

In [None]:
# Installation
# pip install apache-airflow feast dvc mlflow great-expectations prefect

from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import torch
from typing import Dict, List, Any, Optional
import json
import logging
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

## Part 1: Apache Airflow DAG

### Complete ML Pipeline from Data to Deployment

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable

# Pipeline configuration
default_args = {
    'owner': 'ml-team',
    'depends_on_past': False,
    'email': ['alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

# Create DAG
dag = DAG(
    'ml_training_pipeline',
    default_args=default_args,
    description='End-to-end ML training pipeline',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=days_ago(1),
    catchup=False,
    tags=['ml', 'production'],
)

# Task functions
def extract_data(**context):
    """Extract data from source systems."""
    logger.info("Extracting data from database...")
    
    # Simulate data extraction
    data = {
        'records': 10000,
        'features': ['age', 'income', 'tenure', 'usage'],
        'timestamp': datetime.utcnow().isoformat()
    }
    
    # Push to XCom for next task
    context['task_instance'].xcom_push(key='extraction_info', value=data)
    logger.info(f"Extracted {data['records']} records")
    return data

def validate_data(**context):
    """Validate data quality with Great Expectations."""
    logger.info("Validating data quality...")
    
    extraction_info = context['task_instance'].xcom_pull(
        task_ids='extract_data',
        key='extraction_info'
    )
    
    # Validation checks
    validations = {
        'null_check': True,
        'schema_check': True,
        'distribution_check': True,
        'outlier_check': True
    }
    
    if not all(validations.values()):
        raise ValueError("Data validation failed")
    
    logger.info("Data validation passed")
    return validations

def feature_engineering(**context):
    """Create features and store in feature store."""
    logger.info("Engineering features...")
    
    # Create features
    features = {
        'user_engagement_7d': 'avg(events) over 7 days',
        'user_engagement_30d': 'avg(events) over 30 days',
        'ltv_prediction': 'predicted lifetime value',
        'churn_risk': 'churn probability score'
    }
    
    # Push to feature store (simulated)
    context['task_instance'].xcom_push(key='features', value=features)
    logger.info(f"Created {len(features)} features")
    return features

def train_model(**context):
    """Train ML model with hyperparameter tuning."""
    logger.info("Training model...")
    
    features = context['task_instance'].xcom_pull(
        task_ids='feature_engineering',
        key='features'
    )
    
    # Simulate training
    model_metrics = {
        'accuracy': 0.92,
        'precision': 0.89,
        'recall': 0.91,
        'f1_score': 0.90,
        'auc_roc': 0.95,
        'training_time_seconds': 3600
    }
    
    context['task_instance'].xcom_push(key='model_metrics', value=model_metrics)
    logger.info(f"Model trained: AUC-ROC = {model_metrics['auc_roc']}")
    return model_metrics

def evaluate_model(**context):
    """Evaluate model on held-out test set."""
    logger.info("Evaluating model...")
    
    metrics = context['task_instance'].xcom_pull(
        task_ids='train_model',
        key='model_metrics'
    )
    
    # Quality gates
    min_auc_roc = 0.85
    min_precision = 0.80
    
    if metrics['auc_roc'] < min_auc_roc:
        raise ValueError(f"Model AUC-ROC {metrics['auc_roc']} below threshold {min_auc_roc}")
    
    if metrics['precision'] < min_precision:
        raise ValueError(f"Model precision {metrics['precision']} below threshold {min_precision}")
    
    logger.info("Model evaluation passed quality gates")
    return True

def register_model(**context):
    """Register model in MLflow registry."""
    logger.info("Registering model...")
    
    model_info = {
        'name': 'churn_predictor',
        'version': '2.3.0',
        'stage': 'staging',
        'artifacts': ['model.pt', 'preprocessor.pkl', 'feature_config.json'],
        'registered_at': datetime.utcnow().isoformat()
    }
    
    context['task_instance'].xcom_push(key='model_info', value=model_info)
    logger.info(f"Model registered: {model_info['name']} v{model_info['version']}")
    return model_info

def deploy_to_staging(**context):
    """Deploy model to staging environment."""
    logger.info("Deploying to staging...")
    
    model_info = context['task_instance'].xcom_pull(
        task_ids='register_model',
        key='model_info'
    )
    
    deployment = {
        'environment': 'staging',
        'endpoint': 'https://staging-api.company.com/predict',
        'replicas': 2,
        'deployed_at': datetime.utcnow().isoformat()
    }
    
    logger.info(f"Deployed to {deployment['environment']}")
    return deployment

def run_integration_tests(**context):
    """Run integration tests on staging."""
    logger.info("Running integration tests...")
    
    tests = {
        'latency_p95_ms': 45,  # Must be < 50ms
        'throughput_qps': 120,  # Must be > 100 QPS
        'error_rate': 0.001,    # Must be < 0.5%
        'prediction_accuracy': 0.91  # Must match training
    }
    
    # Quality gates
    assert tests['latency_p95_ms'] < 50, "Latency too high"
    assert tests['throughput_qps'] > 100, "Throughput too low"
    assert tests['error_rate'] < 0.005, "Error rate too high"
    
    logger.info("All integration tests passed")
    return tests

def notify_team(**context):
    """Send notification to team."""
    logger.info("Sending notification...")
    
    message = {
        'pipeline': 'ml_training_pipeline',
        'status': 'SUCCESS',
        'run_id': context['run_id'],
        'execution_date': context['execution_date'].isoformat(),
        'next_step': 'Manual approval for production deployment'
    }
    
    logger.info(f"Notification sent: {message}")
    return message

# Define task dependencies
extract = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

validate = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    dag=dag,
)

engineer_features = PythonOperator(
    task_id='feature_engineering',
    python_callable=feature_engineering,
    dag=dag,
)

train = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag,
)

evaluate = PythonOperator(
    task_id='evaluate_model',
    python_callable=evaluate_model,
    dag=dag,
)

register = PythonOperator(
    task_id='register_model',
    python_callable=register_model,
    dag=dag,
)

deploy_staging = PythonOperator(
    task_id='deploy_to_staging',
    python_callable=deploy_to_staging,
    dag=dag,
)

test = PythonOperator(
    task_id='run_integration_tests',
    python_callable=run_integration_tests,
    dag=dag,
)

notify = PythonOperator(
    task_id='notify_team',
    python_callable=notify_team,
    dag=dag,
)

# Define DAG structure
extract >> validate >> engineer_features >> train >> evaluate >> register >> deploy_staging >> test >> notify

print("✅ Airflow DAG defined")
print("\nDAG Structure:")
print("extract_data → validate_data → feature_engineering → train_model")
print("→ evaluate_model → register_model → deploy_to_staging")
print("→ run_integration_tests → notify_team")

## Part 2: Feature Store with Feast

### Centralized Feature Management

In [None]:
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast.feature_store import FeatureStore
from datetime import timedelta

class FeatureStoreManager:
    """Production feature store with online and offline serving."""
    
    def __init__(self, repo_path: str = "./feature_repo"):
        self.repo_path = Path(repo_path)
        self.repo_path.mkdir(exist_ok=True)
        self._setup_feature_definitions()
    
    def _setup_feature_definitions(self):
        """Define feature entities and views."""
        
        # Define entity (e.g., user_id)
        user = Entity(
            name="user_id",
            value_type=ValueType.INT64,
            description="User identifier"
        )
        
        # Define data source
        user_stats_source = FileSource(
            path="data/user_stats.parquet",
            event_timestamp_column="event_timestamp",
        )
        
        # Define feature view
        user_engagement_features = FeatureView(
            name="user_engagement",
            entities=["user_id"],
            ttl=timedelta(days=30),
            features=[
                Feature(name="sessions_7d", dtype=ValueType.INT64),
                Feature(name="sessions_30d", dtype=ValueType.INT64),
                Feature(name="avg_session_duration", dtype=ValueType.FLOAT),
                Feature(name="total_revenue", dtype=ValueType.FLOAT),
            ],
            online=True,
            source=user_stats_source,
            tags={"team": "ml", "version": "v2"},
        )
        
        logger.info("Feature definitions created")
        return user_engagement_features
    
    def materialize_features(self, start_date: datetime, end_date: datetime):
        """Materialize features for offline training."""
        logger.info(f"Materializing features from {start_date} to {end_date}")
        
        # Simulated feature materialization
        features = pd.DataFrame({
            'user_id': range(1000),
            'sessions_7d': np.random.randint(0, 50, 1000),
            'sessions_30d': np.random.randint(0, 200, 1000),
            'avg_session_duration': np.random.uniform(5, 60, 1000),
            'total_revenue': np.random.uniform(0, 1000, 1000),
            'event_timestamp': datetime.now()
        })
        
        logger.info(f"Materialized {len(features)} feature rows")
        return features
    
    def get_online_features(self, entity_ids: List[int]) -> pd.DataFrame:
        """Get features for online inference."""
        logger.info(f"Fetching online features for {len(entity_ids)} entities")
        
        # Simulated online feature retrieval (sub-10ms in production)
        features = pd.DataFrame({
            'user_id': entity_ids,
            'sessions_7d': np.random.randint(0, 50, len(entity_ids)),
            'sessions_30d': np.random.randint(0, 200, len(entity_ids)),
            'avg_session_duration': np.random.uniform(5, 60, len(entity_ids)),
            'total_revenue': np.random.uniform(0, 1000, len(entity_ids)),
        })
        
        return features
    
    def get_historical_features(self, entity_df: pd.DataFrame) -> pd.DataFrame:
        """Point-in-time correct features for training."""
        logger.info("Fetching historical features with point-in-time correctness")
        
        # This prevents training-serving skew by ensuring features
        # are computed exactly as they were at prediction time
        historical_features = self.get_online_features(entity_df['user_id'].tolist())
        
        return pd.merge(entity_df, historical_features, on='user_id')

# Example usage
print("\n=== Feature Store Example ===")
fs = FeatureStoreManager()

# Materialize for training
offline_features = fs.materialize_features(
    start_date=datetime.now() - timedelta(days=30),
    end_date=datetime.now()
)
print(f"\nOffline features shape: {offline_features.shape}")
print(offline_features.head())

# Online serving for inference
online_features = fs.get_online_features([1, 2, 3, 4, 5])
print(f"\nOnline features shape: {online_features.shape}")
print(online_features.head())

## Part 3: Data Versioning with DVC

### Git for Data and Models

In [None]:
class DataVersionControl:
    """Data versioning and experiment tracking."""
    
    def __init__(self, project_path: str = "."):
        self.project_path = Path(project_path)
        self.dvc_dir = self.project_path / ".dvc"
        self.dvc_dir.mkdir(exist_ok=True)
    
    def track_dataset(self, dataset_path: str, remote: str = "s3://ml-data"):
        """Version control a dataset."""
        logger.info(f"Tracking dataset: {dataset_path}")
        
        # DVC commands (simulated)
        commands = [
            f"dvc add {dataset_path}",
            f"git add {dataset_path}.dvc .gitignore",
            f"git commit -m 'Add dataset {dataset_path}'",
            f"dvc push"
        ]
        
        metadata = {
            'path': dataset_path,
            'size_mb': 1024,
            'hash': 'abc123def456',
            'remote': remote,
            'tracked_at': datetime.utcnow().isoformat()
        }
        
        logger.info(f"Dataset tracked: {metadata}")
        return metadata
    
    def create_pipeline(self, name: str, stages: List[Dict]):
        """Define reproducible DVC pipeline."""
        logger.info(f"Creating pipeline: {name}")
        
        pipeline = {
            'stages': {
                'prepare': {
                    'cmd': 'python src/prepare.py',
                    'deps': ['data/raw.csv', 'src/prepare.py'],
                    'outs': ['data/prepared.csv']
                },
                'featurize': {
                    'cmd': 'python src/featurize.py',
                    'deps': ['data/prepared.csv', 'src/featurize.py'],
                    'outs': ['data/features.csv']
                },
                'train': {
                    'cmd': 'python src/train.py',
                    'deps': ['data/features.csv', 'src/train.py'],
                    'params': ['train.epochs', 'train.lr'],
                    'outs': ['models/model.pt'],
                    'metrics': ['metrics.json']
                },
                'evaluate': {
                    'cmd': 'python src/evaluate.py',
                    'deps': ['models/model.pt', 'data/test.csv'],
                    'metrics': ['evaluation.json']
                }
            }
        }
        
        logger.info(f"Pipeline created with {len(pipeline['stages'])} stages")
        return pipeline
    
    def compare_experiments(self, experiment_ids: List[str]):
        """Compare metrics across experiments."""
        logger.info(f"Comparing {len(experiment_ids)} experiments")
        
        # Simulated experiment comparison
        experiments = []
        for exp_id in experiment_ids:
            experiments.append({
                'experiment': exp_id,
                'accuracy': np.random.uniform(0.85, 0.95),
                'precision': np.random.uniform(0.80, 0.92),
                'recall': np.random.uniform(0.82, 0.93),
                'f1_score': np.random.uniform(0.81, 0.92),
                'train_time_s': np.random.randint(1800, 7200)
            })
        
        df = pd.DataFrame(experiments)
        return df

# Example usage
print("\n=== Data Version Control ===")
dvc = DataVersionControl()

# Track dataset
dataset_meta = dvc.track_dataset('data/training_set_v3.parquet')
print(f"\nDataset metadata: {dataset_meta}")

# Create pipeline
pipeline = dvc.create_pipeline('ml_pipeline', [])
print(f"\nPipeline stages: {list(pipeline['stages'].keys())}")

# Compare experiments
comparison = dvc.compare_experiments(['exp-001', 'exp-002', 'exp-003'])
print("\nExperiment comparison:")
print(comparison.to_string(index=False))

## Part 4: MLflow Model Registry

### Centralized Model Management

In [None]:
import mlflow
from mlflow.tracking import MlflowClient

class ModelRegistry:
    """MLflow-based model registry for lifecycle management."""
    
    def __init__(self, tracking_uri: str = "http://localhost:5000"):
        self.tracking_uri = tracking_uri
        mlflow.set_tracking_uri(tracking_uri)
        self.client = MlflowClient()
    
    def log_experiment(self, experiment_name: str, params: Dict, metrics: Dict, artifacts: List[str]):
        """Log training experiment with all metadata."""
        mlflow.set_experiment(experiment_name)
        
        with mlflow.start_run() as run:
            # Log parameters
            mlflow.log_params(params)
            
            # Log metrics
            mlflow.log_metrics(metrics)
            
            # Log artifacts (models, plots, configs)
            for artifact in artifacts:
                mlflow.log_artifact(artifact)
            
            # Log model
            # mlflow.pytorch.log_model(model, "model")
            
            logger.info(f"Logged experiment: {run.info.run_id}")
            return run.info.run_id
    
    def register_model(self, model_name: str, run_id: str, stage: str = "Staging"):
        """Register model in registry."""
        model_uri = f"runs:/{run_id}/model"
        
        # Register model
        model_details = mlflow.register_model(model_uri, model_name)
        
        # Transition to stage
        self.client.transition_model_version_stage(
            name=model_name,
            version=model_details.version,
            stage=stage
        )
        
        logger.info(f"Registered {model_name} v{model_details.version} in {stage}")
        return model_details
    
    def promote_model(self, model_name: str, version: int, from_stage: str, to_stage: str):
        """Promote model to production."""
        self.client.transition_model_version_stage(
            name=model_name,
            version=version,
            stage=to_stage,
            archive_existing_versions=True  # Archive old production models
        )
        
        logger.info(f"Promoted {model_name} v{version} from {from_stage} to {to_stage}")
        return True
    
    def get_model_lineage(self, model_name: str, version: int):
        """Get full lineage: data, code, params, metrics."""
        model_version = self.client.get_model_version(model_name, version)
        run = self.client.get_run(model_version.run_id)
        
        lineage = {
            'model_name': model_name,
            'version': version,
            'run_id': run.info.run_id,
            'parameters': run.data.params,
            'metrics': run.data.metrics,
            'artifacts': run.info.artifact_uri,
            'tags': run.data.tags,
            'created_at': run.info.start_time
        }
        
        return lineage

# Example usage
print("\n=== Model Registry ===")
print("MLflow model registry provides:")
print("- Model versioning and lineage")
print("- Stage transitions (None → Staging → Production → Archived)")
print("- Experiment tracking with parameters and metrics")
print("- Artifact storage for models, configs, and plots")
print("\nTypical workflow:")
print("1. Train model and log to MLflow")
print("2. Register model in registry (Staging stage)")
print("3. Run A/B test in staging")
print("4. Promote to Production if metrics improve")
print("5. Archive old production model")

## Part 5: Deployment Strategies

### Safe Production Rollouts

In [None]:
class DeploymentStrategy:
    """Safe deployment patterns for ML models."""
    
    @staticmethod
    def canary_deployment(new_model_traffic: float = 0.1):
        """Gradually increase traffic to new model."""
        
        stages = [
            {'traffic_pct': 10, 'duration_hours': 2, 'rollback_threshold': 0.05},
            {'traffic_pct': 25, 'duration_hours': 4, 'rollback_threshold': 0.03},
            {'traffic_pct': 50, 'duration_hours': 8, 'rollback_threshold': 0.02},
            {'traffic_pct': 100, 'duration_hours': 24, 'rollback_threshold': 0.01}
        ]
        
        return {
            'strategy': 'canary',
            'stages': stages,
            'monitoring': ['error_rate', 'latency_p95', 'prediction_drift']
        }
    
    @staticmethod
    def blue_green_deployment():
        """Instant switchover with quick rollback."""
        
        return {
            'strategy': 'blue_green',
            'blue_environment': 'production-v1',
            'green_environment': 'production-v2',
            'steps': [
                '1. Deploy v2 to green environment',
                '2. Run smoke tests on green',
                '3. Switch load balancer to green',
                '4. Monitor metrics for 1 hour',
                '5. If stable, keep blue for 24h then decomission'
            ]
        }
    
    @staticmethod
    def ab_testing(treatment_pct: float = 0.5, duration_days: int = 7):
        """A/B test new model against baseline."""
        
        return {
            'strategy': 'ab_test',
            'control_model': 'v1.0',
            'treatment_model': 'v2.0',
            'traffic_split': {'control': 50, 'treatment': 50},
            'duration_days': duration_days,
            'metrics': [
                'prediction_accuracy',
                'user_engagement',
                'revenue_per_user',
                'latency_p95'
            ],
            'success_criteria': {
                'min_improvement': 0.02,  # 2% lift required
                'significance_level': 0.05,  # 95% confidence
                'min_sample_size': 10000
            }
        }
    
    @staticmethod
    def shadow_deployment():
        """Run new model in parallel without affecting users."""
        
        return {
            'strategy': 'shadow',
            'production_model': 'v1.0',
            'shadow_model': 'v2.0',
            'traffic': '100% to production, duplicate to shadow',
            'duration_days': 3,
            'comparison_metrics': [
                'prediction_difference',
                'latency_comparison',
                'resource_usage'
            ]
        }

# Example deployment plans
print("\n=== Deployment Strategies ===")

canary = DeploymentStrategy.canary_deployment()
print("\n1. Canary Deployment:")
for stage in canary['stages']:
    print(f"   - {stage['traffic_pct']}% traffic for {stage['duration_hours']}h")

ab_test = DeploymentStrategy.ab_testing()
print(f"\n2. A/B Testing:")
print(f"   - Control: {ab_test['control_model']}")
print(f"   - Treatment: {ab_test['treatment_model']}")
print(f"   - Duration: {ab_test['duration_days']} days")
print(f"   - Min improvement: {ab_test['success_criteria']['min_improvement']*100}%")

shadow = DeploymentStrategy.shadow_deployment()
print(f"\n3. Shadow Deployment:")
print(f"   - Production: {shadow['production_model']} (serves users)")
print(f"   - Shadow: {shadow['shadow_model']} (observes only)")
print(f"   - Duration: {shadow['duration_days']} days")

## Key Takeaways

### Enterprise ML Pipeline Components:
1. ✅ **Orchestration**: Airflow DAGs with retries, alerts, dependencies
2. ✅ **Feature Store**: Centralized features, point-in-time correctness
3. ✅ **Data Versioning**: DVC for reproducible data and model tracking
4. ✅ **Model Registry**: MLflow for lifecycle management
5. ✅ **Deployment**: Canary, blue-green, A/B testing, shadow
6. ✅ **Quality Gates**: Automated validation at each stage
7. ✅ **Monitoring**: Metrics, alerts, rollback triggers
8. ✅ **Compliance**: Audit trails, lineage tracking

### Production Best Practices:
- **Separation of Concerns**: Data, features, training, serving
- **Automation**: CI/CD for ML, automated retraining
- **Observability**: Metrics at every pipeline stage
- **Reproducibility**: Version everything (data, code, models, config)
- **Safety**: Quality gates, gradual rollouts, quick rollbacks

## Interview Questions

1. **What is training-serving skew and how do you prevent it?**
   - Skew: Features computed differently in training vs serving
   - Prevention: Feature store with point-in-time correctness
   - Example: Using 7-day average at prediction time, not future data

2. **How do you design a retraining pipeline?**
   - Trigger: Schedule (daily), drift detection, or manual
   - Steps: Extract new data → validate → engineer features → train → evaluate
   - Gates: Data quality, model performance, integration tests
   - Deploy: Canary rollout with monitoring

3. **Explain the trade-offs between deployment strategies.**
   - Canary: Gradual, safe, but slow rollout
   - Blue-Green: Fast switchover, but requires 2x infrastructure
   - A/B: Rigorous testing, but needs statistical significance
   - Shadow: Zero risk, but doesn't validate business impact

4. **What should be in a feature store?**
   - Online store: Low-latency serving (<10ms) with Redis/DynamoDB
   - Offline store: Historical features for training (Parquet/S3)
   - Feature definitions: Transformations, owners, SLAs
   - Monitoring: Drift detection, freshness, correctness
   - Point-in-time joins: Prevent data leakage