# Production Deployment Guide

This notebook demonstrates how to deploy DLT models in production environments, including model serialization, deployment strategies, monitoring, and best practices for production ML systems.

## Table of Contents
1. [Model Serialization and Persistence](#serialization)
2. [Production Configuration](#config)
3. [Model Serving Strategies](#serving)
4. [Monitoring and Logging](#monitoring)
5. [Performance Optimization](#optimization)
6. [CI/CD Integration](#cicd)
7. [Best Practices](#best-practices)

In [17]:
import os
import sys
import json
import pickle
import joblib
import logging
from datetime import datetime
from pathlib import Path

# Add the src directory to Python path
sys.path.append('/home/rlfowler/Documents/myprojects/DLT/src')

from dlt.core.config import DLTConfig
from dlt.core.model import DLTModel
from dlt.core.pipeline import train, evaluate, predict
from dlt.core.trainer import DLTTrainer

print("Production deployment dependencies loaded successfully!")

Production deployment dependencies loaded successfully!


## 1. Model Serialization and Persistence {#serialization}

Proper model serialization is crucial for production deployment. We'll explore different serialization methods and their trade-offs.

In [20]:
# Create a sample model for demonstration
config = DLTConfig(
    model_type="sklearn.ensemble.RandomForestClassifier",
    model_params={
        "n_estimators": 100,
        "random_state": 42
    }
)

# Update with additional configuration
config.training.update({
    'epochs': 5,
    'validation_split': 0.2
})

config.data.update({
    'input_shape': [784],
    'output_shape': [10],
    'batch_size': 32
})

model = DLTModel.from_config(config)
print("Sample model created for deployment demonstration")

Sample model created for deployment demonstration


### Model Serialization Methods

In [22]:
# Create models directory for production artifacts
models_dir = Path('/home/rlfowler/Documents/myprojects/DLT/models/production')
models_dir.mkdir(parents=True, exist_ok=True)

# Method 1: Framework-specific serialization (recommended)
def save_model_framework_native(model, path):
    """Save model using framework's native serialization."""
    if hasattr(model, 'save_model'):
        model.save_model(path)
        print(f"Model saved using framework-native method: {path}")
    else:
        print("Framework-native save not available")

# Method 2: Joblib serialization (fast, good for sklearn models)
def save_model_joblib(model, path):
    """Save model using joblib."""
    joblib.dump(model, path)
    print(f"Model saved using joblib: {path}")

# Method 3: Pickle serialization (universal but less efficient)
def save_model_pickle(model, path):
    """Save model using pickle."""
    with open(path, 'wb') as f:
        pickle.dump(model, f)
    print(f"Model saved using pickle: {path}")

# Save model metadata
def save_model_metadata(config, path):
    """Save model configuration and metadata."""
    metadata = {
        'created_at': datetime.now().isoformat(),
        'config': config.model_dump() if hasattr(config, 'model_dump') else str(config),
        'version': '1.0.0',
        'framework': config.model_type if hasattr(config, 'model_type') else 'unknown'
    }
    
    with open(path, 'w') as f:
        json.dump(metadata, f, indent=2)
    print(f"Model metadata saved: {path}")

# Save the model and metadata
model_path = models_dir / 'model.joblib'
metadata_path = models_dir / 'metadata.json'

save_model_joblib(model, model_path)
save_model_metadata(config, metadata_path)

Model saved using joblib: /home/rlfowler/Documents/myprojects/DLT/models/production/model.joblib
Model metadata saved: /home/rlfowler/Documents/myprojects/DLT/models/production/metadata.json


## 2. Production Configuration {#config}

Production environments require different configurations than development. Let's set up production-ready configurations.

In [23]:
# Production configuration template
production_config = {
    'environment': 'production',
    'logging': {
        'level': 'INFO',
        'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        'handlers': ['file', 'console'],
        'file_path': '/var/log/dlt/production.log'
    },
    'model_serving': {
        'batch_size': 64,
        'max_latency_ms': 100,
        'timeout_seconds': 30,
        'workers': 4
    },
    'monitoring': {
        'enable_metrics': True,
        'metrics_endpoint': '/metrics',
        'health_endpoint': '/health',
        'log_predictions': False,  # Privacy consideration
        'alert_thresholds': {
            'latency_p95_ms': 200,
            'error_rate_percent': 5.0,
            'throughput_rps': 100
        }
    },
    'security': {
        'enable_auth': True,
        'api_key_header': 'X-API-Key',
        'rate_limit_per_minute': 1000
    },
    'data_validation': {
        'enable_input_validation': True,
        'enable_output_validation': True,
        'drift_detection': True
    }
}

# Save production config
config_path = Path('/home/rlfowler/Documents/myprojects/DLT/config/config_production.yaml')

import yaml
with open(config_path, 'w') as f:
    yaml.dump(production_config, f, default_flow_style=False, indent=2)

print(f"Production configuration saved: {config_path}")
print("\nKey production settings:")
for key, value in production_config.items():
    print(f"  {key}: {type(value).__name__}")

Production configuration saved: /home/rlfowler/Documents/myprojects/DLT/config/config_production.yaml

Key production settings:
  environment: str
  logging: dict
  model_serving: dict
  monitoring: dict
  security: dict
  data_validation: dict


## 3. Model Serving Strategies {#serving}

Different deployment strategies for serving ML models in production.

In [24]:
# Strategy 1: REST API Service
class ModelAPIServer:
    """Simple REST API server for model serving."""
    
    def __init__(self, model_path, config):
        self.model = joblib.load(model_path)
        self.config = config
        self.setup_logging()
    
    def setup_logging(self):
        logging.basicConfig(
            level=getattr(logging, self.config.get('logging', {}).get('level', 'INFO')),
            format=self.config.get('logging', {}).get('format', '%(message)s')
        )
        self.logger = logging.getLogger(__name__)
    
    def predict(self, data):
        """Make predictions on input data."""
        try:
            start_time = datetime.now()
            
            # Validate input
            if not self.validate_input(data):
                return {'error': 'Invalid input data'}
            
            # Make prediction
            if hasattr(self.model, 'predict'):
                predictions = self.model.predict(data)
            else:
                predictions = "Mock prediction for demonstration"
            
            # Calculate latency
            latency_ms = (datetime.now() - start_time).total_seconds() * 1000
            
            self.logger.info(f"Prediction completed in {latency_ms:.2f}ms")
            
            return {
                'predictions': predictions.tolist() if hasattr(predictions, 'tolist') else predictions,
                'latency_ms': latency_ms,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Prediction error: {str(e)}")
            return {'error': str(e)}
    
    def validate_input(self, data):
        """Validate input data format."""
        # Basic validation - expand based on your needs
        if data is None:
            return False
        return True
    
    def health_check(self):
        """Health check endpoint."""
        return {
            'status': 'healthy',
            'timestamp': datetime.now().isoformat(),
            'model_loaded': self.model is not None
        }

# Initialize API server
api_server = ModelAPIServer(model_path, production_config)
print("Model API server initialized")

# Test the server
test_data = [[0.5, 0.3, 0.8, 0.1]]  # Sample input
result = api_server.predict(test_data)
print(f"\nTest prediction result: {result}")

health_status = api_server.health_check()
print(f"\nHealth check: {health_status}")

2025-09-29 09:48:41,319 - __main__ - ERROR - Prediction error: This RandomForestClassifier instance is not fitted yet. Call 'fit' with appropriate arguments before using this estimator.


Model API server initialized

Test prediction result: {'error': "This RandomForestClassifier instance is not fitted yet. Call 'fit' with appropriate arguments before using this estimator."}

Health check: {'status': 'healthy', 'timestamp': '2025-09-29T09:48:41.320919', 'model_loaded': True}


In [25]:
# Strategy 2: Batch Prediction Service
class BatchPredictionService:
    """Service for processing batch predictions."""
    
    def __init__(self, model_path, config):
        self.model = joblib.load(model_path)
        self.config = config
        self.batch_size = config.get('model_serving', {}).get('batch_size', 64)
    
    def process_batch(self, input_file, output_file):
        """Process a batch of data from file."""
        try:
            # Simulate batch processing
            print(f"Processing batch: {input_file} -> {output_file}")
            print(f"Batch size: {self.batch_size}")
            
            # In real implementation, you would:
            # 1. Read data from input_file
            # 2. Process in batches
            # 3. Write results to output_file
            
            return {
                'status': 'completed',
                'processed_records': 1000,  # Mock value
                'processing_time_seconds': 45.2,  # Mock value
                'output_file': output_file
            }
            
        except Exception as e:
            return {'status': 'failed', 'error': str(e)}

# Initialize batch service
batch_service = BatchPredictionService(model_path, production_config)
result = batch_service.process_batch('input.csv', 'output.csv')
print(f"Batch processing result: {result}")

Processing batch: input.csv -> output.csv
Batch size: 64
Batch processing result: {'status': 'completed', 'processed_records': 1000, 'processing_time_seconds': 45.2, 'output_file': 'output.csv'}


## 4. Monitoring and Logging {#monitoring}

Production ML systems require comprehensive monitoring and logging.

In [26]:
import time
from collections import defaultdict, deque
from threading import Lock

class ProductionMonitor:
    """Monitor for tracking production metrics."""
    
    def __init__(self, config):
        self.config = config
        self.metrics = defaultdict(list)
        self.alerts = []
        self.lock = Lock()
        
        # Initialize metric storage
        self.latencies = deque(maxlen=1000)  # Keep last 1000 latencies
        self.error_count = 0
        self.success_count = 0
        self.prediction_count = 0
    
    def record_prediction(self, latency_ms, success=True):
        """Record a prediction event."""
        with self.lock:
            self.prediction_count += 1
            self.latencies.append(latency_ms)
            
            if success:
                self.success_count += 1
            else:
                self.error_count += 1
            
            # Check for alerts
            self.check_alerts(latency_ms, success)
    
    def check_alerts(self, latency_ms, success):
        """Check if any alert thresholds are exceeded."""
        thresholds = self.config.get('monitoring', {}).get('alert_thresholds', {})
        
        # Latency alert
        p95_threshold = thresholds.get('latency_p95_ms', 200)
        if len(self.latencies) >= 20:  # Need sufficient data
            latencies_sorted = sorted(self.latencies)
            p95_latency = latencies_sorted[int(0.95 * len(latencies_sorted))]
            if p95_latency > p95_threshold:
                self.add_alert(f"High P95 latency: {p95_latency:.2f}ms > {p95_threshold}ms")
        
        # Error rate alert
        if self.prediction_count >= 100:  # Need sufficient data
            error_rate = (self.error_count / self.prediction_count) * 100
            error_threshold = thresholds.get('error_rate_percent', 5.0)
            if error_rate > error_threshold:
                self.add_alert(f"High error rate: {error_rate:.2f}% > {error_threshold}%")
    
    def add_alert(self, message):
        """Add an alert."""
        alert = {
            'timestamp': datetime.now().isoformat(),
            'message': message,
            'severity': 'warning'
        }
        self.alerts.append(alert)
        print(f"ALERT: {message}")
    
    def get_metrics(self):
        """Get current metrics summary."""
        with self.lock:
            if not self.latencies:
                return {'status': 'no_data'}
            
            latencies_sorted = sorted(self.latencies)
            
            return {
                'total_predictions': self.prediction_count,
                'success_rate': (self.success_count / self.prediction_count) * 100 if self.prediction_count > 0 else 0,
                'error_rate': (self.error_count / self.prediction_count) * 100 if self.prediction_count > 0 else 0,
                'latency_stats': {
                    'mean_ms': sum(self.latencies) / len(self.latencies),
                    'p50_ms': latencies_sorted[int(0.5 * len(latencies_sorted))],
                    'p95_ms': latencies_sorted[int(0.95 * len(latencies_sorted))],
                    'p99_ms': latencies_sorted[int(0.99 * len(latencies_sorted))],
                },
                'alert_count': len(self.alerts),
                'last_updated': datetime.now().isoformat()
            }

# Initialize monitor
monitor = ProductionMonitor(production_config)

# Simulate some predictions with monitoring
import random

print("Simulating production traffic...")
for i in range(150):
    # Simulate varying latencies and occasional failures
    latency = random.uniform(50, 300)  # ms
    success = random.random() > 0.02  # 2% error rate
    
    monitor.record_prediction(latency, success)
    
    if i % 50 == 0:
        metrics = monitor.get_metrics()
        print(f"\nMetrics after {i+1} predictions:")
        print(f"  Success rate: {metrics['success_rate']:.1f}%")
        print(f"  Mean latency: {metrics['latency_stats']['mean_ms']:.1f}ms")
        print(f"  P95 latency: {metrics['latency_stats']['p95_ms']:.1f}ms")

# Final metrics
final_metrics = monitor.get_metrics()
print("\n=== Final Production Metrics ===")
print(json.dumps(final_metrics, indent=2))

Simulating production traffic...

Metrics after 1 predictions:
  Success rate: 100.0%
  Mean latency: 147.9ms
  P95 latency: 147.9ms
ALERT: High P95 latency: 298.31ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 276.69ms > 200ms
ALERT: High P95 latency: 282.59ms > 200ms
ALERT: High P95 latency: 282.59ms > 200ms
ALERT: High P95 latency: 282.59ms > 200ms
ALERT: High P95 latency: 28

## 5. Performance Optimization {#optimization}

Techniques for optimizing model performance in production.

In [28]:
# Performance optimization techniques
class PerformanceOptimizer:
    """Utilities for optimizing model performance."""
    
    @staticmethod
    def benchmark_model(model, test_data, num_iterations=100):
        """Benchmark model inference speed."""
        print(f"Benchmarking model with {num_iterations} iterations...")
        
        latencies = []
        
        for i in range(num_iterations):
            start_time = time.time()
            
            # Simulate prediction
            try:
                if hasattr(model, 'predict'):
                    _ = model.predict(test_data)
                else:
                    time.sleep(0.001)  # Simulate processing time
            except Exception:
                # If model is not fitted or has issues, simulate processing time
                time.sleep(0.001)
            
            latency_ms = (time.time() - start_time) * 1000
            latencies.append(latency_ms)
        
        latencies.sort()
        
        return {
            'mean_latency_ms': sum(latencies) / len(latencies),
            'median_latency_ms': latencies[len(latencies) // 2],
            'p95_latency_ms': latencies[int(0.95 * len(latencies))],
            'p99_latency_ms': latencies[int(0.99 * len(latencies))],
            'min_latency_ms': min(latencies),
            'max_latency_ms': max(latencies)
        }
    
    @staticmethod
    def optimize_batch_size(model, test_data, batch_sizes=[1, 8, 16, 32, 64]):
        """Find optimal batch size for throughput."""
        print("Optimizing batch size...")
        
        results = {}
        
        for batch_size in batch_sizes:
            print(f"Testing batch size: {batch_size}")
            
            # Simulate batch processing
            start_time = time.time()
            
            # Process multiple batches
            for _ in range(10):
                try:
                    if hasattr(model, 'predict'):
                        _ = model.predict(test_data[:batch_size] if len(test_data) >= batch_size else test_data)
                    else:
                        time.sleep(0.001 * batch_size)  # Simulate processing time
                except Exception:
                    # If model is not fitted, simulate processing time
                    time.sleep(0.001 * batch_size)
            
            total_time = time.time() - start_time
            throughput = (10 * batch_size) / total_time  # samples per second
            
            results[batch_size] = {
                'throughput_samples_per_sec': throughput,
                'avg_latency_per_sample_ms': (total_time / (10 * batch_size)) * 1000
            }
        
        # Find optimal batch size
        optimal_batch_size = max(results.keys(), key=lambda x: results[x]['throughput_samples_per_sec'])
        
        return {
            'results': results,
            'optimal_batch_size': optimal_batch_size,
            'optimal_throughput': results[optimal_batch_size]['throughput_samples_per_sec']
        }

# Run performance benchmarks
optimizer = PerformanceOptimizer()

# Benchmark single prediction latency
test_data = [[0.5, 0.3, 0.8, 0.1] for _ in range(10)]
latency_results = optimizer.benchmark_model(model, test_data)

print("=== Latency Benchmark Results ===")
for metric, value in latency_results.items():
    print(f"{metric}: {value:.2f}")

# Optimize batch size
batch_results = optimizer.optimize_batch_size(model, test_data)

print("\n=== Batch Size Optimization ===")
print(f"Optimal batch size: {batch_results['optimal_batch_size']}")
print(f"Optimal throughput: {batch_results['optimal_throughput']:.2f} samples/sec")

print("\nThroughput by batch size:")
for batch_size, metrics in batch_results['results'].items():
    print(f"  Batch {batch_size}: {metrics['throughput_samples_per_sec']:.2f} samples/sec")

Benchmarking model with 100 iterations...
=== Latency Benchmark Results ===
mean_latency_ms: 1.08
median_latency_ms: 1.08
p95_latency_ms: 1.11
p99_latency_ms: 1.13
min_latency_ms: 1.05
max_latency_ms: 1.13
Optimizing batch size...
Testing batch size: 1
Testing batch size: 8
Testing batch size: 16
Testing batch size: 32
Testing batch size: 32
Testing batch size: 64
Testing batch size: 64

=== Batch Size Optimization ===
Optimal batch size: 64
Optimal throughput: 997.65 samples/sec

Throughput by batch size:
  Batch 1: 924.06 samples/sec
  Batch 8: 989.61 samples/sec
  Batch 16: 994.83 samples/sec
  Batch 32: 997.20 samples/sec
  Batch 64: 997.65 samples/sec

=== Batch Size Optimization ===
Optimal batch size: 64
Optimal throughput: 997.65 samples/sec

Throughput by batch size:
  Batch 1: 924.06 samples/sec
  Batch 8: 989.61 samples/sec
  Batch 16: 994.83 samples/sec
  Batch 32: 997.20 samples/sec
  Batch 64: 997.65 samples/sec


## 6. CI/CD Integration {#cicd}

Integrating model deployment with CI/CD pipelines.

In [29]:
# CI/CD Pipeline Components

class ModelValidator:
    """Validate models before deployment."""
    
    def __init__(self, config):
        self.config = config
    
    def validate_model_performance(self, model, test_data, performance_threshold):
        """Validate model meets performance requirements."""
        print("Validating model performance...")
        
        # Simulate performance validation
        try:
            if hasattr(model, 'score'):
                # For models with score method
                score = model.score(test_data, [1] * len(test_data))
            else:
                # Mock score for demonstration
                score = 0.85
            
            passed = score >= performance_threshold
            
            return {
                'passed': passed,
                'score': score,
                'threshold': performance_threshold,
                'message': f"Model score {score:.3f} {'≥' if passed else '<'} threshold {performance_threshold:.3f}"
            }
            
        except Exception as e:
            return {
                'passed': False,
                'error': str(e),
                'message': f"Performance validation failed: {str(e)}"
            }
    
    def validate_model_compatibility(self, model):
        """Validate model is compatible with production environment."""
        print("Validating model compatibility...")
        
        checks = {
            'has_predict_method': hasattr(model, 'predict') or hasattr(model, '__call__'),
            'serializable': self._test_serialization(model),
            'memory_efficient': self._test_memory_usage(model)
        }
        
        all_passed = all(checks.values())
        
        return {
            'passed': all_passed,
            'checks': checks,
            'message': 'All compatibility checks passed' if all_passed else 'Some compatibility checks failed'
        }
    
    def _test_serialization(self, model):
        """Test if model can be serialized and deserialized."""
        try:
            # Test joblib serialization
            import tempfile
            with tempfile.NamedTemporaryFile() as tmp:
                joblib.dump(model, tmp.name)
                _ = joblib.load(tmp.name)
            return True
        except Exception:
            return False
    
    def _test_memory_usage(self, model):
        """Test model memory usage."""
        try:
            # Simple memory test - in practice, use memory profilers
            import sys
            size_mb = sys.getsizeof(model) / (1024 * 1024)
            return size_mb < 1000  # Less than 1GB
        except Exception:
            return False

class DeploymentManager:
    """Manage model deployments."""
    
    def __init__(self, config):
        self.config = config
        self.validator = ModelValidator(config)
    
    def deploy_model(self, model, version, test_data):
        """Deploy model with validation pipeline."""
        print(f"Starting deployment pipeline for model version {version}...")
        
        deployment_result = {
            'version': version,
            'timestamp': datetime.now().isoformat(),
            'status': 'failed',
            'steps': []
        }
        
        # Step 1: Validate performance
        perf_result = self.validator.validate_model_performance(model, test_data, 0.8)
        deployment_result['steps'].append(('performance_validation', perf_result))
        
        if not perf_result['passed']:
            deployment_result['message'] = 'Deployment failed: Performance validation'
            return deployment_result
        
        # Step 2: Validate compatibility
        compat_result = self.validator.validate_model_compatibility(model)
        deployment_result['steps'].append(('compatibility_validation', compat_result))
        
        if not compat_result['passed']:
            deployment_result['message'] = 'Deployment failed: Compatibility validation'
            return deployment_result
        
        # Step 3: Create deployment artifacts
        artifacts_result = self._create_deployment_artifacts(model, version)
        deployment_result['steps'].append(('create_artifacts', artifacts_result))
        
        if not artifacts_result['passed']:
            deployment_result['message'] = 'Deployment failed: Artifact creation'
            return deployment_result
        
        # Step 4: Deploy to staging (simulation)
        staging_result = self._deploy_to_staging(version)
        deployment_result['steps'].append(('deploy_staging', staging_result))
        
        if not staging_result['passed']:
            deployment_result['message'] = 'Deployment failed: Staging deployment'
            return deployment_result
        
        deployment_result['status'] = 'success'
        deployment_result['message'] = f'Model version {version} deployed successfully'
        
        return deployment_result
    
    def _create_deployment_artifacts(self, model, version):
        """Create deployment artifacts."""
        try:
            print(f"Creating deployment artifacts for version {version}...")
            
            # Create version directory
            version_dir = Path(f'/home/rlfowler/Documents/myprojects/DLT/models/deployments/v{version}')
            version_dir.mkdir(parents=True, exist_ok=True)
            
            # Save model
            model_path = version_dir / 'model.joblib'
            joblib.dump(model, model_path)
            
            # Save deployment metadata
            metadata = {
                'version': version,
                'created_at': datetime.now().isoformat(),
                'model_file': 'model.joblib',
                'deployment_config': self.config
            }
            
            metadata_path = version_dir / 'deployment_metadata.json'
            with open(metadata_path, 'w') as f:
                json.dump(metadata, f, indent=2)
            
            return {
                'passed': True,
                'artifacts_path': str(version_dir),
                'message': f'Artifacts created in {version_dir}'
            }
            
        except Exception as e:
            return {
                'passed': False,
                'error': str(e),
                'message': f'Artifact creation failed: {str(e)}'
            }
    
    def _deploy_to_staging(self, version):
        """Deploy to staging environment."""
        try:
            print(f"Deploying version {version} to staging...")
            
            # Simulate staging deployment
            time.sleep(1)  # Simulate deployment time
            
            return {
                'passed': True,
                'staging_url': f'https://staging.api.example.com/v{version}',
                'message': f'Version {version} deployed to staging'
            }
            
        except Exception as e:
            return {
                'passed': False,
                'error': str(e),
                'message': f'Staging deployment failed: {str(e)}'
            }

# Test the deployment pipeline
deployment_manager = DeploymentManager(production_config)
test_data = [[0.5, 0.3, 0.8, 0.1] for _ in range(10)]

deployment_result = deployment_manager.deploy_model(model, "1.2.3", test_data)

print("=== Deployment Pipeline Results ===")
print(f"Status: {deployment_result['status'].upper()}")
print(f"Message: {deployment_result['message']}")
print(f"Version: {deployment_result['version']}")
print("\nDeployment Steps:")

for step_name, step_result in deployment_result['steps']:
    status = "✅ PASSED" if step_result['passed'] else "❌ FAILED"
    print(f"  {step_name}: {status}")
    if 'message' in step_result:
        print(f"    {step_result['message']}")

Starting deployment pipeline for model version 1.2.3...
Validating model performance...
Validating model compatibility...
Creating deployment artifacts for version 1.2.3...
Deploying version 1.2.3 to staging...
=== Deployment Pipeline Results ===
Status: SUCCESS
Message: Model version 1.2.3 deployed successfully
Version: 1.2.3

Deployment Steps:
  performance_validation: ✅ PASSED
    Model score 0.850 ≥ threshold 0.800
  compatibility_validation: ✅ PASSED
    All compatibility checks passed
  create_artifacts: ✅ PASSED
    Artifacts created in /home/rlfowler/Documents/myprojects/DLT/models/deployments/v1.2.3
  deploy_staging: ✅ PASSED
    Version 1.2.3 deployed to staging
=== Deployment Pipeline Results ===
Status: SUCCESS
Message: Model version 1.2.3 deployed successfully
Version: 1.2.3

Deployment Steps:
  performance_validation: ✅ PASSED
    Model score 0.850 ≥ threshold 0.800
  compatibility_validation: ✅ PASSED
    All compatibility checks passed
  create_artifacts: ✅ PASSED
    A

## 7. Best Practices {#best-practices}

Summary of production deployment best practices.

In [30]:
# Production Best Practices Checklist
best_practices = {
    "Model Management": [
        "✅ Version all models with semantic versioning",
        "✅ Store model metadata alongside model files",
        "✅ Use reproducible training processes",
        "✅ Implement model validation before deployment",
        "✅ Keep model registry updated"
    ],
    "Infrastructure": [
        "✅ Use containerization (Docker) for consistency",
        "✅ Implement horizontal scaling capabilities",
        "✅ Set up load balancing for high availability",
        "✅ Use environment-specific configurations",
        "✅ Implement circuit breakers for fault tolerance"
    ],
    "Monitoring & Observability": [
        "✅ Monitor prediction latency and throughput",
        "✅ Track model performance metrics",
        "✅ Implement data drift detection",
        "✅ Set up alerting for anomalies",
        "✅ Log predictions for debugging (when privacy allows)"
    ],
    "Security": [
        "✅ Implement authentication and authorization",
        "✅ Use API rate limiting",
        "✅ Encrypt data in transit and at rest",
        "✅ Validate and sanitize all inputs",
        "✅ Follow data privacy regulations"
    ],
    "Performance": [
        "✅ Optimize batch sizes for throughput",
        "✅ Use model quantization when appropriate",
        "✅ Implement caching for frequent predictions",
        "✅ Profile and optimize memory usage",
        "✅ Use async processing for non-real-time workloads"
    ],
    "CI/CD": [
        "✅ Automate testing and validation pipelines",
        "✅ Implement canary deployments",
        "✅ Use blue-green deployment strategies",
        "✅ Automate rollback procedures",
        "✅ Test in staging before production"
    ]
}

print("=== PRODUCTION ML DEPLOYMENT BEST PRACTICES ===")
print("\nImplement these practices to ensure robust, scalable, and maintainable ML systems:")
print()

for category, practices in best_practices.items():
    print(f"## {category}")
    for practice in practices:
        print(f"  {practice}")
    print()

print("\n=== DEPLOYMENT READINESS ASSESSMENT ===")
print("\nUse this checklist to assess your deployment readiness:")
print()

assessment_items = [
    "Model performance meets business requirements",
    "Model is tested on production-like data",
    "Infrastructure can handle expected load",
    "Monitoring and alerting are configured",
    "Security measures are implemented",
    "Rollback procedure is tested",
    "Documentation is complete",
    "Team is trained on operations"
]

for i, item in enumerate(assessment_items, 1):
    print(f"{i}. □ {item}")

print("\n✅ All items checked = Ready for production deployment!")

=== PRODUCTION ML DEPLOYMENT BEST PRACTICES ===

Implement these practices to ensure robust, scalable, and maintainable ML systems:

## Model Management
  ✅ Version all models with semantic versioning
  ✅ Store model metadata alongside model files
  ✅ Use reproducible training processes
  ✅ Implement model validation before deployment
  ✅ Keep model registry updated

## Infrastructure
  ✅ Use containerization (Docker) for consistency
  ✅ Implement horizontal scaling capabilities
  ✅ Set up load balancing for high availability
  ✅ Use environment-specific configurations
  ✅ Implement circuit breakers for fault tolerance

## Monitoring & Observability
  ✅ Monitor prediction latency and throughput
  ✅ Track model performance metrics
  ✅ Implement data drift detection
  ✅ Set up alerting for anomalies
  ✅ Log predictions for debugging (when privacy allows)

## Security
  ✅ Implement authentication and authorization
  ✅ Use API rate limiting
  ✅ Encrypt data in transit and at rest
  ✅ Valid

## Summary

This notebook covered the essential aspects of deploying DLT models to production:

### Key Topics Covered:
1. **Model Serialization** - Different methods for saving and loading models
2. **Production Configuration** - Environment-specific settings and security
3. **Serving Strategies** - REST API and batch prediction services
4. **Monitoring** - Comprehensive metrics tracking and alerting
5. **Performance Optimization** - Benchmarking and optimization techniques
6. **CI/CD Integration** - Automated validation and deployment pipelines
7. **Best Practices** - Production deployment checklist

### Next Steps:
- Adapt the code examples to your specific model types and frameworks
- Set up proper infrastructure (containers, orchestration, monitoring)
- Implement security measures appropriate for your use case
- Test thoroughly in staging environments before production
- Establish monitoring and incident response procedures

### Additional Resources:
- Review the other notebooks for model development guidance
- Consult framework-specific documentation for advanced features
- Consider using MLOps platforms for enterprise deployments

Remember: Production ML is not just about the model - it's about building reliable, scalable, and maintainable systems that deliver value to users.

In [31]:
print("Test cell - Notebook kernel is running!")
import os
print(f"Working directory: {os.getcwd()}")
print(f"Python version: {os.sys.version}")
print("Basic imports working!")

Test cell - Notebook kernel is running!
Working directory: /home/rlfowler/Documents/myprojects/DLT/notebooks
Python version: 3.10.18 (main, Jun  4 2025, 08:56:00) [GCC 9.4.0]
Basic imports working!


In [32]:
# Basic imports for production deployment
import os
import sys
import json
import pickle
import joblib
import logging
import yaml
from datetime import datetime
from pathlib import Path

print("✅ Basic production deployment imports successful!")

✅ Basic production deployment imports successful!


In [33]:
# Create a mock model and config for demonstration
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification

# Create sample data and train a mock model
X, y = make_classification(n_samples=100, n_features=10, n_informative=8, n_classes=3, random_state=42)
model = RandomForestClassifier(n_estimators=10, random_state=42)
model.fit(X, y)

# Create mock config
class MockConfig:
    def get(self, key, default=None):
        configs = {
            'model': {'type': 'sklearn'},
            'experiment': {'name': 'mock_experiment'}
        }
        return configs.get(key, default)

config = MockConfig()

print("✅ Mock model and config created for demonstration!")
print(f"Model type: {type(model).__name__}")
print(f"Model score: {model.score(X, y):.3f}")

✅ Mock model and config created for demonstration!
Model type: RandomForestClassifier
Model score: 0.990


In [34]:
# Create production config for the demo
production_config = {
    'logging': {
        'level': 'INFO',
        'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    },
    'model_serving': {
        'batch_size': 64,
        'max_latency_ms': 100
    }
}

# Create model path for reference
model_path = Path('/home/rlfowler/Documents/myprojects/DLT/models/production/model.joblib')

print("✅ Production config and model path set up!")

✅ Production config and model path set up!
