# ðŸš€ Production ML Pipeline with MLOps

Advanced end-to-end pipeline with production best practices.

## Topics
1. Data versioning and validation
2. Feature store patterns
3. Model registry
4. A/B testing framework
5. Monitoring and alerting

**Level**: Advanced  
**Time Required**: ~60 minutes

In [None]:
import sys
sys.path.insert(0, '../../')

from data_science_master_system import *
from datetime import datetime
import pandas as pd
import numpy as np
import json
import hashlib
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

print("âœ… Ready!")

## 1. Data Versioning

In [None]:
class DataVersioner:
    """Simple data versioning system."""
    
    def __init__(self, storage_path: str = './data_versions'):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(exist_ok=True)
        self.registry_path = self.storage_path / 'registry.json'
        self.registry = self._load_registry()
    
    def _load_registry(self):
        if self.registry_path.exists():
            return json.load(open(self.registry_path))
        return {}
    
    def _save_registry(self):
        json.dump(self.registry, open(self.registry_path, 'w'), indent=2)
    
    def _compute_hash(self, df):
        return hashlib.md5(pd.util.hash_pandas_object(df).values).hexdigest()[:8]
    
    def save_version(self, df, name: str, description: str = ''):
        """Save a version of the dataset."""
        version_id = f"{name}_v{len(self.registry.get(name, []))+1}"
        data_hash = self._compute_hash(df)
        
        # Save data
        file_path = self.storage_path / f"{version_id}.parquet"
        df.to_parquet(file_path)
        
        # Update registry
        if name not in self.registry:
            self.registry[name] = []
        
        self.registry[name].append({
            'version_id': version_id,
            'timestamp': datetime.now().isoformat(),
            'hash': data_hash,
            'shape': list(df.shape),
            'description': description,
            'file_path': str(file_path)
        })
        
        self._save_registry()
        print(f"âœ… Saved {version_id}")
        return version_id
    
    def load_version(self, version_id: str):
        """Load a specific version."""
        for versions in self.registry.values():
            for v in versions:
                if v['version_id'] == version_id:
                    return pd.read_parquet(v['file_path'])
        raise ValueError(f"Version {version_id} not found")
    
    def list_versions(self, name: str):
        """List all versions of a dataset."""
        return self.registry.get(name, [])

# Example usage
versioner = DataVersioner('./data_versions')

loader = DataLoader()
df = loader.read('../data/csv/customer_churn.csv')

versioner.save_version(df, 'customer_churn', 'Initial raw data')

## 2. Feature Store Pattern

In [None]:
class FeatureStore:
    """Simple feature store for ML features."""
    
    def __init__(self, storage_path: str = './feature_store'):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(exist_ok=True)
        self.catalog = {}
    
    def register_feature_group(self, name: str, df: pd.DataFrame, 
                               entity_key: str, features: list, description: str = ''):
        """Register a feature group."""
        # Save features
        file_path = self.storage_path / f"{name}.parquet"
        df[[entity_key] + features].to_parquet(file_path)
        
        self.catalog[name] = {
            'entity_key': entity_key,
            'features': features,
            'description': description,
            'created_at': datetime.now().isoformat(),
            'file_path': str(file_path)
        }
        print(f"âœ… Registered feature group: {name}")
    
    def get_features(self, name: str, entity_ids: list = None):
        """Retrieve features from the store."""
        if name not in self.catalog:
            raise ValueError(f"Feature group {name} not found")
        
        df = pd.read_parquet(self.catalog[name]['file_path'])
        
        if entity_ids is not None:
            entity_key = self.catalog[name]['entity_key']
            df = df[df[entity_key].isin(entity_ids)]
        
        return df
    
    def list_feature_groups(self):
        return pd.DataFrame([
            {'name': k, **{kk: vv for kk, vv in v.items() if kk != 'file_path'}}
            for k, v in self.catalog.items()
        ])

# Example usage
store = FeatureStore('./feature_store')

# Create some features
df['tenure_years'] = df['tenure_months'] / 12
df['charges_ratio'] = df['total_charges'] / (df['monthly_charges'] + 1)

store.register_feature_group(
    name='customer_features',
    df=df,
    entity_key='customer_id',
    features=['tenure_years', 'charges_ratio', 'num_support_tickets'],
    description='Customer engagement features'
)

# Retrieve features
features = store.get_features('customer_features', entity_ids=['CUST_00001', 'CUST_00002'])
print("\nRetrieved features:")
display(features)

## 3. Model Registry

In [None]:
import joblib

class ModelRegistry:
    """Model registry for ML model lifecycle management."""
    
    STAGES = ['development', 'staging', 'production', 'archived']
    
    def __init__(self, storage_path: str = './model_registry'):
        self.storage_path = Path(storage_path)
        self.storage_path.mkdir(exist_ok=True)
        self.registry_path = self.storage_path / 'registry.json'
        self.registry = self._load_registry()
    
    def _load_registry(self):
        if self.registry_path.exists():
            return json.load(open(self.registry_path))
        return {}
    
    def _save_registry(self):
        json.dump(self.registry, open(self.registry_path, 'w'), indent=2, default=str)
    
    def register_model(self, name: str, model, metrics: dict, 
                       params: dict = None, description: str = ''):
        """Register a new model version."""
        if name not in self.registry:
            self.registry[name] = []
        
        version = len(self.registry[name]) + 1
        model_id = f"{name}_v{version}"
        
        # Save model
        model_path = self.storage_path / f"{model_id}.joblib"
        joblib.dump(model, model_path)
        
        self.registry[name].append({
            'model_id': model_id,
            'version': version,
            'stage': 'development',
            'metrics': metrics,
            'params': params or {},
            'description': description,
            'created_at': datetime.now().isoformat(),
            'model_path': str(model_path)
        })
        
        self._save_registry()
        print(f"âœ… Registered {model_id}")
        return model_id
    
    def transition_stage(self, model_id: str, stage: str):
        """Transition model to a new stage."""
        if stage not in self.STAGES:
            raise ValueError(f"Invalid stage: {stage}")
        
        for models in self.registry.values():
            for m in models:
                if m['model_id'] == model_id:
                    m['stage'] = stage
                    self._save_registry()
                    print(f"âœ… {model_id} â†’ {stage}")
                    return
    
    def get_production_model(self, name: str):
        """Get the production model."""
        for m in reversed(self.registry.get(name, [])):
            if m['stage'] == 'production':
                return joblib.load(m['model_path'])
        raise ValueError(f"No production model for {name}")
    
    def list_models(self, name: str = None):
        """List all registered models."""
        models = []
        for n, versions in self.registry.items():
            if name is None or n == name:
                for m in versions:
                    models.append({'name': n, **m})
        return pd.DataFrame(models)

# Example usage
registry = ModelRegistry('./model_registry')

# Train and register a model
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

X = df.drop(columns=['customer_id', 'churn']).select_dtypes(include=[np.number])
y = df['churn']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

from sklearn.metrics import accuracy_score, f1_score
y_pred = model.predict(X_test)

registry.register_model(
    name='churn_classifier',
    model=model,
    metrics={'accuracy': accuracy_score(y_test, y_pred), 'f1': f1_score(y_test, y_pred)},
    params={'n_estimators': 100},
    description='Initial RF model'
)

# Promote to production
registry.transition_stage('churn_classifier_v1', 'production')

print("\nRegistered models:")
display(registry.list_models())

## 4. A/B Testing Framework

In [None]:
class ABTest:
    """A/B testing framework for ML models."""
    
    def __init__(self, model_a, model_b, split_ratio: float = 0.5):
        self.model_a = model_a
        self.model_b = model_b
        self.split_ratio = split_ratio
        self.results = {'A': [], 'B': []}
    
    def predict(self, X, record_id: str):
        """Make prediction using A/B split."""
        # Deterministic split based on record_id
        hash_val = int(hashlib.md5(str(record_id).encode()).hexdigest(), 16)
        use_b = (hash_val % 100) < (self.split_ratio * 100)
        
        if use_b:
            pred = self.model_b.predict(X)[0]
            self.results['B'].append({'id': record_id, 'prediction': pred})
            return pred, 'B'
        else:
            pred = self.model_a.predict(X)[0]
            self.results['A'].append({'id': record_id, 'prediction': pred})
            return pred, 'A'
    
    def record_outcome(self, record_id: str, actual: int):
        """Record the actual outcome."""
        for variant in ['A', 'B']:
            for r in self.results[variant]:
                if r['id'] == record_id:
                    r['actual'] = actual
                    return
    
    def evaluate(self):
        """Evaluate A/B test results."""
        results = {}
        for variant in ['A', 'B']:
            outcomes = [r for r in self.results[variant] if 'actual' in r]
            if outcomes:
                correct = sum(1 for r in outcomes if r['prediction'] == r['actual'])
                results[variant] = {
                    'count': len(outcomes),
                    'accuracy': correct / len(outcomes)
                }
        return results

# Example usage
model_a = RandomForestClassifier(n_estimators=50, random_state=42)
model_b = RandomForestClassifier(n_estimators=200, random_state=42)

model_a.fit(X_train, y_train)
model_b.fit(X_train, y_train)

ab_test = ABTest(model_a, model_b, split_ratio=0.5)

# Simulate predictions
for i in range(len(X_test)):
    record_id = f"test_{i}"
    pred, variant = ab_test.predict(X_test.iloc[[i]], record_id)
    ab_test.record_outcome(record_id, y_test.iloc[i])

results = ab_test.evaluate()
print("\nðŸ“Š A/B Test Results:")
for variant, metrics in results.items():
    print(f"  Model {variant}: {metrics['count']} samples, {metrics['accuracy']:.1%} accuracy")

## 5. Monitoring & Alerting

In [None]:
class ModelMonitor:
    """Monitor model performance in production."""
    
    def __init__(self, baseline_metrics: dict, alert_threshold: float = 0.1):
        self.baseline = baseline_metrics
        self.alert_threshold = alert_threshold
        self.history = []
    
    def log_prediction(self, prediction, actual=None, features=None):
        """Log a prediction for monitoring."""
        self.history.append({
            'timestamp': datetime.now(),
            'prediction': prediction,
            'actual': actual,
            'features': features
        })
    
    def check_drift(self, window_size: int = 100):
        """Check for model drift."""
        if len(self.history) < window_size:
            return {'status': 'insufficient_data'}
        
        recent = self.history[-window_size:]
        outcomes = [r for r in recent if r['actual'] is not None]
        
        if not outcomes:
            return {'status': 'no_outcomes'}
        
        correct = sum(1 for r in outcomes if r['prediction'] == r['actual'])
        current_accuracy = correct / len(outcomes)
        
        drift = self.baseline.get('accuracy', 1.0) - current_accuracy
        
        return {
            'status': 'alert' if drift > self.alert_threshold else 'ok',
            'current_accuracy': current_accuracy,
            'baseline_accuracy': self.baseline.get('accuracy'),
            'drift': drift,
            'samples': len(outcomes)
        }
    
    def get_metrics_summary(self):
        """Get summary of recent metrics."""
        outcomes = [r for r in self.history if r['actual'] is not None]
        if not outcomes:
            return {}
        
        correct = sum(1 for r in outcomes if r['prediction'] == r['actual'])
        return {
            'total_predictions': len(self.history),
            'total_outcomes': len(outcomes),
            'accuracy': correct / len(outcomes)
        }

# Example usage
monitor = ModelMonitor(
    baseline_metrics={'accuracy': 0.85},
    alert_threshold=0.1
)

# Simulate production predictions
for i in range(len(X_test)):
    pred = model.predict(X_test.iloc[[i]])[0]
    monitor.log_prediction(prediction=pred, actual=y_test.iloc[i])

# Check for drift
drift_status = monitor.check_drift(window_size=50)
print("\nðŸ“Š Drift Check:")
for k, v in drift_status.items():
    print(f"  {k}: {v}")

## ðŸŽ¯ Key MLOps Practices

1. **Data Versioning** - Track data changes
2. **Feature Stores** - Reusable feature pipelines
3. **Model Registry** - Lifecycle management
4. **A/B Testing** - Safe rollouts
5. **Monitoring** - Catch drift early