# 📈 Pipeline Monitoring and Operations

Welcome to the **final tutorial** in our Data Ingestion Pipeline series! In this comprehensive notebook, you'll learn how to monitor, operate, and maintain production data pipelines with enterprise-grade observability and alerting.

## 🎯 Learning Objectives

By the end of this tutorial, you will:
- ✅ Implement comprehensive pipeline monitoring and observability
- ✅ Create real-time dashboards and visualizations
- ✅ Set up automated alerting and notification systems
- ✅ Perform pipeline performance analysis and optimization
- ✅ Establish operational procedures and best practices
- ✅ Build a complete monitoring and alerting framework

---

## 🔍 Why Pipeline Monitoring Matters

In production environments, data pipelines are critical business infrastructure. Without proper monitoring, you risk:

### ❌ **Without Monitoring**
- **Silent Failures**: Pipelines fail without anyone knowing
- **Data Quality Issues**: Bad data propagates downstream
- **Performance Degradation**: Slow pipelines impact business operations
- **Resource Waste**: Inefficient resource utilization
- **Compliance Risks**: Missing SLA requirements

### ✅ **With Proper Monitoring**
- **Proactive Issue Detection**: Catch problems before they impact users
- **Data Quality Assurance**: Continuous quality monitoring
- **Performance Optimization**: Identify and fix bottlenecks
- **Resource Efficiency**: Optimize compute and storage usage
- **Compliance Confidence**: Meet SLA and regulatory requirements

### 📊 **Key Monitoring Dimensions**
```
🔄 OPERATIONAL METRICS     📊 DATA QUALITY METRICS     💰 BUSINESS METRICS
├── Pipeline Success Rate  ├── Data Completeness       ├── Records Processed
├── Execution Time         ├── Data Accuracy           ├── Revenue Impact
├── Resource Usage         ├── Schema Compliance       ├── Customer Impact
├── Error Rates            ├── Freshness               ├── SLA Compliance
└── Throughput             └── Consistency             └── Cost Efficiency
```

In [None]:
# Import all necessary libraries
import sys
import os
import pandas as pd
import numpy as np
import json
import logging
import time
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Any, Optional, Tuple
import warnings
warnings.filterwarnings('ignore')

# Add src to path for imports
sys.path.append(os.path.join('..', 'src'))

# Import our pipeline components
try:
    from pipeline.pipeline_manager import PipelineManager
    from storage.database_manager import DatabaseManager
    from utils.config import config
    from utils.helpers import ensure_directory_exists, format_duration
    print("✅ Pipeline components imported successfully!")
except ImportError as e:
    print(f"⚠️ Import error: {e}")
    print("📝 Note: Some components may not be available in this demo environment")

# Set up plotting
plt.style.use('default')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (12, 8)

print("📈 Pipeline monitoring environment ready!")
print(f"🕐 Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

## 📊 Step 1: Monitoring Infrastructure Setup

Let's build a comprehensive monitoring system that tracks all aspects of our pipeline performance.

In [None]:
# Comprehensive Pipeline Monitor Class
class PipelineMonitor:
    """Comprehensive pipeline monitoring and observability system"""
    
    def __init__(self, db_path="../data/monitoring.db"):
        self.db_path = db_path
        self.setup_monitoring_database()
        self.setup_logging()
        
        # Monitoring configuration
        self.metrics_config = {
            'collection_interval': 30,  # seconds
            'retention_days': 30,
            'alert_thresholds': {
                'error_rate': 0.05,  # 5%
                'execution_time': 300,  # 5 minutes
                'quality_score': 80,  # 80%
                'throughput_min': 10  # records/second
            }
        }
        
        print(f"📈 Pipeline Monitor initialized: {self.db_path}")
    
    def setup_monitoring_database(self):
        """Setup monitoring database tables"""
        ensure_directory_exists(os.path.dirname(self.db_path))
        
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            
            # Pipeline execution metrics
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS pipeline_metrics (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    pipeline_id TEXT NOT NULL,
                    timestamp TEXT NOT NULL,
                    metric_name TEXT NOT NULL,
                    metric_value REAL NOT NULL,
                    metric_type TEXT NOT NULL,
                    stage TEXT,
                    created_at TEXT DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            # Data quality metrics
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS quality_metrics (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    pipeline_id TEXT NOT NULL,
                    timestamp TEXT NOT NULL,
                    total_records INTEGER NOT NULL,
                    valid_records INTEGER NOT NULL,
                    quality_score REAL NOT NULL,
                    data_source TEXT,
                    created_at TEXT DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            # Alert history
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS alerts (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    pipeline_id TEXT NOT NULL,
                    timestamp TEXT NOT NULL,
                    alert_type TEXT NOT NULL,
                    severity TEXT NOT NULL,
                    message TEXT NOT NULL,
                    resolved BOOLEAN DEFAULT FALSE,
                    created_at TEXT DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            # System resource metrics
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS resource_metrics (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TEXT NOT NULL,
                    cpu_usage REAL,
                    memory_usage REAL,
                    disk_usage REAL,
                    network_io REAL,
                    created_at TEXT DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            conn.commit()
    
    def setup_logging(self):
        """Setup structured logging for monitoring"""
        log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        
        # Create logger
        self.logger = logging.getLogger('PipelineMonitor')
        self.logger.setLevel(logging.INFO)
        
        # Console handler
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(logging.Formatter(log_format))
        self.logger.addHandler(console_handler)
        
        # File handler
        ensure_directory_exists('../logs')
        file_handler = logging.FileHandler('../logs/monitoring.log')
        file_handler.setFormatter(logging.Formatter(log_format))
        self.logger.addHandler(file_handler)
    
    def record_pipeline_metric(self, pipeline_id: str, metric_name: str, 
                             metric_value: float, metric_type: str, 
                             stage: str = None):
        """Record a pipeline performance metric"""
        timestamp = datetime.now().isoformat()
        
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO pipeline_metrics 
                (pipeline_id, timestamp, metric_name, metric_value, metric_type, stage)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (pipeline_id, timestamp, metric_name, metric_value, metric_type, stage))
            conn.commit()
        
        self.logger.info(f"📊 Metric recorded: {metric_name}={metric_value} ({metric_type})")
    
    def record_quality_metric(self, pipeline_id: str, total_records: int, 
                            valid_records: int, quality_score: float, 
                            data_source: str = None):
        """Record data quality metrics"""
        timestamp = datetime.now().isoformat()
        
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO quality_metrics 
                (pipeline_id, timestamp, total_records, valid_records, quality_score, data_source)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (pipeline_id, timestamp, total_records, valid_records, quality_score, data_source))
            conn.commit()
        
        self.logger.info(f"📊 Quality metric recorded: {quality_score:.1f}% ({valid_records}/{total_records})")
    
    def create_alert(self, pipeline_id: str, alert_type: str, 
                    severity: str, message: str):
        """Create an alert for pipeline issues"""
        timestamp = datetime.now().isoformat()
        
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO alerts 
                (pipeline_id, timestamp, alert_type, severity, message)
                VALUES (?, ?, ?, ?, ?)
            ''', (pipeline_id, timestamp, alert_type, severity, message))
            conn.commit()
        
        severity_icon = {'LOW': '🟡', 'MEDIUM': '🟠', 'HIGH': '🔴', 'CRITICAL': '🚨'}
        icon = severity_icon.get(severity, '⚠️')
        
        self.logger.warning(f"{icon} ALERT [{severity}]: {alert_type} - {message}")
        
        # In production, this would trigger notifications (email, Slack, etc.)
        self.send_notification(alert_type, severity, message)
    
    def send_notification(self, alert_type: str, severity: str, message: str):
        """Send notification for alerts (simulated)"""
        # In production, implement actual notification logic
        notification_methods = {
            'CRITICAL': ['email', 'sms', 'slack'],
            'HIGH': ['email', 'slack'],
            'MEDIUM': ['slack'],
            'LOW': ['log_only']
        }
        
        methods = notification_methods.get(severity, ['log_only'])
        self.logger.info(f"📢 Notification sent via: {', '.join(methods)}")
    
    def check_alert_conditions(self, pipeline_id: str, metrics: Dict[str, Any]):
        """Check if any alert conditions are met"""
        thresholds = self.metrics_config['alert_thresholds']
        
        # Check error rate
        if 'error_rate' in metrics and metrics['error_rate'] > thresholds['error_rate']:
            self.create_alert(
                pipeline_id, 'HIGH_ERROR_RATE', 'HIGH',
                f"Error rate {metrics['error_rate']:.1%} exceeds threshold {thresholds['error_rate']:.1%}"
            )
        
        # Check execution time
        if 'execution_time' in metrics and metrics['execution_time'] > thresholds['execution_time']:
            self.create_alert(
                pipeline_id, 'SLOW_EXECUTION', 'MEDIUM',
                f"Execution time {metrics['execution_time']:.1f}s exceeds threshold {thresholds['execution_time']}s"
            )
        
        # Check data quality
        if 'quality_score' in metrics and metrics['quality_score'] < thresholds['quality_score']:
            self.create_alert(
                pipeline_id, 'LOW_DATA_QUALITY', 'HIGH',
                f"Data quality {metrics['quality_score']:.1f}% below threshold {thresholds['quality_score']}%"
            )
        
        # Check throughput
        if 'throughput' in metrics and metrics['throughput'] < thresholds['throughput_min']:
            self.create_alert(
                pipeline_id, 'LOW_THROUGHPUT', 'MEDIUM',
                f"Throughput {metrics['throughput']:.1f} records/s below threshold {thresholds['throughput_min']}"
            )

# Initialize monitoring system
monitor = PipelineMonitor()

print("✅ Pipeline monitoring system initialized!")
print(f"📊 Monitoring database: {monitor.db_path}")
print(f"⚙️ Alert thresholds configured: {monitor.metrics_config['alert_thresholds']}")

## 🔄 Step 2: Monitored Pipeline Execution

Let's create a pipeline wrapper that automatically collects monitoring data during execution.

In [None]:
# Monitored Pipeline Wrapper
class MonitoredPipeline:
    """Pipeline wrapper with comprehensive monitoring"""
    
    def __init__(self, monitor: PipelineMonitor):
        self.monitor = monitor
        self.pipeline_id = f"MONITORED-{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        self.execution_start = None
        self.stage_times = {}
        
    def execute_monitored_pipeline(self, sample_data_size: int = 100):
        """Execute pipeline with comprehensive monitoring"""
        self.execution_start = datetime.now()
        self.monitor.logger.info(f"🚀 Starting monitored pipeline: {self.pipeline_id}")
        
        try:
            # Generate sample data for demonstration
            sample_data = self.generate_sample_data(sample_data_size)
            
            # Stage 1: Data Ingestion (Monitored)
            ingested_data = self.monitored_ingestion_stage(sample_data)
            
            # Stage 2: Data Validation (Monitored)
            validated_data = self.monitored_validation_stage(ingested_data)
            
            # Stage 3: Data Transformation (Monitored)
            transformed_data = self.monitored_transformation_stage(validated_data)
            
            # Stage 4: Data Storage (Monitored)
            storage_result = self.monitored_storage_stage(transformed_data)
            
            # Calculate overall metrics
            total_time = (datetime.now() - self.execution_start).total_seconds()
            throughput = len(transformed_data) / total_time if total_time > 0 else 0
            
            # Record overall pipeline metrics
            self.monitor.record_pipeline_metric(
                self.pipeline_id, 'total_execution_time', total_time, 'duration'
            )
            self.monitor.record_pipeline_metric(
                self.pipeline_id, 'throughput', throughput, 'rate'
            )
            self.monitor.record_pipeline_metric(
                self.pipeline_id, 'records_processed', len(transformed_data), 'count'
            )
            
            # Check alert conditions
            pipeline_metrics = {
                'execution_time': total_time,
                'throughput': throughput,
                'error_rate': 0.0,  # No errors in this demo
                'quality_score': transformed_data['quality_score'].mean() if 'quality_score' in transformed_data.columns else 100
            }
            
            self.monitor.check_alert_conditions(self.pipeline_id, pipeline_metrics)
            
            self.monitor.logger.info(f"✅ Pipeline completed successfully in {total_time:.2f}s")
            
            return {
                'success': True,
                'pipeline_id': self.pipeline_id,
                'execution_time': total_time,
                'records_processed': len(transformed_data),
                'throughput': throughput,
                'stage_times': self.stage_times,
                'final_data': transformed_data
            }
            
        except Exception as e:
            total_time = (datetime.now() - self.execution_start).total_seconds()
            
            # Record failure metrics
            self.monitor.record_pipeline_metric(
                self.pipeline_id, 'execution_failed', 1, 'boolean'
            )
            
            # Create critical alert
            self.monitor.create_alert(
                self.pipeline_id, 'PIPELINE_FAILURE', 'CRITICAL',
                f"Pipeline execution failed: {str(e)}"
            )
            
            self.monitor.logger.error(f"❌ Pipeline failed after {total_time:.2f}s: {str(e)}")
            
            return {
                'success': False,
                'pipeline_id': self.pipeline_id,
                'execution_time': total_time,
                'error': str(e)
            }
    
    def generate_sample_data(self, size: int) -> pd.DataFrame:
        """Generate sample data for monitoring demonstration"""
        np.random.seed(42)  # For reproducible results
        
        # Generate realistic e-commerce data
        products = ['iPhone 15', 'MacBook Pro', 'AirPods Pro', 'iPad Air', 'Apple Watch', 
                   'Nintendo Switch', 'PlayStation 5', 'Xbox Series X', 'Kindle Paperwhite']
        
        customers = [f"Customer_{i:03d}" for i in range(1, min(size//2, 50) + 1)]
        sources = ['website', 'mobile_app', 'store', 'phone']
        locations = ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix', 'Philadelphia']
        
        data = {
            'order_id': [f"ORD-2024-{i:06d}" for i in range(1, size + 1)],
            'customer_name': np.random.choice(customers, size),
            'product': np.random.choice(products, size),
            'quantity': np.random.randint(1, 5, size),
            'price': np.round(np.random.uniform(50, 2000, size), 2),
            'order_date': pd.date_range(start='2024-01-01', periods=size, freq='H'),
            'source': np.random.choice(sources, size),
            'store_location': np.random.choice(locations, size),
            'customer_email': [f"customer{i}@example.com" for i in np.random.randint(1, 100, size)]
        }
        
        # Add some data quality issues for realistic monitoring
        df = pd.DataFrame(data)
        
        # Introduce some missing values (5%)
        missing_indices = np.random.choice(df.index, size=int(size * 0.05), replace=False)
        df.loc[missing_indices, 'customer_email'] = None
        
        # Introduce some invalid prices (2%)
        invalid_indices = np.random.choice(df.index, size=int(size * 0.02), replace=False)
        df.loc[invalid_indices, 'price'] = -1
        
        return df
    
    def monitored_ingestion_stage(self, data: pd.DataFrame) -> pd.DataFrame:
        """Ingestion stage with monitoring"""
        stage_start = time.time()
        self.monitor.logger.info("📥 Starting ingestion stage")
        
        # Simulate ingestion processing
        time.sleep(0.1)  # Simulate processing time
        
        # Add ingestion metadata
        data['ingested_at'] = datetime.now().isoformat()
        data['pipeline_id'] = self.pipeline_id
        
        stage_time = time.time() - stage_start
        self.stage_times['ingestion'] = stage_time
        
        # Record stage metrics
        self.monitor.record_pipeline_metric(
            self.pipeline_id, 'ingestion_time', stage_time, 'duration', 'ingestion'
        )
        self.monitor.record_pipeline_metric(
            self.pipeline_id, 'ingestion_records', len(data), 'count', 'ingestion'
        )
        
        self.monitor.logger.info(f"✅ Ingestion completed: {len(data)} records in {stage_time:.2f}s")
        return data
    
    def monitored_validation_stage(self, data: pd.DataFrame) -> pd.DataFrame:
        """Validation stage with monitoring"""
        stage_start = time.time()
        self.monitor.logger.info("🔍 Starting validation stage")
        
        # Perform validation
        total_records = len(data)
        
        # Check for invalid prices
        valid_prices = data['price'] > 0
        invalid_price_count = (~valid_prices).sum()
        
        # Check for missing emails
        missing_email_count = data['customer_email'].isnull().sum()
        
        # Calculate quality score
        quality_issues = invalid_price_count + missing_email_count
        quality_score = ((total_records - quality_issues) / total_records) * 100
        valid_records = total_records - quality_issues
        
        stage_time = time.time() - stage_start
        self.stage_times['validation'] = stage_time
        
        # Record validation metrics
        self.monitor.record_pipeline_metric(
            self.pipeline_id, 'validation_time', stage_time, 'duration', 'validation'
        )
        self.monitor.record_quality_metric(
            self.pipeline_id, total_records, valid_records, quality_score
        )
        
        # Add validation results to data
        data['is_valid'] = valid_prices & data['customer_email'].notna()
        data['validation_score'] = data['is_valid'].astype(int) * 100
        
        self.monitor.logger.info(f"✅ Validation completed: {quality_score:.1f}% quality in {stage_time:.2f}s")
        
        if quality_score < 90:
            self.monitor.logger.warning(f"⚠️ Data quality below 90%: {quality_score:.1f}%")
        
        return data
    
    def monitored_transformation_stage(self, data: pd.DataFrame) -> pd.DataFrame:
        """Transformation stage with monitoring"""
        stage_start = time.time()
        self.monitor.logger.info("🧹 Starting transformation stage")
        
        # Clean invalid data
        original_count = len(data)
        data = data[data['price'] > 0].copy()  # Remove invalid prices
        cleaned_count = len(data)
        
        # Add transformations
        data['total_amount'] = data['quantity'] * data['price']
        data['order_size'] = pd.cut(
            data['total_amount'],
            bins=[0, 100, 500, 1000, 2000, float('inf')],
            labels=['XSmall', 'Small', 'Medium', 'Large', 'XLarge']
        )
        
        # Customer segmentation
        def categorize_customer(amount):
            if amount >= 1500: return 'VIP'
            elif amount >= 800: return 'Premium'
            elif amount >= 300: return 'Standard'
            else: return 'Budget'
        
        data['customer_segment'] = data['total_amount'].apply(categorize_customer)
        
        # Add quality score
        data['quality_score'] = data['validation_score']  # Use validation score
        
        # Add transformation metadata
        data['transformed_at'] = datetime.now().isoformat()
        
        stage_time = time.time() - stage_start
        self.stage_times['transformation'] = stage_time
        
        # Record transformation metrics
        self.monitor.record_pipeline_metric(
            self.pipeline_id, 'transformation_time', stage_time, 'duration', 'transformation'
        )
        self.monitor.record_pipeline_metric(
            self.pipeline_id, 'records_cleaned', original_count - cleaned_count, 'count', 'transformation'
        )
        self.monitor.record_pipeline_metric(
            self.pipeline_id, 'transformation_records', cleaned_count, 'count', 'transformation'
        )
        
        self.monitor.logger.info(f"✅ Transformation completed: {cleaned_count} records in {stage_time:.2f}s")
        
        if original_count != cleaned_count:
            removed = original_count - cleaned_count
            self.monitor.logger.warning(f"⚠️ Removed {removed} invalid records during transformation")
        
        return data
    
    def monitored_storage_stage(self, data: pd.DataFrame) -> Dict[str, Any]:
        """Storage stage with monitoring"""
        stage_start = time.time()
        self.monitor.logger.info("💾 Starting storage stage")
        
        # Simulate storage operations
        time.sleep(0.05)  # Simulate storage time
        
        # In production, this would save to actual storage
        storage_operations = [
            f"Saved {len(data)} records to database",
            f"Exported data to CSV file",
            f"Created backup copy"
        ]
        
        stage_time = time.time() - stage_start
        self.stage_times['storage'] = stage_time
        
        # Record storage metrics
        self.monitor.record_pipeline_metric(
            self.pipeline_id, 'storage_time', stage_time, 'duration', 'storage'
        )
        self.monitor.record_pipeline_metric(
            self.pipeline_id, 'records_stored', len(data), 'count', 'storage'
        )
        
        self.monitor.logger.info(f"✅ Storage completed: {len(data)} records in {stage_time:.2f}s")
        
        return {
            'success': True,
            'records_stored': len(data),
            'operations': storage_operations,
            'storage_time': stage_time
        }

# Initialize monitored pipeline
monitored_pipeline = MonitoredPipeline(monitor)
print(f"🔄 Monitored pipeline initialized: {monitored_pipeline.pipeline_id}")

## 🚀 Step 3: Execute Monitored Pipeline

Let's run our pipeline with comprehensive monitoring and see the metrics in action.

In [None]:
# Execute monitored pipeline with different scenarios
print("🚀 EXECUTING MONITORED PIPELINE SCENARIOS")
print("=" * 60)

# Scenario 1: Normal execution
print("\n📊 Scenario 1: Normal Pipeline Execution")
print("-" * 50)

result1 = monitored_pipeline.execute_monitored_pipeline(sample_data_size=150)

print(f"\n📈 Execution Results:")
print(f"  Success: {'✅ Yes' if result1['success'] else '❌ No'}")
print(f"  Pipeline ID: {result1['pipeline_id']}")
print(f"  Execution Time: {result1['execution_time']:.2f}s")
print(f"  Records Processed: {result1['records_processed']:,}")
print(f"  Throughput: {result1['throughput']:.1f} records/second")

print(f"\n⏱️ Stage Breakdown:")
for stage, time_taken in result1['stage_times'].items():
    percentage = (time_taken / result1['execution_time']) * 100
    print(f"  {stage.title()}: {time_taken:.3f}s ({percentage:.1f}%)")

# Scenario 2: Large dataset (stress test)
print("\n\n📊 Scenario 2: Large Dataset Processing")
print("-" * 50)

large_pipeline = MonitoredPipeline(monitor)
result2 = large_pipeline.execute_monitored_pipeline(sample_data_size=1000)

print(f"\n📈 Large Dataset Results:")
print(f"  Success: {'✅ Yes' if result2['success'] else '❌ No'}")
print(f"  Pipeline ID: {result2['pipeline_id']}")
print(f"  Execution Time: {result2['execution_time']:.2f}s")
print(f"  Records Processed: {result2['records_processed']:,}")
print(f"  Throughput: {result2['throughput']:.1f} records/second")

# Scenario 3: Small dataset (fast execution)
print("\n\n📊 Scenario 3: Small Dataset Processing")
print("-" * 50)

small_pipeline = MonitoredPipeline(monitor)
result3 = small_pipeline.execute_monitored_pipeline(sample_data_size=50)

print(f"\n📈 Small Dataset Results:")
print(f"  Success: {'✅ Yes' if result3['success'] else '❌ No'}")
print(f"  Pipeline ID: {result3['pipeline_id']}")
print(f"  Execution Time: {result3['execution_time']:.2f}s")
print(f"  Records Processed: {result3['records_processed']:,}")
print(f"  Throughput: {result3['throughput']:.1f} records/second")

# Store results for analysis
execution_results = [result1, result2, result3]

print(f"\n✅ Completed {len(execution_results)} monitored pipeline executions")
print(f"📊 All metrics have been recorded in the monitoring database")

## 📊 Step 4: Real-Time Monitoring Dashboard

Let's create comprehensive dashboards to visualize our pipeline performance and health metrics.

In [None]:
# Monitoring Dashboard Generator
class MonitoringDashboard:
    """Generate comprehensive monitoring dashboards"""
    
    def __init__(self, monitor: PipelineMonitor):
        self.monitor = monitor
    
    def get_pipeline_metrics(self, hours_back: int = 24) -> pd.DataFrame:
        """Get pipeline metrics from the last N hours"""
        cutoff_time = (datetime.now() - timedelta(hours=hours_back)).isoformat()
        
        with sqlite3.connect(self.monitor.db_path) as conn:
            query = '''
                SELECT * FROM pipeline_metrics 
                WHERE timestamp >= ? 
                ORDER BY timestamp DESC
            '''
            return pd.read_sql_query(query, conn, params=[cutoff_time])
    
    def get_quality_metrics(self, hours_back: int = 24) -> pd.DataFrame:
        """Get quality metrics from the last N hours"""
        cutoff_time = (datetime.now() - timedelta(hours=hours_back)).isoformat()
        
        with sqlite3.connect(self.monitor.db_path) as conn:
            query = '''
                SELECT * FROM quality_metrics 
                WHERE timestamp >= ? 
                ORDER BY timestamp DESC
            '''
            return pd.read_sql_query(query, conn, params=[cutoff_time])
    
    def get_alerts(self, hours_back: int = 24) -> pd.DataFrame:
        """Get alerts from the last N hours"""
        cutoff_time = (datetime.now() - timedelta(hours=hours_back)).isoformat()
        
        with sqlite3.connect(self.monitor.db_path) as conn:
            query = '''
                SELECT * FROM alerts 
                WHERE timestamp >= ? 
                ORDER BY timestamp DESC
            '''
            return pd.read_sql_query(query, conn, params=[cutoff_time])
    
    def create_performance_dashboard(self):
        """Create comprehensive performance dashboard"""
        # Get data
        pipeline_metrics = self.get_pipeline_metrics()
        quality_metrics = self.get_quality_metrics()
        alerts = self.get_alerts()
        
        if pipeline_metrics.empty:
            print("⚠️ No pipeline metrics available for dashboard")
            return
        
        # Create dashboard
        fig = plt.figure(figsize=(20, 16))
        
        # 1. Pipeline Execution Times
        ax1 = plt.subplot(3, 3, 1)
        execution_times = pipeline_metrics[pipeline_metrics['metric_name'] == 'total_execution_time']
        if not execution_times.empty:
            execution_times['timestamp'] = pd.to_datetime(execution_times['timestamp'])
            ax1.plot(execution_times['timestamp'], execution_times['metric_value'], 
                    marker='o', linewidth=2, markersize=6)
            ax1.set_title('Pipeline Execution Times', fontsize=14, fontweight='bold')
            ax1.set_ylabel('Time (seconds)')
            ax1.tick_params(axis='x', rotation=45)
            ax1.grid(True, alpha=0.3)
        
        # 2. Throughput Over Time
        ax2 = plt.subplot(3, 3, 2)
        throughput_data = pipeline_metrics[pipeline_metrics['metric_name'] == 'throughput']
        if not throughput_data.empty:
            throughput_data['timestamp'] = pd.to_datetime(throughput_data['timestamp'])
            ax2.plot(throughput_data['timestamp'], throughput_data['metric_value'], 
                    marker='s', linewidth=2, markersize=6, color='green')
            ax2.set_title('Pipeline Throughput', fontsize=14, fontweight='bold')
            ax2.set_ylabel('Records/Second')
            ax2.tick_params(axis='x', rotation=45)
            ax2.grid(True, alpha=0.3)
        
        # 3. Records Processed
        ax3 = plt.subplot(3, 3, 3)
        records_data = pipeline_metrics[pipeline_metrics['metric_name'] == 'records_processed']
        if not records_data.empty:
            records_data['timestamp'] = pd.to_datetime(records_data['timestamp'])
            bars = ax3.bar(range(len(records_data)), records_data['metric_value'], 
                          color='skyblue', alpha=0.7)
            ax3.set_title('Records Processed per Run', fontsize=14, fontweight='bold')
            ax3.set_ylabel('Number of Records')
            ax3.set_xlabel('Pipeline Run')
            
            # Add value labels on bars
            for i, bar in enumerate(bars):
                height = bar.get_height()
                ax3.text(bar.get_x() + bar.get_width()/2., height + height*0.01,
                        f'{int(height)}', ha='center', va='bottom', fontweight='bold')
        
        # 4. Data Quality Trends
        ax4 = plt.subplot(3, 3, 4)
        if not quality_metrics.empty:
            quality_metrics['timestamp'] = pd.to_datetime(quality_metrics['timestamp'])
            ax4.plot(quality_metrics['timestamp'], quality_metrics['quality_score'], 
                    marker='D', linewidth=2, markersize=6, color='orange')
            ax4.axhline(y=80, color='red', linestyle='--', alpha=0.7, label='Threshold (80%)')
            ax4.set_title('Data Quality Score Trends', fontsize=14, fontweight='bold')
            ax4.set_ylabel('Quality Score (%)')
            ax4.set_ylim(0, 100)
            ax4.tick_params(axis='x', rotation=45)
            ax4.grid(True, alpha=0.3)
            ax4.legend()
        
        # 5. Stage Performance Breakdown
        ax5 = plt.subplot(3, 3, 5)
        stage_metrics = pipeline_metrics[pipeline_metrics['stage'].notna()]
        if not stage_metrics.empty:
            stage_avg = stage_metrics.groupby('stage')['metric_value'].mean()
            colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4']
            bars = ax5.bar(stage_avg.index, stage_avg.values, color=colors[:len(stage_avg)])
            ax5.set_title('Average Stage Performance', fontsize=14, fontweight='bold')
            ax5.set_ylabel('Average Time (seconds)')
            ax5.tick_params(axis='x', rotation=45)
            
            # Add value labels
            for bar in bars:
                height = bar.get_height()
                ax5.text(bar.get_x() + bar.get_width()/2., height + height*0.01,
                        f'{height:.3f}s', ha='center', va='bottom', fontweight='bold')
        
        # 6. Alert Summary
        ax6 = plt.subplot(3, 3, 6)
        if not alerts.empty:
            alert_counts = alerts['severity'].value_counts()
            colors_alert = {'LOW': 'yellow', 'MEDIUM': 'orange', 'HIGH': 'red', 'CRITICAL': 'darkred'}
            pie_colors = [colors_alert.get(severity, 'gray') for severity in alert_counts.index]
            
            wedges, texts, autotexts = ax6.pie(alert_counts.values, labels=alert_counts.index, 
                                              autopct='%1.0f%%', colors=pie_colors, startangle=90)
            ax6.set_title('Alert Distribution by Severity', fontsize=14, fontweight='bold')
            
            for autotext in autotexts:
                autotext.set_color('white')
                autotext.set_fontweight('bold')
        else:
            ax6.text(0.5, 0.5, '✅ No Alerts', ha='center', va='center', 
                    transform=ax6.transAxes, fontsize=16, fontweight='bold', color='green')
            ax6.set_title('Alert Status', fontsize=14, fontweight='bold')
        
        # 7. Pipeline Success Rate
        ax7 = plt.subplot(3, 3, 7)
        total_runs = len(pipeline_metrics['pipeline_id'].unique())
        failed_runs = len(pipeline_metrics[pipeline_metrics['metric_name'] == 'execution_failed']['pipeline_id'].unique())
        success_runs = total_runs - failed_runs
        success_rate = (success_runs / total_runs) * 100 if total_runs > 0 else 100
        
        # Create gauge-like visualization
        theta = np.linspace(0, 2*np.pi, 100)
        r = np.ones_like(theta)
        
        # Background circle
        ax7.fill_between(theta, 0, r, alpha=0.3, color='lightgray')
        
        # Success rate arc
        success_theta = theta[:int(success_rate)]
        success_r = r[:int(success_rate)]
        color = 'green' if success_rate >= 95 else 'orange' if success_rate >= 80 else 'red'
        ax7.fill_between(success_theta, 0, success_r, alpha=0.8, color=color)
        
        ax7.text(0, 0, f'{success_rate:.1f}%', ha='center', va='center', 
                fontsize=20, fontweight='bold')
        ax7.set_title('Pipeline Success Rate', fontsize=14, fontweight='bold')
        ax7.set_xlim(-1.2, 1.2)
        ax7.set_ylim(-1.2, 1.2)
        ax7.axis('off')
        
        # 8. Resource Utilization (Simulated)
        ax8 = plt.subplot(3, 3, 8)
        # Simulate resource metrics
        resources = ['CPU', 'Memory', 'Disk I/O', 'Network']
        utilization = [65, 45, 30, 25]  # Simulated percentages
        colors_resource = ['red' if u > 80 else 'orange' if u > 60 else 'green' for u in utilization]
        
        bars = ax8.barh(resources, utilization, color=colors_resource, alpha=0.7)
        ax8.set_title('Resource Utilization', fontsize=14, fontweight='bold')
        ax8.set_xlabel('Utilization (%)')
        ax8.set_xlim(0, 100)
        
        # Add percentage labels
        for i, (bar, util) in enumerate(zip(bars, utilization)):
            ax8.text(util + 2, bar.get_y() + bar.get_height()/2, 
                    f'{util}%', va='center', fontweight='bold')
        
        # 9. Recent Pipeline Activity
        ax9 = plt.subplot(3, 3, 9)
        recent_pipelines = pipeline_metrics['pipeline_id'].unique()[-10:]  # Last 10 pipelines
        pipeline_status = []
        
        for pid in recent_pipelines:
            pipeline_data = pipeline_metrics[pipeline_metrics['pipeline_id'] == pid]
            has_failure = 'execution_failed' in pipeline_data['metric_name'].values
            pipeline_status.append('Failed' if has_failure else 'Success')
        
        # Create timeline visualization
        y_pos = range(len(recent_pipelines))
        colors_status = ['red' if status == 'Failed' else 'green' for status in pipeline_status]
        
        ax9.barh(y_pos, [1]*len(recent_pipelines), color=colors_status, alpha=0.7)
        ax9.set_yticks(y_pos)
        ax9.set_yticklabels([f'Run {i+1}' for i in range(len(recent_pipelines))])
        ax9.set_title('Recent Pipeline Runs', fontsize=14, fontweight='bold')
        ax9.set_xlabel('Status')
        ax9.set_xlim(0, 1.2)
        
        # Add status labels
        for i, status in enumerate(pipeline_status):
            ax9.text(0.5, i, status, ha='center', va='center', 
                    fontweight='bold', color='white')
        
        plt.tight_layout()
        plt.suptitle('📊 Pipeline Monitoring Dashboard', fontsize=20, fontweight='bold', y=0.98)
        plt.show()
        
        # Print summary statistics
        self.print_dashboard_summary(pipeline_metrics, quality_metrics, alerts)
    
    def print_dashboard_summary(self, pipeline_metrics: pd.DataFrame, 
                               quality_metrics: pd.DataFrame, alerts: pd.DataFrame):
        """Print dashboard summary statistics"""
        print("\n📊 DASHBOARD SUMMARY STATISTICS")
        print("=" * 50)
        
        # Pipeline statistics
        total_runs = len(pipeline_metrics['pipeline_id'].unique())
        avg_execution_time = pipeline_metrics[pipeline_metrics['metric_name'] == 'total_execution_time']['metric_value'].mean()
        total_records = pipeline_metrics[pipeline_metrics['metric_name'] == 'records_processed']['metric_value'].sum()
        avg_throughput = pipeline_metrics[pipeline_metrics['metric_name'] == 'throughput']['metric_value'].mean()
        
        print(f"🔄 Pipeline Performance:")
        print(f"  Total Runs: {total_runs}")
        print(f"  Average Execution Time: {avg_execution_time:.2f}s")
        print(f"  Total Records Processed: {total_records:,.0f}")
        print(f"  Average Throughput: {avg_throughput:.1f} records/second")
        
        # Quality statistics
        if not quality_metrics.empty:
            avg_quality = quality_metrics['quality_score'].mean()
            min_quality = quality_metrics['quality_score'].min()
            max_quality = quality_metrics['quality_score'].max()
            
            print(f"\n📊 Data Quality:")
            print(f"  Average Quality Score: {avg_quality:.1f}%")
            print(f"  Quality Range: {min_quality:.1f}% - {max_quality:.1f}%")
        
        # Alert statistics
        if not alerts.empty:
            alert_counts = alerts['severity'].value_counts()
            print(f"\n🚨 Alerts:")
            for severity, count in alert_counts.items():
                print(f"  {severity}: {count}")
        else:
            print(f"\n✅ No alerts in the monitoring period")
        
        # Performance insights
        print(f"\n💡 Performance Insights:")
        if avg_throughput > 50:
            print(f"  ✅ Excellent throughput performance")
        elif avg_throughput > 20:
            print(f"  ⚡ Good throughput performance")
        else:
            print(f"  ⚠️ Consider optimizing pipeline performance")
        
        if not quality_metrics.empty and avg_quality >= 95:
            print(f"  ✅ Excellent data quality maintained")
        elif not quality_metrics.empty and avg_quality >= 80:
            print(f"  ⚡ Good data quality")
        elif not quality_metrics.empty:
            print(f"  ⚠️ Data quality needs attention")

# Create and display monitoring dashboard
dashboard = MonitoringDashboard(monitor)
dashboard.create_performance_dashboard()

## 🚨 Step 5: Advanced Alerting System

Let's implement a sophisticated alerting system that can detect various types of issues and send appropriate notifications.

In [None]:
# Advanced Alerting System
class AdvancedAlertingSystem:
    """Sophisticated alerting system with multiple notification channels"""
    
    def __init__(self, monitor: PipelineMonitor):
        self.monitor = monitor
        self.alert_rules = self.setup_alert_rules()
        self.notification_channels = self.setup_notification_channels()
        self.alert_history = []
        
    def setup_alert_rules(self) -> Dict[str, Dict[str, Any]]:
        """Setup comprehensive alert rules"""
        return {
            'pipeline_failure': {
                'condition': lambda metrics: metrics.get('success', True) == False,
                'severity': 'CRITICAL',
                'message_template': 'Pipeline {pipeline_id} failed: {error}',
                'cooldown_minutes': 5,
                'escalation_minutes': 15
            },
            'high_execution_time': {
                'condition': lambda metrics: metrics.get('execution_time', 0) > 300,
                'severity': 'HIGH',
                'message_template': 'Pipeline {pipeline_id} execution time {execution_time:.1f}s exceeds threshold',
                'cooldown_minutes': 10,
                'escalation_minutes': 30
            },
            'low_throughput': {
                'condition': lambda metrics: metrics.get('throughput', float('inf')) < 10,
                'severity': 'MEDIUM',
                'message_template': 'Pipeline {pipeline_id} throughput {throughput:.1f} records/s is below threshold',
                'cooldown_minutes': 15,
                'escalation_minutes': 60
            },
            'data_quality_degradation': {
                'condition': lambda metrics: metrics.get('quality_score', 100) < 80,
                'severity': 'HIGH',
                'message_template': 'Data quality {quality_score:.1f}% below acceptable threshold',
                'cooldown_minutes': 5,
                'escalation_minutes': 20
            },
            'no_data_processed': {
                'condition': lambda metrics: metrics.get('records_processed', 1) == 0,
                'severity': 'HIGH',
                'message_template': 'Pipeline {pipeline_id} processed zero records',
                'cooldown_minutes': 5,
                'escalation_minutes': 15
            },
            'resource_exhaustion': {
                'condition': lambda metrics: (
                    metrics.get('cpu_usage', 0) > 90 or 
                    metrics.get('memory_usage', 0) > 90 or
                    metrics.get('disk_usage', 0) > 95
                ),
                'severity': 'CRITICAL',
                'message_template': 'Resource exhaustion detected: CPU {cpu_usage}%, Memory {memory_usage}%, Disk {disk_usage}%',
                'cooldown_minutes': 2,
                'escalation_minutes': 10
            }
        }
    
    def setup_notification_channels(self) -> Dict[str, Dict[str, Any]]:
        """Setup notification channels configuration"""
        return {
            'email': {
                'enabled': True,
                'recipients': ['admin@company.com', 'data-team@company.com'],
                'severity_threshold': 'MEDIUM'
            },
            'slack': {
                'enabled': True,
                'webhook_url': 'https://hooks.slack.com/services/...',
                'channel': '#data-alerts',
                'severity_threshold': 'LOW'
            },
            'sms': {
                'enabled': True,
                'phone_numbers': ['+1234567890'],
                'severity_threshold': 'CRITICAL'
            },
            'pagerduty': {
                'enabled': False,
                'integration_key': 'your-pagerduty-key',
                'severity_threshold': 'CRITICAL'
            }
        }
    
    def evaluate_alerts(self, pipeline_id: str, metrics: Dict[str, Any]):
        """Evaluate all alert rules against current metrics"""
        triggered_alerts = []
        
        for rule_name, rule_config in self.alert_rules.items():
            try:
                if rule_config['condition'](metrics):
                    # Check cooldown period
                    if self._is_in_cooldown(rule_name, pipeline_id):
                        continue
                    
                    # Create alert
                    alert = self._create_alert(
                        pipeline_id, rule_name, rule_config, metrics
                    )
                    triggered_alerts.append(alert)
                    
                    # Send notifications
                    self._send_notifications(alert)
                    
                    # Record in history
                    self.alert_history.append(alert)
                    
            except Exception as e:
                self.monitor.logger.error(f"Error evaluating alert rule {rule_name}: {e}")
        
        return triggered_alerts
    
    def _is_in_cooldown(self, rule_name: str, pipeline_id: str) -> bool:
        """Check if alert is in cooldown period"""
        cooldown_minutes = self.alert_rules[rule_name]['cooldown_minutes']
        cutoff_time = datetime.now() - timedelta(minutes=cooldown_minutes)
        
        # Check recent alerts for this rule and pipeline
        recent_alerts = [
            alert for alert in self.alert_history
            if (alert['rule_name'] == rule_name and 
                alert['pipeline_id'] == pipeline_id and
                datetime.fromisoformat(alert['timestamp']) > cutoff_time)
        ]
        
        return len(recent_alerts) > 0
    
    def _create_alert(self, pipeline_id: str, rule_name: str, 
                     rule_config: Dict[str, Any], metrics: Dict[str, Any]) -> Dict[str, Any]:
        """Create alert object"""
        timestamp = datetime.now().isoformat()
        
        # Format message with metrics
        message_data = {'pipeline_id': pipeline_id, **metrics}
        message = rule_config['message_template'].format(**message_data)
        
        alert = {
            'id': f"ALERT-{timestamp}-{rule_name}",
            'timestamp': timestamp,
            'pipeline_id': pipeline_id,
            'rule_name': rule_name,
            'severity': rule_config['severity'],
            'message': message,
            'metrics': metrics,
            'resolved': False
        }
        
        # Save to monitoring database
        self.monitor.create_alert(
            pipeline_id, rule_name, rule_config['severity'], message
        )
        
        return alert
    
    def _send_notifications(self, alert: Dict[str, Any]):
        """Send notifications through configured channels"""
        severity_levels = {'LOW': 1, 'MEDIUM': 2, 'HIGH': 3, 'CRITICAL': 4}
        alert_severity_level = severity_levels.get(alert['severity'], 0)
        
        for channel_name, channel_config in self.notification_channels.items():
            if not channel_config['enabled']:
                continue
            
            threshold_level = severity_levels.get(channel_config['severity_threshold'], 0)
            
            if alert_severity_level >= threshold_level:
                self._send_notification(channel_name, channel_config, alert)
    
    def _send_notification(self, channel_name: str, channel_config: Dict[str, Any], 
                          alert: Dict[str, Any]):
        """Send notification to specific channel (simulated)"""
        severity_icons = {
            'LOW': '🟡',
            'MEDIUM': '🟠', 
            'HIGH': '🔴',
            'CRITICAL': '🚨'
        }
        
        icon = severity_icons.get(alert['severity'], '⚠️')
        
        if channel_name == 'email':
            self._send_email_notification(channel_config, alert, icon)
        elif channel_name == 'slack':
            self._send_slack_notification(channel_config, alert, icon)
        elif channel_name == 'sms':
            self._send_sms_notification(channel_config, alert, icon)
        elif channel_name == 'pagerduty':
            self._send_pagerduty_notification(channel_config, alert, icon)
    
    def _send_email_notification(self, config: Dict[str, Any], 
                                alert: Dict[str, Any], icon: str):
        """Send email notification (simulated)"""
        subject = f"{icon} [{alert['severity']}] Pipeline Alert: {alert['rule_name']}"
        body = f"""
        Alert Details:
        - Timestamp: {alert['timestamp']}
        - Pipeline ID: {alert['pipeline_id']}
        - Severity: {alert['severity']}
        - Message: {alert['message']}
        
        Metrics:
        {json.dumps(alert['metrics'], indent=2)}
        
        Please investigate and take appropriate action.
        """
        
        self.monitor.logger.info(f"📧 EMAIL sent to {config['recipients']}: {subject}")
        # In production: send actual email using SMTP
    
    def _send_slack_notification(self, config: Dict[str, Any], 
                                alert: Dict[str, Any], icon: str):
        """Send Slack notification (simulated)"""
        message = f"{icon} *[{alert['severity']}]* Pipeline Alert\n" \
                 f"*Rule:* {alert['rule_name']}\n" \
                 f"*Pipeline:* {alert['pipeline_id']}\n" \
                 f"*Message:* {alert['message']}"
        
        self.monitor.logger.info(f"💬 SLACK sent to {config['channel']}: {alert['rule_name']}")
        # In production: send to Slack webhook
    
    def _send_sms_notification(self, config: Dict[str, Any], 
                              alert: Dict[str, Any], icon: str):
        """Send SMS notification (simulated)"""
        message = f"{icon} [{alert['severity']}] {alert['rule_name']}: {alert['message'][:100]}..."
        
        self.monitor.logger.info(f"📱 SMS sent to {config['phone_numbers']}: {alert['rule_name']}")
        # In production: send SMS using Twilio or similar service
    
    def _send_pagerduty_notification(self, config: Dict[str, Any], 
                                    alert: Dict[str, Any], icon: str):
        """Send PagerDuty notification (simulated)"""
        self.monitor.logger.info(f"📟 PAGERDUTY incident created: {alert['rule_name']}")
        # In production: create PagerDuty incident
    
    def get_alert_summary(self, hours_back: int = 24) -> Dict[str, Any]:
        """Get alert summary for the specified time period"""
        cutoff_time = datetime.now() - timedelta(hours=hours_back)
        
        recent_alerts = [
            alert for alert in self.alert_history
            if datetime.fromisoformat(alert['timestamp']) > cutoff_time
        ]
        
        # Group by severity
        severity_counts = {}
        rule_counts = {}
        
        for alert in recent_alerts:
            severity = alert['severity']
            rule_name = alert['rule_name']
            
            severity_counts[severity] = severity_counts.get(severity, 0) + 1
            rule_counts[rule_name] = rule_counts.get(rule_name, 0) + 1
        
        return {
            'total_alerts': len(recent_alerts),
            'severity_breakdown': severity_counts,
            'rule_breakdown': rule_counts,
            'recent_alerts': recent_alerts[-10:],  # Last 10 alerts
            'time_period_hours': hours_back
        }

# Initialize advanced alerting system
alerting_system = AdvancedAlertingSystem(monitor)

print("🚨 Advanced Alerting System initialized!")
print(f"📋 Alert Rules: {len(alerting_system.alert_rules)}")
print(f"📢 Notification Channels: {len(alerting_system.notification_channels)}")

# Test alerting system with various scenarios
print("\n🧪 Testing Alert Scenarios:")
print("-" * 40)

# Scenario 1: Normal metrics (no alerts)
normal_metrics = {
    'success': True,
    'execution_time': 45.2,
    'throughput': 25.5,
    'quality_score': 95.2,
    'records_processed': 150
}

alerts1 = alerting_system.evaluate_alerts('TEST-001', normal_metrics)
print(f"Normal metrics: {len(alerts1)} alerts triggered")

# Scenario 2: Slow execution
slow_metrics = {
    'success': True,
    'execution_time': 350.0,  # Exceeds threshold
    'throughput': 15.2,
    'quality_score': 88.5,
    'records_processed': 200
}

alerts2 = alerting_system.evaluate_alerts('TEST-002', slow_metrics)
print(f"Slow execution: {len(alerts2)} alerts triggered")

# Scenario 3: Poor data quality
quality_metrics = {
    'success': True,
    'execution_time': 65.3,
    'throughput': 22.1,
    'quality_score': 75.0,  # Below threshold
    'records_processed': 180
}

alerts3 = alerting_system.evaluate_alerts('TEST-003', quality_metrics)
print(f"Poor quality: {len(alerts3)} alerts triggered")

# Scenario 4: Pipeline failure
failure_metrics = {
    'success': False,  # Pipeline failed
    'execution_time': 120.0,
    'error': 'Database connection timeout',
    'records_processed': 0
}

alerts4 = alerting_system.evaluate_alerts('TEST-004', failure_metrics)
print(f"Pipeline failure: {len(alerts4)} alerts triggered")

# Get alert summary
alert_summary = alerting_system.get_alert_summary()
print(f"\n📊 Alert Summary (24h):")
print(f"  Total Alerts: {alert_summary['total_alerts']}")
print(f"  By Severity: {alert_summary['severity_breakdown']}")
print(f"  By Rule: {alert_summary['rule_breakdown']}")

## 🔧 Step 6: Performance Analysis and Optimization

Let's analyze pipeline performance and identify optimization opportunities.

In [None]:
# Performance Analysis and Optimization
class PerformanceAnalyzer:
    """Analyze pipeline performance and suggest optimizations"""
    
    def __init__(self, monitor: PipelineMonitor):
        self.monitor = monitor
    
    def analyze_performance_trends(self, days_back: int = 7) -> Dict[str, Any]:
        """Analyze performance trends over time"""
        cutoff_time = (datetime.now() - timedelta(days=days_back)).isoformat()
        
        with sqlite3.connect(self.monitor.db_path) as conn:
            # Get pipeline metrics
            pipeline_query = '''
                SELECT * FROM pipeline_metrics 
                WHERE timestamp >= ? 
                ORDER BY timestamp
            '''
            pipeline_metrics = pd.read_sql_query(pipeline_query, conn, params=[cutoff_time])
            
            # Get quality metrics
            quality_query = '''
                SELECT * FROM quality_metrics 
                WHERE timestamp >= ? 
                ORDER BY timestamp
            '''
            quality_metrics = pd.read_sql_query(quality_query, conn, params=[cutoff_time])
        
        if pipeline_metrics.empty:
            return {'error': 'No performance data available'}
        
        # Convert timestamps
        pipeline_metrics['timestamp'] = pd.to_datetime(pipeline_metrics['timestamp'])
        if not quality_metrics.empty:
            quality_metrics['timestamp'] = pd.to_datetime(quality_metrics['timestamp'])
        
        # Analyze execution times
        execution_times = pipeline_metrics[pipeline_metrics['metric_name'] == 'total_execution_time']
        throughput_data = pipeline_metrics[pipeline_metrics['metric_name'] == 'throughput']
        records_data = pipeline_metrics[pipeline_metrics['metric_name'] == 'records_processed']
        
        analysis = {
            'time_period_days': days_back,
            'total_pipeline_runs': len(pipeline_metrics['pipeline_id'].unique()),
            'metrics_collected': len(pipeline_metrics)
        }
        
        # Execution time analysis
        if not execution_times.empty:
            analysis['execution_time'] = {
                'mean': execution_times['metric_value'].mean(),
                'median': execution_times['metric_value'].median(),
                'std': execution_times['metric_value'].std(),
                'min': execution_times['metric_value'].min(),
                'max': execution_times['metric_value'].max(),
                'trend': self._calculate_trend(execution_times)
            }
        
        # Throughput analysis
        if not throughput_data.empty:
            analysis['throughput'] = {
                'mean': throughput_data['metric_value'].mean(),
                'median': throughput_data['metric_value'].median(),
                'std': throughput_data['metric_value'].std(),
                'min': throughput_data['metric_value'].min(),
                'max': throughput_data['metric_value'].max(),
                'trend': self._calculate_trend(throughput_data)
            }
        
        # Records processed analysis
        if not records_data.empty:
            analysis['records_processed'] = {
                'total': records_data['metric_value'].sum(),
                'mean_per_run': records_data['metric_value'].mean(),
                'median_per_run': records_data['metric_value'].median(),
                'trend': self._calculate_trend(records_data)
            }
        
        # Quality analysis
        if not quality_metrics.empty:
            analysis['data_quality'] = {
                'mean_quality_score': quality_metrics['quality_score'].mean(),
                'median_quality_score': quality_metrics['quality_score'].median(),
                'min_quality_score': quality_metrics['quality_score'].min(),
                'max_quality_score': quality_metrics['quality_score'].max(),
                'quality_trend': self._calculate_trend(quality_metrics, 'quality_score')
            }
        
        # Stage performance analysis
        stage_analysis = self._analyze_stage_performance(pipeline_metrics)
        analysis['stage_performance'] = stage_analysis
        
        return analysis
    
    def _calculate_trend(self, data: pd.DataFrame, value_col: str = 'metric_value') -> str:
        """Calculate trend direction (improving, degrading, stable)"""
        if len(data) < 2:
            return 'insufficient_data'
        
        # Simple linear trend calculation
        x = np.arange(len(data))
        y = data[value_col].values
        
        # Calculate correlation coefficient
        correlation = np.corrcoef(x, y)[0, 1]
        
        if abs(correlation) < 0.1:
            return 'stable'
        elif correlation > 0:
            return 'increasing'
        else:
            return 'decreasing'
    
    def _analyze_stage_performance(self, pipeline_metrics: pd.DataFrame) -> Dict[str, Any]:
        """Analyze performance by pipeline stage"""
        stage_metrics = pipeline_metrics[pipeline_metrics['stage'].notna()]
        
        if stage_metrics.empty:
            return {'error': 'No stage-specific metrics available'}
        
        stage_analysis = {}
        
        for stage in stage_metrics['stage'].unique():
            stage_data = stage_metrics[stage_metrics['stage'] == stage]
            
            stage_analysis[stage] = {
                'mean_time': stage_data['metric_value'].mean(),
                'median_time': stage_data['metric_value'].median(),
                'std_time': stage_data['metric_value'].std(),
                'min_time': stage_data['metric_value'].min(),
                'max_time': stage_data['metric_value'].max(),
                'execution_count': len(stage_data),
                'trend': self._calculate_trend(stage_data)
            }
        
        return stage_analysis
    
    def generate_optimization_recommendations(self, analysis: Dict[str, Any]) -> List[Dict[str, str]]:
        """Generate optimization recommendations based on performance analysis"""
        recommendations = []
        
        # Execution time recommendations
        if 'execution_time' in analysis:
            exec_time = analysis['execution_time']
            
            if exec_time['mean'] > 300:  # 5 minutes
                recommendations.append({
                    'category': 'Performance',
                    'priority': 'High',
                    'issue': 'Long execution times',
                    'recommendation': f"Average execution time is {exec_time['mean']:.1f}s. Consider parallel processing, data partitioning, or infrastructure scaling.",
                    'impact': 'Reduce pipeline execution time by 30-50%'
                })
            
            if exec_time['std'] > exec_time['mean'] * 0.5:  # High variability
                recommendations.append({
                    'category': 'Consistency',
                    'priority': 'Medium',
                    'issue': 'Inconsistent execution times',
                    'recommendation': f"High variability in execution times (std: {exec_time['std']:.1f}s). Investigate resource contention or data size variations.",
                    'impact': 'Improve pipeline predictability'
                })
            
            if exec_time['trend'] == 'increasing':
                recommendations.append({
                    'category': 'Performance',
                    'priority': 'High',
                    'issue': 'Degrading performance trend',
                    'recommendation': 'Execution times are increasing over time. Review recent changes and consider performance optimization.',
                    'impact': 'Prevent further performance degradation'
                })
        
        # Throughput recommendations
        if 'throughput' in analysis:
            throughput = analysis['throughput']
            
            if throughput['mean'] < 20:  # Low throughput
                recommendations.append({
                    'category': 'Performance',
                    'priority': 'Medium',
                    'issue': 'Low throughput',
                    'recommendation': f"Average throughput is {throughput['mean']:.1f} records/s. Consider batch size optimization or parallel processing.",
                    'impact': 'Increase data processing rate'
                })
            
            if throughput['trend'] == 'decreasing':
                recommendations.append({
                    'category': 'Performance',
                    'priority': 'High',
                    'issue': 'Declining throughput',
                    'recommendation': 'Throughput is decreasing over time. Investigate data growth, resource constraints, or code changes.',
                    'impact': 'Maintain processing efficiency'
                })
        
        # Data quality recommendations
        if 'data_quality' in analysis:
            quality = analysis['data_quality']
            
            if quality['mean_quality_score'] < 90:
                recommendations.append({
                    'category': 'Data Quality',
                    'priority': 'High',
                    'issue': 'Low data quality',
                    'recommendation': f"Average data quality is {quality['mean_quality_score']:.1f}%. Review data sources and validation rules.",
                    'impact': 'Improve downstream data reliability'
                })
            
            if quality['quality_trend'] == 'decreasing':
                recommendations.append({
                    'category': 'Data Quality',
                    'priority': 'Critical',
                    'issue': 'Degrading data quality',
                    'recommendation': 'Data quality is declining over time. Immediate investigation required for data sources.',
                    'impact': 'Prevent data quality deterioration'
                })
        
        # Stage-specific recommendations
        if 'stage_performance' in analysis and 'error' not in analysis['stage_performance']:
            stages = analysis['stage_performance']
            
            # Find slowest stage
            slowest_stage = max(stages.keys(), key=lambda x: stages[x]['mean_time'])
            slowest_time = stages[slowest_stage]['mean_time']
            
            if slowest_time > 60:  # More than 1 minute
                recommendations.append({
                    'category': 'Performance',
                    'priority': 'Medium',
                    'issue': f'Slow {slowest_stage} stage',
                    'recommendation': f"The {slowest_stage} stage takes {slowest_time:.1f}s on average. Consider optimizing this stage specifically.",
                    'impact': f'Reduce {slowest_stage} stage execution time'
                })
            
            # Check for stages with increasing trends
            for stage, metrics in stages.items():
                if metrics['trend'] == 'increasing':
                    recommendations.append({
                        'category': 'Performance',
                        'priority': 'Medium',
                        'issue': f'{stage} stage performance degrading',
                        'recommendation': f"The {stage} stage is taking longer over time. Review recent changes to this stage.",
                        'impact': f'Stabilize {stage} stage performance'
                    })
        
        # General recommendations if no specific issues found
        if not recommendations:
            recommendations.append({
                'category': 'Optimization',
                'priority': 'Low',
                'issue': 'Proactive optimization',
                'recommendation': 'Pipeline performance is good. Consider proactive optimizations like caching, indexing, or resource scaling for future growth.',
                'impact': 'Prepare for increased data volumes'
            })
        
        return recommendations
    
    def create_performance_report(self, analysis: Dict[str, Any], 
                                 recommendations: List[Dict[str, str]]) -> str:
        """Create comprehensive performance analysis report"""
        report = []
        report.append("# 📊 Pipeline Performance Analysis Report")
        report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"Analysis Period: {analysis.get('time_period_days', 'N/A')} days")
        report.append("")
        
        # Executive Summary
        report.append("## 🎯 Executive Summary")
        report.append(f"- **Total Pipeline Runs**: {analysis.get('total_pipeline_runs', 0):,}")
        report.append(f"- **Metrics Collected**: {analysis.get('metrics_collected', 0):,}")
        
        if 'execution_time' in analysis:
            exec_time = analysis['execution_time']
            report.append(f"- **Average Execution Time**: {exec_time['mean']:.2f}s")
            report.append(f"- **Execution Time Range**: {exec_time['min']:.2f}s - {exec_time['max']:.2f}s")
        
        if 'throughput' in analysis:
            throughput = analysis['throughput']
            report.append(f"- **Average Throughput**: {throughput['mean']:.1f} records/second")
        
        if 'data_quality' in analysis:
            quality = analysis['data_quality']
            report.append(f"- **Average Data Quality**: {quality['mean_quality_score']:.1f}%")
        
        report.append("")
        
        # Performance Trends
        report.append("## 📈 Performance Trends")
        
        if 'execution_time' in analysis:
            trend = analysis['execution_time']['trend']
            trend_icon = {'increasing': '📈', 'decreasing': '📉', 'stable': '➡️'}.get(trend, '❓')
            report.append(f"- **Execution Time Trend**: {trend_icon} {trend.title()}")
        
        if 'throughput' in analysis:
            trend = analysis['throughput']['trend']
            trend_icon = {'increasing': '📈', 'decreasing': '📉', 'stable': '➡️'}.get(trend, '❓')
            report.append(f"- **Throughput Trend**: {trend_icon} {trend.title()}")
        
        if 'data_quality' in analysis:
            trend = analysis['data_quality']['quality_trend']
            trend_icon = {'increasing': '📈', 'decreasing': '📉', 'stable': '➡️'}.get(trend, '❓')
            report.append(f"- **Data Quality Trend**: {trend_icon} {trend.title()}")
        
        report.append("")
        
        # Stage Performance
        if 'stage_performance' in analysis and 'error' not in analysis['stage_performance']:
            report.append("## ⚡ Stage Performance Analysis")
            stages = analysis['stage_performance']
            
            for stage, metrics in stages.items():
                report.append(f"### {stage.title()} Stage")
                report.append(f"- **Average Time**: {metrics['mean_time']:.3f}s")
                report.append(f"- **Time Range**: {metrics['min_time']:.3f}s - {metrics['max_time']:.3f}s")
                report.append(f"- **Executions**: {metrics['execution_count']}")
                trend_icon = {'increasing': '📈', 'decreasing': '📉', 'stable': '➡️'}.get(metrics['trend'], '❓')
                report.append(f"- **Trend**: {trend_icon} {metrics['trend'].title()}")
                report.append("")
        
        # Recommendations
        report.append("## 💡 Optimization Recommendations")
        
        if recommendations:
            # Group by priority
            priority_order = ['Critical', 'High', 'Medium', 'Low']
            for priority in priority_order:
                priority_recs = [r for r in recommendations if r['priority'] == priority]
                if priority_recs:
                    priority_icon = {'Critical': '🚨', 'High': '🔴', 'Medium': '🟠', 'Low': '🟡'}.get(priority, '⚪')
                    report.append(f"### {priority_icon} {priority} Priority")
                    
                    for i, rec in enumerate(priority_recs, 1):
                        report.append(f"**{i}. {rec['issue']}**")
                        report.append(f"- **Category**: {rec['category']}")
                        report.append(f"- **Recommendation**: {rec['recommendation']}")
                        report.append(f"- **Expected Impact**: {rec['impact']}")
                        report.append("")
        else:
            report.append("✅ No critical performance issues identified.")
            report.append("")
        
        # Next Steps
        report.append("## 🚀 Next Steps")
        report.append("1. **Review High Priority Recommendations**: Address critical and high priority items first")
        report.append("2. **Implement Monitoring**: Set up automated alerts for performance degradation")
        report.append("3. **Regular Analysis**: Schedule weekly performance reviews")
        report.append("4. **Capacity Planning**: Plan for future data growth and scaling needs")
        report.append("5. **Performance Testing**: Implement load testing for pipeline validation")
        report.append("")
        
        report.append("---")
        report.append("*Report generated by Pipeline Performance Analyzer*")
        
        return "\n".join(report)

# Initialize performance analyzer
performance_analyzer = PerformanceAnalyzer(monitor)

print("🔧 Performance Analyzer initialized!")
print("📊 Analyzing pipeline performance...")

# Perform performance analysis
analysis = performance_analyzer.analyze_performance_trends(days_back=1)  # Last 24 hours

if 'error' not in analysis:
    # Generate recommendations
    recommendations = performance_analyzer.generate_optimization_recommendations(analysis)
    
    # Create performance report
    performance_report = performance_analyzer.create_performance_report(analysis, recommendations)
    
    print("\n📋 PERFORMANCE ANALYSIS RESULTS")
    print("=" * 50)
    print(performance_report)
    
    # Save report
    report_filename = f"../data/output/performance_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
    try:
        with open(report_filename, 'w') as f:
            f.write(performance_report)
        print(f"\n💾 Performance report saved to: {report_filename}")
    except Exception as e:
        print(f"\n⚠️ Could not save performance report: {e}")
    
    print(f"\n🎯 Summary: {len(recommendations)} optimization recommendations generated")
    
    # Show top recommendations
    high_priority = [r for r in recommendations if r['priority'] in ['Critical', 'High']]
    if high_priority:
        print(f"\n🚨 High Priority Actions:")
        for i, rec in enumerate(high_priority[:3], 1):
            print(f"  {i}. {rec['issue']}: {rec['recommendation'][:100]}...")
else:
    print(f"\n⚠️ Performance analysis error: {analysis['error']}")

## 📋 Step 7: Operational Procedures and Best Practices

Let's establish comprehensive operational procedures for maintaining production data pipelines.

In [None]:
# Operational Procedures and Best Practices
class OperationalProcedures:
    """Comprehensive operational procedures for production pipelines"""
    
    def __init__(self, monitor: PipelineMonitor):
        self.monitor = monitor
        self.procedures = self.define_procedures()
        self.checklists = self.create_checklists()
        self.runbooks = self.create_runbooks()
    
    def define_procedures(self) -> Dict[str, Dict[str, Any]]:
        """Define standard operational procedures"""
        return {
            'daily_health_check': {
                'frequency': 'daily',
                'description': 'Daily pipeline health assessment',
                'steps': [
                    'Check pipeline execution status for last 24 hours',
                    'Review data quality metrics',
                    'Verify alert status and resolve any issues',
                    'Check resource utilization trends',
                    'Validate data freshness and completeness'
                ],
                'success_criteria': [
                    'All pipelines executed successfully',
                    'Data quality > 95%',
                    'No unresolved critical alerts',
                    'Resource utilization < 80%'
                ]
            },
            'weekly_performance_review': {
                'frequency': 'weekly',
                'description': 'Weekly performance analysis and optimization',
                'steps': [
                    'Generate performance analysis report',
                    'Review execution time trends',
                    'Analyze throughput patterns',
                    'Identify optimization opportunities',
                    'Plan performance improvements'
                ],
                'success_criteria': [
                    'Performance report generated',
                    'Trends analyzed and documented',
                    'Action items identified for next week'
                ]
            },
            'monthly_capacity_planning': {
                'frequency': 'monthly',
                'description': 'Monthly capacity and scaling assessment',
                'steps': [
                    'Analyze data volume growth trends',
                    'Review resource utilization patterns',
                    'Forecast future capacity needs',
                    'Plan infrastructure scaling',
                    'Update capacity planning documentation'
                ],
                'success_criteria': [
                    'Capacity forecast completed',
                    'Scaling plan documented',
                    'Budget requirements identified'
                ]
            },
            'incident_response': {
                'frequency': 'as_needed',
                'description': 'Response to pipeline incidents and failures',
                'steps': [
                    'Assess incident severity and impact',
                    'Implement immediate containment measures',
                    'Investigate root cause',
                    'Implement fix and validate resolution',
                    'Document incident and lessons learned'
                ],
                'success_criteria': [
                    'Incident resolved within SLA',
                    'Root cause identified',
                    'Prevention measures implemented'
                ]
            }
        }
    
    def create_checklists(self) -> Dict[str, List[Dict[str, str]]]:
        """Create operational checklists"""
        return {
            'pipeline_deployment': [
                {'task': 'Code review completed and approved', 'type': 'verification'},
                {'task': 'Unit tests passing (>95% coverage)', 'type': 'testing'},
                {'task': 'Integration tests passing', 'type': 'testing'},
                {'task': 'Performance tests completed', 'type': 'testing'},
                {'task': 'Security scan completed', 'type': 'security'},
                {'task': 'Configuration validated', 'type': 'configuration'},
                {'task': 'Database migrations applied', 'type': 'database'},
                {'task': 'Monitoring and alerts configured', 'type': 'monitoring'},
                {'task': 'Rollback plan prepared', 'type': 'contingency'},
                {'task': 'Stakeholders notified', 'type': 'communication'}
            ],
            'incident_response': [
                {'task': 'Incident severity assessed', 'type': 'assessment'},
                {'task': 'Incident commander assigned', 'type': 'organization'},
                {'task': 'Stakeholders notified', 'type': 'communication'},
                {'task': 'Initial containment implemented', 'type': 'containment'},
                {'task': 'Root cause investigation started', 'type': 'investigation'},
                {'task': 'Fix implemented and tested', 'type': 'resolution'},
                {'task': 'Service restored and validated', 'type': 'validation'},
                {'task': 'Post-incident review scheduled', 'type': 'follow_up'},
                {'task': 'Documentation updated', 'type': 'documentation'}
            ],
            'data_quality_investigation': [
                {'task': 'Quality issue scope identified', 'type': 'assessment'},
                {'task': 'Affected data sources identified', 'type': 'investigation'},
                {'task': 'Data lineage traced', 'type': 'investigation'},
                {'task': 'Validation rules reviewed', 'type': 'analysis'},
                {'task': 'Source data quality verified', 'type': 'verification'},
                {'task': 'Transformation logic validated', 'type': 'verification'},
                {'task': 'Corrective actions implemented', 'type': 'resolution'},
                {'task': 'Data quality monitoring enhanced', 'type': 'prevention'}
            ]
        }
    
    def create_runbooks(self) -> Dict[str, Dict[str, Any]]:
        """Create detailed runbooks for common scenarios"""
        return {
            'pipeline_failure': {
                'title': 'Pipeline Execution Failure Response',
                'severity': 'HIGH',
                'description': 'Steps to diagnose and resolve pipeline execution failures',
                'steps': [
                    {
                        'step': 1,
                        'action': 'Check pipeline logs',
                        'details': 'Review execution logs for error messages and stack traces',
                        'commands': ['tail -f /logs/pipeline.log', 'grep ERROR /logs/pipeline.log']
                    },
                    {
                        'step': 2,
                        'action': 'Verify data source availability',
                        'details': 'Check if all data sources are accessible and contain expected data',
                        'commands': ['curl -I <api_endpoint>', 'ls -la /data/input/']
                    },
                    {
                        'step': 3,
                        'action': 'Check system resources',
                        'details': 'Verify CPU, memory, and disk space availability',
                        'commands': ['top', 'df -h', 'free -m']
                    },
                    {
                        'step': 4,
                        'action': 'Restart pipeline with debug mode',
                        'details': 'Attempt restart with verbose logging enabled',
                        'commands': ['python scripts/run_pipeline.py --log-level DEBUG']
                    },
                    {
                        'step': 5,
                        'action': 'Escalate if unresolved',
                        'details': 'Contact senior engineer if issue persists after 30 minutes',
                        'commands': []
                    }
                ]
            },
            'data_quality_degradation': {
                'title': 'Data Quality Issue Response',
                'severity': 'MEDIUM',
                'description': 'Steps to investigate and resolve data quality issues',
                'steps': [
                    {
                        'step': 1,
                        'action': 'Identify affected data',
                        'details': 'Determine which datasets and time periods are affected',
                        'commands': ['python scripts/data_quality_check.py --date-range']
                    },
                    {
                        'step': 2,
                        'action': 'Check source data quality',
                        'details': 'Verify if quality issues originate from source systems',
                        'commands': ['python scripts/source_validation.py']
                    },
                    {
                        'step': 3,
                        'action': 'Review validation rules',
                        'details': 'Check if validation rules need updates or fixes',
                        'commands': ['python scripts/validate_rules.py']
                    },
                    {
                        'step': 4,
                        'action': 'Implement data fixes',
                        'details': 'Apply data corrections and re-run affected pipelines',
                        'commands': ['python scripts/data_correction.py', 'python scripts/reprocess_data.py']
                    }
                ]
            },
            'performance_degradation': {
                'title': 'Performance Degradation Response',
                'severity': 'MEDIUM',
                'description': 'Steps to diagnose and resolve performance issues',
                'steps': [
                    {
                        'step': 1,
                        'action': 'Analyze performance metrics',
                        'details': 'Review execution times, throughput, and resource usage',
                        'commands': ['python scripts/performance_analysis.py']
                    },
                    {
                        'step': 2,
                        'action': 'Identify bottlenecks',
                        'details': 'Determine which pipeline stages are causing delays',
                        'commands': ['python scripts/bottleneck_analysis.py']
                    },
                    {
                        'step': 3,
                        'action': 'Check resource constraints',
                        'details': 'Verify if system resources are limiting performance',
                        'commands': ['htop', 'iotop', 'nethogs']
                    },
                    {
                        'step': 4,
                        'action': 'Apply performance optimizations',
                        'details': 'Implement immediate performance improvements',
                        'commands': ['python scripts/optimize_pipeline.py']
                    }
                ]
            }
        }
    
    def execute_health_check(self) -> Dict[str, Any]:
        """Execute comprehensive pipeline health check"""
        health_check = {
            'timestamp': datetime.now().isoformat(),
            'overall_status': 'HEALTHY',
            'checks': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # Check 1: Recent pipeline executions
            recent_pipelines = self._check_recent_executions()
            health_check['checks']['recent_executions'] = recent_pipelines
            
            # Check 2: Data quality status
            quality_status = self._check_data_quality()
            health_check['checks']['data_quality'] = quality_status
            
            # Check 3: Alert status
            alert_status = self._check_alert_status()
            health_check['checks']['alerts'] = alert_status
            
            # Check 4: Resource utilization
            resource_status = self._check_resource_utilization()
            health_check['checks']['resources'] = resource_status
            
            # Check 5: Data freshness
            freshness_status = self._check_data_freshness()
            health_check['checks']['data_freshness'] = freshness_status
            
            # Determine overall status
            failed_checks = [name for name, check in health_check['checks'].items() 
                           if check['status'] == 'FAILED']
            warning_checks = [name for name, check in health_check['checks'].items() 
                            if check['status'] == 'WARNING']
            
            if failed_checks:
                health_check['overall_status'] = 'UNHEALTHY'
                health_check['issues'].extend([f"Failed check: {check}" for check in failed_checks])
            elif warning_checks:
                health_check['overall_status'] = 'WARNING'
                health_check['issues'].extend([f"Warning in check: {check}" for check in warning_checks])
            
            # Generate recommendations
            health_check['recommendations'] = self._generate_health_recommendations(health_check)
            
        except Exception as e:
            health_check['overall_status'] = 'ERROR'
            health_check['issues'].append(f"Health check error: {str(e)}")
        
        return health_check
    
    def _check_recent_executions(self) -> Dict[str, Any]:
        """Check recent pipeline executions"""
        try:
            cutoff_time = (datetime.now() - timedelta(hours=24)).isoformat()
            
            with sqlite3.connect(self.monitor.db_path) as conn:
                query = '''
                    SELECT pipeline_id, COUNT(*) as execution_count
                    FROM pipeline_metrics 
                    WHERE timestamp >= ? AND metric_name = 'total_execution_time'
                    GROUP BY pipeline_id
                '''
                results = pd.read_sql_query(query, conn, params=[cutoff_time])
            
            if results.empty:
                return {
                    'status': 'WARNING',
                    'message': 'No pipeline executions in the last 24 hours',
                    'details': {'execution_count': 0}
                }
            
            total_executions = results['execution_count'].sum()
            unique_pipelines = len(results)
            
            return {
                'status': 'PASSED',
                'message': f'{total_executions} pipeline executions across {unique_pipelines} pipelines',
                'details': {
                    'total_executions': int(total_executions),
                    'unique_pipelines': unique_pipelines
                }
            }
            
        except Exception as e:
            return {
                'status': 'FAILED',
                'message': f'Error checking recent executions: {str(e)}',
                'details': {}
            }
    
    def _check_data_quality(self) -> Dict[str, Any]:
        """Check recent data quality metrics"""
        try:
            cutoff_time = (datetime.now() - timedelta(hours=24)).isoformat()
            
            with sqlite3.connect(self.monitor.db_path) as conn:
                query = '''
                    SELECT AVG(quality_score) as avg_quality, MIN(quality_score) as min_quality
                    FROM quality_metrics 
                    WHERE timestamp >= ?
                '''
                result = pd.read_sql_query(query, conn, params=[cutoff_time])
            
            if result.empty or result['avg_quality'].iloc[0] is None:
                return {
                    'status': 'WARNING',
                    'message': 'No quality metrics available',
                    'details': {}
                }
            
            avg_quality = result['avg_quality'].iloc[0]
            min_quality = result['min_quality'].iloc[0]
            
            if avg_quality < 80:
                status = 'FAILED'
                message = f'Poor average data quality: {avg_quality:.1f}%'
            elif min_quality < 70:
                status = 'WARNING'
                message = f'Some low quality data detected (min: {min_quality:.1f}%)'
            else:
                status = 'PASSED'
                message = f'Good data quality (avg: {avg_quality:.1f}%)'
            
            return {
                'status': status,
                'message': message,
                'details': {
                    'average_quality': round(avg_quality, 1),
                    'minimum_quality': round(min_quality, 1)
                }
            }
            
        except Exception as e:
            return {
                'status': 'FAILED',
                'message': f'Error checking data quality: {str(e)}',
                'details': {}
            }
    
    def _check_alert_status(self) -> Dict[str, Any]:
        """Check recent alert status"""
        try:
            cutoff_time = (datetime.now() - timedelta(hours=24)).isoformat()
            
            with sqlite3.connect(self.monitor.db_path) as conn:
                query = '''
                    SELECT severity, COUNT(*) as count
                    FROM alerts 
                    WHERE timestamp >= ? AND resolved = 0
                    GROUP BY severity
                '''
                results = pd.read_sql_query(query, conn, params=[cutoff_time])
            
            if results.empty:
                return {
                    'status': 'PASSED',
                    'message': 'No unresolved alerts',
                    'details': {'unresolved_alerts': 0}
                }
            
            alert_counts = dict(zip(results['severity'], results['count']))
            total_alerts = results['count'].sum()
            
            if alert_counts.get('CRITICAL', 0) > 0:
                status = 'FAILED'
                message = f'{alert_counts["CRITICAL"]} critical alerts unresolved'
            elif alert_counts.get('HIGH', 0) > 0:
                status = 'WARNING'
                message = f'{alert_counts["HIGH"]} high severity alerts unresolved'
            else:
                status = 'WARNING'
                message = f'{total_alerts} alerts need attention'
            
            return {
                'status': status,
                'message': message,
                'details': {
                    'unresolved_alerts': int(total_alerts),
                    'by_severity': alert_counts
                }
            }
            
        except Exception as e:
            return {
                'status': 'FAILED',
                'message': f'Error checking alerts: {str(e)}',
                'details': {}
            }
    
    def _check_resource_utilization(self) -> Dict[str, Any]:
        """Check system resource utilization (simulated)"""
        # In production, this would check actual system resources
        import random
        
        cpu_usage = random.uniform(20, 85)
        memory_usage = random.uniform(30, 75)
        disk_usage = random.uniform(40, 90)
        
        max_usage = max(cpu_usage, memory_usage, disk_usage)
        
        if max_usage > 90:
            status = 'FAILED'
            message = 'Critical resource utilization detected'
        elif max_usage > 80:
            status = 'WARNING'
            message = 'High resource utilization'
        else:
            status = 'PASSED'
            message = 'Resource utilization normal'
        
        return {
            'status': status,
            'message': message,
            'details': {
                'cpu_usage': round(cpu_usage, 1),
                'memory_usage': round(memory_usage, 1),
                'disk_usage': round(disk_usage, 1)
            }
        }
    
    def _check_data_freshness(self) -> Dict[str, Any]:
        """Check data freshness"""
        try:
            # Check when the last pipeline ran
            with sqlite3.connect(self.monitor.db_path) as conn:
                query = '''
                    SELECT MAX(timestamp) as last_execution
                    FROM pipeline_metrics 
                    WHERE metric_name = 'total_execution_time'
                '''
                result = pd.read_sql_query(query, conn)
            
            if result.empty or result['last_execution'].iloc[0] is None:
                return {
                    'status': 'FAILED',
                    'message': 'No recent pipeline executions found',
                    'details': {}
                }
            
            last_execution = pd.to_datetime(result['last_execution'].iloc[0])
            hours_since = (datetime.now() - last_execution).total_seconds() / 3600
            
            if hours_since > 24:
                status = 'FAILED'
                message = f'Data is stale (last update: {hours_since:.1f} hours ago)'
            elif hours_since > 12:
                status = 'WARNING'
                message = f'Data is getting stale (last update: {hours_since:.1f} hours ago)'
            else:
                status = 'PASSED'
                message = f'Data is fresh (last update: {hours_since:.1f} hours ago)'
            
            return {
                'status': status,
                'message': message,
                'details': {
                    'last_execution': last_execution.isoformat(),
                    'hours_since_last_update': round(hours_since, 1)
                }
            }
            
        except Exception as e:
            return {
                'status': 'FAILED',
                'message': f'Error checking data freshness: {str(e)}',
                'details': {}
            }
    
    def _generate_health_recommendations(self, health_check: Dict[str, Any]) -> List[str]:
        """Generate recommendations based on health check results"""
        recommendations = []
        
        for check_name, check_result in health_check['checks'].items():
            if check_result['status'] == 'FAILED':
                if check_name == 'recent_executions':
                    recommendations.append('Investigate why pipelines are not executing')
                elif check_name == 'data_quality':
                    recommendations.append('Review data sources and validation rules')
                elif check_name == 'alerts':
                    recommendations.append('Resolve critical alerts immediately')
                elif check_name == 'resources':
                    recommendations.append('Scale infrastructure or optimize resource usage')
                elif check_name == 'data_freshness':
                    recommendations.append('Check pipeline scheduling and execution')
            
            elif check_result['status'] == 'WARNING':
                if check_name == 'data_quality':
                    recommendations.append('Monitor data quality trends closely')
                elif check_name == 'alerts':
                    recommendations.append('Review and resolve pending alerts')
                elif check_name == 'resources':
                    recommendations.append('Plan for resource scaling')
        
        if not recommendations:
            recommendations.append('System is healthy - continue regular monitoring')
        
        return recommendations
    
    def generate_operational_report(self) -> str:
        """Generate comprehensive operational report"""
        health_check = self.execute_health_check()
        
        report = []
        report.append("# 📋 Pipeline Operational Status Report")
        report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append("")
        
        # Overall Status
        status_icon = {
            'HEALTHY': '✅',
            'WARNING': '⚠️',
            'UNHEALTHY': '❌',
            'ERROR': '🚨'
        }.get(health_check['overall_status'], '❓')
        
        report.append(f"## {status_icon} Overall Status: {health_check['overall_status']}")
        report.append("")
        
        # Health Check Results
        report.append("## 🔍 Health Check Results")
        
        for check_name, check_result in health_check['checks'].items():
            check_icon = {
                'PASSED': '✅',
                'WARNING': '⚠️',
                'FAILED': '❌'
            }.get(check_result['status'], '❓')
            
            report.append(f"### {check_icon} {check_name.replace('_', ' ').title()}")
            report.append(f"- **Status**: {check_result['status']}")
            report.append(f"- **Message**: {check_result['message']}")
            
            if check_result['details']:
                report.append(f"- **Details**: {check_result['details']}")
            
            report.append("")
        
        # Issues
        if health_check['issues']:
            report.append("## 🚨 Issues Identified")
            for i, issue in enumerate(health_check['issues'], 1):
                report.append(f"{i}. {issue}")
            report.append("")
        
        # Recommendations
        report.append("## 💡 Recommendations")
        for i, recommendation in enumerate(health_check['recommendations'], 1):
            report.append(f"{i}. {recommendation}")
        report.append("")
        
        # Operational Procedures
        report.append("## 📋 Available Procedures")
        for proc_name, proc_config in self.procedures.items():
            report.append(f"### {proc_name.replace('_', ' ').title()}")
            report.append(f"- **Frequency**: {proc_config['frequency']}")
            report.append(f"- **Description**: {proc_config['description']}")
            report.append("")
        
        # Runbooks
        report.append("## 📖 Available Runbooks")
        for runbook_name, runbook_config in self.runbooks.items():
            severity_icon = {
                'HIGH': '🔴',
                'MEDIUM': '🟠',
                'LOW': '🟡'
            }.get(runbook_config['severity'], '⚪')
            
            report.append(f"### {severity_icon} {runbook_config['title']}")
            report.append(f"- **Severity**: {runbook_config['severity']}")
            report.append(f"- **Description**: {runbook_config['description']}")
            report.append("")
        
        report.append("---")
        report.append("*Report generated by Pipeline Operational Procedures*")
        
        return "\n".join(report)

# Initialize operational procedures
ops_procedures = OperationalProcedures(monitor)

print("📋 Operational Procedures initialized!")
print(f"📝 Procedures defined: {len(ops_procedures.procedures)}")
print(f"✅ Checklists created: {len(ops_procedures.checklists)}")
print(f"📖 Runbooks available: {len(ops_procedures.runbooks)}")

# Execute health check
print("\n🔍 Executing comprehensive health check...")
health_result = ops_procedures.execute_health_check()

print(f"\n📊 Health Check Results:")
print(f"  Overall Status: {health_result['overall_status']}")
print(f"  Checks Performed: {len(health_result['checks'])}")
print(f"  Issues Found: {len(health_result['issues'])}")
print(f"  Recommendations: {len(health_result['recommendations'])}")

# Generate operational report
operational_report = ops_procedures.generate_operational_report()

print("\n📋 OPERATIONAL STATUS REPORT")
print("=" * 60)
print(operational_report)

# Save operational report
ops_report_filename = f"../data/output/operational_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
try:
    with open(ops_report_filename, 'w') as f:
        f.write(operational_report)
    print(f"\n💾 Operational report saved to: {ops_report_filename}")
except Exception as e:
    print(f"\n⚠️ Could not save operational report: {e}")

## 🎯 Key Takeaways

Congratulations! You've completed the comprehensive **Pipeline Monitoring and Operations** tutorial. You've built a production-ready monitoring and alerting system that provides:

### ✅ **Monitoring Infrastructure**
- **📊 Comprehensive Metrics Collection**: Pipeline performance, data quality, and system health
- **🗄️ Monitoring Database**: Structured storage for all monitoring data
- **📈 Real-time Dashboards**: Visual monitoring with multiple chart types
- **📋 Performance Analysis**: Trend analysis and bottleneck identification

### ✅ **Advanced Alerting System**
- **🚨 Multi-level Alerts**: Critical, High, Medium, and Low severity levels
- **📢 Multiple Notification Channels**: Email, Slack, SMS, and PagerDuty integration
- **⏰ Smart Cooldown**: Prevents alert spam with configurable cooldown periods
- **🔄 Escalation Logic**: Automatic escalation for unresolved issues

### ✅ **Performance Optimization**
- **📊 Trend Analysis**: Identify performance patterns and degradation
- **🎯 Bottleneck Detection**: Pinpoint slow stages and optimization opportunities
- **💡 Automated Recommendations**: AI-driven optimization suggestions
- **📈 Capacity Planning**: Forecast future resource needs

### ✅ **Operational Excellence**
- **📋 Standard Procedures**: Daily, weekly, and monthly operational tasks
- **✅ Operational Checklists**: Ensure consistent execution of procedures
- **📖 Detailed Runbooks**: Step-by-step incident response procedures
- **🔍 Health Checks**: Automated system health assessment

---

## 🏆 What You've Accomplished

Through this 7-part tutorial series, you've built a **complete, enterprise-grade data ingestion pipeline** with:

### 🎓 **Technical Skills Mastered**
1. **📥 Data Ingestion**: Multi-source data collection (CSV, JSON, APIs, databases)
2. **🔍 Data Validation**: Comprehensive quality scoring and business rule validation
3. **🧹 Data Transformation**: Cleaning, standardization, and enrichment
4. **💾 Data Storage**: Database operations and file management
5. **🔄 Pipeline Orchestration**: End-to-end workflow management
6. **📈 Monitoring & Alerting**: Production-ready observability
7. **🛠️ Operations**: Incident response and maintenance procedures

### 🏗️ **Architecture Patterns Learned**
- **Modular Design**: Loosely coupled, reusable components
- **Error Handling**: Graceful failure management and recovery
- **Observability**: Comprehensive logging, metrics, and tracing
- **Scalability**: Horizontal scaling and performance optimization
- **Reliability**: Fault tolerance and data consistency

### 💼 **Business Value Delivered**
- **Automated Data Processing**: Hands-off data pipeline operations
- **Data Quality Assurance**: Consistent, reliable data for analytics
- **Operational Efficiency**: Reduced manual intervention and faster issue resolution
- **Scalable Foundation**: Ready for enterprise-scale data volumes
- **Compliance Ready**: Audit trails and data lineage tracking

---

## 🚀 Next Steps for Production

To deploy this pipeline in production, consider these enhancements:

### 🔒 **Security & Compliance**
- **Data Encryption**: Encrypt data at rest and in transit
- **Access Control**: Implement role-based access control (RBAC)
- **Audit Logging**: Comprehensive audit trails for compliance
- **Data Privacy**: GDPR/CCPA compliance features

### ☁️ **Cloud Deployment**
- **Container Orchestration**: Deploy with Docker and Kubernetes
- **Cloud Services**: Leverage AWS/GCP/Azure managed services
- **Auto-scaling**: Implement horizontal and vertical scaling
- **Multi-region**: Deploy across multiple regions for reliability

### 🔄 **Advanced Features**
- **Stream Processing**: Real-time data processing with Kafka/Kinesis
- **Machine Learning**: Automated anomaly detection and data profiling
- **Data Lineage**: Complete data lineage tracking and visualization
- **Schema Evolution**: Automatic schema migration and compatibility

### 📊 **Enterprise Integration**
- **Data Catalog**: Integrate with enterprise data catalogs
- **Workflow Orchestration**: Use Apache Airflow or Prefect
- **BI Integration**: Connect to Tableau, PowerBI, or Looker
- **API Gateway**: Expose pipeline APIs for external consumption

---

## 📚 Recommended Learning Path

Continue your data engineering journey with these topics:

### 🎯 **Immediate Next Steps**
1. **Apache Airflow**: Workflow orchestration and scheduling
2. **Apache Kafka**: Stream processing and event-driven architecture
3. **Docker & Kubernetes**: Containerization and orchestration
4. **Cloud Platforms**: AWS/GCP/Azure data services

### 🏗️ **Advanced Topics**
1. **Apache Spark**: Big data processing and analytics
2. **Data Mesh**: Decentralized data architecture
3. **MLOps**: Machine learning pipeline operations
4. **Data Governance**: Enterprise data management

### 📖 **Recommended Resources**
- **Books**: "Designing Data-Intensive Applications" by Martin Kleppmann
- **Courses**: Data Engineering courses on Coursera, Udacity, or Pluralsight
- **Certifications**: AWS/GCP/Azure data engineering certifications
- **Communities**: Join data engineering Slack communities and forums

---

## 🎉 Congratulations!

You've successfully completed a comprehensive data engineering tutorial series and built a production-ready data ingestion pipeline with enterprise-grade monitoring and operations capabilities.

**You're now equipped with the skills and knowledge to:**
- ✅ Design and implement scalable data pipelines
- ✅ Ensure data quality and reliability
- ✅ Monitor and operate production data systems
- ✅ Handle incidents and optimize performance
- ✅ Build career-ready data engineering solutions

**Keep building, keep learning, and welcome to the world of Data Engineering! 🚀**

---

*Thank you for completing the Data Ingestion Pipeline Tutorial Series!*