# Building Coverage Match System - Main Execution Notebook

This notebook provides the main execution interface for the building coverage match system,
combining the original functionality with advanced parallel processing and custom hooks.

## Features
- **Multi-source Data Loading**: Parallel loading from AIP, Atlas, and Snowflake
- **Advanced RAG Processing**: Multi-threaded claim analysis
- **Custom Hooks**: Pre and post-processing customization
- **Performance Monitoring**: Built-in performance tracking
- **Comprehensive Validation**: Data quality checks throughout the pipeline

## 1. Import Required Libraries and Modules

In [None]:
# Standard library imports
import pandas as pd
import numpy as np
import time
import logging
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Original system imports (backward compatibility)
from coverage_configs.src.environment import DatabricksEnv
from coverage_configs.src.credentials import get_credentials
from coverage_configs.src.sql import get_sql_query
from coverage_configs.src.rag_params import get_rag_params
from coverage_configs.src.prompts import get_prompt

# New modular system imports
from modules.core.pipeline import CoveragePipeline
from modules.core.loader import ConfigLoader
from modules.core.monitor import PipelinePerformanceMonitor
from modules.core.validator import PipelineValidator

print("Building Coverage Match System - Enhanced with Parallel Processing")
print("=" * 70)
print(f"Execution started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 70)

## 2. Configuration Setup

In [None]:
# Original databricks configuration (modify with your actual values)
databricks_dict = {
    'environment': 'production',
    'region': 'us-east-1',
    'workspace_url': 'your-workspace-url',
    'resource_group': 'your-resource-group',
    'subscription_id': 'your-subscription-id'
    # Add your actual databricks configuration here
}

# Load original environment configuration
print("Loading original environment configuration...")
env = DatabricksEnv(databricks_dict)

# Enhanced configuration with parallel processing and hooks
config_overrides = {
    'pipeline': {
        'parallel_processing': {
            'enabled': True,
            'max_workers': 6,  # Adjust based on your system capacity
            'batch_size': 50
        },
        'source_loading': {
            'enabled_sources': ['aip', 'atlas', 'snowflake'],
            'parallel_loading': True,
            'timeout_seconds': 300
        },
        'hooks': {
            'pre_processing_enabled': True,
            'post_processing_enabled': True,
            'pre_hook_path': 'custom_hooks/pre_processing.py',
            'post_hook_path': 'custom_hooks/post_processing.py'
        },
        'monitoring': {
            'performance_tracking': True,
            'detailed_reporting': True
        }
    }
}

# Load complete configuration with enhancements
print("Loading enhanced configuration...")
config = ConfigLoader.load_config(env.__dict__, config_overrides)

# Display configuration summary
print("\n📋 Configuration Summary:")
print(f"  Parallel Processing: {'✅ Enabled' if config['pipeline']['parallel_processing']['enabled'] else '❌ Disabled'}")
print(f"  Max Workers: {config['pipeline']['parallel_processing']['max_workers']}")
print(f"  Data Sources: {', '.join(config['pipeline']['source_loading']['enabled_sources'])}")
print(f"  Pre-processing Hook: {'✅ Enabled' if config['pipeline']['hooks']['pre_processing_enabled'] else '❌ Disabled'}")
print(f"  Post-processing Hook: {'✅ Enabled' if config['pipeline']['hooks']['post_processing_enabled'] else '❌ Disabled'}")
print(f"  Performance Monitoring: {'✅ Enabled' if config['pipeline']['monitoring']['performance_tracking'] else '❌ Disabled'}")

## 3. Initialize Pipeline Components

In [None]:
# Initialize performance monitor
print("\n🔧 Initializing pipeline components...")
monitor = PipelinePerformanceMonitor()
monitor.start_operation('pipeline_initialization')

# Initialize data validator
validator = PipelineValidator()

# Initialize the enhanced coverage pipeline
try:
    pipeline = CoveragePipeline(
        credentials_dict=env.credentials_dict,
        sql_queries=env.sql_queries,
        rag_params=env.rag_params,
        crypto_spark=env.crypto_spark,
        logger=env.logger,
        SQL_QUERY_CONFIGS=env.SQL_QUERY_CONFIGS,
        pre_hook_path=config['pipeline']['hooks']['pre_hook_path'] if config['pipeline']['hooks']['pre_processing_enabled'] else None,
        post_hook_path=config['pipeline']['hooks']['post_hook_path'] if config['pipeline']['hooks']['post_processing_enabled'] else None,
        max_workers=config['pipeline']['parallel_processing']['max_workers']
    )
    
    monitor.end_operation('pipeline_initialization')
    print("✅ Pipeline initialized successfully!")
    
    # Display pipeline statistics
    pipeline_stats = pipeline.get_pipeline_stats()
    print("\n📊 Pipeline Statistics:")
    for key, value in pipeline_stats.items():
        print(f"  {key}: {value}")
        
except Exception as e:
    monitor.end_operation('pipeline_initialization')
    print(f"❌ Pipeline initialization failed: {e}")
    raise

## 4. Define Building Coverage Conditions

In [None]:
# Building coverage conditions (same as original system)
bldg_conditions = [
    "BLDG in LOSSDESC",
    "BUILDING in LOSSDESC", 
    "STRUCTURE in LOSSDESC",
    "ROOF in LOSSDESC",
    "WALL in LOSSDESC",
    "FOUNDATION in LOSSDESC",
    "FLOOR in LOSSDESC",
    "CEILING in LOSSDESC"
]

print(f"\n🏗️ Building Coverage Conditions ({len(bldg_conditions)} rules):")
for i, condition in enumerate(bldg_conditions, 1):
    print(f"  {i}. {condition}")

# Additional processing parameters
processing_params = {
    'confidence_threshold': 0.7,
    'batch_processing': True,
    'quality_validation': True,
    'performance_optimization': True
}

print(f"\n⚙️ Processing Parameters:")
for param, value in processing_params.items():
    print(f"  {param}: {value}")

## 5. Execute Enhanced Pipeline

In [None]:
print("\n🚀 Starting enhanced building coverage match processing...")
print("=" * 70)

# Start full pipeline timing
monitor.start_operation('full_pipeline_execution')
overall_start_time = time.time()

try:
    # Execute the enhanced pipeline
    final_df = pipeline.run_pipeline(bldg_conditions)
    
    # End timing
    monitor.end_operation('full_pipeline_execution')
    overall_end_time = time.time()
    total_execution_time = overall_end_time - overall_start_time
    
    print("\n" + "=" * 70)
    print("🎉 PIPELINE EXECUTION COMPLETED SUCCESSFULLY!")
    print("=" * 70)
    
    # Display results summary
    if not final_df.empty:
        print(f"\n📈 Results Summary:")
        print(f"  Total Claims Processed: {len(final_df):,}")
        print(f"  Dataset Shape: {final_df.shape}")
        print(f"  Execution Time: {total_execution_time:.2f} seconds")
        print(f"  Processing Rate: {len(final_df) / total_execution_time:.1f} claims/second")
        
        # Display column information
        print(f"\n📋 Output Columns ({len(final_df.columns)}):")
        for i, col in enumerate(final_df.columns, 1):
            print(f"  {i:2d}. {col}")
        
        # Display sample results
        print(f"\n🔍 Sample Results (first 5 rows):")
        display_columns = ['CLAIMNO', 'prediction', 'confidence'] if all(col in final_df.columns for col in ['CLAIMNO', 'prediction', 'confidence']) else final_df.columns[:5]
        print(final_df[display_columns].head().to_string(index=False))
        
        # Show prediction distribution if available
        if 'prediction' in final_df.columns:
            print(f"\n📊 Prediction Distribution:")
            pred_counts = final_df['prediction'].value_counts()
            for pred, count in pred_counts.items():
                percentage = (count / len(final_df)) * 100
                print(f"  {pred}: {count:,} ({percentage:.1f}%)")
        
        # Show quality flags if available
        if 'final_quality_flag' in final_df.columns:
            print(f"\n🏷️ Quality Distribution:")
            quality_counts = final_df['final_quality_flag'].value_counts()
            for quality, count in quality_counts.items():
                percentage = (count / len(final_df)) * 100
                print(f"  {quality}: {count:,} ({percentage:.1f}%)")
    else:
        print("⚠️ No results generated - check data sources and configuration")
        
except Exception as e:
    monitor.end_operation('full_pipeline_execution')
    print(f"\n❌ Pipeline execution failed: {e}")
    print("\n🔍 Error Details:")
    import traceback
    traceback.print_exc()
    
    # Set empty dataframe for error handling
    final_df = pd.DataFrame()

## 6. Performance Analysis and Monitoring

In [None]:
print("\n📊 PERFORMANCE ANALYSIS")
print("=" * 50)

# Display detailed performance report
monitor.print_pipeline_report()

# Get comprehensive performance statistics
performance_summary = monitor.get_pipeline_summary()

print(f"\n🎯 Key Performance Metrics:")
if performance_summary['total_time'] > 0:
    print(f"  Total Execution Time: {performance_summary['total_time']:.2f} seconds")
    print(f"  Claims Processing Rate: {performance_summary['processing_rate_claims_per_second']:.2f} claims/second")
    print(f"  Average Time per Claim: {performance_summary['average_time_per_claim']:.3f} seconds")
    
    # Calculate efficiency metrics
    if len(final_df) > 0:
        efficiency_score = min(100, (performance_summary['processing_rate_claims_per_second'] / 2.0) * 100)
        print(f"  Processing Efficiency: {efficiency_score:.1f}%")
        
        # Memory efficiency (if available)
        try:
            import psutil
            memory_info = psutil.virtual_memory()
            print(f"  Memory Usage: {memory_info.percent:.1f}%")
            print(f"  Available Memory: {memory_info.available / (1024**3):.1f} GB")
        except ImportError:
            print(f"  Memory Monitoring: Not available (psutil not installed)")

# Performance comparison with theoretical sequential processing
if len(final_df) > 0 and performance_summary['total_time'] > 0:
    estimated_sequential_time = len(final_df) * 0.5  # Assume 0.5 seconds per claim for sequential
    time_saved = estimated_sequential_time - performance_summary['total_time']
    improvement_percent = (time_saved / estimated_sequential_time) * 100
    
    print(f"\n⚡ Parallel Processing Benefits:")
    print(f"  Estimated Sequential Time: {estimated_sequential_time:.2f} seconds")
    print(f"  Actual Parallel Time: {performance_summary['total_time']:.2f} seconds")
    print(f"  Time Saved: {time_saved:.2f} seconds ({improvement_percent:.1f}% improvement)")

## 7. Data Quality Validation

In [None]:
if not final_df.empty:
    print("\n🔍 DATA QUALITY VALIDATION")
    print("=" * 40)
    
    # Validate output data quality
    validation_results = validator.validate_output_data(final_df)
    
    print(f"\n✅ Validation Status: {'PASSED' if validation_results['is_valid'] else 'FAILED'}")
    
    if validation_results['errors']:
        print(f"\n❌ Validation Errors:")
        for error in validation_results['errors']:
            print(f"  • {error}")
    
    if validation_results['warnings']:
        print(f"\n⚠️ Validation Warnings:")
        for warning in validation_results['warnings']:
            print(f"  • {warning}")
    
    # Display validation statistics
    if validation_results['statistics']:
        print(f"\n📈 Validation Statistics:")
        for stat_name, stat_value in validation_results['statistics'].items():
            print(f"  {stat_name}: {stat_value}")
    
    # Additional data quality checks
    print(f"\n🔬 Additional Quality Checks:")
    
    # Check for missing values in critical columns
    critical_columns = ['CLAIMNO', 'prediction', 'confidence']
    for col in critical_columns:
        if col in final_df.columns:
            missing_count = final_df[col].isnull().sum()
            missing_percent = (missing_count / len(final_df)) * 100
            status = "✅" if missing_count == 0 else "⚠️" if missing_percent < 5 else "❌"
            print(f"  {status} {col}: {missing_count} missing ({missing_percent:.1f}%)")
    
    # Check confidence score distribution
    if 'confidence' in final_df.columns:
        high_conf = (final_df['confidence'] >= 0.8).sum()
        medium_conf = ((final_df['confidence'] >= 0.6) & (final_df['confidence'] < 0.8)).sum()
        low_conf = (final_df['confidence'] < 0.6).sum()
        
        print(f"\n🎯 Confidence Distribution:")
        print(f"  High Confidence (≥0.8): {high_conf:,} ({(high_conf/len(final_df)*100):.1f}%)")
        print(f"  Medium Confidence (0.6-0.8): {medium_conf:,} ({(medium_conf/len(final_df)*100):.1f}%)")
        print(f"  Low Confidence (<0.6): {low_conf:,} ({(low_conf/len(final_df)*100):.1f}%)")

else:
    print("\n⚠️ No data available for quality validation")

## 8. Save Results

In [None]:
if not final_df.empty:
    print("\n💾 SAVING RESULTS")
    print("=" * 30)
    
    monitor.start_operation('data_saving')
    
    try:
        # Save to multiple destinations using the multi-writer
        save_config = {
            'table': 'building_coverage_predictions_enhanced',
            'schema': 'dbo',
            'write_mode': 'append',
            'output_dir': 'output',
            'filename': f'building_coverage_predictions_{datetime.now().strftime("%Y%m%d_%H%M%S")}'
        }
        
        # Choose destinations based on configuration
        destinations = ['local']  # Always save locally
        
        # Add database destinations if configured
        if env.credentials_dict.get('sql_warehouse'):
            destinations.append('sql_warehouse')
        
        # Save results
        save_results = pipeline.storage_writer.save_data_parallel(
            final_df,
            destinations=destinations,
            max_workers=2,
            write_config=save_config
        )
        
        monitor.end_operation('data_saving')
        
        print(f"\n📁 Save Results:")
        for destination, result in save_results.items():
            if destination != '_timing_info':
                status = "✅" if "Success" in str(result) else "❌"
                print(f"  {status} {destination}: {result}")
        
        # Display timing information
        if '_timing_info' in save_results:
            timing_info = save_results['_timing_info']
            print(f"\n⏱️ Save Performance:")
            print(f"  Total Save Time: {timing_info['total_time']:.2f} seconds")
            print(f"  Records Saved: {timing_info['records_written']:,}")
            print(f"  Save Rate: {timing_info['records_written'] / timing_info['total_time']:.0f} records/second")
        
    except Exception as e:
        monitor.end_operation('data_saving')
        print(f"❌ Error saving results: {e}")
        
        # Fallback: save to local CSV
        try:
            fallback_filename = f"output/building_coverage_fallback_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
            final_df.to_csv(fallback_filename, index=False)
            print(f"💾 Fallback save successful: {fallback_filename}")
        except Exception as fallback_error:
            print(f"❌ Fallback save also failed: {fallback_error}")

else:
    print("\n⚠️ No results to save")

## 9. Execution Summary and Final Report

In [None]:
print("\n" + "=" * 70)
print("📋 FINAL EXECUTION SUMMARY")
print("=" * 70)

# Overall execution status
execution_successful = not final_df.empty
print(f"\n🎯 Execution Status: {'✅ SUCCESSFUL' if execution_successful else '❌ FAILED'}")
print(f"⏰ Completion Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

if execution_successful:
    # Summary statistics
    print(f"\n📊 Summary Statistics:")
    print(f"  ✅ Total Claims Processed: {len(final_df):,}")
    print(f"  ⚡ Processing Mode: {'Parallel' if config['pipeline']['parallel_processing']['enabled'] else 'Sequential'}")
    print(f"  🔧 Workers Used: {config['pipeline']['parallel_processing']['max_workers']}")
    print(f"  📁 Data Sources: {len(config['pipeline']['source_loading']['enabled_sources'])}")
    print(f"  🪝 Custom Hooks: {'Pre & Post' if config['pipeline']['hooks']['pre_processing_enabled'] and config['pipeline']['hooks']['post_processing_enabled'] else 'None'}")
    
    # Performance highlights
    final_summary = monitor.get_pipeline_summary()
    if final_summary['total_time'] > 0:
        print(f"\n⚡ Performance Highlights:")
        print(f"  🚀 Total Execution Time: {final_summary['total_time']:.2f} seconds")
        print(f"  📈 Processing Rate: {final_summary['processing_rate_claims_per_second']:.2f} claims/second")
        print(f"  🎯 Average per Claim: {final_summary['average_time_per_claim']:.3f} seconds")
    
    # Quality assessment
    if 'final_quality_flag' in final_df.columns:
        good_quality = (final_df['final_quality_flag'] == 'GOOD').sum()
        quality_percentage = (good_quality / len(final_df)) * 100
        print(f"\n🏷️ Quality Assessment:")
        print(f"  ✅ Good Quality: {good_quality:,} ({quality_percentage:.1f}%)")
        print(f"  ⚠️  Needs Review: {len(final_df) - good_quality:,} ({100 - quality_percentage:.1f}%)")
    
    # Confidence assessment
    if 'confidence' in final_df.columns:
        avg_confidence = final_df['confidence'].mean()
        high_confidence = (final_df['confidence'] >= 0.8).sum()
        print(f"\n🎯 Confidence Assessment:")
        print(f"  📊 Average Confidence: {avg_confidence:.1%}")
        print(f"  🔥 High Confidence (≥80%): {high_confidence:,} ({(high_confidence/len(final_df)*100):.1f}%)")

# System recommendations
print(f"\n💡 System Recommendations:")
if execution_successful:
    if final_summary.get('processing_rate_claims_per_second', 0) < 1.0:
        print(f"  🔧 Consider increasing max_workers for better performance")
    if 'confidence' in final_df.columns and final_df['confidence'].mean() < 0.7:
        print(f"  📚 Consider tuning RAG parameters for higher confidence")
    if len(final_df) > 10000:
        print(f"  💾 Consider implementing batch processing for large datasets")
    print(f"  ✅ System is performing well - continue with current configuration")
else:
    print(f"  🔍 Check data source connectivity and configuration")
    print(f"  📋 Review error logs above for specific issues")
    print(f"  🛠️ Consider running in debug mode for detailed troubleshooting")

# Final performance report (detailed)
print(f"\n" + "=" * 50)
print(f"📈 DETAILED PERFORMANCE REPORT")
print(f"=" * 50)
monitor.print_pipeline_report()

print(f"\n" + "=" * 70)
print(f"🎉 BUILDING COVERAGE MATCH SYSTEM EXECUTION COMPLETE")
print(f"=" * 70)