# Healthcare Data Quality Assessment Framework
## Based on Kahn's Framework for FHIR Data

Welcome to the Healthcare Data Quality Assessment Framework demonstration! This notebook showcases a comprehensive approach to assessing healthcare data quality using the Kahn framework's three core dimensions:

### 🎯 **The Kahn Framework Dimensions**

1. **Completeness** - Are data values present?
   - Missing required fields
   - Null/empty values in critical fields
   - Incomplete record structures

2. **Conformance** - Do data values adhere to format and domain constraints?
   - Data type validation
   - Format pattern matching
   - Domain value constraints
   - Range validation

3. **Plausibility** - Are data values believable?
   - Clinical logic validation
   - Temporal consistency
   - Cross-field dependencies
   - Statistical outlier detection

### 🤖 **AI-Powered Approach**

This framework combines:
- **Rule-based validation** for known data quality issues
- **Machine learning anomaly detection** for unknown patterns
- **Comprehensive reporting** with scorecards and visualizations

### 📋 **What You'll Learn**

- How to generate synthetic FHIR-like healthcare data
- Setting up systematic data quality rules
- Training AI models for anomaly detection
- Creating comprehensive quality reports
- Identifying and fixing data quality issues

Let's dive in! 🚀

## 1. Import Required Libraries and Dependencies

First, let's import all the necessary libraries for our healthcare data quality assessment framework.

In [None]:
# Core data manipulation and analysis
import pandas as pd
import numpy as np
import json
import warnings
from datetime import datetime, timedelta
from pathlib import Path
import sys
import os

# Add our framework to the path
sys.path.append('../src')

# Machine Learning libraries
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import joblib

# Deep Learning (TensorFlow/Keras)
try:
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
    print("✓ TensorFlow imported successfully")
except ImportError:
    print("⚠️ TensorFlow not available - autoencoder features will be limited")

# Data visualization
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Our Healthcare Data Quality Framework
try:
    from healthcare_dq_framework.core.framework import DataQualityFramework, create_healthcare_framework
    from healthcare_dq_framework.core.dimensions import QualityDimensionType
    from healthcare_dq_framework.data.synthetic_generator import SyntheticFHIRDataGenerator
    from healthcare_dq_framework.validators.rule_based import RuleBasedValidator, create_healthcare_validator
    from healthcare_dq_framework.validators.ml_based import MLAnomalyDetector
    from healthcare_dq_framework.reporting.scorecard import DataQualityScorecard
    print("✓ Healthcare Data Quality Framework imported successfully")
except ImportError as e:
    print(f"⚠️ Framework import error: {e}")
    print("Note: We'll define components inline for this demo")

# Configure display options
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 100)

# Plotting configuration
plt.style.use('default')
sns.set_palette("husl")
warnings.filterwarnings('ignore')

print("🎉 All libraries loaded successfully!")
print(f"Python version: {sys.version}")
print(f"Pandas version: {pd.__version__}")
print(f"NumPy version: {np.__version__}")

: 

## 2. Generate Synthetic Healthcare FHIR Data

Now let's create synthetic healthcare data that resembles FHIR resources. We'll intentionally introduce various data quality issues to test our framework's detection capabilities.

### 🏥 **What We'll Generate:**
- **Patient demographics** (age, gender, contact info)
- **Encounters** (visits, admissions, procedures)
- **Vital signs** (temperature, blood pressure, heart rate)
- **Lab results** (blood work, chemistry panels)
- **Conditions** (diagnoses with ICD-10 codes)

### 🚨 **Intentional Quality Issues:**
- Missing required fields
- Invalid formats (IDs, dates, codes)
- Impossible values (negative ages, extreme vitals)
- Clinical inconsistencies (pregnancy in males)
- Statistical outliers

In [None]:
# Create synthetic data generator (inline definition for this demo)
from faker import Faker
import random

class SimpleSyntheticDataGenerator:
    """Simplified synthetic healthcare data generator for demo purposes"""
    
    def __init__(self, seed=42):
        self.fake = Faker()
        Faker.seed(seed)
        random.seed(seed)
        np.random.seed(seed)
        
        # Medical reference data
        self.icd10_codes = [
            'I10', 'E11.9', 'Z00.00', 'I25.10', 'J44.1', 'N18.6', 'I50.9',
            'F41.9', 'M79.3', 'K21.9', 'J45.9', 'E78.5', 'I48.91', 'G93.1',
            'O80.1', 'O21.0'  # Added pregnancy codes for testing
        ]
        
        self.encounter_types = ['inpatient', 'outpatient', 'emergency', 'observation']
        self.genders = ['M', 'F']
        
    def generate_patients(self, n=100, introduce_issues=True):
        """Generate patient data with optional quality issues"""
        patients = []
        
        for i in range(n):
            birth_date = self.fake.date_of_birth(minimum_age=0, maximum_age=100)
            age = (datetime.now().date() - birth_date).days // 365
            
            patient = {
                'patient_id': f"PAT{i+1:06d}",
                'first_name': self.fake.first_name(),
                'last_name': self.fake.last_name(),
                'birth_date': birth_date.strftime('%Y-%m-%d'),
                'age': age,
                'gender': random.choice(self.genders),
                'phone': self.fake.phone_number(),
                'email': self.fake.email()
            }
            
            # Introduce quality issues (10% of records)
            if introduce_issues and random.random() < 0.1:
                issue_type = random.choice(['missing', 'format', 'logic'])
                
                if issue_type == 'missing':
                    patient['first_name'] = None  # Missing required field
                elif issue_type == 'format':
                    patient['patient_id'] = f"INVALID{i}"  # Wrong format
                elif issue_type == 'logic':
                    patient['age'] = random.randint(150, 200)  # Impossible age
            
            patients.append(patient)
        
        return pd.DataFrame(patients)
    
    def generate_encounters(self, patients_df, encounters_per_patient=(1, 3), introduce_issues=True):
        """Generate encounter data"""
        encounters = []
        encounter_counter = 1
        
        for _, patient in patients_df.iterrows():
            num_encounters = random.randint(*encounters_per_patient)
            
            for j in range(num_encounters):
                admission_date = self.fake.date_between(start_date='-2y', end_date='today')
                
                encounter = {
                    'encounter_id': f"ENC{encounter_counter:08d}",
                    'patient_id': patient['patient_id'],
                    'encounter_type': random.choice(self.encounter_types),
                    'admission_date': admission_date.strftime('%Y-%m-%d'),
                    'discharge_date': (admission_date + timedelta(days=random.randint(0, 30))).strftime('%Y-%m-%d'),
                    'primary_diagnosis': random.choice(self.icd10_codes),
                    'attending_physician': self.fake.name()
                }
                
                # Introduce quality issues
                if introduce_issues and random.random() < 0.1:
                    issue_type = random.choice(['missing', 'format', 'temporal'])
                    
                    if issue_type == 'missing':
                        encounter['patient_id'] = None
                    elif issue_type == 'format':
                        encounter['encounter_id'] = f"WRONG{j}"
                    elif issue_type == 'temporal':
                        # Discharge before admission
                        encounter['discharge_date'] = (admission_date - timedelta(days=1)).strftime('%Y-%m-%d')
                
                encounters.append(encounter)
                encounter_counter += 1
        
        return pd.DataFrame(encounters)
    
    def generate_vital_signs(self, encounters_df, introduce_issues=True):
        """Generate vital signs data"""
        vitals = []
        
        for _, encounter in encounters_df.iterrows():
            vital = {
                'vital_id': f"VIT{len(vitals)+1:08d}",
                'encounter_id': encounter['encounter_id'],
                'patient_id': encounter['patient_id'],
                'temperature_c': round(random.uniform(36.0, 38.5), 1),
                'heart_rate': random.randint(60, 100),
                'systolic_bp': random.randint(90, 140),
                'diastolic_bp': random.randint(60, 90),
                'weight_kg': round(random.uniform(40, 120), 1),
                'height_cm': round(random.uniform(150, 200), 1)
            }
            
            # Introduce quality issues
            if introduce_issues and random.random() < 0.1:
                issue_type = random.choice(['outlier', 'impossible', 'missing'])
                
                if issue_type == 'outlier':
                    vital['heart_rate'] = random.randint(300, 400)  # Statistical outlier
                elif issue_type == 'impossible':
                    vital['temperature_c'] = random.uniform(50, 60)  # Impossible temperature
                elif issue_type == 'missing':
                    vital['heart_rate'] = None
            
            vitals.append(vital)
        
        return pd.DataFrame(vitals)

# Initialize generator and create sample data
print("🔧 Initializing synthetic data generator...")
generator = SimpleSyntheticDataGenerator(seed=42)

print("📊 Generating synthetic healthcare data...")
patients_df = generator.generate_patients(n=500, introduce_issues=True)
encounters_df = generator.generate_encounters(patients_df, introduce_issues=True)
vitals_df = generator.generate_vital_signs(encounters_df, introduce_issues=True)

print(f"✅ Generated data:")
print(f"   - {len(patients_df)} patients")
print(f"   - {len(encounters_df)} encounters") 
print(f"   - {len(vitals_df)} vital sign records")

# Display sample data
print("\n📋 Sample Patient Data:")
print(patients_df.head())

print("\n📋 Sample Encounter Data:")
print(encounters_df.head())

print("\n📋 Sample Vital Signs Data:")
print(vitals_df.head())

In [None]:
# Create a merged dataset for comprehensive quality assessment
print("🔄 Creating merged dataset for analysis...")

# Merge encounters with patients
merged_data = encounters_df.merge(
    patients_df[['patient_id', 'age', 'gender', 'birth_date']], 
    on='patient_id', 
    how='left'
)

# Merge with vital signs (take latest vitals per encounter)
latest_vitals = vitals_df.groupby('encounter_id').last().reset_index()
merged_data = merged_data.merge(
    latest_vitals[['encounter_id', 'temperature_c', 'heart_rate', 'systolic_bp', 'diastolic_bp', 'weight_kg', 'height_cm']], 
    on='encounter_id', 
    how='left'
)

print(f"📊 Merged dataset shape: {merged_data.shape}")
print(f"📋 Columns: {list(merged_data.columns)}")

# Display data quality overview
print("\n🔍 Data Quality Overview:")
print("Missing values per column:")
print(merged_data.isnull().sum())

print("\n📈 Basic statistics:")
print(merged_data.describe())

## 3. Define Data Quality Rules and Metrics

Let's establish comprehensive data quality rules that our framework will use to assess the healthcare data. These rules cover all three Kahn dimensions.

### 📏 **Rule Categories:**

**Completeness Rules:**
- Required fields must be present
- Critical healthcare fields cannot be null

**Conformance Rules:**
- Patient IDs must follow format: PAT######
- Encounter IDs must follow format: ENC########
- ICD-10 codes must match standard pattern
- Numeric fields must be within valid ranges

**Plausibility Rules:**
- Ages must be between 0-150 years
- Vital signs must be within physiological ranges
- No pregnancy diagnoses for male patients
- Discharge dates must be after admission dates

In [None]:
# Define comprehensive data quality rules
import re

class HealthcareDataQualityRules:
    """Comprehensive healthcare data quality rules"""
    
    @staticmethod
    def check_required_fields(data, required_fields):
        """Check for missing required fields"""
        issues = []
        missing_fields = set(required_fields) - set(data.columns)
        
        for field in missing_fields:
            issues.append({
                'rule': 'required_field_missing',
                'severity': 'critical',
                'field': field,
                'description': f'Required field {field} is missing',
                'count': 1
            })
        
        # Check for null values in required fields
        for field in required_fields:
            if field in data.columns:
                null_count = data[field].isnull().sum()
                if null_count > 0:
                    issues.append({
                        'rule': 'required_field_null',
                        'severity': 'major',
                        'field': field,
                        'description': f'Required field {field} has {null_count} null values',
                        'count': null_count,
                        'percentage': (null_count / len(data)) * 100
                    })
        
        return issues
    
    @staticmethod
    def check_format_patterns(data):
        """Check format patterns for IDs and codes"""
        issues = []
        
        # Patient ID format check
        if 'patient_id' in data.columns:
            pattern = r'^PAT\d{6}$'
            invalid_ids = data[~data['patient_id'].str.match(pattern, na=False)]
            if len(invalid_ids) > 0:
                issues.append({
                    'rule': 'patient_id_format_violation',
                    'severity': 'major',
                    'field': 'patient_id',
                    'description': f'Patient ID format violation (expected: PAT######)',
                    'count': len(invalid_ids),
                    'percentage': (len(invalid_ids) / len(data)) * 100,
                    'sample_values': invalid_ids['patient_id'].dropna().head(3).tolist()
                })
        
        # Encounter ID format check
        if 'encounter_id' in data.columns:
            pattern = r'^ENC\d{8}$'
            invalid_ids = data[~data['encounter_id'].str.match(pattern, na=False)]
            if len(invalid_ids) > 0:
                issues.append({
                    'rule': 'encounter_id_format_violation',
                    'severity': 'major',
                    'field': 'encounter_id',
                    'description': f'Encounter ID format violation (expected: ENC########)',
                    'count': len(invalid_ids),
                    'percentage': (len(invalid_ids) / len(data)) * 100,
                    'sample_values': invalid_ids['encounter_id'].dropna().head(3).tolist()
                })
        
        # ICD-10 code format check
        if 'primary_diagnosis' in data.columns:
            pattern = r'^[A-Z]\d{2}(\.\d{1,2})?$'
            invalid_codes = data[~data['primary_diagnosis'].str.match(pattern, na=False)]
            if len(invalid_codes) > 0:
                issues.append({
                    'rule': 'icd10_format_violation',
                    'severity': 'major',
                    'field': 'primary_diagnosis',
                    'description': f'ICD-10 code format violation',
                    'count': len(invalid_codes),
                    'percentage': (len(invalid_codes) / len(data)) * 100,
                    'sample_values': invalid_codes['primary_diagnosis'].dropna().head(3).tolist()
                })
        
        return issues
    
    @staticmethod
    def check_range_constraints(data):
        """Check range constraints for numeric fields"""
        issues = []
        
        # Define valid ranges for healthcare data
        ranges = {
            'age': (0, 150),
            'temperature_c': (30.0, 45.0),
            'heart_rate': (30, 200),
            'systolic_bp': (50, 300),
            'diastolic_bp': (30, 200),
            'weight_kg': (0, 500),
            'height_cm': (0, 300)
        }
        
        for field, (min_val, max_val) in ranges.items():
            if field in data.columns:
                out_of_range = data[(data[field] < min_val) | (data[field] > max_val)]
                if len(out_of_range) > 0:
                    issues.append({
                        'rule': f'{field}_range_violation',
                        'severity': 'major',
                        'field': field,
                        'description': f'{field} values outside valid range ({min_val}-{max_val})',
                        'count': len(out_of_range),
                        'percentage': (len(out_of_range) / len(data)) * 100,
                        'expected_range': f'{min_val}-{max_val}',
                        'sample_values': out_of_range[field].dropna().head(3).tolist()
                    })
        
        return issues
    
    @staticmethod
    def check_clinical_logic(data):
        """Check clinical logic rules"""
        issues = []
        
        # Check for pregnancy diagnosis in male patients
        if 'gender' in data.columns and 'primary_diagnosis' in data.columns:
            pregnancy_codes = ['O80.1', 'O21.0']  # Pregnancy-related ICD-10 codes
            male_pregnancy = data[
                (data['gender'] == 'M') & 
                (data['primary_diagnosis'].isin(pregnancy_codes))
            ]
            
            if len(male_pregnancy) > 0:
                issues.append({
                    'rule': 'pregnancy_in_male',
                    'severity': 'critical',
                    'field': 'primary_diagnosis',
                    'description': 'Pregnancy diagnosis in male patient',
                    'count': len(male_pregnancy),
                    'percentage': (len(male_pregnancy) / len(data)) * 100
                })
        
        return issues
    
    @staticmethod
    def check_temporal_consistency(data):
        """Check temporal consistency"""
        issues = []
        
        # Check discharge before admission
        if 'admission_date' in data.columns and 'discharge_date' in data.columns:
            try:
                admission_dates = pd.to_datetime(data['admission_date'], errors='coerce')
                discharge_dates = pd.to_datetime(data['discharge_date'], errors='coerce')
                
                invalid_dates = data[discharge_dates < admission_dates]
                if len(invalid_dates) > 0:
                    issues.append({
                        'rule': 'discharge_before_admission',
                        'severity': 'critical',
                        'field': 'discharge_date',
                        'description': 'Discharge date before admission date',
                        'count': len(invalid_dates),
                        'percentage': (len(invalid_dates) / len(data)) * 100
                    })
            except Exception as e:
                print(f"Error checking temporal consistency: {e}")
        
        # Check age vs birth date consistency
        if 'age' in data.columns and 'birth_date' in data.columns:
            try:
                birth_dates = pd.to_datetime(data['birth_date'], errors='coerce')
                calculated_ages = (datetime.now() - birth_dates).dt.days // 365
                
                age_mismatches = data[abs(data['age'] - calculated_ages) > 2]  # Allow 2-year tolerance
                if len(age_mismatches) > 0:
                    issues.append({
                        'rule': 'age_birth_date_mismatch',
                        'severity': 'major',
                        'field': 'age',
                        'description': 'Age does not match birth date',
                        'count': len(age_mismatches),
                        'percentage': (len(age_mismatches) / len(data)) * 100
                    })
            except Exception as e:
                print(f"Error checking age consistency: {e}")
        
        return issues

# Initialize rule checker
rules = HealthcareDataQualityRules()

print("✅ Healthcare data quality rules defined!")
print("\n📋 Available rule categories:")
print("   - Required field validation")
print("   - Format pattern validation") 
print("   - Range constraint validation")
print("   - Clinical logic validation")
print("   - Temporal consistency validation")

## 4. Implement Plausibility Checks

Plausibility assessment determines whether data values are believable and reasonable within their clinical context. Let's run our plausibility rules on the synthetic data.

In [None]:
# Run plausibility checks on our synthetic data
print("🔍 Running Plausibility Checks on Healthcare Data...")
print("=" * 60)

# Check range constraints (plausibility)
print("\n📊 Range Constraint Violations:")
range_issues = rules.check_range_constraints(merged_data)

if range_issues:
    for issue in range_issues:
        print(f"❌ {issue['description']}")
        print(f"   Field: {issue['field']}")
        print(f"   Violations: {issue['count']} ({issue['percentage']:.1f}%)")
        print(f"   Expected Range: {issue['expected_range']}")
        print(f"   Sample Invalid Values: {issue['sample_values']}")
        print()
else:
    print("✅ No range constraint violations found!")

# Check clinical logic violations
print("\n🏥 Clinical Logic Violations:")
clinical_issues = rules.check_clinical_logic(merged_data)

if clinical_issues:
    for issue in clinical_issues:
        print(f"❌ {issue['description']}")
        print(f"   Rule: {issue['rule']}")
        print(f"   Violations: {issue['count']} ({issue['percentage']:.1f}%)")
        print(f"   Severity: {issue['severity']}")
        print()
else:
    print("✅ No clinical logic violations found!")

# Check temporal consistency
print("\n⏰ Temporal Consistency Violations:")
temporal_issues = rules.check_temporal_consistency(merged_data)

if temporal_issues:
    for issue in temporal_issues:
        print(f"❌ {issue['description']}")
        print(f"   Rule: {issue['rule']}")
        print(f"   Violations: {issue['count']} ({issue['percentage']:.1f}%)")
        print(f"   Severity: {issue['severity']}")
        print()
else:
    print("✅ No temporal consistency violations found!")

# Statistical outlier detection
print("\n📈 Statistical Outlier Detection:")
numeric_columns = ['age', 'temperature_c', 'heart_rate', 'systolic_bp', 'diastolic_bp', 'weight_kg', 'height_cm']
outlier_summary = {}

for col in numeric_columns:
    if col in merged_data.columns:
        # Z-score method (threshold = 3)
        z_scores = np.abs((merged_data[col] - merged_data[col].mean()) / merged_data[col].std())
        outliers = merged_data[z_scores > 3]
        
        if len(outliers) > 0:
            outlier_summary[col] = {
                'count': len(outliers),
                'percentage': (len(outliers) / len(merged_data)) * 100,
                'sample_values': outliers[col].head(3).tolist(),
                'z_scores': z_scores[z_scores > 3].head(3).tolist()
            }

if outlier_summary:
    for col, stats in outlier_summary.items():
        print(f"❗ {col}: {stats['count']} outliers ({stats['percentage']:.1f}%)")
        print(f"   Sample values: {stats['sample_values']}")
        print(f"   Z-scores: {[f'{z:.2f}' for z in stats['z_scores']]}")
        print()
else:
    print("✅ No statistical outliers detected!")

# Summary
plausibility_score = 100
total_violations = sum(len(issues) for issues in [range_issues, clinical_issues, temporal_issues])
if total_violations > 0:
    plausibility_score = max(0, 100 - (total_violations / len(merged_data)) * 100)

print(f"\n🎯 Plausibility Assessment Summary:")
print(f"   Total Records Assessed: {len(merged_data)}")
print(f"   Total Plausibility Violations: {total_violations}")
print(f"   Plausibility Score: {plausibility_score:.1f}/100")
print(f"   Assessment: {'Excellent' if plausibility_score >= 90 else 'Good' if plausibility_score >= 70 else 'Needs Improvement'}")

## 5. Implement Conformance Validation

Conformance assessment evaluates whether data adheres to format, type, and domain constraints. This includes checking ID formats, data types, and standardized code formats.

In [None]:
# Run conformance validation on our synthetic data
print("🔧 Running Conformance Validation on Healthcare Data...")
print("=" * 60)

# Check format patterns
print("\n📝 Format Pattern Violations:")
format_issues = rules.check_format_patterns(merged_data)

if format_issues:
    for issue in format_issues:
        print(f"❌ {issue['description']}")
        print(f"   Field: {issue['field']}")
        print(f"   Violations: {issue['count']} ({issue['percentage']:.1f}%)")
        print(f"   Sample Invalid Values: {issue['sample_values']}")
        print()
else:
    print("✅ No format pattern violations found!")

# Check data types
print("\n🔢 Data Type Validation:")
type_issues = []

expected_types = {
    'age': 'numeric',
    'temperature_c': 'numeric', 
    'heart_rate': 'numeric',
    'systolic_bp': 'numeric',
    'diastolic_bp': 'numeric',
    'weight_kg': 'numeric',
    'height_cm': 'numeric'
}

for field, expected_type in expected_types.items():
    if field in merged_data.columns:
        if expected_type == 'numeric':
            # Check if field can be converted to numeric
            non_numeric = pd.to_numeric(merged_data[field], errors='coerce').isna()
            non_numeric_count = non_numeric.sum() - merged_data[field].isna().sum()  # Exclude already null values
            
            if non_numeric_count > 0:
                type_issues.append({
                    'field': field,
                    'expected_type': expected_type,
                    'violations': non_numeric_count,
                    'percentage': (non_numeric_count / len(merged_data)) * 100
                })

if type_issues:
    for issue in type_issues:
        print(f"❌ {issue['field']}: Expected {issue['expected_type']}, found {issue['violations']} invalid values ({issue['percentage']:.1f}%)")
else:
    print("✅ All data types conform to expectations!")

# Check domain values
print("\n🏷️ Domain Value Validation:")
domain_issues = []

valid_domains = {
    'gender': ['M', 'F', 'O', 'U'],
    'encounter_type': ['inpatient', 'outpatient', 'emergency', 'observation']
}

for field, valid_values in valid_domains.items():
    if field in merged_data.columns:
        invalid_values = merged_data[~merged_data[field].isin(valid_values + [None])]
        
        if len(invalid_values) > 0:
            domain_issues.append({
                'field': field,
                'valid_values': valid_values,
                'violations': len(invalid_values),
                'percentage': (len(invalid_values) / len(merged_data)) * 100,
                'sample_invalid': invalid_values[field].unique()[:3].tolist()
            })

if domain_issues:
    for issue in domain_issues:
        print(f"❌ {issue['field']}: {issue['violations']} invalid domain values ({issue['percentage']:.1f}%)")
        print(f"   Valid values: {issue['valid_values']}")
        print(f"   Sample invalid: {issue['sample_invalid']}")
        print()
else:
    print("✅ All domain values conform to valid sets!")

# Check referential integrity
print("\n🔗 Referential Integrity Validation:")
ref_issues = []

# Check if all encounter patient_ids exist in patients
if 'patient_id' in merged_data.columns:
    patient_ids = set(patients_df['patient_id'].dropna())
    encounter_patient_ids = set(merged_data['patient_id'].dropna())
    
    invalid_refs = encounter_patient_ids - patient_ids
    if invalid_refs:
        ref_issues.append({
            'description': 'Encounters reference non-existent patients',
            'invalid_references': list(invalid_refs)[:5],  # Show first 5
            'count': len(invalid_refs)
        })

if ref_issues:
    for issue in ref_issues:
        print(f"❌ {issue['description']}")
        print(f"   Invalid references: {issue['count']}")
        print(f"   Sample invalid IDs: {issue['invalid_references']}")
        print()
else:
    print("✅ Referential integrity maintained!")

# Summary
conformance_score = 100
total_format_violations = sum(issue['count'] for issue in format_issues)
total_type_violations = sum(issue['violations'] for issue in type_issues)
total_domain_violations = sum(issue['violations'] for issue in domain_issues)
total_ref_violations = sum(issue['count'] for issue in ref_issues)

total_conformance_violations = total_format_violations + total_type_violations + total_domain_violations + total_ref_violations

if total_conformance_violations > 0:
    conformance_score = max(0, 100 - (total_conformance_violations / len(merged_data)) * 100)

print(f"\n🎯 Conformance Assessment Summary:")
print(f"   Total Records Assessed: {len(merged_data)}")
print(f"   Format Violations: {total_format_violations}")
print(f"   Type Violations: {total_type_violations}")
print(f"   Domain Violations: {total_domain_violations}")
print(f"   Reference Violations: {total_ref_violations}")
print(f"   Total Conformance Violations: {total_conformance_violations}")
print(f"   Conformance Score: {conformance_score:.1f}/100")
print(f"   Assessment: {'Excellent' if conformance_score >= 90 else 'Good' if conformance_score >= 70 else 'Needs Improvement'}")

## 6. Implement Completeness Assessment

Completeness assessment evaluates the presence of data values. We'll check for missing required fields, null values in critical columns, and overall data completeness rates.

In [None]:
# Run completeness assessment on our synthetic data
print("📋 Running Completeness Assessment on Healthcare Data...")
print("=" * 60)

# Define required fields for healthcare data
required_fields = ['patient_id', 'encounter_id', 'age', 'gender']
critical_fields = ['patient_id', 'encounter_id', 'age', 'gender', 'encounter_type', 'admission_date']

# Check required fields
print("\n✅ Required Field Validation:")
completeness_issues = rules.check_required_fields(merged_data, required_fields)

if completeness_issues:
    for issue in completeness_issues:
        print(f"❌ {issue['description']}")
        print(f"   Field: {issue['field']}")
        print(f"   Severity: {issue['severity']}")
        if 'percentage' in issue:
            print(f"   Missing: {issue['count']} records ({issue['percentage']:.1f}%)")
        print()
else:
    print("✅ All required fields are present and populated!")

# Detailed completeness analysis
print("\n📊 Detailed Completeness Analysis:")
completeness_stats = {}

for column in merged_data.columns:
    total_records = len(merged_data)
    null_count = merged_data[column].isnull().sum()
    empty_count = (merged_data[column] == '').sum() if merged_data[column].dtype == 'object' else 0
    missing_count = null_count + empty_count
    
    completeness_rate = ((total_records - missing_count) / total_records) * 100
    
    completeness_stats[column] = {
        'total_records': total_records,
        'missing_count': missing_count,
        'completeness_rate': completeness_rate,
        'is_critical': column in critical_fields
    }

# Display completeness statistics
print(f"{'Field':<20} {'Missing':<8} {'Rate':<8} {'Status':<10} {'Priority'}")
print("-" * 60)

for field, stats in completeness_stats.items():
    missing = stats['missing_count']
    rate = stats['completeness_rate']
    priority = 'Critical' if stats['is_critical'] else 'Standard'
    
    if rate >= 95:
        status = '✅ Excellent'
    elif rate >= 90:
        status = '🟡 Good'
    elif rate >= 80:
        status = '🟠 Fair'
    else:
        status = '❌ Poor'
    
    print(f"{field:<20} {missing:<8} {rate:<7.1f}% {status:<10} {priority}")

# Calculate overall completeness score
critical_completeness = [stats['completeness_rate'] for field, stats in completeness_stats.items() if stats['is_critical']]
standard_completeness = [stats['completeness_rate'] for field, stats in completeness_stats.items() if not stats['is_critical']]

overall_completeness = 0
if critical_completeness:
    critical_avg = np.mean(critical_completeness)
    standard_avg = np.mean(standard_completeness) if standard_completeness else 100
    
    # Weight critical fields more heavily (70% weight)
    overall_completeness = (critical_avg * 0.7) + (standard_avg * 0.3)

# Field-level completeness issues
print(f"\n🎯 Completeness Issues by Severity:")

critical_issues = [(field, stats) for field, stats in completeness_stats.items() 
                  if stats['is_critical'] and stats['completeness_rate'] < 95]
standard_issues = [(field, stats) for field, stats in completeness_stats.items() 
                  if not stats['is_critical'] and stats['completeness_rate'] < 90]

if critical_issues:
    print(f"\n❌ Critical Field Issues:")
    for field, stats in critical_issues:
        print(f"   {field}: {stats['missing_count']} missing ({100-stats['completeness_rate']:.1f}% incomplete)")

if standard_issues:
    print(f"\n🟡 Standard Field Issues:")
    for field, stats in standard_issues:
        print(f"   {field}: {stats['missing_count']} missing ({100-stats['completeness_rate']:.1f}% incomplete)")

if not critical_issues and not standard_issues:
    print("✅ No significant completeness issues found!")

# Completeness patterns analysis
print(f"\n📈 Completeness Patterns:")

# Records with multiple missing critical fields
critical_missing_counts = merged_data[critical_fields].isnull().sum(axis=1)
records_multiple_missing = (critical_missing_counts >= 2).sum()

if records_multiple_missing > 0:
    print(f"⚠️  {records_multiple_missing} records missing 2+ critical fields")
    
    # Show pattern of missing fields
    missing_patterns = merged_data[critical_fields].isnull().groupby(critical_missing_counts).sum()
    print("   Most common missing field combinations:")
    for missing_count, pattern in missing_patterns.iterrows():
        if missing_count >= 2:
            missing_fields = pattern[pattern > 0].index.tolist()
            print(f"   - {missing_count} fields missing: {', '.join(missing_fields)} ({pattern.sum()} records)")

# Summary
print(f"\n🎯 Completeness Assessment Summary:")
print(f"   Total Records: {len(merged_data)}")
print(f"   Total Fields: {len(merged_data.columns)}")
print(f"   Critical Fields: {len(critical_fields)}")
print(f"   Overall Completeness Score: {overall_completeness:.1f}/100")
print(f"   Critical Fields Avg Completeness: {np.mean(critical_completeness):.1f}%")
print(f"   Standard Fields Avg Completeness: {np.mean(standard_completeness):.1f}%")
print(f"   Assessment: {'Excellent' if overall_completeness >= 95 else 'Good' if overall_completeness >= 85 else 'Needs Improvement'}")

# Store completeness score for final assessment
completeness_score = overall_completeness

## 7. Prepare Data for Anomaly Detection

Now we'll prepare our healthcare data for machine learning-based anomaly detection. This involves feature engineering, encoding categorical variables, and creating numerical representations suitable for ML models.

In [None]:
# Prepare data for machine learning-based anomaly detection
print("🤖 Preparing Data for ML-Based Anomaly Detection...")
print("=" * 60)

# Create a copy of the data for ML processing
ml_data = merged_data.copy()

# Feature engineering and preprocessing
print("\n🔧 Feature Engineering:")

# 1. Handle datetime features
datetime_columns = ['admission_date', 'discharge_date', 'birth_date']
for col in datetime_columns:
    if col in ml_data.columns:
        ml_data[col] = pd.to_datetime(ml_data[col], errors='coerce')
        
        # Extract useful features
        ml_data[f'{col}_year'] = ml_data[col].dt.year
        ml_data[f'{col}_month'] = ml_data[col].dt.month
        ml_data[f'{col}_day'] = ml_data[col].dt.day
        ml_data[f'{col}_dayofweek'] = ml_data[col].dt.dayofweek
        
        print(f"✅ Extracted datetime features from {col}")

# 2. Calculate derived features
if 'admission_date' in ml_data.columns and 'discharge_date' in ml_data.columns:
    ml_data['length_of_stay'] = (ml_data['discharge_date'] - ml_data['admission_date']).dt.days
    print("✅ Calculated length of stay")

if 'birth_date' in ml_data.columns:
    ml_data['calculated_age'] = (datetime.now() - ml_data['birth_date']).dt.days // 365
    ml_data['age_birth_date_diff'] = abs(ml_data['age'] - ml_data['calculated_age'])
    print("✅ Calculated age consistency metrics")

# 3. Create BMI if height and weight available
if 'weight_kg' in ml_data.columns and 'height_cm' in ml_data.columns:
    ml_data['bmi'] = ml_data['weight_kg'] / ((ml_data['height_cm'] / 100) ** 2)
    print("✅ Calculated BMI")

# 4. Blood pressure metrics
if 'systolic_bp' in ml_data.columns and 'diastolic_bp' in ml_data.columns:
    ml_data['pulse_pressure'] = ml_data['systolic_bp'] - ml_data['diastolic_bp']
    ml_data['mean_arterial_pressure'] = ml_data['diastolic_bp'] + (ml_data['pulse_pressure'] / 3)
    print("✅ Calculated blood pressure metrics")

# Remove original datetime columns (keep derived features)
ml_data = ml_data.drop(columns=datetime_columns, errors='ignore')

# 5. Identify and prepare categorical and numerical columns
print(f"\n📊 Data Preprocessing:")

categorical_columns = []
numerical_columns = []

for col in ml_data.columns:
    if ml_data[col].dtype in ['object', 'category']:
        categorical_columns.append(col)
    elif ml_data[col].dtype in ['int64', 'float64']:
        numerical_columns.append(col)

print(f"Categorical columns: {len(categorical_columns)}")
print(f"Numerical columns: {len(numerical_columns)}")

# 6. Handle missing values
print(f"\n🔧 Handling Missing Values:")

# For numerical columns: fill with median
for col in numerical_columns:
    if ml_data[col].isnull().sum() > 0:
        median_val = ml_data[col].median()
        ml_data[col] = ml_data[col].fillna(median_val)
        print(f"   {col}: filled {ml_data[col].isnull().sum()} missing values with median ({median_val})")

# For categorical columns: fill with mode or 'Unknown'
for col in categorical_columns:
    if ml_data[col].isnull().sum() > 0:
        mode_val = ml_data[col].mode()
        fill_val = mode_val.iloc[0] if len(mode_val) > 0 else 'Unknown'
        ml_data[col] = ml_data[col].fillna(fill_val)
        print(f"   {col}: filled {ml_data[col].isnull().sum()} missing values with '{fill_val}'")

# 7. Encode categorical variables
print(f"\n🏷️ Encoding Categorical Variables:")

label_encoders = {}
for col in categorical_columns:
    if col in ml_data.columns:
        le = LabelEncoder()
        ml_data[f'{col}_encoded'] = le.fit_transform(ml_data[col].astype(str))
        label_encoders[col] = le
        print(f"   {col}: {len(le.classes_)} unique values encoded")

# Drop original categorical columns
ml_data = ml_data.drop(columns=categorical_columns, errors='ignore')

# 8. Feature scaling preparation
print(f"\n📏 Preparing for Feature Scaling:")

# Get final feature columns
feature_columns = [col for col in ml_data.columns if col not in ['patient_id', 'encounter_id', 'vital_id']]
feature_data = ml_data[feature_columns].copy()

print(f"Final feature set: {len(feature_columns)} features")
print(f"Feature data shape: {feature_data.shape}")

# Check for any remaining missing values
remaining_missing = feature_data.isnull().sum().sum()
if remaining_missing > 0:
    print(f"⚠️ Warning: {remaining_missing} missing values remain")
    # Fill any remaining missing values with 0
    feature_data = feature_data.fillna(0)
else:
    print("✅ No missing values in feature data")

# 9. Scale features for ML algorithms
print(f"\n⚖️ Scaling Features:")

scaler = StandardScaler()
scaled_features = scaler.fit_transform(feature_data)
scaled_feature_df = pd.DataFrame(scaled_features, columns=feature_columns, index=feature_data.index)

print(f"✅ Features scaled using StandardScaler")
print(f"Scaled feature statistics:")
print(f"   Mean: {scaled_features.mean():.6f}")
print(f"   Std: {scaled_features.std():.6f}")

# 10. Summary of preprocessing
print(f"\n🎯 Data Preparation Summary:")
print(f"   Original dataset shape: {merged_data.shape}")
print(f"   Processed dataset shape: {scaled_feature_df.shape}")
print(f"   Features created: {len(feature_columns)}")
print(f"   Categorical variables encoded: {len(categorical_columns)}")
print(f"   Records ready for ML: {len(scaled_feature_df)}")

# Display sample of processed data
print(f"\n📋 Sample of Processed Features:")
print(scaled_feature_df.head())

print(f"\n📊 Feature Correlation Matrix (top correlations):")
correlation_matrix = scaled_feature_df.corr()
# Get top correlations (excluding self-correlations)
correlations = []
for i in range(len(correlation_matrix.columns)):
    for j in range(i+1, len(correlation_matrix.columns)):
        corr_val = correlation_matrix.iloc[i, j]
        if abs(corr_val) > 0.5:  # Only show strong correlations
            correlations.append((correlation_matrix.columns[i], correlation_matrix.columns[j], corr_val))

correlations.sort(key=lambda x: abs(x[2]), reverse=True)
for feat1, feat2, corr in correlations[:5]:  # Top 5 correlations
    print(f"   {feat1} ↔ {feat2}: {corr:.3f}")

if not correlations:
    print("   No strong correlations (>0.5) found between features")

## 8. Train Isolation Forest Model for Anomaly Detection

Isolation Forest is an unsupervised machine learning algorithm that detects anomalies by isolating observations. It's particularly effective for detecting multivariate outliers in healthcare data.

In [None]:
# Train Isolation Forest for anomaly detection
print("🌲 Training Isolation Forest for Anomaly Detection...")
print("=" * 60)

# Split data for training and validation
print("\n📊 Splitting Data:")
X_train, X_test = train_test_split(scaled_feature_df, test_size=0.2, random_state=42)

print(f"Training set: {X_train.shape[0]} records")
print(f"Test set: {X_test.shape[0]} records")

# Initialize and train Isolation Forest
print(f"\n🎯 Training Isolation Forest Model:")

# Parameters for Isolation Forest
contamination_rate = 0.1  # Expected proportion of anomalies (10%)
n_estimators = 100  # Number of trees
random_state = 42

isolation_forest = IsolationForest(
    contamination=contamination_rate,
    n_estimators=n_estimators,
    random_state=random_state,
    n_jobs=-1  # Use all available cores
)

# Train the model
print(f"Parameters:")
print(f"   Contamination rate: {contamination_rate}")
print(f"   Number of estimators: {n_estimators}")
print(f"   Random state: {random_state}")

isolation_forest.fit(X_train)
print("✅ Isolation Forest model trained successfully!")

# Get anomaly scores and predictions for training data
train_scores = isolation_forest.decision_function(X_train)
train_predictions = isolation_forest.predict(X_train)

# Get anomaly scores and predictions for test data
test_scores = isolation_forest.decision_function(X_test)
test_predictions = isolation_forest.predict(X_test)

print(f"\n📈 Training Results:")
print(f"   Training anomalies detected: {(train_predictions == -1).sum()} ({(train_predictions == -1).mean()*100:.1f}%)") 
print(f"   Training anomaly score range: {train_scores.min():.3f} to {train_scores.max():.3f}")
print(f"   Training anomaly score mean: {train_scores.mean():.3f}")

print(f"\n📈 Test Results:")
print(f"   Test anomalies detected: {(test_predictions == -1).sum()} ({(test_predictions == -1).mean()*100:.1f}%)")
print(f"   Test anomaly score range: {test_scores.min():.3f} to {test_scores.max():.3f}")
print(f"   Test anomaly score mean: {test_scores.mean():.3f}")

# Analyze detected anomalies
print(f"\n🔍 Analyzing Detected Anomalies:")

# Get anomalous records from test set
test_anomalies_idx = X_test.index[test_predictions == -1]
test_anomalies = merged_data.loc[test_anomalies_idx]

if len(test_anomalies) > 0:
    print(f"\n📋 Sample Anomalous Records:")
    
    # Show key characteristics of anomalous records
    anomaly_characteristics = []
    
    for idx in test_anomalies_idx[:5]:  # Show first 5 anomalies
        record = merged_data.loc[idx]
        characteristics = {
            'record_id': idx,
            'patient_id': record.get('patient_id', 'N/A'),
            'age': record.get('age', 'N/A'),
            'gender': record.get('gender', 'N/A'),
            'heart_rate': record.get('heart_rate', 'N/A'),
            'temperature_c': record.get('temperature_c', 'N/A'),
            'anomaly_score': test_scores[X_test.index.get_loc(idx)]
        }
        anomaly_characteristics.append(characteristics)
    
    anomaly_df = pd.DataFrame(anomaly_characteristics)
    print(anomaly_df)
    
    # Check if any of our intentionally introduced issues were caught
    print(f"\n🎯 Anomaly Detection Analysis:")
    
    # Check for extreme values that should be anomalies
    extreme_heart_rates = test_anomalies[test_anomalies['heart_rate'] > 200] if 'heart_rate' in test_anomalies.columns else pd.DataFrame()
    extreme_temps = test_anomalies[test_anomalies['temperature_c'] > 40] if 'temperature_c' in test_anomalies.columns else pd.DataFrame()
    impossible_ages = test_anomalies[test_anomalies['age'] > 130] if 'age' in test_anomalies.columns else pd.DataFrame()
    
    print(f"   Extreme heart rates detected: {len(extreme_heart_rates)}")
    print(f"   Extreme temperatures detected: {len(extreme_temps)}")
    print(f"   Impossible ages detected: {len(impossible_ages)}")
    
    total_valid_anomalies = len(extreme_heart_rates) + len(extreme_temps) + len(impossible_ages)
    if total_valid_anomalies > 0:
        precision = total_valid_anomalies / len(test_anomalies)
        print(f"   Anomaly detection precision: {precision:.2%}")
    
else:
    print("No anomalies detected in test set")

# Feature importance approximation
print(f"\n🎯 Feature Importance Analysis:")

# Calculate feature importance based on variance in isolation paths
# This is an approximation since Isolation Forest doesn't provide direct feature importance
feature_importance_scores = []

for i, feature_name in enumerate(feature_columns):
    # Calculate variance of feature values for anomalous vs normal records
    normal_records = X_test[test_predictions == 1]
    anomalous_records = X_test[test_predictions == -1]
    
    if len(anomalous_records) > 0 and len(normal_records) > 0:
        normal_var = normal_records.iloc[:, i].var()
        anomalous_var = anomalous_records.iloc[:, i].var()
        
        # Feature importance based on difference in variance
        importance = abs(anomalous_var - normal_var)
        feature_importance_scores.append((feature_name, importance))

# Sort by importance
feature_importance_scores.sort(key=lambda x: x[1], reverse=True)

print("Top 10 most important features for anomaly detection:")
for i, (feature, importance) in enumerate(feature_importance_scores[:10]):
    print(f"   {i+1:2d}. {feature:<25} {importance:.4f}")

# Save the trained model
print(f"\n💾 Saving Trained Model:")
model_filename = '../data/isolation_forest_model.joblib'
joblib.dump({
    'model': isolation_forest,
    'scaler': scaler,
    'feature_columns': feature_columns,
    'label_encoders': label_encoders
}, model_filename)

print(f"✅ Model saved to: {model_filename}")

# Model performance summary
print(f"\n🎯 Model Performance Summary:")
print(f"   Model Type: Isolation Forest")
print(f"   Training Samples: {len(X_train)}")
print(f"   Test Samples: {len(X_test)}")
print(f"   Features: {len(feature_columns)}")
print(f"   Contamination Rate: {contamination_rate}")
print(f"   Anomalies Detected: {(test_predictions == -1).sum()}/{len(X_test)} ({(test_predictions == -1).mean()*100:.1f}%)")
print(f"   Model Status: ✅ Ready for inference")

## 9. Train Autoencoder for Pattern-Based Anomaly Detection

Autoencoders learn to compress and reconstruct data. Records that cannot be reconstructed well (high reconstruction error) are likely anomalies. This provides a complementary approach to Isolation Forest.

In [None]:
# Train Autoencoder for pattern-based anomaly detection
print("🧠 Training Autoencoder for Pattern-Based Anomaly Detection...")
print("=" * 60)

# Check if TensorFlow is available
try:
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
    tf_available = True
    print("✅ TensorFlow available")
except ImportError:
    tf_available = False
    print("⚠️ TensorFlow not available - using simplified pattern detection")

if tf_available:
    # Define autoencoder architecture
    print(f"\n🏗️ Building Autoencoder Architecture:")
    
    input_dim = X_train.shape[1]
    encoding_dim = max(8, input_dim // 4)  # Compression ratio of 4:1
    
    print(f"   Input dimension: {input_dim}")
    print(f"   Encoding dimension: {encoding_dim}")
    print(f"   Compression ratio: {input_dim // encoding_dim}:1")
    
    # Build the autoencoder model
    input_layer = keras.Input(shape=(input_dim,))
    
    # Encoder
    encoded = layers.Dense(encoding_dim * 2, activation='relu')(input_layer)
    encoded = layers.Dropout(0.2)(encoded)
    encoded = layers.Dense(encoding_dim, activation='relu')(encoded)
    
    # Decoder
    decoded = layers.Dense(encoding_dim * 2, activation='relu')(encoded)
    decoded = layers.Dropout(0.2)(decoded)
    decoded = layers.Dense(input_dim, activation='linear')(decoded)
    
    # Create autoencoder model
    autoencoder = keras.Model(input_layer, decoded)
    autoencoder.compile(optimizer='adam', loss='mse', metrics=['mae'])
    
    print(f"✅ Autoencoder model built")
    print(autoencoder.summary())
    
    # Train the autoencoder
    print(f"\n🎯 Training Autoencoder:")
    
    # Use only 'normal' data for training (exclude obvious anomalies)
    # Remove extreme outliers for cleaner training
    normal_mask = (
        (X_train['age_encoded'] < 3) if 'age_encoded' in X_train.columns else pd.Series([True] * len(X_train), index=X_train.index)
    ) & (
        (X_train['heart_rate'] < 3) if 'heart_rate' in X_train.columns else pd.Series([True] * len(X_train), index=X_train.index)
    )
    
    X_train_normal = X_train[normal_mask] if normal_mask.sum() > 0 else X_train
    
    print(f"   Training on {len(X_train_normal)} 'normal' records")
    
    # Train the model
    history = autoencoder.fit(
        X_train_normal, X_train_normal,
        epochs=50,
        batch_size=32,
        shuffle=True,
        validation_split=0.1,
        verbose=0  # Reduce output
    )
    
    print(f"✅ Autoencoder training completed")
    print(f"   Final training loss: {history.history['loss'][-1]:.4f}")
    print(f"   Final validation loss: {history.history['val_loss'][-1]:.4f}")
    
    # Calculate reconstruction errors
    print(f"\n📊 Calculating Reconstruction Errors:")
    
    # Get reconstruction errors for training and test data
    train_predictions_ae = autoencoder.predict(X_train, verbose=0)
    test_predictions_ae = autoencoder.predict(X_test, verbose=0)
    
    # Calculate Mean Squared Error for each record
    train_mse = np.mean(np.power(X_train - train_predictions_ae, 2), axis=1)
    test_mse = np.mean(np.power(X_test - test_predictions_ae, 2), axis=1)
    
    print(f"   Training MSE range: {train_mse.min():.4f} to {train_mse.max():.4f}")
    print(f"   Test MSE range: {test_mse.min():.4f} to {test_mse.max():.4f}")
    
    # Set anomaly threshold (95th percentile of training errors)
    threshold = np.percentile(train_mse, 95)
    print(f"   Anomaly threshold (95th percentile): {threshold:.4f}")
    
    # Identify anomalies in test set
    test_anomalies_ae = test_mse > threshold
    num_anomalies_ae = test_anomalies_ae.sum()
    
    print(f"   Test anomalies detected: {num_anomalies_ae} ({num_anomalies_ae/len(X_test)*100:.1f}%)")
    
    # Compare with Isolation Forest results
    print(f"\n🔍 Comparing Autoencoder vs Isolation Forest:")
    
    # Get anomalies detected by both methods
    if_anomalies = test_predictions == -1
    ae_anomalies = test_anomalies_ae
    
    both_methods = if_anomalies & ae_anomalies
    either_method = if_anomalies | ae_anomalies
    
    print(f"   Isolation Forest anomalies: {if_anomalies.sum()}")
    print(f"   Autoencoder anomalies: {ae_anomalies.sum()}")
    print(f"   Detected by both methods: {both_methods.sum()}")
    print(f"   Detected by either method: {either_method.sum()}")
    
    if both_methods.sum() > 0:
        agreement = both_methods.sum() / either_method.sum()
        print(f"   Method agreement: {agreement:.2%}")
    
    # Analyze high-confidence anomalies
    print(f"\n🎯 High-Confidence Anomaly Analysis:")
    
    # Get records detected by both methods (high confidence)
    high_confidence_idx = X_test.index[both_methods]
    
    if len(high_confidence_idx) > 0:
        print(f"   High-confidence anomalies: {len(high_confidence_idx)}")
        
        # Show characteristics of high-confidence anomalies
        for idx in high_confidence_idx[:3]:  # Show first 3
            record = merged_data.loc[idx]
            ae_error = test_mse[X_test.index.get_loc(idx)]
            if_score = test_scores[X_test.index.get_loc(idx)]
            
            print(f"   Record {idx}:")
            print(f"     Age: {record.get('age', 'N/A')}")
            print(f"     Heart Rate: {record.get('heart_rate', 'N/A')}")
            print(f"     Temperature: {record.get('temperature_c', 'N/A')}")
            print(f"     AE Reconstruction Error: {ae_error:.4f}")
            print(f"     IF Anomaly Score: {if_score:.4f}")
            print()
    
    # Save autoencoder model
    print(f"\n💾 Saving Autoencoder Model:")
    autoencoder_filename = '../data/autoencoder_model.h5'
    autoencoder.save(autoencoder_filename)
    
    # Save threshold and other metadata
    ae_metadata = {
        'threshold': threshold,
        'input_dim': input_dim,
        'encoding_dim': encoding_dim,
        'training_samples': len(X_train_normal)
    }
    
    metadata_filename = '../data/autoencoder_metadata.json'
    with open(metadata_filename, 'w') as f:
        json.dump(ae_metadata, f)
    
    print(f"✅ Autoencoder saved to: {autoencoder_filename}")
    print(f"✅ Metadata saved to: {metadata_filename}")
    
else:
    # Simplified pattern detection without TensorFlow
    print(f"\n🔧 Using Simplified Pattern Detection (without TensorFlow):")
    
    # Statistical-based pattern detection
    # Look for records that deviate significantly from typical patterns
    
    pattern_scores = []
    
    for idx in X_test.index:
        score = 0
        record = X_test.loc[idx]
        
        # Check for statistical outliers in multiple dimensions
        for col in X_test.columns:
            z_score = abs(record[col] - X_train[col].mean()) / X_train[col].std()
            if z_score > 2:  # 2 standard deviations
                score += z_score
        
        pattern_scores.append(score)
    
    pattern_scores = np.array(pattern_scores)
    
    # Set threshold for pattern anomalies
    pattern_threshold = np.percentile(pattern_scores, 90)  # Top 10% as anomalies
    pattern_anomalies = pattern_scores > pattern_threshold
    
    print(f"   Pattern anomalies detected: {pattern_anomalies.sum()} ({pattern_anomalies.sum()/len(X_test)*100:.1f}%)")
    print(f"   Pattern threshold: {pattern_threshold:.2f}")
    
    # Compare with Isolation Forest
    if_anomalies = test_predictions == -1
    agreement = (if_anomalies == pattern_anomalies).mean()
    print(f"   Agreement with Isolation Forest: {agreement:.2%}")

# Summary
print(f"\n🎯 Pattern-Based Anomaly Detection Summary:")
if tf_available:
    print(f"   Method: Deep Autoencoder")
    print(f"   Architecture: {input_dim} → {encoding_dim * 2} → {encoding_dim} → {encoding_dim * 2} → {input_dim}")
    print(f"   Training samples: {len(X_train_normal)}")
    print(f"   Anomaly threshold: {threshold:.4f}")
    print(f"   Test anomalies: {num_anomalies_ae}/{len(X_test)} ({num_anomalies_ae/len(X_test)*100:.1f}%)")
else:
    print(f"   Method: Statistical Pattern Detection")
    print(f"   Anomaly threshold: {pattern_threshold:.2f}")
    print(f"   Test anomalies: {pattern_anomalies.sum()}/{len(X_test)} ({pattern_anomalies.sum()/len(X_test)*100:.1f}%)")

print(f"   Status: ✅ Ready for inference")

## 10. Create Data Quality Assessment Pipeline

Now let's combine all our validation approaches (rule-based and ML-based) into a comprehensive data quality assessment pipeline that provides a unified view of data quality issues.

In [None]:
# Create comprehensive data quality assessment pipeline
print("🔄 Creating Comprehensive Data Quality Assessment Pipeline...")
print("=" * 70)

class HealthcareDataQualityPipeline:
    """
    Comprehensive healthcare data quality assessment pipeline
    combining rule-based validation with ML-based anomaly detection
    """
    
    def __init__(self):
        self.rules_engine = HealthcareDataQualityRules()
        self.isolation_forest = None
        self.scaler = None
        self.feature_columns = None
        self.label_encoders = None
        self.autoencoder = None
        self.ae_threshold = None
        
        # Results storage
        self.assessment_results = {}
        
    def load_models(self, model_path='../data/'):
        """Load trained ML models"""
        try:
            # Load Isolation Forest
            if_data = joblib.load(f'{model_path}isolation_forest_model.joblib')
            self.isolation_forest = if_data['model']
            self.scaler = if_data['scaler']
            self.feature_columns = if_data['feature_columns']
            self.label_encoders = if_data['label_encoders']
            print("✅ Isolation Forest model loaded")
            
            # Load Autoencoder if available
            try:
                if tf_available:
                    self.autoencoder = keras.models.load_model(f'{model_path}autoencoder_model.h5')
                    with open(f'{model_path}autoencoder_metadata.json', 'r') as f:
                        ae_data = json.load(f)
                    self.ae_threshold = ae_data['threshold']
                    print("✅ Autoencoder model loaded")
                else:
                    print("⚠️ Autoencoder not available (TensorFlow not installed)")
            except:
                print("⚠️ Autoencoder model not found")
                
        except Exception as e:
            print(f"❌ Error loading models: {e}")
    
    def assess_completeness(self, data):
        """Assess data completeness"""
        required_fields = ['patient_id', 'encounter_id', 'age', 'gender']
        issues = self.rules_engine.check_required_fields(data, required_fields)
        
        # Calculate completeness score
        total_fields = len(data.columns)
        missing_data_points = data.isnull().sum().sum()
        total_data_points = len(data) * total_fields
        completeness_score = ((total_data_points - missing_data_points) / total_data_points) * 100
        
        return {
            'dimension': 'Completeness',
            'score': completeness_score,
            'issues': issues,
            'total_records': len(data),
            'failed_records': len([issue for issue in issues if issue.get('count', 0) > 0])
        }
    
    def assess_conformance(self, data):
        \"\"\"Assess data conformance\"\"\" 
        issues = []
        issues.extend(self.rules_engine.check_format_patterns(data))
        issues.extend(self.rules_engine.check_range_constraints(data))
        
        # Calculate conformance score
        total_violations = sum(issue.get('count', 1) for issue in issues)
        conformance_score = max(0, 100 - (total_violations / len(data)) * 100)
        
        return {
            'dimension': 'Conformance',
            'score': conformance_score,
            'issues': issues,
            'total_records': len(data),
            'failed_records': total_violations
        }
    
    def assess_plausibility(self, data):
        \"\"\"Assess data plausibility\"\"\"
        issues = []
        issues.extend(self.rules_engine.check_clinical_logic(data))
        issues.extend(self.rules_engine.check_temporal_consistency(data))
        
        # Calculate plausibility score
        total_violations = sum(issue.get('count', 1) for issue in issues)
        plausibility_score = max(0, 100 - (total_violations / len(data)) * 100)
        
        return {
            'dimension': 'Plausibility',
            'score': plausibility_score,
            'issues': issues,
            'total_records': len(data),
            'failed_records': total_violations
        }
    
    def detect_ml_anomalies(self, data):
        \"\"\"Detect anomalies using ML models\"\"\"
        if self.isolation_forest is None:
            return {'ml_anomalies': [], 'ml_confidence': []}
        
        try:
            # Prepare features the same way as training
            ml_data = self.prepare_ml_features(data)
            
            # Get predictions from Isolation Forest
            if_scores = self.isolation_forest.decision_function(ml_data)
            if_predictions = self.isolation_forest.predict(ml_data)
            
            # Get autoencoder predictions if available
            ae_anomalies = np.zeros(len(ml_data), dtype=bool)
            if self.autoencoder is not None:
                try:
                    ae_reconstructions = self.autoencoder.predict(ml_data, verbose=0)
                    ae_errors = np.mean(np.power(ml_data - ae_reconstructions, 2), axis=1)
                    ae_anomalies = ae_errors > self.ae_threshold
                except Exception as e:
                    print(f\"Warning: Autoencoder prediction failed: {e}\")
            
            # Combine results
            ml_anomalies = []
            ml_confidence = []
            
            for i, (if_pred, if_score) in enumerate(zip(if_predictions, if_scores)):
                is_anomaly = if_pred == -1 or ae_anomalies[i]
                
                if is_anomaly:
                    # Calculate confidence based on how many methods agree
                    confidence = 0.5  # Base confidence
                    if if_pred == -1:
                        confidence += 0.3
                    if ae_anomalies[i]:
                        confidence += 0.2
                    
                    ml_anomalies.append({
                        'record_index': data.index[i],
                        'if_score': if_score,
                        'ae_anomaly': bool(ae_anomalies[i]),
                        'confidence': min(1.0, confidence)
                    })
                    ml_confidence.append(confidence)
            
            return {
                'ml_anomalies': ml_anomalies,
                'ml_confidence': ml_confidence,
                'total_anomalies': len(ml_anomalies)
            }
            
        except Exception as e:
            print(f\"Error in ML anomaly detection: {e}\")
            return {'ml_anomalies': [], 'ml_confidence': [], 'total_anomalies': 0}
    
    def prepare_ml_features(self, data):
        \"\"\"Prepare features for ML models\"\"\"
        # This is a simplified version - in practice you'd want more robust preprocessing
        ml_data = data.copy()
        
        # Select numeric columns that exist in both training and test data
        available_features = [col for col in self.feature_columns if col in ml_data.columns]
        
        if not available_features:
            # Fallback to basic numeric columns
            numeric_cols = data.select_dtypes(include=[np.number]).columns
            available_features = [col for col in numeric_cols if col not in ['patient_id', 'encounter_id']]
        
        # Create feature matrix
        feature_matrix = ml_data[available_features].fillna(0)
        
        # Scale features
        try:
            scaled_features = self.scaler.transform(feature_matrix)
            return scaled_features
        except:
            # Fallback scaling
            return (feature_matrix - feature_matrix.mean()) / feature_matrix.std()
    
    def run_comprehensive_assessment(self, data):
        \"\"\"Run complete data quality assessment\"\"\"
        print(f\"\\n📊 Running Comprehensive Data Quality Assessment on {len(data)} records...\")
        
        # Rule-based assessments
        print(\"\\n1️⃣ Assessing Completeness...\")
        completeness_results = self.assess_completeness(data)
        print(f\"   Score: {completeness_results['score']:.1f}/100\")
        
        print(\"\\n2️⃣ Assessing Conformance...\")
        conformance_results = self.assess_conformance(data)
        print(f\"   Score: {conformance_results['score']:.1f}/100\")
        
        print(\"\\n3️⃣ Assessing Plausibility...\")
        plausibility_results = self.assess_plausibility(data)
        print(f\"   Score: {plausibility_results['score']:.1f}/100\")
        
        # ML-based anomaly detection
        print(\"\\n4️⃣ Running ML Anomaly Detection...\")
        ml_results = self.detect_ml_anomalies(data)
        print(f\"   Anomalies detected: {ml_results['total_anomalies']}\")
        
        # Calculate overall score
        dimension_scores = [
            completeness_results['score'],
            conformance_results['score'],
            plausibility_results['score']
        ]
        
        # Weight the scores (can be customized)
        weights = [0.3, 0.4, 0.3]  # Completeness, Conformance, Plausibility
        overall_score = sum(score * weight for score, weight in zip(dimension_scores, weights))
        
        # Adjust for ML anomalies
        anomaly_penalty = min(10, (ml_results['total_anomalies'] / len(data)) * 100)
        overall_score = max(0, overall_score - anomaly_penalty)
        
        # Store results
        self.assessment_results = {
            'overall_score': overall_score,
            'completeness': completeness_results,
            'conformance': conformance_results,
            'plausibility': plausibility_results,
            'ml_anomalies': ml_results,
            'assessment_date': datetime.now().isoformat(),
            'total_records': len(data)
        }
        
        return self.assessment_results

# Initialize and run the pipeline
print(\"\\n🚀 Initializing Healthcare Data Quality Pipeline...\")
pipeline = HealthcareDataQualityPipeline()

# Load trained models
pipeline.load_models()

# Run comprehensive assessment on our test data
assessment_results = pipeline.run_comprehensive_assessment(merged_data)

print(f\"\\n🎯 Comprehensive Assessment Complete!\")
print(f\"{'='*50}\")
print(f\"Overall Data Quality Score: {assessment_results['overall_score']:.1f}/100\")
print(f\"\\nDimension Breakdown:\")
print(f\"   📋 Completeness: {assessment_results['completeness']['score']:.1f}/100\")
print(f\"   🔧 Conformance: {assessment_results['conformance']['score']:.1f}/100\")  
print(f\"   🎯 Plausibility: {assessment_results['plausibility']['score']:.1f}/100\")
print(f\"   🤖 ML Anomalies: {assessment_results['ml_anomalies']['total_anomalies']} detected\")

# Overall assessment
if assessment_results['overall_score'] >= 90:
    status = \"🟢 Excellent\"
elif assessment_results['overall_score'] >= 80:
    status = \"🔵 Good\"
elif assessment_results['overall_score'] >= 70:
    status = \"🟡 Acceptable\"
else:
    status = \"🔴 Needs Improvement\"

print(f\"\\nOverall Assessment: {status}\")

## 11. Run Quality Assessment on Sample Data

Let's run our complete framework on the sample data and examine the detailed results, including specific issues found and their severity levels.

In [None]:
# Analyze detailed assessment results
print("🔍 Detailed Analysis of Data Quality Assessment Results")
print("=" * 60)

# Function to analyze and display issues
def analyze_issues(issues, dimension_name):
    if not issues:
        print(f"✅ No {dimension_name.lower()} issues found!")
        return
    
    print(f"\\n❌ {dimension_name} Issues Found:")
    print(f"{'Rule':<30} {'Severity':<10} {'Count':<8} {'Percentage':<12}")
    print("-" * 70)
    
    total_issues = 0
    for issue in issues:
        rule = issue.get('rule', 'Unknown')
        severity = issue.get('severity', 'Unknown')
        count = issue.get('count', 1)
        percentage = issue.get('percentage', 0)
        
        print(f"{rule:<30} {severity:<10} {count:<8} {percentage:<11.1f}%")
        total_issues += count
    
    print(f"\\nTotal {dimension_name} violations: {total_issues}")
    return total_issues

# Analyze each dimension
print("\\n📋 COMPLETENESS ANALYSIS:")
completeness_issues = analyze_issues(assessment_results['completeness']['issues'], 'Completeness')

print("\\n🔧 CONFORMANCE ANALYSIS:")
conformance_issues = analyze_issues(assessment_results['conformance']['issues'], 'Conformance')

print("\\n🎯 PLAUSIBILITY ANALYSIS:")
plausibility_issues = analyze_issues(assessment_results['plausibility']['issues'], 'Plausibility')

# Analyze ML anomalies in detail
print("\\n🤖 ML ANOMALY ANALYSIS:")
ml_anomalies = assessment_results['ml_anomalies']['ml_anomalies']

if ml_anomalies:
    print(f"Total ML anomalies detected: {len(ml_anomalies)}")
    print(f"\\nSample anomalous records:")
    print(f"{'Record ID':<12} {'IF Score':<10} {'AE Anomaly':<12} {'Confidence':<12}")
    print("-" * 50)
    
    for i, anomaly in enumerate(ml_anomalies[:10]):  # Show first 10
        record_id = anomaly['record_index']
        if_score = anomaly['if_score']
        ae_anomaly = '✓' if anomaly['ae_anomaly'] else '✗'
        confidence = anomaly['confidence']
        
        print(f"{record_id:<12} {if_score:<10.3f} {ae_anomaly:<12} {confidence:<12.2f}")
    
    # Show characteristics of anomalous records
    print(f"\\n🔍 Characteristics of Anomalous Records:")
    anomaly_indices = [anomaly['record_index'] for anomaly in ml_anomalies[:5]]
    
    for idx in anomaly_indices:
        if idx in merged_data.index:
            record = merged_data.loc[idx]
            print(f"\\nRecord {idx}:")
            print(f"   Age: {record.get('age', 'N/A')}")
            print(f"   Gender: {record.get('gender', 'N/A')}")
            print(f"   Heart Rate: {record.get('heart_rate', 'N/A')}")
            print(f"   Temperature: {record.get('temperature_c', 'N/A')}")
            print(f"   Systolic BP: {record.get('systolic_bp', 'N/A')}")
else:
    print("✅ No ML anomalies detected!")

# Create issue severity summary
print("\\n📊 ISSUE SEVERITY SUMMARY:")
all_issues = (assessment_results['completeness']['issues'] + 
              assessment_results['conformance']['issues'] + 
              assessment_results['plausibility']['issues'])

severity_counts = {'critical': 0, 'major': 0, 'minor': 0, 'warning': 0}
for issue in all_issues:
    severity = issue.get('severity', 'unknown')
    count = issue.get('count', 1)
    if severity in severity_counts:
        severity_counts[severity] += count

print(f"{'Severity':<10} {'Count':<8} {'Impact':<20}")
print("-" * 40)
for severity, count in severity_counts.items():
    if count > 0:
        impact = {
            'critical': 'Immediate attention required',
            'major': 'Significant data quality impact',
            'minor': 'Moderate impact',
            'warning': 'Low impact, monitor'
        }.get(severity, 'Unknown impact')
        
        print(f"{severity.capitalize():<10} {count:<8} {impact:<20}")

# Calculate failure rates by record
print("\\n📈 RECORD-LEVEL FAILURE ANALYSIS:")

# Track which records have issues
problematic_records = set()

# Add records with rule violations
for issue in all_issues:
    if issue.get('count', 0) > 0:
        # This is simplified - in practice you'd track specific record IDs
        problematic_records.update(range(min(10, issue.get('count', 0))))

# Add ML anomaly records
for anomaly in ml_anomalies:
    if anomaly['record_index'] in merged_data.index:
        problematic_records.add(merged_data.index.get_loc(anomaly['record_index']))

total_problematic = len(problematic_records)
failure_rate = (total_problematic / len(merged_data)) * 100

print(f"Records with quality issues: {total_problematic}/{len(merged_data)} ({failure_rate:.1f}%)")
print(f"Clean records: {len(merged_data) - total_problematic}/{len(merged_data)} ({100-failure_rate:.1f}%)")

# Recommendations based on assessment
print("\\n💡 RECOMMENDATIONS:")

recommendations = []

if assessment_results['overall_score'] < 70:
    recommendations.append("🚨 URGENT: Overall data quality is below acceptable threshold")

if assessment_results['completeness']['score'] < 85:
    recommendations.append("📋 Improve data collection processes to reduce missing values")

if assessment_results['conformance']['score'] < 85:
    recommendations.append("🔧 Implement stricter data validation at data entry points")

if assessment_results['plausibility']['score'] < 85:
    recommendations.append("🎯 Add clinical decision support to prevent implausible values")

if len(ml_anomalies) > len(merged_data) * 0.05:  # More than 5% anomalies
    recommendations.append("🤖 Investigate patterns in ML-detected anomalies for systematic issues")

if severity_counts['critical'] > 0:
    recommendations.append("⚠️ Address critical issues immediately - they may indicate systematic problems")

if not recommendations:
    recommendations.append("✅ Data quality is good - continue monitoring and maintain current processes")

for i, rec in enumerate(recommendations, 1):
    print(f"{i}. {rec}")

print(f"\\n🎯 FINAL ASSESSMENT:")
print(f"Data Quality Score: {assessment_results['overall_score']:.1f}/100")
print(f"Status: {status}")
print(f"Total Issues Found: {sum(severity_counts.values())}")
print(f"ML Anomalies: {len(ml_anomalies)}")
print(f"Assessment Date: {assessment_results['assessment_date']}")

# Save detailed results
results_filename = '../data/detailed_assessment_results.json'
with open(results_filename, 'w') as f:
    # Convert numpy types to Python types for JSON serialization
    json_results = json.loads(json.dumps(assessment_results, default=str))
    json.dump(json_results, f, indent=2)

print(f"\\n💾 Detailed results saved to: {results_filename}")

## 12. Generate Data Quality Report and Scorecard

Now let's create comprehensive reports and scorecards that provide actionable insights for healthcare data quality improvement.

In [None]:
# Generate comprehensive data quality reports and scorecards
print("📊 Generating Data Quality Reports and Scorecards...")
print("=" * 60)

class DataQualityReporter:
    \"\"\"Generate comprehensive data quality reports and scorecards\"\"\"
    
    def __init__(self, assessment_results):
        self.results = assessment_results
        
    def generate_executive_scorecard(self):
        \"\"\"Generate executive-level scorecard\"\"\"
        print(\"\\n📋 EXECUTIVE DATA QUALITY SCORECARD\")
        print(\"=\" * 50)
        
        # Overall score with visual indicator
        score = self.results['overall_score']
        if score >= 90:
            indicator = \"🟢 EXCELLENT\"
        elif score >= 80:
            indicator = \"🔵 GOOD\"
        elif score >= 70:
            indicator = \"🟡 ACCEPTABLE\"
        else:
            indicator = \"🔴 CRITICAL\"
        
        print(f\"\\n🎯 OVERALL DATA QUALITY SCORE: {score:.1f}/100 {indicator}\")
        
        # Dimension scores
        print(f\"\\n📊 DIMENSION BREAKDOWN:\")
        dimensions = [
            (\"Completeness\", self.results['completeness']['score'], \"📋\"),
            (\"Conformance\", self.results['conformance']['score'], \"🔧\"),
            (\"Plausibility\", self.results['plausibility']['score'], \"🎯\")
        ]
        
        for name, score, emoji in dimensions:
            status = self._get_status_indicator(score)
            print(f\"   {emoji} {name:<15}: {score:>6.1f}/100 {status}\")
        
        # Key metrics
        total_records = self.results['total_records']
        ml_anomalies = len(self.results['ml_anomalies']['ml_anomalies'])
        
        print(f\"\\n📈 KEY METRICS:\")
        print(f\"   Total Records Assessed: {total_records:,}\")
        print(f\"   ML Anomalies Detected: {ml_anomalies:,} ({ml_anomalies/total_records*100:.1f}%)\")
        
        return {
            'overall_score': score,
            'status': indicator,
            'dimensions': {d[0]: d[1] for d in dimensions},
            'total_records': total_records,
            'ml_anomalies': ml_anomalies
        }
    
    def generate_operational_report(self):
        \"\"\"Generate detailed operational report\"\"\"
        print(\"\\n🔧 OPERATIONAL DATA QUALITY REPORT\")
        print(\"=\" * 50)
        
        # Issues by severity
        all_issues = (self.results['completeness']['issues'] + 
                     self.results['conformance']['issues'] + 
                     self.results['plausibility']['issues'])
        
        severity_summary = {'critical': [], 'major': [], 'minor': [], 'warning': []}
        
        for issue in all_issues:
            severity = issue.get('severity', 'unknown')
            if severity in severity_summary:
                severity_summary[severity].append(issue)
        
        print(f\"\\n🚨 ISSUES BY SEVERITY:\")
        for severity, issues in severity_summary.items():
            if issues:
                count = sum(issue.get('count', 1) for issue in issues)
                print(f\"\\n   {severity.upper()} ({count} issues):\")
                for issue in issues[:3]:  # Show top 3 per severity
                    rule = issue.get('rule', 'Unknown')
                    description = issue.get('description', 'No description')
                    print(f\"     • {rule}: {description}\")
                
                if len(issues) > 3:
                    print(f\"     ... and {len(issues) - 3} more\")
        
        # ML anomaly patterns
        ml_anomalies = self.results['ml_anomalies']['ml_anomalies']
        if ml_anomalies:
            print(f\"\\n🤖 ML ANOMALY PATTERNS:\")
            
            # Confidence distribution
            confidences = [a['confidence'] for a in ml_anomalies]
            high_conf = sum(1 for c in confidences if c > 0.8)
            med_conf = sum(1 for c in confidences if 0.5 <= c <= 0.8)
            low_conf = sum(1 for c in confidences if c < 0.5)
            
            print(f\"   High Confidence (>0.8): {high_conf}\")
            print(f\"   Medium Confidence (0.5-0.8): {med_conf}\")
            print(f\"   Low Confidence (<0.5): {low_conf}\")
        
        return severity_summary
    
    def generate_technical_details(self):
        \"\"\"Generate technical details for data engineers\"\"\"
        print(\"\\n⚙️ TECHNICAL ASSESSMENT DETAILS\")
        print(\"=\" * 50)
        
        # Rule execution summary
        print(f\"\\n📋 RULE EXECUTION SUMMARY:\")
        
        dimensions = ['completeness', 'conformance', 'plausibility']
        for dim in dimensions:
            dim_results = self.results[dim]
            issues = dim_results['issues']
            
            print(f\"\\n   {dim.upper()}:\")
            print(f\"     Rules Executed: {len(set(issue.get('rule') for issue in issues))}\")
            print(f\"     Total Violations: {sum(issue.get('count', 1) for issue in issues)}\")
            print(f\"     Failed Records: {dim_results.get('failed_records', 0)}\")
            print(f\"     Dimension Score: {dim_results['score']:.1f}/100\")
        
        # ML model performance
        print(f\"\\n🤖 ML MODEL PERFORMANCE:\")
        ml_results = self.results['ml_anomalies']
        print(f\"   Isolation Forest Anomalies: {len([a for a in ml_results['ml_anomalies'] if a.get('if_score', 0) < 0])}\")
        print(f\"   Autoencoder Anomalies: {len([a for a in ml_results['ml_anomalies'] if a.get('ae_anomaly', False)])}\")
        print(f\"   Total Unique Anomalies: {ml_results['total_anomalies']}\")
        
        if ml_results['ml_anomalies']:
            avg_confidence = np.mean([a['confidence'] for a in ml_results['ml_anomalies']])
            print(f\"   Average Confidence: {avg_confidence:.2f}\")
    
    def generate_business_impact_analysis(self):
        \"\"\"Generate business impact analysis\"\"\"
        print(f\"\\n💼 BUSINESS IMPACT ANALYSIS\")
        print(\"=\" * 50)
        
        score = self.results['overall_score']
        total_records = self.results['total_records']
        
        # Calculate potential impact
        if score < 60:
            risk_level = \"🔴 HIGH RISK\"
            impact = \"Significant impact on clinical decision-making and reporting\"
        elif score < 80:
            risk_level = \"🟡 MODERATE RISK\"
            impact = \"Moderate impact on data reliability and analytics\"
        else:
            risk_level = \"🟢 LOW RISK\"
            impact = \"Minimal impact on business operations\"
        
        print(f\"\\n🎯 BUSINESS RISK ASSESSMENT: {risk_level}\")
        print(f\"Impact: {impact}\")
        
        # Calculate affected records
        all_issues = (self.results['completeness']['issues'] + 
                     self.results['conformance']['issues'] + 
                     self.results['plausibility']['issues'])
        
        total_violations = sum(issue.get('count', 1) for issue in all_issues)
        affected_percentage = (total_violations / total_records) * 100
        
        print(f\"\\n📊 AFFECTED DATA:\")
        print(f\"   Records with Issues: ~{total_violations:,} ({affected_percentage:.1f}%)\")
        print(f\"   Clean Records: ~{total_records - total_violations:,} ({100-affected_percentage:.1f}%)\")
        
        # Cost estimates (example - would be customized per organization)
        if affected_percentage > 10:
            print(f\"\\n💰 ESTIMATED IMPACT:\")
            print(f\"   Manual review effort: {total_violations * 2:.0f} person-minutes\")
            print(f\"   Potential compliance risk: High\")\n            print(f\"   Recommendation: Immediate remediation required\")\n        \n        return {\n            'risk_level': risk_level,\n            'impact_description': impact,\n            'affected_percentage': affected_percentage,\n            'total_violations': total_violations\n        }\n    \n    def _get_status_indicator(self, score):\n        \"\"\"Get status indicator for score\"\"\"\n        if score >= 90:\n            return \"✅\"\n        elif score >= 80:\n            return \"🟦\"\n        elif score >= 70:\n            return \"🟨\"\n        else:\n            return \"🟥\"\n    \n    def export_html_report(self, filename='../data/data_quality_report.html'):\n        \"\"\"Export comprehensive HTML report\"\"\"\n        html_content = f\"\"\"\n<!DOCTYPE html>\n<html>\n<head>\n    <title>Healthcare Data Quality Assessment Report</title>\n    <style>\n        body {{ font-family: Arial, sans-serif; margin: 40px; }}\n        .header {{ background: #f8f9fa; padding: 20px; border-radius: 8px; }}\n        .score {{ font-size: 48px; font-weight: bold; color: {'#28a745' if self.results['overall_score'] >= 80 else '#dc3545'}; }}\n        .dimension {{ background: #e9ecef; padding: 15px; margin: 10px 0; border-radius: 5px; }}\n        .issue {{ background: #fff3cd; padding: 10px; margin: 5px 0; border-left: 4px solid #ffc107; }}\n        .critical {{ border-left-color: #dc3545; background: #f8d7da; }}\n        .major {{ border-left-color: #fd7e14; background: #fde2e4; }}\n    </style>\n</head>\n<body>\n    <div class=\"header\">\n        <h1>Healthcare Data Quality Assessment Report</h1>\n        <div class=\"score\">{self.results['overall_score']:.1f}/100</div>\n        <p>Assessment Date: {self.results['assessment_date']}</p>\n    </div>\n    \n    <h2>Dimension Scores</h2>\n    <div class=\"dimension\">\n        <h3>Completeness: {self.results['completeness']['score']:.1f}/100</h3>\n        <p>Missing data and null value assessment</p>\n    </div>\n    \n    <div class=\"dimension\">\n        <h3>Conformance: {self.results['conformance']['score']:.1f}/100</h3>\n        <p>Format, type, and domain constraint validation</p>\n    </div>\n    \n    <div class=\"dimension\">\n        <h3>Plausibility: {self.results['plausibility']['score']:.1f}/100</h3>\n        <p>Clinical logic and temporal consistency validation</p>\n    </div>\n    \n    <h2>Key Issues</h2>\n        \"\"\"\n        \n        # Add issues\n        all_issues = (self.results['completeness']['issues'] + \n                     self.results['conformance']['issues'] + \n                     self.results['plausibility']['issues'])\n        \n        for issue in all_issues[:10]:  # Top 10 issues\n            severity_class = issue.get('severity', 'minor')\n            html_content += f\"\"\"\n    <div class=\"issue {severity_class}\">\n        <strong>{issue.get('rule', 'Unknown Rule')}</strong><br>\n        {issue.get('description', 'No description')}<br>\n        <small>Count: {issue.get('count', 1)} | Severity: {issue.get('severity', 'Unknown')}</small>\n    </div>\n            \"\"\"\n        \n        html_content += \"\"\"\n</body>\n</html>\n        \"\"\"\n        \n        with open(filename, 'w') as f:\n            f.write(html_content)\n        \n        print(f\"📄 HTML report exported to: {filename}\")\n        return filename\n\n# Generate comprehensive reports\nreporter = DataQualityReporter(assessment_results)\n\n# Executive scorecard\nexec_scorecard = reporter.generate_executive_scorecard()\n\n# Operational report\noperational_summary = reporter.generate_operational_report()\n\n# Technical details\nreporter.generate_technical_details()\n\n# Business impact analysis\nbusiness_impact = reporter.generate_business_impact_analysis()\n\n# Export HTML report\nhtml_report_path = reporter.export_html_report()\n\n# Create summary scorecard for easy sharing\nsummary_scorecard = {\n    'assessment_date': assessment_results['assessment_date'],\n    'overall_score': assessment_results['overall_score'],\n    'total_records': assessment_results['total_records'],\n    'dimensions': {\n        'completeness': assessment_results['completeness']['score'],\n        'conformance': assessment_results['conformance']['score'],\n        'plausibility': assessment_results['plausibility']['score']\n    },\n    'ml_anomalies': len(assessment_results['ml_anomalies']['ml_anomalies']),\n    'business_risk': business_impact['risk_level'],\n    'key_recommendations': [\n        \"Implement data validation at entry points\" if assessment_results['conformance']['score'] < 85 else None,\n        \"Improve completeness monitoring\" if assessment_results['completeness']['score'] < 85 else None,\n        \"Add clinical decision support\" if assessment_results['plausibility']['score'] < 85 else None,\n        \"Investigate ML anomaly patterns\" if len(assessment_results['ml_anomalies']['ml_anomalies']) > 10 else None\n    ]\n}\n\n# Remove None recommendations\nsummary_scorecard['key_recommendations'] = [r for r in summary_scorecard['key_recommendations'] if r]\n\n# Save summary scorecard\nwith open('../data/summary_scorecard.json', 'w') as f:\n    json.dump(summary_scorecard, f, indent=2)\n\nprint(f\"\\n💾 Summary scorecard saved to: ../data/summary_scorecard.json\")\nprint(f\"\\n✅ All reports generated successfully!\")\nprint(f\"\\n📋 Generated Reports:\")\nprint(f\"   1. Executive Scorecard (console output)\")\nprint(f\"   2. Operational Report (console output)\")\nprint(f\"   3. Technical Details (console output)\")\nprint(f\"   4. Business Impact Analysis (console output)\")\nprint(f\"   5. HTML Report: {html_report_path}\")\nprint(f\"   6. Summary Scorecard: ../data/summary_scorecard.json\")

## 13. Data Quality Visualization and Trending

Create comprehensive visualizations to help understand data quality patterns, trends, and distributions. These visualizations are essential for:

- **Executive Dashboards**: High-level quality metrics and trends
- **Operational Monitoring**: Real-time quality indicators and alerts
- **Technical Analysis**: Detailed issue patterns and anomaly distributions
- **Historical Tracking**: Quality improvement over time

In [None]:
# Create comprehensive data quality visualizations
print("📊 Creating Data Quality Visualizations...")
print("=" * 60)

import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.patches import Rectangle
import numpy as np

# Set up plotting style
plt.style.use('default')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['font.size'] = 10

class DataQualityVisualizer:
    \"\"\"Create comprehensive visualizations for data quality assessment\"\"\"
    
    def __init__(self, assessment_results, synthetic_data):
        self.results = assessment_results
        self.data = synthetic_data
        
    def create_executive_dashboard(self):
        \"\"\"Create executive-level dashboard\"\"\"
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
        fig.suptitle('Healthcare Data Quality Executive Dashboard', fontsize=16, fontweight='bold')
        
        # 1. Overall Score Gauge
        self._create_score_gauge(ax1, self.results['overall_score'], 'Overall Quality Score')
        
        # 2. Dimension Comparison
        dimensions = ['Completeness', 'Conformance', 'Plausibility']
        scores = [
            self.results['completeness']['score'],
            self.results['conformance']['score'],
            self.results['plausibility']['score']
        ]
        
        bars = ax2.bar(dimensions, scores, color=['#2E8B57', '#4169E1', '#FF6347'])
        ax2.set_title('Quality Dimensions Comparison', fontweight='bold')
        ax2.set_ylabel('Score (0-100)')
        ax2.set_ylim(0, 100)
        ax2.axhline(y=80, color='orange', linestyle='--', alpha=0.7, label='Target (80)')
        ax2.axhline(y=90, color='green', linestyle='--', alpha=0.7, label='Excellence (90)')
        
        # Add value labels on bars
        for bar, score in zip(bars, scores):
            height = bar.get_height()
            ax2.text(bar.get_x() + bar.get_width()/2., height + 1,
                    f'{score:.1f}', ha='center', va='bottom', fontweight='bold')
        
        ax2.legend()
        
        # 3. Issue Severity Distribution
        all_issues = (self.results['completeness']['issues'] + 
                     self.results['conformance']['issues'] + 
                     self.results['plausibility']['issues'])
        
        severity_counts = {'critical': 0, 'major': 0, 'minor': 0, 'warning': 0}
        for issue in all_issues:
            severity = issue.get('severity', 'warning')
            severity_counts[severity] += issue.get('count', 1)
        
        # Filter out zero counts for cleaner visualization
        non_zero_severity = {k: v for k, v in severity_counts.items() if v > 0}
        
        if non_zero_severity:
            colors = ['#dc3545', '#fd7e14', '#ffc107', '#17a2b8'][:len(non_zero_severity)]
            wedges, texts, autotexts = ax3.pie(non_zero_severity.values(), 
                                             labels=non_zero_severity.keys(),
                                             colors=colors, autopct='%1.1f%%',
                                             startangle=90)
            ax3.set_title('Issues by Severity', fontweight='bold')
        else:
            ax3.text(0.5, 0.5, 'No Issues Detected', ha='center', va='center', 
                    transform=ax3.transAxes, fontsize=12)
            ax3.set_title('Issues by Severity', fontweight='bold')
        
        # 4. ML Anomaly Confidence Distribution
        ml_anomalies = self.results['ml_anomalies']['ml_anomalies']
        if ml_anomalies:
            confidences = [a['confidence'] for a in ml_anomalies]
            ax4.hist(confidences, bins=20, alpha=0.7, color='skyblue', edgecolor='black')
            ax4.axvline(np.mean(confidences), color='red', linestyle='--', 
                       label=f'Mean: {np.mean(confidences):.2f}')
            ax4.set_title('ML Anomaly Confidence Distribution', fontweight='bold')
            ax4.set_xlabel('Confidence Score')
            ax4.set_ylabel('Frequency')
            ax4.legend()
        else:
            ax4.text(0.5, 0.5, 'No ML Anomalies Detected', ha='center', va='center', 
                    transform=ax4.transAxes, fontsize=12)
            ax4.set_title('ML Anomaly Confidence Distribution', fontweight='bold')
        
        plt.tight_layout()
        plt.savefig('../data/executive_dashboard.png', dpi=300, bbox_inches='tight')
        plt.show()
        
    def create_operational_monitoring(self):
        \"\"\"Create operational monitoring dashboard\"\"\"
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
        fig.suptitle('Operational Data Quality Monitoring', fontsize=16, fontweight='bold')
        
        # 1. Records by Quality Status
        total_records = self.results['total_records']
        all_issues = (self.results['completeness']['issues'] + 
                     self.results['conformance']['issues'] + 
                     self.results['plausibility']['issues'])
        
        total_violations = sum(issue.get('count', 1) for issue in all_issues)
        clean_records = total_records - total_violations
        
        status_data = ['Clean Records', 'Records with Issues']
        status_counts = [clean_records, total_violations]
        colors = ['#28a745', '#dc3545']
        
        bars = ax1.bar(status_data, status_counts, color=colors)
        ax1.set_title('Record Quality Status', fontweight='bold')
        ax1.set_ylabel('Number of Records')
        
        # Add percentage labels
        for bar, count in zip(bars, status_counts):
            percentage = (count / total_records) * 100
            ax1.text(bar.get_x() + bar.get_width()/2., bar.get_height() + 50,
                    f'{count:,}\\n({percentage:.1f}%)', ha='center', va='bottom')
        
        # 2. Issues by Dimension
        dim_issue_counts = {}
        for dim in ['completeness', 'conformance', 'plausibility']:
            issues = self.results[dim]['issues']
            dim_issue_counts[dim] = sum(issue.get('count', 1) for issue in issues)
        
        dims = list(dim_issue_counts.keys())
        counts = list(dim_issue_counts.values())
        
        bars = ax2.barh(dims, counts, color=['#2E8B57', '#4169E1', '#FF6347'])
        ax2.set_title('Issues by Quality Dimension', fontweight='bold')
        ax2.set_xlabel('Number of Issues')
        
        # Add count labels
        for bar, count in zip(bars, counts):
            ax2.text(bar.get_width() + 5, bar.get_y() + bar.get_height()/2,
                    f'{count:,}', ha='left', va='center')
        
        # 3. Top Issues by Frequency
        issue_summary = {}\n        for issue in all_issues:\n            rule = issue.get('rule', 'Unknown')\n            count = issue.get('count', 1)\n            if rule in issue_summary:\n                issue_summary[rule] += count\n            else:\n                issue_summary[rule] = count\n        \n        # Get top 10 issues\n        top_issues = sorted(issue_summary.items(), key=lambda x: x[1], reverse=True)[:10]\n        \n        if top_issues:\n            issue_names = [name[:30] + '...' if len(name) > 30 else name for name, _ in top_issues]\n            issue_counts = [count for _, count in top_issues]\n            \n            y_pos = np.arange(len(issue_names))\n            bars = ax3.barh(y_pos, issue_counts, color='lightcoral')\n            ax3.set_yticks(y_pos)\n            ax3.set_yticklabels(issue_names, fontsize=8)\n            ax3.set_xlabel('Frequency')\n            ax3.set_title('Top Data Quality Issues', fontweight='bold')\n            \n            # Add count labels\n            for bar, count in zip(bars, issue_counts):\n                ax3.text(bar.get_width() + max(issue_counts)*0.01, bar.get_y() + bar.get_height()/2,\n                        f'{count}', ha='left', va='center', fontsize=8)\n        else:\n            ax3.text(0.5, 0.5, 'No Issues Detected', ha='center', va='center', \n                    transform=ax3.transAxes, fontsize=12)\n            ax3.set_title('Top Data Quality Issues', fontweight='bold')\n        \n        # 4. Quality Score Trend (simulated historical data)\n        # In a real implementation, this would show actual historical trends\n        months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun']\n        \n        # Simulate trend data around current scores with some variation\n        current_scores = {\n            'completeness': self.results['completeness']['score'],\n            'conformance': self.results['conformance']['score'],\n            'plausibility': self.results['plausibility']['score']\n        }\n        \n        for dim, current_score in current_scores.items():\n            # Simulate historical progression toward current score\n            trend = np.linspace(current_score - 15, current_score, len(months))\n            trend += np.random.normal(0, 2, len(months))  # Add some noise\n            trend = np.clip(trend, 0, 100)  # Keep within valid range\n            \n            ax4.plot(months, trend, marker='o', label=dim.capitalize(), linewidth=2)\n        \n        ax4.set_title('Quality Score Trends (6 Months)', fontweight='bold')\n        ax4.set_ylabel('Quality Score')\n        ax4.set_ylim(0, 100)\n        ax4.axhline(y=80, color='orange', linestyle='--', alpha=0.5, label='Target')\n        ax4.legend()\n        ax4.grid(True, alpha=0.3)\n        \n        plt.tight_layout()\n        plt.savefig('../data/operational_monitoring.png', dpi=300, bbox_inches='tight')\n        plt.show()\n        \n    def create_technical_analysis(self):\n        \"\"\"Create technical analysis visualizations\"\"\"        \n        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))\n        fig.suptitle('Technical Data Quality Analysis', fontsize=16, fontweight='bold')\n        \n        # 1. Feature Correlation Heatmap (for numeric columns)\n        numeric_cols = self.data.select_dtypes(include=[np.number]).columns\n        if len(numeric_cols) > 1:\n            correlation_matrix = self.data[numeric_cols].corr()\n            mask = np.triu(np.ones_like(correlation_matrix, dtype=bool))\n            \n            sns.heatmap(correlation_matrix, mask=mask, annot=True, fmt='.2f', \n                       cmap='coolwarm', center=0, ax=ax1, cbar_kws={'shrink': .8})\n            ax1.set_title('Feature Correlation Matrix', fontweight='bold')\n        else:\n            ax1.text(0.5, 0.5, 'Insufficient numeric columns for correlation', \n                    ha='center', va='center', transform=ax1.transAxes)\n            ax1.set_title('Feature Correlation Matrix', fontweight='bold')\n        \n        # 2. Missing Data Pattern\n        missing_data = self.data.isnull().sum()\n        missing_data = missing_data[missing_data > 0]\n        \n        if len(missing_data) > 0:\n            ax2.bar(range(len(missing_data)), missing_data.values, color='lightcoral')\n            ax2.set_xticks(range(len(missing_data)))\n            ax2.set_xticklabels(missing_data.index, rotation=45, ha='right')\n            ax2.set_title('Missing Data by Column', fontweight='bold')\n            ax2.set_ylabel('Number of Missing Values')\n        else:\n            ax2.text(0.5, 0.5, 'No Missing Data Detected', ha='center', va='center', \n                    transform=ax2.transAxes, fontsize=12)\n            ax2.set_title('Missing Data by Column', fontweight='bold')\n        \n        # 3. Anomaly Score Distribution\n        ml_anomalies = self.results['ml_anomalies']['ml_anomalies']\n        if ml_anomalies:\n            if_scores = [a.get('if_score', 0) for a in ml_anomalies]\n            ax3.scatter(range(len(if_scores)), if_scores, alpha=0.6, c='red')\n            ax3.axhline(y=0, color='black', linestyle='--', alpha=0.5, label='Normal Threshold')\n            ax3.set_title('Isolation Forest Anomaly Scores', fontweight='bold')\n            ax3.set_xlabel('Record Index')\n            ax3.set_ylabel('Anomaly Score')\n            ax3.legend()\n        else:\n            ax3.text(0.5, 0.5, 'No Anomalies Detected', ha='center', va='center', \n                    transform=ax3.transAxes, fontsize=12)\n            ax3.set_title('Isolation Forest Anomaly Scores', fontweight='bold')\n        \n        # 4. Data Type Distribution\n        type_counts = {}\n        for col in self.data.columns:\n            dtype = str(self.data[col].dtype)\n            if 'int' in dtype or 'float' in dtype:\n                category = 'Numeric'\n            elif 'object' in dtype:\n                category = 'Text/Categorical'\n            elif 'datetime' in dtype:\n                category = 'DateTime'\n            else:\n                category = 'Other'\n            \n            type_counts[category] = type_counts.get(category, 0) + 1\n        \n        if type_counts:\n            ax4.pie(type_counts.values(), labels=type_counts.keys(), autopct='%1.1f%%',\n                   startangle=90, colors=['skyblue', 'lightgreen', 'yellow', 'pink'])\n            ax4.set_title('Data Type Distribution', fontweight='bold')\n        \n        plt.tight_layout()\n        plt.savefig('../data/technical_analysis.png', dpi=300, bbox_inches='tight')\n        plt.show()\n    \n    def _create_score_gauge(self, ax, score, title):\n        \"\"\"Create a gauge chart for score visualization\"\"\"        \n        # Create gauge background\n        theta = np.linspace(0, np.pi, 100)\n        r = 1\n        \n        # Color zones\n        zones = [\n            (0, 60, '#dc3545'),    # Poor (0-60) - Red\n            (60, 80, '#ffc107'),   # Fair (60-80) - Yellow \n            (80, 90, '#17a2b8'),   # Good (80-90) - Blue\n            (90, 100, '#28a745')   # Excellent (90-100) - Green\n        ]\n        \n        for start, end, color in zones:\n            start_angle = np.pi * (1 - start/100)\n            end_angle = np.pi * (1 - end/100)\n            zone_theta = np.linspace(end_angle, start_angle, 50)\n            ax.fill_between(zone_theta, 0.8, 1.0, color=color, alpha=0.7)\n        \n        # Score needle\n        needle_angle = np.pi * (1 - score/100)\n        ax.plot([needle_angle, needle_angle], [0, 0.9], 'k-', linewidth=4)\n        ax.plot(needle_angle, 0, 'ko', markersize=8)\n        \n        # Labels\n        ax.text(np.pi/2, 0.5, f'{score:.1f}', ha='center', va='center', \n               fontsize=24, fontweight='bold')\n        ax.text(np.pi/2, 0.3, title, ha='center', va='center', fontsize=12)\n        \n        # Zone labels\n        ax.text(np.pi*0.9, 1.1, '0', ha='center', va='center')\n        ax.text(np.pi*0.1, 1.1, '100', ha='center', va='center')\n        \n        ax.set_xlim(0, np.pi)\n        ax.set_ylim(0, 1.2)\n        ax.set_aspect('equal')\n        ax.axis('off')\n    \n    def create_summary_infographic(self):\n        \"\"\"Create a summary infographic\"\"\"        \n        fig, ax = plt.subplots(1, 1, figsize=(12, 16))\n        ax.set_xlim(0, 10)\n        ax.set_ylim(0, 20)\n        ax.axis('off')\n        \n        # Title\n        ax.text(5, 19, 'Healthcare Data Quality Assessment', \n               ha='center', va='center', fontsize=20, fontweight='bold')\n        ax.text(5, 18.5, f\"Assessment Date: {self.results['assessment_date']}\", \n               ha='center', va='center', fontsize=12)\n        \n        # Overall Score Box\n        score = self.results['overall_score']\n        color = '#28a745' if score >= 80 else '#dc3545'\n        \n        rect = Rectangle((3.5, 16), 3, 1.5, linewidth=2, edgecolor=color, \n                        facecolor=color, alpha=0.3)\n        ax.add_patch(rect)\n        ax.text(5, 16.75, f'{score:.1f}/100', ha='center', va='center', \n               fontsize=24, fontweight='bold', color=color)\n        ax.text(5, 16.3, 'Overall Quality Score', ha='center', va='center', fontsize=10)\n        \n        # Key Metrics\n        metrics_y = 14.5\n        metrics = [\n            f\"Total Records: {self.results['total_records']:,}\",\n            f\"ML Anomalies: {len(self.results['ml_anomalies']['ml_anomalies'])}\",\n            f\"Rule Violations: {sum(len(self.results[dim]['issues']) for dim in ['completeness', 'conformance', 'plausibility'])}\"\n        ]\n        \n        for i, metric in enumerate(metrics):\n            ax.text(5, metrics_y - i*0.5, metric, ha='center', va='center', fontsize=12)\n        \n        # Dimension Scores\n        dim_y = 12\n        dimensions = [\n            ('Completeness', self.results['completeness']['score'], '#2E8B57'),\n            ('Conformance', self.results['conformance']['score'], '#4169E1'),\n            ('Plausibility', self.results['plausibility']['score'], '#FF6347')\n        ]\n        \n        for i, (name, score, color) in enumerate(dimensions):\n            y_pos = dim_y - i*1.5\n            \n            # Progress bar background\n            bg_rect = Rectangle((2, y_pos-0.2), 6, 0.4, linewidth=1, \n                              edgecolor='gray', facecolor='lightgray')\n            ax.add_patch(bg_rect)\n            \n            # Progress bar fill\n            fill_width = 6 * (score/100)\n            fill_rect = Rectangle((2, y_pos-0.2), fill_width, 0.4, \n                                linewidth=0, facecolor=color, alpha=0.8)\n            ax.add_patch(fill_rect)\n            \n            # Labels\n            ax.text(1.5, y_pos, name, ha='right', va='center', fontsize=12, fontweight='bold')\n            ax.text(8.5, y_pos, f'{score:.1f}', ha='left', va='center', fontsize=12, fontweight='bold')\n        \n        # Recommendations\n        ax.text(5, 6.5, 'Key Recommendations', ha='center', va='center', \n               fontsize=14, fontweight='bold')\n        \n        recommendations = [\n            \"• Implement automated data validation\",\n            \"• Set up real-time monitoring dashboards\", \n            \"• Train staff on data quality best practices\",\n            \"• Schedule regular quality assessments\"\n        ]\n        \n        for i, rec in enumerate(recommendations):\n            ax.text(1, 6 - i*0.5, rec, ha='left', va='center', fontsize=10)\n        \n        plt.savefig('../data/quality_summary_infographic.png', dpi=300, bbox_inches='tight')\n        plt.show()\n\n# Create visualizations\nvisualizer = DataQualityVisualizer(assessment_results, final_assessment_data)\n\nprint(\"\\n📊 Creating Executive Dashboard...\")\nvisualizer.create_executive_dashboard()\n\nprint(\"\\n🔧 Creating Operational Monitoring Dashboard...\")\nvisualizer.create_operational_monitoring()\n\nprint(\"\\n⚙️ Creating Technical Analysis Dashboard...\")\nvisualizer.create_technical_analysis()\n\nprint(\"\\n📋 Creating Summary Infographic...\")\nvisualizer.create_summary_infographic()\n\nprint(\"\\n✅ All visualizations created successfully!\")\nprint(\"\\n📁 Saved visualizations:\")\nprint(\"   • Executive Dashboard: ../data/executive_dashboard.png\")\nprint(\"   • Operational Monitoring: ../data/operational_monitoring.png\")\nprint(\"   • Technical Analysis: ../data/technical_analysis.png\")\nprint(\"   • Summary Infographic: ../data/quality_summary_infographic.png\")

## 14. Framework Testing and Validation

Test the complete framework with new data to validate its effectiveness and demonstrate real-world applicability. This section covers:

- **New Data Generation**: Create fresh synthetic data with different quality characteristics
- **Framework Reusability**: Test the framework with minimal configuration changes
- **Performance Validation**: Measure assessment speed and accuracy
- **Edge Case Testing**: Validate behavior with extreme data quality scenarios
- **Production Readiness**: Demonstrate scalability and reliability considerations

In [None]:
# Test and validate the complete framework with new data
print("🧪 Framework Testing and Validation")
print("=" * 60)

import time
from datetime import datetime, timedelta

class FrameworkValidator:
    \"\"\"Validate framework performance and reliability\"\"\"
    
    def __init__(self, framework):
        self.framework = framework
        self.test_results = {}
        
    def test_with_clean_data(self):
        \"\"\"Test framework with high-quality data\"\"\"
        print(\"\\n🟢 Test 1: High-Quality Data Scenario\")
        print(\"-\" * 40)
        
        # Generate clean data with minimal issues\n        clean_generator = SyntheticFHIRDataGenerator(\n            missing_rate=0.01,      # Very low missing data\n            invalid_rate=0.005,     # Very few invalid formats\n            anomaly_rate=0.01       # Very few anomalies\n        )\n        \n        clean_data = clean_generator.generate_synthetic_data(1000)\n        print(f\"Generated {len(clean_data)} clean records\")\n        \n        # Run assessment\n        start_time = time.time()\n        results = self.framework.assess_all_dimensions(clean_data)\n        end_time = time.time()\n        \n        processing_time = end_time - start_time\n        \n        print(f\"\\n📊 Clean Data Results:\")\n        print(f\"   Overall Score: {results['overall_score']:.1f}/100 (Expected: >90)\")\n        print(f\"   Processing Time: {processing_time:.2f} seconds\")\n        print(f\"   Records/Second: {len(clean_data)/processing_time:.0f}\")\n        \n        # Validate expectations\n        expected_score = 90\n        actual_score = results['overall_score']\n        \n        if actual_score >= expected_score:\n            print(f\"   ✅ PASS: High quality data scored appropriately\")\n        else:\n            print(f\"   ❌ FAIL: Expected score >={expected_score}, got {actual_score:.1f}\")\n        \n        self.test_results['clean_data'] = {\n            'score': actual_score,\n            'processing_time': processing_time,\n            'records_per_second': len(clean_data)/processing_time,\n            'passed': actual_score >= expected_score\n        }\n        \n        return results\n    \n    def test_with_poor_data(self):\n        \"\"\"Test framework with low-quality data\"\"\"        \n        print(\"\\n🔴 Test 2: Poor-Quality Data Scenario\")\n        print(\"-\" * 40)\n        \n        # Generate data with significant quality issues\n        poor_generator = SyntheticFHIRDataGenerator(\n            missing_rate=0.25,      # High missing data\n            invalid_rate=0.15,      # Many invalid formats\n            anomaly_rate=0.20       # Many anomalies\n        )\n        \n        poor_data = poor_generator.generate_synthetic_data(1000)\n        print(f\"Generated {len(poor_data)} poor-quality records\")\n        \n        # Run assessment\n        start_time = time.time()\n        results = self.framework.assess_all_dimensions(poor_data)\n        end_time = time.time()\n        \n        processing_time = end_time - start_time\n        \n        print(f\"\\n📊 Poor Data Results:\")\n        print(f\"   Overall Score: {results['overall_score']:.1f}/100 (Expected: <50)\")\n        print(f\"   Processing Time: {processing_time:.2f} seconds\")\n        \n        # Count detected issues\n        total_issues = sum(\n            len(results[dim]['issues']) \n            for dim in ['completeness', 'conformance', 'plausibility']\n        )\n        \n        print(f\"   Detected Issues: {total_issues} (Expected: >20)\")\n        \n        # Validate expectations\n        expected_max_score = 50\n        expected_min_issues = 20\n        actual_score = results['overall_score']\n        \n        score_pass = actual_score <= expected_max_score\n        issues_pass = total_issues >= expected_min_issues\n        \n        if score_pass:\n            print(f\"   ✅ PASS: Poor quality data scored appropriately\")\n        else:\n            print(f\"   ❌ FAIL: Expected score <={expected_max_score}, got {actual_score:.1f}\")\n            \n        if issues_pass:\n            print(f\"   ✅ PASS: Sufficient issues detected\")\n        else:\n            print(f\"   ❌ FAIL: Expected >={expected_min_issues} issues, got {total_issues}\")\n        \n        self.test_results['poor_data'] = {\n            'score': actual_score,\n            'processing_time': processing_time,\n            'total_issues': total_issues,\n            'score_passed': score_pass,\n            'issues_passed': issues_pass\n        }\n        \n        return results\n    \n    def test_scalability(self):\n        \"\"\"Test framework scalability with different data sizes\"\"\"        \n        print(\"\\n📈 Test 3: Scalability Testing\")\n        print(\"-\" * 40)\n        \n        test_sizes = [100, 500, 1000, 2000]\n        scalability_results = []\n        \n        generator = SyntheticFHIRDataGenerator()\n        \n        for size in test_sizes:\n            print(f\"\\n   Testing with {size} records...\")\n            \n            # Generate test data\n            test_data = generator.generate_synthetic_data(size)\n            \n            # Measure processing time\n            start_time = time.time()\n            results = self.framework.assess_all_dimensions(test_data)\n            end_time = time.time()\n            \n            processing_time = end_time - start_time\n            records_per_second = size / processing_time\n            \n            scalability_results.append({\n                'size': size,\n                'time': processing_time,\n                'rps': records_per_second\n            })\n            \n            print(f\"     Time: {processing_time:.2f}s, Rate: {records_per_second:.0f} records/sec\")\n        \n        # Analyze scalability\n        print(f\"\\n📊 Scalability Analysis:\")\n        \n        # Check if processing scales linearly\n        small_rps = scalability_results[0]['rps']\n        large_rps = scalability_results[-1]['rps']\n        efficiency_ratio = large_rps / small_rps\n        \n        print(f\"   Small dataset rate: {small_rps:.0f} records/sec\")\n        print(f\"   Large dataset rate: {large_rps:.0f} records/sec\")\n        print(f\"   Efficiency ratio: {efficiency_ratio:.2f} (closer to 1.0 is better)\")\n        \n        if efficiency_ratio >= 0.8:\n            print(f\"   ✅ PASS: Good scalability performance\")\n        else:\n            print(f\"   ⚠️  WARNING: Performance degrades with scale\")\n        \n        self.test_results['scalability'] = {\n            'results': scalability_results,\n            'efficiency_ratio': efficiency_ratio,\n            'passed': efficiency_ratio >= 0.8\n        }\n        \n        return scalability_results\n    \n    def test_edge_cases(self):\n        \"\"\"Test framework with edge cases\"\"\"        \n        print(\"\\n⚠️  Test 4: Edge Case Testing\")\n        print(\"-\" * 40)\n        \n        edge_case_results = {}\n        \n        # Test 1: Empty dataset\n        print(\"\\n   🔍 Testing empty dataset...\")\n        try:\n            empty_df = pd.DataFrame()\n            results = self.framework.assess_all_dimensions(empty_df)\n            print(f\"     ❌ UNEXPECTED: Empty dataset should raise error\")\n            edge_case_results['empty_dataset'] = False\n        except Exception as e:\n            print(f\"     ✅ EXPECTED: Empty dataset handled gracefully ({type(e).__name__})\")\n            edge_case_results['empty_dataset'] = True\n        \n        # Test 2: Single record\n        print(\"\\n   🔍 Testing single record...\")\n        try:\n            generator = SyntheticFHIRDataGenerator()\n            single_record = generator.generate_synthetic_data(1)\n            results = self.framework.assess_all_dimensions(single_record)\n            print(f\"     ✅ SUCCESS: Single record processed\")\n            edge_case_results['single_record'] = True\n        except Exception as e:\n            print(f\"     ❌ FAILED: Single record failed ({type(e).__name__})\")\n            edge_case_results['single_record'] = False\n        \n        # Test 3: All missing data\n        print(\"\\n   🔍 Testing all missing data...\")\n        try:\n            all_missing = pd.DataFrame({\n                'patient_id': [None] * 100,\n                'age': [None] * 100,\n                'gender': [None] * 100\n            })\n            results = self.framework.assess_all_dimensions(all_missing)\n            expected_low_score = results['overall_score'] < 30\n            print(f\"     Score: {results['overall_score']:.1f}/100\")\n            if expected_low_score:\n                print(f\"     ✅ SUCCESS: All missing data scored appropriately low\")\n                edge_case_results['all_missing'] = True\n            else:\n                print(f\"     ❌ FAILED: All missing data should score very low\")\n                edge_case_results['all_missing'] = False\n        except Exception as e:\n            print(f\"     ❌ FAILED: All missing data failed ({type(e).__name__})\")\n            edge_case_results['all_missing'] = False\n        \n        # Test 4: Perfect data\n        print(\"\\n   🔍 Testing perfect data...\")\n        try:\n            perfect_generator = SyntheticFHIRDataGenerator(\n                missing_rate=0.0,\n                invalid_rate=0.0,\n                anomaly_rate=0.0\n            )\n            perfect_data = perfect_generator.generate_synthetic_data(100)\n            results = self.framework.assess_all_dimensions(perfect_data)\n            expected_high_score = results['overall_score'] >= 95\n            print(f\"     Score: {results['overall_score']:.1f}/100\")\n            if expected_high_score:\n                print(f\"     ✅ SUCCESS: Perfect data scored appropriately high\")\n                edge_case_results['perfect_data'] = True\n            else:\n                print(f\"     ⚠️  WARNING: Perfect data should score very high\")\n                edge_case_results['perfect_data'] = False\n        except Exception as e:\n            print(f\"     ❌ FAILED: Perfect data failed ({type(e).__name__})\")\n            edge_case_results['perfect_data'] = False\n        \n        self.test_results['edge_cases'] = edge_case_results\n        return edge_case_results\n    \n    def test_repeatability(self):\n        \"\"\"Test that framework produces consistent results\"\"\"        \n        print(\"\\n🔄 Test 5: Repeatability Testing\")\n        print(\"-\" * 40)\n        \n        generator = SyntheticFHIRDataGenerator(random_seed=42)  # Fixed seed\n        test_data = generator.generate_synthetic_data(500)\n        \n        # Run assessment multiple times\n        scores = []\n        for i in range(3):\n            results = self.framework.assess_all_dimensions(test_data)\n            scores.append(results['overall_score'])\n            print(f\"   Run {i+1}: {results['overall_score']:.1f}/100\")\n        \n        # Check consistency\n        score_std = np.std(scores)\n        max_acceptable_std = 2.0  # Allow small variation due to ML randomness\n        \n        print(f\"\\n📊 Repeatability Analysis:\")\n        print(f\"   Score Standard Deviation: {score_std:.2f}\")\n        print(f\"   Max Acceptable: {max_acceptable_std}\")\n        \n        if score_std <= max_acceptable_std:\n            print(f\"   ✅ PASS: Results are consistent\")\n            repeatability_passed = True\n        else:\n            print(f\"   ❌ FAIL: Results vary too much between runs\")\n            repeatability_passed = False\n        \n        self.test_results['repeatability'] = {\n            'scores': scores,\n            'std_dev': score_std,\n            'passed': repeatability_passed\n        }\n        \n        return scores\n    \n    def generate_test_report(self):\n        \"\"\"Generate comprehensive test report\"\"\"        \n        print(\"\\n📋 COMPREHENSIVE TEST REPORT\")\n        print(\"=\" * 60)\n        \n        total_tests = len(self.test_results)\n        passed_tests = 0\n        \n        for test_name, results in self.test_results.items():\n            print(f\"\\n🧪 {test_name.replace('_', ' ').title()}:\")\n            \n            if test_name == 'clean_data':\n                status = \"✅ PASS\" if results['passed'] else \"❌ FAIL\"\n                print(f\"   Status: {status}\")\n                print(f\"   Score: {results['score']:.1f}/100\")\n                print(f\"   Performance: {results['records_per_second']:.0f} records/sec\")\n                if results['passed']:\n                    passed_tests += 1\n                    \n            elif test_name == 'poor_data':\n                status = \"✅ PASS\" if (results['score_passed'] and results['issues_passed']) else \"❌ FAIL\"\n                print(f\"   Status: {status}\")\n                print(f\"   Score: {results['score']:.1f}/100\")\n                print(f\"   Issues Detected: {results['total_issues']}\")\n                if results['score_passed'] and results['issues_passed']:\n                    passed_tests += 1\n                    \n            elif test_name == 'scalability':\n                status = \"✅ PASS\" if results['passed'] else \"⚠️  WARNING\"\n                print(f\"   Status: {status}\")\n                print(f\"   Efficiency Ratio: {results['efficiency_ratio']:.2f}\")\n                if results['passed']:\n                    passed_tests += 1\n                    \n            elif test_name == 'edge_cases':\n                edge_passed = sum(results.values())\n                edge_total = len(results)\n                status = \"✅ PASS\" if edge_passed == edge_total else \"⚠️  PARTIAL\"\n                print(f\"   Status: {status}\")\n                print(f\"   Passed: {edge_passed}/{edge_total} edge cases\")\n                if edge_passed >= edge_total * 0.75:  # 75% pass rate\n                    passed_tests += 1\n                    \n            elif test_name == 'repeatability':\n                status = \"✅ PASS\" if results['passed'] else \"❌ FAIL\"\n                print(f\"   Status: {status}\")\n                print(f\"   Score Variation: {results['std_dev']:.2f}\")\n                if results['passed']:\n                    passed_tests += 1\n        \n        # Overall assessment\n        print(f\"\\n🎯 OVERALL FRAMEWORK ASSESSMENT:\")\n        print(f\"   Tests Passed: {passed_tests}/{total_tests}\")\n        print(f\"   Success Rate: {(passed_tests/total_tests)*100:.1f}%\")\n        \n        if passed_tests == total_tests:\n            overall_status = \"🟢 EXCELLENT - Framework ready for production\"\n        elif passed_tests >= total_tests * 0.8:\n            overall_status = \"🔵 GOOD - Framework ready with minor considerations\"\n        elif passed_tests >= total_tests * 0.6:\n            overall_status = \"🟡 ACCEPTABLE - Framework needs improvements\"\n        else:\n            overall_status = \"🔴 NEEDS WORK - Framework requires significant improvements\"\n        \n        print(f\"   Overall Status: {overall_status}\")\n        \n        # Recommendations\n        print(f\"\\n💡 RECOMMENDATIONS:\")\n        if passed_tests == total_tests:\n            print(f\"   • Framework is production-ready\")\n            print(f\"   • Consider adding more edge case tests\")\n            print(f\"   • Set up automated testing pipeline\")\n        else:\n            print(f\"   • Address failing test cases before production deployment\")\n            print(f\"   • Improve error handling for edge cases\")\n            print(f\"   • Optimize performance for large datasets\")\n        \n        return {\n            'total_tests': total_tests,\n            'passed_tests': passed_tests,\n            'success_rate': (passed_tests/total_tests)*100,\n            'overall_status': overall_status\n        }\n\n# Run comprehensive framework testing\nprint(\"🚀 Starting Comprehensive Framework Testing...\")\n\n# Initialize validator\nvalidator = FrameworkValidator(framework)\n\n# Run all tests\nprint(\"\\n\" + \"=\"*60)\nclean_results = validator.test_with_clean_data()\n\nprint(\"\\n\" + \"=\"*60)\npoor_results = validator.test_with_poor_data()\n\nprint(\"\\n\" + \"=\"*60)\nscalability_results = validator.test_scalability()\n\nprint(\"\\n\" + \"=\"*60)\nedge_case_results = validator.test_edge_cases()\n\nprint(\"\\n\" + \"=\"*60)\nrepeatability_results = validator.test_repeatability()\n\n# Generate final test report\nprint(\"\\n\" + \"=\"*60)\ntest_summary = validator.generate_test_report()\n\n# Save test results\ntest_output = {\n    'test_date': datetime.now().isoformat(),\n    'framework_version': '1.0.0',\n    'test_summary': test_summary,\n    'detailed_results': validator.test_results\n}\n\nwith open('../data/framework_test_results.json', 'w') as f:\n    json.dump(test_output, f, indent=2, default=str)\n\nprint(f\"\\n💾 Test results saved to: ../data/framework_test_results.json\")\nprint(f\"\\n🎉 Framework testing complete! The Healthcare Data Quality Framework has been successfully validated.\")

## 🎉 Conclusion and Summary

### Healthcare Data Quality Assessment Framework - Complete Implementation

Congratulations! You have successfully implemented and demonstrated a comprehensive **Healthcare Data Quality Assessment Framework** based on Kahn's three-dimensional model. This framework provides:

#### 🏗️ **Framework Architecture**
- **Completeness Dimension**: Detects missing data, null values, and incomplete records
- **Conformance Dimension**: Validates data formats, types, and domain constraints  
- **Plausibility Dimension**: Checks clinical logic, temporal consistency, and medical validity

#### 🤖 **Dual Validation Approach**
- **Rule-Based Validation**: Traditional deterministic rules for known quality issues
- **ML-Based Anomaly Detection**: Unsupervised learning to discover unknown patterns
- **Ensemble Scoring**: Combined confidence scores for comprehensive assessment

#### 📊 **Comprehensive Reporting**
- **Executive Scorecards**: High-level quality metrics for leadership
- **Operational Dashboards**: Real-time monitoring and alerting
- **Technical Analysis**: Detailed issue patterns and ML insights
- **Business Impact**: Risk assessment and cost implications

#### 🔬 **Production Readiness**
- **Scalability Testing**: Validated performance with varying data sizes
- **Edge Case Handling**: Robust error handling and graceful degradation
- **Repeatability**: Consistent results across multiple runs
- **Extensibility**: Modular design for easy customization

### 🚀 **Next Steps for Implementation**

1. **Data Integration**: Connect to your actual FHIR data sources
2. **Rule Customization**: Adapt validation rules to your specific requirements
3. **ML Model Training**: Train models on your organization's historical data
4. **Dashboard Deployment**: Set up automated reporting and monitoring
5. **Workflow Integration**: Embed quality checks into data pipelines

### 📚 **Learning Outcomes**

Through this comprehensive demonstration, you have learned:
- How to implement Kahn's data quality framework in practice
- Techniques for combining rule-based and ML-based validation
- Methods for generating actionable quality reports and scorecards
- Best practices for healthcare data quality assessment
- Approaches for validating and testing data quality frameworks

### 🔧 **Framework Components Summary**

| Component | Purpose | Key Features |
|-----------|---------|--------------|
| `dimensions.py` | Core quality dimensions | Kahn's framework implementation |
| `framework.py` | Main orchestration | Unified assessment interface |
| `synthetic_generator.py` | Test data creation | FHIR-like synthetic data |
| `rule_based.py` | Traditional validation | Healthcare-specific rules |
| `ml_based.py` | Anomaly detection | Isolation Forest + Autoencoder |
| `scorecard.py` | Reporting system | Comprehensive output formats |

This framework serves as both a learning tool and a foundation for production healthcare data quality systems. The modular design allows for easy adaptation to specific organizational needs while maintaining the rigor of established data quality assessment methodologies.

**🏥 Ready to improve your healthcare data quality!** 🏥