# Performance Monitoring - ML Pipeline Platform

Monitor API performance, model metrics, and system health in real-time.

**Prerequisites**: Services should be running (`docker-compose up -d`)

In [None]:
# Core imports
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time
from datetime import datetime, timedelta
import json
from IPython.display import display, clear_output
import warnings
warnings.filterwarnings('ignore')

# Configuration
API_URL = "http://localhost:8000"
PROMETHEUS_URL = "http://localhost:9090"
GRAFANA_URL = "http://localhost:3001"
MLFLOW_URL = "http://localhost:5000"

print("Environment ready")

## 1. Service Health Check

In [None]:
def check_service_health():
    """Check health of all services"""
    services = {
        "API": f"{API_URL}/health",
        "Prometheus": f"{PROMETHEUS_URL}/-/healthy",
        "MLflow": f"{MLFLOW_URL}/health",
    }
    
    results = {}
    for name, url in services.items():
        try:
            response = requests.get(url, timeout=5)
            if response.status_code == 200:
                results[name] = "‚úÖ Healthy"
                if name == "API" and response.headers.get('content-type') == 'application/json':
                    data = response.json()
                    if 'model_loaded' in data:
                        results[name] += f" (Model: {data.get('model_name', 'Unknown')})"
            else:
                results[name] = f"‚ö†Ô∏è Unhealthy (Status: {response.status_code})"
        except requests.exceptions.RequestException:
            results[name] = "‚ùå Unavailable"
    
    return results

# Check services
print("SERVICE HEALTH STATUS")
print("="*50)
health_results = check_service_health()
for service, status in health_results.items():
    print(f"{service:15s}: {status}")

if "‚ùå" in str(health_results.values()):
    print("\n‚ö†Ô∏è Some services are unavailable. Run: docker-compose up -d")

## 2. API Performance Metrics

In [None]:
def get_api_metrics():
    """Fetch current API metrics from Prometheus endpoint"""
    try:
        response = requests.get(f"{API_URL}/metrics")
        if response.status_code == 200:
            metrics_text = response.text
            
            # Parse key metrics
            metrics = {}
            for line in metrics_text.split('\n'):
                if line and not line.startswith('#'):
                    if 'http_requests_total' in line:
                        parts = line.split(' ')
                        if len(parts) == 2:
                            metrics['total_requests'] = float(parts[1])
                    elif 'http_request_duration_seconds_sum' in line:
                        parts = line.split(' ')
                        if len(parts) == 2:
                            metrics['total_duration'] = float(parts[1])
                    elif 'predictions_total' in line:
                        parts = line.split(' ')
                        if len(parts) == 2:
                            metrics['total_predictions'] = float(parts[1])
                    elif 'model_cache_hits_total' in line:
                        parts = line.split(' ')
                        if len(parts) == 2:
                            metrics['cache_hits'] = float(parts[1])
                    elif 'model_cache_misses_total' in line:
                        parts = line.split(' ')
                        if len(parts) == 2:
                            metrics['cache_misses'] = float(parts[1])
            return metrics
    except:
        return None

# Get current metrics
metrics = get_api_metrics()
if metrics:
    print("API METRICS SUMMARY")
    print("="*50)
    print(f"Total Requests: {metrics.get('total_requests', 0):.0f}")
    print(f"Total Predictions: {metrics.get('total_predictions', 0):.0f}")
    
    cache_hits = metrics.get('cache_hits', 0)
    cache_misses = metrics.get('cache_misses', 0)
    if cache_hits + cache_misses > 0:
        cache_rate = cache_hits / (cache_hits + cache_misses) * 100
        print(f"Cache Hit Rate: {cache_rate:.1f}%")
    
    if metrics.get('total_requests', 0) > 0:
        avg_duration = metrics.get('total_duration', 0) / metrics.get('total_requests', 1)
        print(f"Avg Response Time: {avg_duration*1000:.1f}ms")
else:
    print("No metrics available yet. Make some API calls first.")

## 3. Load Testing

In [None]:
def load_test(num_requests=50, concurrent=5):
    """Simple load test to generate metrics"""
    import concurrent.futures
    
    # Sample request
    request_data = {
        "features": {
            "amount": 150.0,
            "merchant_category": "electronics",
            "hour_of_day": 14,
            "is_weekend": 0,
            "risk_score": 0.3,
            "days_since_last": 3,
            "num_transactions_today": 2
        },
        "model_name": "fraud_detector"
    }
    
    def make_request(_):
        start = time.time()
        try:
            response = requests.post(f"{API_URL}/predict", json=request_data, timeout=10)
            duration = time.time() - start
            return {"success": response.status_code == 200, "duration": duration}
        except:
            return {"success": False, "duration": time.time() - start}
    
    print(f"Starting load test: {num_requests} requests with {concurrent} concurrent workers")
    print("Progress: ", end="")
    
    results = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent) as executor:
        futures = [executor.submit(make_request, i) for i in range(num_requests)]
        for i, future in enumerate(concurrent.futures.as_completed(futures)):
            results.append(future.result())
            if (i + 1) % 10 == 0:
                print(f"{i+1}", end=" ")
    
    print("\n\nLOAD TEST RESULTS")
    print("="*50)
    
    successes = [r for r in results if r["success"]]
    failures = [r for r in results if not r["success"]]
    durations = [r["duration"] for r in successes]
    
    print(f"Success Rate: {len(successes)}/{num_requests} ({len(successes)/num_requests*100:.1f}%)")
    
    if durations:
        print(f"\nResponse Times (successful):")
        print(f"  Min: {min(durations)*1000:.1f}ms")
        print(f"  Avg: {np.mean(durations)*1000:.1f}ms")
        print(f"  Max: {max(durations)*1000:.1f}ms")
        print(f"  P50: {np.percentile(durations, 50)*1000:.1f}ms")
        print(f"  P95: {np.percentile(durations, 95)*1000:.1f}ms")
        print(f"  P99: {np.percentile(durations, 99)*1000:.1f}ms")
    
    if failures:
        print(f"\n‚ö†Ô∏è {len(failures)} requests failed")
    
    return results

# Run load test
try:
    results = load_test(num_requests=50, concurrent=5)
except Exception as e:
    print(f"Load test failed: {e}")
    print("Make sure the API is running and a model is loaded.")

## 4. Response Time Analysis

In [None]:
# Visualize response times from load test
if 'results' in locals() and results:
    durations_ms = [r['duration']*1000 for r in results if r['success']]
    
    if durations_ms:
        fig, axes = plt.subplots(1, 2, figsize=(12, 5))
        
        # Histogram
        axes[0].hist(durations_ms, bins=20, alpha=0.7, color='blue', edgecolor='black')
        axes[0].axvline(np.mean(durations_ms), color='red', linestyle='--', label=f'Mean: {np.mean(durations_ms):.1f}ms')
        axes[0].axvline(np.percentile(durations_ms, 95), color='orange', linestyle='--', label=f'P95: {np.percentile(durations_ms, 95):.1f}ms')
        axes[0].set_xlabel('Response Time (ms)')
        axes[0].set_ylabel('Frequency')
        axes[0].set_title('Response Time Distribution')
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)
        
        # Time series
        axes[1].plot(durations_ms, alpha=0.7)
        axes[1].axhline(np.mean(durations_ms), color='red', linestyle='--', alpha=0.5)
        axes[1].set_xlabel('Request Number')
        axes[1].set_ylabel('Response Time (ms)')
        axes[1].set_title('Response Time Over Time')
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
else:
    print("No load test results to visualize. Run the load test cell first.")

## 5. Real-time Monitoring

In [None]:
def monitor_realtime(duration_seconds=30, interval=2):
    """Monitor API in real-time"""
    print(f"Monitoring for {duration_seconds} seconds...")
    print("Press Interrupt to stop early\n")
    
    start_time = time.time()
    metrics_history = []
    
    try:
        while time.time() - start_time < duration_seconds:
            # Get current metrics
            metrics = get_api_metrics()
            if metrics:
                metrics['timestamp'] = datetime.now()
                metrics_history.append(metrics)
            
            # Make a test request
            test_request = {
                "features": {
                    "amount": np.random.uniform(10, 1000),
                    "merchant_category": np.random.choice(['electronics', 'grocery', 'gas']),
                    "hour_of_day": np.random.randint(0, 24),
                    "is_weekend": np.random.choice([0, 1]),
                    "risk_score": np.random.uniform(0, 1),
                    "days_since_last": np.random.randint(1, 30),
                    "num_transactions_today": np.random.randint(1, 10)
                },
                "model_name": "fraud_detector"
            }
            
            request_start = time.time()
            try:
                response = requests.post(f"{API_URL}/predict", json=test_request, timeout=5)
                request_time = (time.time() - request_start) * 1000
                
                if response.status_code == 200:
                    result = response.json()
                    
                    # Display current status
                    clear_output(wait=True)
                    print(f"‚è±Ô∏è Time Elapsed: {time.time() - start_time:.0f}s / {duration_seconds}s")
                    print(f"\nüìä Latest Request:")
                    print(f"  Response Time: {request_time:.1f}ms")
                    print(f"  Prediction: {result.get('prediction', 'N/A')}")
                    if 'probability' in result:
                        print(f"  Confidence: {result['probability']:.3f}")
                    
                    if len(metrics_history) > 1:
                        print(f"\nüìà Session Stats:")
                        print(f"  Total Requests: {len(metrics_history)}")
                        print(f"  Avg Response: {np.mean([m.get('total_duration', 0) for m in metrics_history[-10:]])*1000:.1f}ms")
            except:
                pass
            
            time.sleep(interval)
            
    except KeyboardInterrupt:
        print("\nMonitoring stopped by user")
    
    print(f"\n‚úÖ Monitoring complete. Made {len(metrics_history)} requests.")
    return metrics_history

# Start monitoring
metrics_history = monitor_realtime(duration_seconds=20, interval=1)

## 6. Model Performance Tracking

In [None]:
# Check current model info
try:
    response = requests.get(f"{API_URL}/models/current")
    if response.status_code == 200:
        model_info = response.json()
        
        print("CURRENT MODEL INFO")
        print("="*50)
        for key, value in model_info.items():
            print(f"{key}: {value}")
        
        # Get model versions from MLflow
        import mlflow
        from mlflow.tracking import MlflowClient
        
        mlflow.set_tracking_uri(MLFLOW_URL)
        client = MlflowClient()
        
        try:
            model_name = model_info.get('name', 'fraud_detector')
            versions = client.search_model_versions(f"name='{model_name}'")
            
            if versions:
                print(f"\nMODEL VERSION HISTORY")
                print("="*50)
                
                version_data = []
                for v in versions[:5]:  # Last 5 versions
                    run = client.get_run(v.run_id)
                    metrics = run.data.metrics
                    version_data.append({
                        'Version': v.version,
                        'Stage': v.current_stage,
                        'F1': metrics.get('f1', 0),
                        'AUC': metrics.get('auc_roc', 0),
                        'Accuracy': metrics.get('accuracy', 0)
                    })
                
                df_versions = pd.DataFrame(version_data)
                print(df_versions.to_string(index=False))
                
        except Exception as e:
            print(f"\nCould not fetch MLflow model versions: {e}")
            
    else:
        print("Could not get current model info. Is a model loaded?")
        
except requests.exceptions.RequestException:
    print("API is not responding. Make sure services are running.")

## 7. System Resource Usage

In [None]:
def get_docker_stats():
    """Get Docker container resource usage"""
    import subprocess
    
    try:
        # Run docker stats command
        result = subprocess.run(
            ['docker', 'stats', '--no-stream', '--format', 
             'table {{.Container}}\t{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}'],
            capture_output=True, text=True
        )
        
        if result.returncode == 0:
            print("DOCKER CONTAINER STATS")
            print("="*80)
            print(result.stdout)
        else:
            print("Could not get Docker stats. Make sure Docker is running.")
            
    except Exception as e:
        print(f"Error getting Docker stats: {e}")
        print("\nAlternative: Run 'docker stats' in a terminal")

get_docker_stats()

## 8. Alert Simulation

In [None]:
def check_alerts():
    """Check for potential issues that would trigger alerts"""
    alerts = []
    
    # Check API response time
    try:
        start = time.time()
        response = requests.get(f"{API_URL}/health", timeout=5)
        response_time = (time.time() - start) * 1000
        
        if response_time > 1000:
            alerts.append(f"‚ö†Ô∏è HIGH LATENCY: API response time {response_time:.0f}ms > 1000ms")
        elif response_time > 500:
            alerts.append(f"‚ö†Ô∏è WARNING: API response time {response_time:.0f}ms > 500ms")
    except:
        alerts.append("‚ùå CRITICAL: API is not responding")
    
    # Check model status
    try:
        response = requests.get(f"{API_URL}/models/current")
        if response.status_code != 200:
            alerts.append("‚ö†Ô∏è WARNING: No model loaded in API")
    except:
        pass
    
    # Check error rate (simulated)
    metrics = get_api_metrics()
    if metrics and metrics.get('total_requests', 0) > 100:
        # Simulate error rate check
        error_rate = np.random.uniform(0, 0.1)  # Simulated
        if error_rate > 0.05:
            alerts.append(f"‚ö†Ô∏è HIGH ERROR RATE: {error_rate*100:.1f}% > 5%")
    
    print("ALERT STATUS")
    print("="*50)
    
    if alerts:
        for alert in alerts:
            print(alert)
    else:
        print("‚úÖ No alerts - All systems operating normally")
    
    return alerts

alerts = check_alerts()

## 9. Performance Summary Dashboard

In [None]:
# Create summary dashboard
print("\n" + "="*60)
print("PERFORMANCE MONITORING DASHBOARD")
print("="*60)
print(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print()

# Service Status
print("üì° SERVICE STATUS")
print("-"*40)
health = check_service_health()
for service, status in health.items():
    print(f"  {service:15s}: {status}")

# Performance Metrics
print("\n‚ö° PERFORMANCE METRICS")
print("-"*40)
metrics = get_api_metrics()
if metrics:
    print(f"  Total Requests: {metrics.get('total_requests', 0):.0f}")
    print(f"  Cache Hit Rate: {metrics.get('cache_hits', 0)/(metrics.get('cache_hits', 0)+metrics.get('cache_misses', 1))*100:.1f}%")
    if metrics.get('total_requests', 0) > 0:
        print(f"  Avg Response: {metrics.get('total_duration', 0)/metrics.get('total_requests', 1)*1000:.1f}ms")
else:
    print("  No metrics available")

# Alert Summary
print("\nüö® ALERTS")
print("-"*40)
if alerts:
    for alert in alerts[:3]:  # Show top 3 alerts
        print(f"  {alert}")
else:
    print("  ‚úÖ No active alerts")

# Quick Links
print("\nüîó QUICK LINKS")
print("-"*40)
print(f"  API Docs: {API_URL}/docs")
print(f"  MLflow UI: {MLFLOW_URL}")
print(f"  Grafana: {GRAFANA_URL}")
print(f"  Prometheus: {PROMETHEUS_URL}")

print("\n" + "="*60)

## Summary

This monitoring notebook provides:

1. **Health Checks**: Verify all services are running
2. **Performance Metrics**: Track API response times and throughput
3. **Load Testing**: Generate traffic to test system capacity
4. **Real-time Monitoring**: Watch system behavior live
5. **Alert Detection**: Identify potential issues
6. **Resource Usage**: Monitor Docker container resources

**Next Steps**:
- Set up automated alerts in Grafana
- Configure Prometheus recording rules
- Implement SLO/SLI tracking
- Add custom business metrics
- Set up distributed tracing