# End-to-End MLOps Implementation Guide for Health Insurance Claims

This notebook provides a comprehensive guide to implementing MLOps practices for the health insurance claims prediction project. We'll cover everything from initial setup to production deployment and monitoring.

## 1. Problem Definition & Environment Setup

### Business Objective
- Predict health insurance claim costs accurately
- Reduce prediction error to optimize premium pricing
- Automate the end-to-end ML pipeline

### Technical Stack
- **Languages & Frameworks**: Python, scikit-learn, FastAPI
- **MLOps Tools**: MLflow, Docker, Kubernetes
- **Cloud Platform**: AWS (can be adapted for GCP/Azure)
- **Monitoring**: Prometheus, Grafana
- **CI/CD**: GitHub Actions

In [None]:
# Install required packages
!pip install mlflow boto3 great-expectations feast evidently prometheus-client

# Import required libraries
import os
import mlflow
from pathlib import Path

# Configure MLflow
MLFLOW_TRACKING_URI = "http://localhost:5000"
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow.set_experiment("health-insurance-claims")

# Configure project paths
BASE_DIR = Path.cwd().parent
DATA_DIR = BASE_DIR / "data"
MODELS_DIR = BASE_DIR / "models"
MLRUNS_DIR = BASE_DIR / "mlruns"

# Create necessary directories
for dir_path in [DATA_DIR, MODELS_DIR, MLRUNS_DIR]:
    dir_path.mkdir(exist_ok=True)

print("Environment setup completed successfully!")

Collecting mlflow
  Downloading mlflow-3.1.1-py3-none-any.whl.metadata (29 kB)
Collecting boto3
  Downloading boto3-1.39.9-py3-none-any.whl.metadata (6.7 kB)
Collecting great-expectations
  Downloading great_expectations-1.5.5-py3-none-any.whl.metadata (8.8 kB)
Collecting feast
  Downloading feast-0.50.0-py2.py3-none-any.whl.metadata (36 kB)
Collecting evidently
  Downloading evidently-0.7.11-py3-none-any.whl.metadata (10 kB)
Collecting mlflow-skinny==3.1.1 (from mlflow)
  Downloading mlflow_skinny-3.1.1-py3-none-any.whl.metadata (30 kB)
Collecting Flask<4 (from mlflow)
  Downloading flask-3.1.1-py3-none-any.whl.metadata (3.0 kB)
Collecting alembic!=1.10.0,<2 (from mlflow)
  Downloading alembic-1.16.4-py3-none-any.whl.metadata (7.3 kB)
Collecting docker<8,>=4.0.0 (from mlflow)
  Downloading docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting graphene<4 (from mlflow)
  Downloading graphene-3.4.3-py2.py3-none-any.whl.metadata (6.9 kB)
Collecting gunicorn<24 (from mlflow)
  Downloa

MlflowException: API request to http://localhost:5000/api/2.0/mlflow/experiments/get-by-name failed with exception HTTPConnectionPool(host='localhost', port=5000): Max retries exceeded with url: /api/2.0/mlflow/experiments/get-by-name?experiment_name=health-insurance-claims (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7a14d91bd0a0>: Failed to establish a new connection: [Errno 111] Connection refused'))

Bad pipe message: %s [b'0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7\r\nHost: localhost:42841\r\nUs', b'-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.']
Bad pipe message: %s [b'0.0 Safari/537.36\r\nAccept-Encoding: gzip, defla']
Bad pipe message: %s [b', br, zstd\r\nAccept-Language: en-US,en;q=0.9\r\nCache-Control: max-age=0\r\nReferer: https://github.com/\r\nX-Request-ID: ', b'632e3f352748910f3d9526d1c4ee13\r\nX-Real-IP: 49.37']
Bad pipe message: %s [b'49.116\r\nX-Forwarded-Port: 443\r\nX-Forwarded-Sc']
Bad pipe message: %s [b'me: https\r\nX-Original-URI: /\r\nX-Scheme: https\r\nsec-fetch-site: cross-site\r\nsec-fetch-mode: navigate\r\nse', b'fetch-dest: document\r\nsec-ch-ua: "Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"\r', b'ec-ch-ua-', b'bile: ?0\r\nsec-ch-ua-platform: "Windows"\r\npriority: u=0, i\r\nX-Original-Proto: https\r\nX-Forwarded-Proto: https', b'X-Forwarded-

## 2. Data Engineering Pipeline

In this section, we'll implement:
1. Data ingestion from various sources
2. Data validation using Great Expectations
3. Data versioning using DVC
4. Cloud storage integration

In [None]:
import pandas as pd
import great_expectations as ge
from great_expectations.dataset import PandasDataset

# Data ingestion function
def load_data(file_path: str) -> pd.DataFrame:
    """Load data from CSV file and convert to Great Expectations dataset"""
    df = pd.read_csv(file_path)
    ge_df = ge.from_pandas(df)
    return ge_df

# Data validation with Great Expectations
def validate_data(ge_df: PandasDataset):
    """Validate data using Great Expectations"""
    # Define expectations
    ge_df.expect_column_values_to_not_be_null("age")
    ge_df.expect_column_values_to_be_between("age", 0, 100)
    ge_df.expect_column_values_to_be_between("bmi", 10, 50)
    ge_df.expect_column_values_to_be_in_set("smoker", ["yes", "no"])
    
    # Save validation results
    results = ge_df.validate()
    print(f"Validation successful: {results.success}")
    return results

# Load and validate sample data
try:
    data = load_data(str(DATA_DIR / "health_claims.csv"))
    validation_results = validate_data(data)
    
    # Log validation results to MLflow
    with mlflow.start_run(run_name="data_validation"):
        mlflow.log_metric("validation_success", int(validation_results.success))
        mlflow.log_param("data_version", "v1.0")
        mlflow.log_param("n_rows", len(data))
        
except FileNotFoundError:
    print("Sample data file not found. Please ensure data is available in the data directory.")

## 3. Feature Engineering

This section covers:
1. Feature transformation pipeline
2. Feature store integration with Feast
3. Feature versioning and tracking

In [None]:
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

# Define feature engineering pipeline
def create_feature_pipeline():
    """Create feature engineering pipeline"""
    numeric_features = ['age', 'bmi']
    categorical_features = ['smoker', 'region', 'gender']
    
    numeric_transformer = Pipeline(steps=[
        ('scaler', StandardScaler())
    ])
    
    categorical_transformer = Pipeline(steps=[
        ('onehot', OneHotEncoder(drop='first', sparse=False))
    ])
    
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ])
    
    return preprocessor

# Create and fit the feature pipeline
def process_features(df: pd.DataFrame):
    """Process features using the pipeline"""
    feature_pipeline = create_feature_pipeline()
    
    with mlflow.start_run(run_name="feature_engineering"):
        # Fit and transform the data
        features_processed = feature_pipeline.fit_transform(df)
        
        # Log feature engineering parameters
        mlflow.log_param("numeric_features", ['age', 'bmi'])
        mlflow.log_param("categorical_features", ['smoker', 'region', 'gender'])
        
        # Log the feature pipeline
        mlflow.sklearn.log_model(feature_pipeline, "feature_pipeline")
        
        return features_processed, feature_pipeline

# Example usage (if data is available)
try:
    features_processed, pipeline = process_features(data)
    print("Feature engineering completed successfully!")
except NameError:
    print("Please run the data loading cell first.")

## 4. Model Development

This section covers:
1. Model training with MLflow tracking
2. Model evaluation and selection
3. Hyperparameter tuning

In [None]:
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np

def train_and_evaluate_model(X, y, model_name="RandomForestRegressor"):
    """Train and evaluate model with MLflow tracking"""
    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    
    with mlflow.start_run(run_name=f"train_{model_name}"):
        # Initialize and train model
        model = RandomForestRegressor(n_estimators=100, random_state=42)
        model.fit(X_train, y_train)
        
        # Make predictions
        y_pred = model.predict(X_test)
        
        # Calculate metrics
        mse = mean_squared_error(y_test, y_pred)
        rmse = np.sqrt(mse)
        r2 = r2_score(y_test, y_pred)
        
        # Log parameters
        mlflow.log_params({
            "model_type": model_name,
            "n_estimators": 100,
            "random_state": 42
        })
        
        # Log metrics
        mlflow.log_metrics({
            "mse": mse,
            "rmse": rmse,
            "r2": r2
        })
        
        # Log model
        mlflow.sklearn.log_model(model, "model")
        
        print(f"Model Training Results:")
        print(f"MSE: {mse:.2f}")
        print(f"RMSE: {rmse:.2f}")
        print(f"R2 Score: {r2:.2f}")
        
        return model, (X_test, y_test)

# Train model if data is available
try:
    target = data['claim_amount']
    model, test_data = train_and_evaluate_model(features_processed, target)
    print("\nModel training completed successfully!")
except NameError:
    print("Please run the feature engineering cell first.")

## 5. Model Registry

This section covers:
1. Model versioning with MLflow
2. Model staging (Development/Staging/Production)
3. Model metadata tracking

In [None]:
# Model Registry Management
from mlflow.tracking import MlflowClient
from mlflow.entities import ViewType

def register_model(run_id, model_name="health_insurance_model"):
    """Register model with MLflow"""
    client = MlflowClient()
    
    # Register the model
    result = mlflow.register_model(
        f"runs:/{run_id}/model",
        model_name
    )
    
    # Transition to staging
    client.transition_model_version_stage(
        name=model_name,
        version=result.version,
        stage="Staging"
    )
    
    return result

def get_latest_model(model_name="health_insurance_model", stage="Staging"):
    """Get the latest model from registry"""
    model = mlflow.pyfunc.load_model(
        model_uri=f"models:/{model_name}/{stage}"
    )
    return model

# Get the latest run ID and register the model
try:
    latest_run = mlflow.search_runs(
        experiment_ids=[mlflow.get_experiment_by_name("health-insurance-claims").experiment_id],
        filter_string="",
        run_view_type=ViewType.ACTIVE_ONLY,
        max_results=1
    )
    
    if not latest_run.empty:
        run_id = latest_run.iloc[0].run_id
        model_info = register_model(run_id)
        print(f"Model registered successfully with version: {model_info.version}")
except Exception as e:
    print(f"Error registering model: {str(e)}")

## 6. CI/CD Pipeline

This section demonstrates:
1. Setting up GitHub Actions workflow
2. Creating Docker container
3. Implementing automated tests
4. Configuring deployment pipeline

In [None]:
# Create GitHub Actions workflow file
github_workflow = """
name: MLOps Pipeline

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  test-and-deploy:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    
    - name: Run tests
      run: |
        python -m pytest tests/
    
    - name: Build Docker image
      run: docker build -t health-insurance-api .
    
    - name: Deploy to staging
      if: github.ref == 'refs/heads/main'
      run: |
        # Add deployment commands here
        echo "Deploying to staging..."
"""

# Write workflow file
workflow_path = Path(".github/workflows/mlops.yml")
workflow_path.parent.mkdir(parents=True, exist_ok=True)
workflow_path.write_text(github_workflow)

print("GitHub Actions workflow created at: .github/workflows/mlops.yml")

# Display Dockerfile content
dockerfile_content = """
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "app.api:app", "--host", "0.0.0.0", "--port", "8000"]
"""

print("\nDockerfile content:")
print(dockerfile_content)

## 7. Monitoring System

This section covers:
1. Setting up Prometheus metrics
2. Implementing data drift detection
3. Creating performance monitoring dashboard

In [None]:
from prometheus_client import Counter, Histogram, start_http_server
import evidently
from evidently.dashboard import Dashboard
from evidently.tabs import DataDriftTab, CatTargetDriftTab

# Prometheus metrics
prediction_counter = Counter('model_predictions_total', 'Total number of predictions')
prediction_latency = Histogram('prediction_latency_seconds', 'Time spent processing prediction')

# Data drift detection
def create_drift_dashboard(reference_data, current_data, output_path="drift_dashboard.html"):
    """Create data drift dashboard using Evidently"""
    dashboard = Dashboard(tabs=[DataDriftTab()])
    dashboard.calculate(reference_data, current_data)
    dashboard.save(output_path)
    
    return dashboard

# Example monitoring setup
def setup_monitoring(port=8000):
    """Setup Prometheus monitoring"""
    start_http_server(port)
    print(f"Prometheus metrics server started at port {port}")
    
    # Example Prometheus metrics
    prediction_counter.inc()
    with prediction_latency.time():
        # Simulate prediction
        import time
        time.sleep(0.1)

# Create sample drift report if data is available
try:
    # Split data into reference and current
    split_idx = len(data) // 2
    reference_data = data[:split_idx]
    current_data = data[split_idx:]
    
    # Create and save drift dashboard
    drift_dashboard = create_drift_dashboard(reference_data, current_data)
    print("Drift dashboard created successfully!")
except NameError:
    print("Please run the data loading cell first to create drift dashboard.")

## 8. Retraining Pipeline

This section covers:
1. Setting up automated retraining triggers
2. Implementing model retraining workflow
3. Validating retrained models

In [None]:
from datetime import datetime, timedelta
import json

class ModelRetrainingPipeline:
    def __init__(self, model_name, performance_threshold=0.7):
        self.model_name = model_name
        self.performance_threshold = performance_threshold
    
    def check_performance_degradation(self, current_performance):
        """Check if model performance is below threshold"""
        return current_performance < self.performance_threshold
    
    def check_data_drift(self, reference_data, current_data, drift_threshold=0.1):
        """Check for significant data drift"""
        # Using Evidently for drift detection
        drift_report = Dashboard(tabs=[DataDriftTab()])
        drift_report.calculate(reference_data, current_data)
        
        # Extract drift score (simplified)
        drift_detected = any(drift_report.dataframes[0]['metrics'])
        return drift_detected
    
    def retrain_model(self, new_data):
        """Retrain model with new data"""
        with mlflow.start_run(run_name=f"retraining_{datetime.now().strftime('%Y%m%d')}"):
            # Preprocess new data
            X, y = self.prepare_data(new_data)
            
            # Train new model
            model = RandomForestRegressor(random_state=42)
            model.fit(X, y)
            
            # Log metrics
            train_score = model.score(X, y)
            mlflow.log_metric("training_score", train_score)
            
            # Log model
            mlflow.sklearn.log_model(model, "retrained_model")
            
            return model, train_score
    
    def validate_retrained_model(self, new_model, validation_data):
        """Validate the retrained model"""
        X_val, y_val = self.prepare_data(validation_data)
        score = new_model.score(X_val, y_val)
        
        if score > self.performance_threshold:
            return True, score
        return False, score
    
    @staticmethod
    def prepare_data(data):
        """Prepare data for model training"""
        X = data.drop('claim_amount', axis=1)
        y = data['claim_amount']
        return X, y

# Example usage
retraining_pipeline = ModelRetrainingPipeline(
    model_name="health_insurance_model",
    performance_threshold=0.7
)

# Simulate retraining decision
try:
    current_performance = 0.65  # Simulated degraded performance
    if retraining_pipeline.check_performance_degradation(current_performance):
        print("Performance degradation detected. Initiating retraining...")
        
        # In practice, you would:
        # 1. Load new training data
        # 2. Retrain model
        # 3. Validate new model
        # 4. Deploy if validation passes
except Exception as e:
    print(f"Error in retraining pipeline: {str(e)}")

## 9. Security and Governance

This section covers:
1. Implementing IAM roles and permissions
2. Setting up audit logging
3. Ensuring HIPAA compliance
4. Implementing model governance

In [None]:
import logging
from datetime import datetime
import hashlib
import json

class ModelGovernance:
    def __init__(self):
        # Setup logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            filename='model_governance.log'
        )
        self.logger = logging.getLogger('ModelGovernance')
    
    def log_model_access(self, user_id, model_version, action):
        """Log model access"""
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'user_id': user_id,
            'model_version': model_version,
            'action': action
        }
        self.logger.info(f"Model Access: {json.dumps(log_entry)}")
    
    def verify_data_compliance(self, data, sensitive_columns=['ssn', 'phone']):
        """Verify data compliance with HIPAA"""
        # Check for sensitive columns
        for col in sensitive_columns:
            if col in data.columns:
                raise ValueError(f"Sensitive column {col} must be encrypted or removed")
        
        return True
    
    def hash_sensitive_data(self, value):
        """Hash sensitive data"""
        return hashlib.sha256(str(value).encode()).hexdigest()
    
    def audit_model_training(self, model_params, training_data_hash):
        """Audit model training process"""
        audit_record = {
            'timestamp': datetime.now().isoformat(),
            'model_params': model_params,
            'training_data_hash': training_data_hash,
            'environment': self.get_environment_info()
        }
        self.logger.info(f"Model Training Audit: {json.dumps(audit_record)}")
    
    @staticmethod
    def get_environment_info():
        """Get information about the training environment"""
        import sys
        import platform
        
        return {
            'python_version': sys.version,
            'platform': platform.platform(),
            'dependencies': mlflow.get_tracking_uri()
        }

# Example usage
governance = ModelGovernance()

# Simulate model access logging
governance.log_model_access(
    user_id="data_scientist_1",
    model_version="v1.0.0",
    action="model_training"
)

# Example of data compliance check
try:
    if 'data' in globals():
        governance.verify_data_compliance(data)
        print("Data compliance verification passed!")
    
    # Audit model training
    model_params = {
        'algorithm': 'RandomForest',
        'parameters': {'n_estimators': 100, 'random_state': 42}
    }
    data_hash = "sample_hash_value"  # In practice, calculate this from actual data
    governance.audit_model_training(model_params, data_hash)
    print("Model training audit logged successfully!")
    
except Exception as e:
    print(f"Governance check failed: {str(e)}")

## Conclusion

This notebook has demonstrated a complete MLOps implementation for the health insurance claims prediction project, including:

1. Environment setup and configuration
2. Data engineering and validation
3. Feature engineering and tracking
4. Model development with MLflow
5. Model versioning and registry
6. CI/CD pipeline setup
7. Monitoring and observability
8. Automated retraining pipeline
9. Security and governance implementation

Next steps:
1. Deploy the model to production
2. Set up automated monitoring alerts
3. Implement automated retraining triggers
4. Establish regular security audits