# Enhanced DVC Pipeline for Insurance Risk Analytics

## Overview
This notebook demonstrates a comprehensive DVC implementation with:
- Advanced pipeline monitoring and validation
- Real-time metrics tracking
- Automated error handling and recovery
- Data quality validation
- Performance optimization

---

In [1]:
# Enhanced imports and configuration
import os
import sys
import json
import yaml
import time
import logging
import subprocess
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Tuple, Optional

# Configure logging
log_dir = Path('../logs')
log_dir.mkdir(exist_ok=True)

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('../logs/dvc_pipeline.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

# Create necessary directories
os.makedirs('../logs', exist_ok=True)
os.makedirs('../outputs', exist_ok=True)
os.makedirs('../results', exist_ok=True)

In [2]:
class DVCPipelineManager:
    """Enhanced DVC Pipeline Management Class"""
    
    def __init__(self, project_root: str = '../'):
        self.project_root = Path(project_root)
        self.dvc_yaml_path = self.project_root / 'dvc.yaml'
        self.params_path = self.project_root / 'params.yaml'
        self.logger = logging.getLogger(self.__class__.__name__)
        
    def run_command(self, command: str, timeout: int = 300) -> Tuple[str, str, int]:
        """Execute shell command with enhanced error handling"""
        try:
            self.logger.info(f"Executing: {command}")
            result = subprocess.run(
                command, 
                shell=True, 
                capture_output=True, 
                text=True, 
                timeout=timeout,
                cwd=self.project_root
            )
            return result.stdout, result.stderr, result.returncode
        except subprocess.TimeoutExpired:
            self.logger.error(f"Command timed out: {command}")
            return '', f'Command timed out after {timeout}s', 1
        except Exception as e:
            self.logger.error(f"Error executing command: {e}")
            return '', str(e), 1
    
    def validate_environment(self) -> Dict[str, bool]:
        """Comprehensive environment validation"""
        checks = {}
        
        # Check DVC installation
        stdout, stderr, code = self.run_command('dvc version')
        checks['dvc_installed'] = code == 0
        
        # Check Git repository
        checks['git_repo'] = (self.project_root / '.git').exists()
        
        # Check DVC initialization
        checks['dvc_initialized'] = (self.project_root / '.dvc').exists()
        
        # Check configuration files
        checks['dvc_yaml_exists'] = self.dvc_yaml_path.exists()
        checks['params_yaml_exists'] = self.params_path.exists()
        
        # Check required directories
        required_dirs = ['data', 'src', 'outputs', 'results']
        for dir_name in required_dirs:
            checks[f'{dir_name}_dir_exists'] = (self.project_root / dir_name).exists()
        
        return checks
    
    def get_pipeline_status(self) -> Dict[str, any]:
        """Get comprehensive pipeline status"""
        status = {}
        
        # DVC status
        stdout, stderr, code = self.run_command('dvc status')
        status['pipeline_up_to_date'] = code == 0 and not stdout.strip()
        status['dvc_status_output'] = stdout if stdout else 'Up to date'
        
        # Pipeline stages
        if self.dvc_yaml_path.exists():
            with open(self.dvc_yaml_path, 'r') as f:
                config = yaml.safe_load(f)
            status['stages'] = list(config.get('stages', {}).keys())
            status['total_stages'] = len(status['stages'])
        else:
            status['stages'] = []
            status['total_stages'] = 0
        
        # Check outputs
        status['outputs_exist'] = self._check_outputs_exist()
        
        return status
    
    def _check_outputs_exist(self) -> Dict[str, bool]:
        """Check if expected outputs exist"""
        expected_outputs = {
            'processed_data': 'outputs/insurance_data_processed.csv',
            'feature_data': 'outputs/insurance_data_features.csv',
            'hypothesis_results': 'results/hypothesis_testing/',
            'model_results': 'results/models/',
            'recommendations': 'results/recommendations.md'
        }
        
        return {
            name: (self.project_root / path).exists() 
            for name, path in expected_outputs.items()
        }
    
    def execute_pipeline(self, stage: Optional[str] = None, force: bool = False) -> bool:
        """Execute pipeline with monitoring"""
        command = 'dvc repro'
        if stage:
            command += f' {stage}'
        if force:
            command += ' --force'
        
        self.logger.info(f"Starting pipeline execution: {command}")
        start_time = time.time()
        
        stdout, stderr, code = self.run_command(command, timeout=1800)  # 30 min timeout
        
        execution_time = time.time() - start_time
        
        if code == 0:
            self.logger.info(f"Pipeline completed successfully in {execution_time:.2f}s")
            return True
        else:
            self.logger.error(f"Pipeline failed: {stderr}")
            return False
    
    def generate_metrics_report(self) -> Dict[str, any]:
        """Generate comprehensive metrics report"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'pipeline_health': self.get_pipeline_health_score(),
            'model_metrics': self._load_model_metrics(),
            'data_quality': self._load_data_quality_metrics(),
            'business_impact': self._load_business_metrics()
        }
        
        # Save report
        report_path = self.project_root / 'results' / 'pipeline_report.json'
        with open(report_path, 'w') as f:
            json.dump(report, f, indent=2)
        
        return report
    
    def get_pipeline_health_score(self) -> float:
        """Calculate overall pipeline health score"""
        checks = self.validate_environment()
        status = self.get_pipeline_status()
        
        total_checks = len(checks) + len(status['outputs_exist'])
        passed_checks = sum(checks.values()) + sum(status['outputs_exist'].values())
        
        return (passed_checks / total_checks) * 100
    
    def _load_model_metrics(self) -> Dict[str, any]:
        """Load model performance metrics"""
        metrics_path = self.project_root / 'results' / 'models' / 'model_metrics.json'
        if metrics_path.exists():
            with open(metrics_path, 'r') as f:
                return json.load(f)
        return {}
    
    def _load_data_quality_metrics(self) -> Dict[str, any]:
        """Load data quality metrics"""
        metrics_path = self.project_root / 'outputs' / 'data_quality_metrics.json'
        if metrics_path.exists():
            with open(metrics_path, 'r') as f:
                return json.load(f)
        return {}
    
    def _load_business_metrics(self) -> Dict[str, any]:
        """Load business impact metrics"""
        metrics_path = self.project_root / 'results' / 'business_impact_metrics.json'
        if metrics_path.exists():
            with open(metrics_path, 'r') as f:
                return json.load(f)
        return {}

# Initialize pipeline manager
pipeline_manager = DVCPipelineManager()

In [3]:
# Pipeline Dashboard
def create_pipeline_dashboard():
    """Create comprehensive pipeline dashboard"""
    print('ENHANCED DVC PIPELINE DASHBOARD')
    print('=' * 60)
    
    # Environment validation
    print('\nENVIRONMENT VALIDATION')
    print('-' * 30)
    checks = pipeline_manager.validate_environment()
    for check, status in checks.items():
        status_text = '[PASS]' if status else '[FAIL]'
        print(f'{status_text} {check.replace("_", " ").title()}: {status}')
    
    # Pipeline status
    print('\nPIPELINE STATUS')
    print('-' * 30)
    status = pipeline_manager.get_pipeline_status()
    print(f'Total Stages: {status["total_stages"]}')
    print(f'Pipeline Status: {status["dvc_status_output"]}')
    
    # Output validation
    print('\nOUTPUT VALIDATION')
    print('-' * 30)
    for output, exists in status['outputs_exist'].items():
        status_text = '[EXISTS]' if exists else '[MISSING]'
        print(f'{status_text} {output.replace("_", " ").title()}: {exists}')
    
    # Health score
    health_score = pipeline_manager.get_pipeline_health_score()
    print(f'\nOVERALL HEALTH SCORE: {health_score:.1f}%')
    
    if health_score >= 90:
        print('STATUS: Excellent! Pipeline is in perfect condition')
    elif health_score >= 75:
        print('STATUS: Good! Pipeline is working well with minor issues')
    elif health_score >= 50:
        print('STATUS: Warning! Pipeline needs attention')
    else:
        print('STATUS: Critical! Pipeline requires immediate fixes')

# Execute dashboard
create_pipeline_dashboard()

2025-06-18 00:29:29,399 - INFO - Executing: dvc version


ENHANCED DVC PIPELINE DASHBOARD

ENVIRONMENT VALIDATION
------------------------------


2025-06-18 00:29:31,509 - INFO - Executing: dvc status


[PASS] Dvc Installed: True
[PASS] Git Repo: True
[PASS] Dvc Initialized: True
[PASS] Dvc Yaml Exists: True
[PASS] Params Yaml Exists: True
[PASS] Data Dir Exists: True
[PASS] Src Dir Exists: True
[PASS] Outputs Dir Exists: True
[PASS] Results Dir Exists: True

PIPELINE STATUS
------------------------------


2025-06-18 00:29:35,435 - INFO - Executing: dvc version


Total Stages: 5
Pipeline Status: Data and pipelines are up to date.


OUTPUT VALIDATION
------------------------------
[EXISTS] Processed Data: True
[EXISTS] Feature Data: True
[EXISTS] Hypothesis Results: True
[EXISTS] Model Results: True
[EXISTS] Recommendations: True


2025-06-18 00:29:38,123 - INFO - Executing: dvc status



OVERALL HEALTH SCORE: 100.0%
STATUS: Excellent! Pipeline is in perfect condition


In [4]:
# Advanced Pipeline Execution with Monitoring
def execute_pipeline_with_monitoring():
    """Execute pipeline with comprehensive monitoring"""
    print('ADVANCED PIPELINE EXECUTION')
    print('=' * 50)
    
    # Pre-execution validation
    print('\nPRE-EXECUTION VALIDATION')
    checks = pipeline_manager.validate_environment()
    critical_checks = ['dvc_installed', 'dvc_initialized', 'dvc_yaml_exists']
    
    for check in critical_checks:
        if not checks.get(check, False):
            print(f'[FAIL] Critical check failed: {check}')
            return False
    
    print('[PASS] All critical checks passed')
    
    # Execute pipeline
    print('\nEXECUTING PIPELINE')
    success = pipeline_manager.execute_pipeline()
    
    if success:
        print('[SUCCESS] Pipeline executed successfully')
        
        # Generate metrics report
        print('\nGENERATING METRICS REPORT')
        report = pipeline_manager.generate_metrics_report()
        print(f'Report saved with timestamp: {report["timestamp"]}')
        print(f'Final health score: {report["pipeline_health"]:.1f}%')
        
        return True
    else:
        print('[ERROR] Pipeline execution failed')
        return False

# Execute with monitoring (uncomment to run)
# execute_pipeline_with_monitoring()

In [5]:
# Metrics Visualization Dashboard
def create_metrics_visualization():
    """Create comprehensive metrics visualization"""
    print('METRICS VISUALIZATION DASHBOARD')
    print('=' * 50)
    
    # Load metrics
    model_metrics = pipeline_manager._load_model_metrics()
    
    if not model_metrics:
        print('[WARNING] No model metrics available yet')
        return
    
    # Create visualizations
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle('Model Performance Dashboard', fontsize=16, fontweight='bold')
    
    # Model comparison
    models = list(model_metrics.keys())
    metrics = ['accuracy', 'precision', 'recall', 'f1']
    
    for i, metric in enumerate(metrics):
        ax = axes[i//2, i%2]
        values = [model_metrics[model].get(metric, 0) for model in models]
        
        bars = ax.bar(models, values, color=['#FF6B6B', '#4ECDC4', '#45B7D1'])
        ax.set_title(f'{metric.upper()} Comparison', fontweight='bold')
        ax.set_ylabel(metric.capitalize())
        ax.set_ylim(0, 1)
        
        # Add value labels on bars
        for bar, value in zip(bars, values):
            ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                   f'{value:.3f}', ha='center', va='bottom', fontweight='bold')
    
    plt.tight_layout()
    plt.savefig('../results/metrics_dashboard.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    # Print summary
    print('\nMODEL PERFORMANCE SUMMARY')
    print('-' * 40)
    for model, metrics in model_metrics.items():
        print(f'\n{model.upper()}:')
        for metric, value in metrics.items():
            print(f'  {metric}: {value:.3f}')

# Create visualization (uncomment to run)
# create_metrics_visualization()

## Summary 

### Enhanced Features Implemented

1. **Advanced Pipeline Management Class**
   - Comprehensive error handling
   - Environment validation
   - Real-time monitoring
   - Automated reporting

2. **Enhanced Configuration**
   - Structured parameters file
   - Comprehensive metrics tracking
   - Multiple model support
   - Advanced plotting configuration

3. **Monitoring and Visualization**
   - Real-time dashboard
   - Health score calculation
   - Metrics visualization
   - Automated report generation

### Usage Instructions

1. **Initialize Pipeline**: Run the dashboard to check environment
2. **Execute Pipeline**: Use the advanced monitoring function
3. **View Results**: Check the metrics visualization
4. **Monitor Health**: Regular health score monitoring
