In [1]:
# Install Java 11
!apt-get update
!apt-get install -y openjdk-11-jdk-headless -qq > /dev/null

# Download Spark 3.5.4 with Hadoop 3.3
!wget https://downloads.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz
!tar xf spark-3.5.4-bin-hadoop3.tgz

# Set environment variables for Java and Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"
os.environ["SPARK_VERSION"] = "3.5"

# Install PySpark 3.5.0, PyDeequ, Pandas and SQLite
!pip install pyspark==3.5.0 pydeequ pandas faker
!apt-get install sqlite3

# Download SQLite JDBC driver
!wget https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.42.0.0/sqlite-jdbc-3.42.0.0.jar -O /content/sqlite-jdbc-3.42.0.0.jar

# Download Deequ 2.0.7 JAR for Spark 3.5
!wget https://repo1.maven.org/maven2/com/amazon/deequ/deequ/2.0.7-spark-3.5/deequ-2.0.7-spark-3.5.jar -O /content/deequ-2.0.7-spark-3.5.jar


Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:3 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,659 kB]
Hit:4 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [8,696 kB]
Hit:6 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:7 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:8 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:9 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:10 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,230 kB]
Get:11 http://security.ubuntu.com/ubuntu jammy-security/restricted amd64 Packages [3,657 kB]
Get:12 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,634 kB]
Get:13 http://archive.ubuntu.com/ubuntu jammy-backports InRelea

In [2]:
!pip install tabulate

Collecting tabulate
  Downloading tabulate-0.9.0-py3-none-any.whl.metadata (34 kB)
Downloading tabulate-0.9.0-py3-none-any.whl (35 kB)
Installing collected packages: tabulate
Successfully installed tabulate-0.9.0


In [10]:
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, min, max, col, count
from pydeequ.analyzers import AnalysisRunner, AnalyzerContext
from pydeequ.analyzers import Size, Completeness, Mean, Maximum, Minimum, CountDistinct, Distinctness
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
import pandas as pd
from datetime import datetime
import json
from tabulate import tabulate
import sqlite3

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('data_quality_validation.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def create_spark_session():
    """Initialize Spark session with required configurations"""
    logger.info("Initializing Spark session...")
    try:
        spark = (SparkSession.builder
            .config("spark.jars", "/content/sqlite-jdbc-3.42.0.0.jar,/content/deequ-2.0.7-spark-3.5.jar")
            .config("spark.driver.memory", "16g")
            .getOrCreate())
        logger.info("Spark session created successfully")
        return spark
    except Exception as e:
        logger.error(f"Failed to create Spark session: {str(e)}")
        raise

def load_dataframes(spark):
    """Load CSV files into Spark DataFrames with error handling"""
    logger.info("Loading data from CSV files...")
    try:
        patients_df = spark.read.csv('patients.csv', header=True, inferSchema=True)
        encounters_df = spark.read.csv('encounters.csv', header=True, inferSchema=True)
        procedures_df = spark.read.csv('procedures.csv', header=True, inferSchema=True)

        # Log basic statistics
        logger.info(f"Patients dataset loaded: {patients_df.count()} records")
        logger.info(f"Encounters dataset loaded: {encounters_df.count()} records")
        logger.info(f"Procedures dataset loaded: {procedures_df.count()} records")

        return patients_df, encounters_df, procedures_df
    except Exception as e:
        logger.error(f"Error loading data: {str(e)}")
        raise

def verify_dataset(spark, df, dataset_name):
    """Run verification checks on a dataset using PyDeequ with enhanced details"""
    logger.info(f"Verifying {dataset_name} dataset using PyDeequ...")

    # Dataset-specific ID field
    id_field = "Id" if dataset_name == "patients" else "PATIENT"

    # Create a check with the spark session included
    check = Check(spark, CheckLevel.Error, f"{dataset_name} Data Quality Checks")

    # Base checks for all datasets
    base_check = check.hasSize(lambda x: x > 0) \
                      .isComplete(id_field)

    # Dataset-specific checks
    if dataset_name == "patients":
        # Add patient-specific checks
        verification_check = base_check \
            .isContainedIn("GENDER", ["M", "F"]) \
            .isComplete("BIRTHDATE")

    elif dataset_name == "encounters":
        # Add encounter-specific checks
        verification_check = base_check \
            .isNonNegative("BASE_ENCOUNTER_COST") \
            .isComplete("START") \
            .isComplete("STOP")

    elif dataset_name == "procedures":
        # Add procedure-specific checks
        verification_check = base_check \
            .isNonNegative("BASE_COST") \
            .isComplete("DESCRIPTION") \
            .isComplete("START") \
            .isComplete("STOP")

    # Run the verification suite with the constructed check
    verification_result = VerificationSuite(spark) \
        .onData(df) \
        .addCheck(verification_check) \
        .run()

    # Convert the results to a DataFrame
    result_df = VerificationResult.checkResultsAsDataFrame(spark, verification_result)

    # Process verification results
    verification_results = []

    for row in result_df.collect():
        status = "Success" if row["constraint_status"] == "Success" else "Error"
        constraint = row["constraint"]

        # Enhance the details column with meaningful information based on the constraint type
        details = row["constraint_message"] if row["constraint_message"] else ""

        if status == "Success" and not details:
            # Add appropriate success messages based on constraint type
            if "SizeConstraint" in constraint:
                record_count = df.count()
                details = f"Dataset contains {record_count:,} records, which meets the size requirement."

            elif "CompletenessConstraint" in constraint:
                # Extract column name from constraint description using safer regex
                import re
                match = re.search(r"Completeness\(([^,]+)", constraint)
                if match:
                    column_name = match.group(1)
                    # Only proceed if the column name is actually in the dataframe
                    if column_name in df.columns:
                        completeness = df.filter(df[column_name].isNotNull()).count() / float(df.count()) * 100
                        details = f"Column '{column_name}' is {completeness:.2f}% complete."
                    else:
                        details = f"Column completeness check passed for {column_name}."
                else:
                    details = f"Column completeness check passed."

            elif "ComplianceConstraint" in constraint and "contained in" in constraint:
                # For gender validation
                gender_counts = df.groupBy("GENDER").count().collect()
                gender_info = ", ".join([f"{row['GENDER']}: {row['count']:,}" for row in gender_counts])
                details = f"Gender distribution: {gender_info}"

            elif "ComplianceConstraint" in constraint and "non-negative" in constraint:
                # Extract cost column from the constraint
                if "BASE_ENCOUNTER_COST" in constraint:
                    cost_col = "BASE_ENCOUNTER_COST"
                elif "BASE_COST" in constraint:
                    cost_col = "BASE_COST"
                else:
                    cost_col = None

                if cost_col and cost_col in df.columns:
                    cost_stats = df.select(
                        mean(cost_col).alias("mean"),
                        min(cost_col).alias("min"),
                        max(cost_col).alias("max")
                    ).collect()[0]

                    cost_type = "encounter" if cost_col == "BASE_ENCOUNTER_COST" else "procedure"
                    details = f"All {cost_type} costs are non-negative. Min: ${cost_stats['min']:.2f}, Max: ${cost_stats['max']:.2f}, Mean: ${cost_stats['mean']:.2f}"
                else:
                    details = "All values are non-negative."
            else:
                details = "Check passed successfully."

        verification_results.append({
            "check_description": constraint,
            "status": status,
            "details": details
        })

    return verification_results

def analyze_dataset(spark, df, dataset_name):
    """Analyze a single dataset using PyDeequ analyzers"""
    logger.info(f"Analyzing {dataset_name} dataset using PyDeequ...")
    try:
        # Initialize analysis runners for different metrics
        analysis_results = []

        # Basic size analysis
        size_analyzer = AnalysisRunner(spark) \
            .onData(df) \
            .addAnalyzer(Size()) \
            .run()

        size_metrics = AnalyzerContext.successMetricsAsDataFrame(spark, size_analyzer)

        # Extract row count
        for row in size_metrics.collect():
            if row["name"] == "Size":
                analysis_results.append({
                    "analyzer": "Record Count",
                    "value": row["value"]
                })

        # Column completeness analysis
        completeness_analyzer = AnalysisRunner(spark) \
            .onData(df) \
            .addAnalyzer(Completeness("*")) \
            .run()

        completeness_metrics = AnalyzerContext.successMetricsAsDataFrame(spark, completeness_analyzer)

        # Extract completeness metrics
        for row in completeness_metrics.collect():
            if row["name"] == "Completeness":
                column_name = row["instance"]
                analysis_results.append({
                    "analyzer": f"Completeness ({column_name})",
                    "value": row["value"]
                })

        # Dataset-specific analysis
        if dataset_name == "patients":
            # Gender distribution - using direct Spark operations
            gender_dist = df.groupBy("GENDER").count().collect()
            row_count = df.count()

            for row in gender_dist:
                analysis_results.append({
                    "analyzer": f"Gender Distribution ({row['GENDER']})",
                    "value": row['count'] / float(row_count)
                })

        elif dataset_name == "encounters":
            # Cost statistics
            cost_col = "BASE_ENCOUNTER_COST"

            cost_analyzer = AnalysisRunner(spark) \
                .onData(df) \
                .addAnalyzer(Minimum(cost_col)) \
                .addAnalyzer(Maximum(cost_col)) \
                .addAnalyzer(Mean(cost_col)) \
                .run()

            cost_metrics = AnalyzerContext.successMetricsAsDataFrame(spark, cost_analyzer)

            # Extract cost metrics
            for row in cost_metrics.collect():
                if row["name"] == "Minimum" and row["instance"] == cost_col:
                    analysis_results.append({
                        "analyzer": "Min Encounter Cost",
                        "value": float(row["value"])
                    })
                elif row["name"] == "Maximum" and row["instance"] == cost_col:
                    analysis_results.append({
                        "analyzer": "Max Encounter Cost",
                        "value": float(row["value"])
                    })
                elif row["name"] == "Mean" and row["instance"] == cost_col:
                    analysis_results.append({
                        "analyzer": "Mean Encounter Cost",
                        "value": float(row["value"])
                    })

        elif dataset_name == "procedures":
            # Cost statistics
            cost_col = "BASE_COST"

            cost_analyzer = AnalysisRunner(spark) \
                .onData(df) \
                .addAnalyzer(Minimum(cost_col)) \
                .addAnalyzer(Maximum(cost_col)) \
                .addAnalyzer(Mean(cost_col)) \
                .run()

            cost_metrics = AnalyzerContext.successMetricsAsDataFrame(spark, cost_analyzer)

            # Extract cost metrics
            for row in cost_metrics.collect():
                if row["name"] == "Minimum" and row["instance"] == cost_col:
                    analysis_results.append({
                        "analyzer": "Min Procedure Cost",
                        "value": float(row["value"])
                    })
                elif row["name"] == "Maximum" and row["instance"] == cost_col:
                    analysis_results.append({
                        "analyzer": "Max Procedure Cost",
                        "value": float(row["value"])
                    })
                elif row["name"] == "Mean" and row["instance"] == cost_col:
                    analysis_results.append({
                        "analyzer": "Mean Procedure Cost",
                        "value": float(row["value"])
                    })

        return analysis_results

    except Exception as e:
        logger.error(f"Error analyzing {dataset_name} dataset with PyDeequ: {str(e)}")
        logger.exception("Stack trace:")
        raise

def save_to_sqlite(df, table_name, db_path):
    """Save DataFrame to SQLite with logging"""
    try:
        pandas_df = df.toPandas()
        conn = sqlite3.connect(db_path)
        pandas_df.to_sql(table_name, conn, if_exists='replace', index=False)
        conn.close()
        logger.info(f"Successfully saved {table_name} to SQLite")
    except Exception as e:
        logger.error(f"Error saving {table_name} to SQLite: {str(e)}")
        raise

def print_results_summary(results):
    """Print formatted results summary"""
    print("\n=== DATA QUALITY VALIDATION SUMMARY ===\n")

    # Print dataset metrics
    print("Dataset Metrics:")
    metrics_table = []
    for dataset, metrics in results["dataset_metrics"].items():
        for metric_name, value in metrics.items():
            metrics_table.append([dataset, metric_name, value])
    print(tabulate(metrics_table,
                  headers=["Dataset", "Metric", "Value"],
                  tablefmt="grid"))

    # Print analysis results
    print("\nAnalysis Results:")
    analysis_table = []
    for dataset, analyses in results["analysis_results"].items():
        for analysis in analyses:
            value = analysis['value']
            formatted_value = f"{value:.2%}" if isinstance(value, float) and value <= 1.0 else f"{value:,.2f}"
            analysis_table.append([
                dataset,
                analysis["analyzer"],
                formatted_value
            ])
    print(tabulate(analysis_table,
                  headers=["Dataset", "Analyzer", "Value"],
                  tablefmt="grid"))

    # Print verification results
    print("\nVerification Results:")
    verification_table = []
    for dataset, verifications in results["verification_results"].items():
        for verification in verifications:
            verification_table.append([
                dataset,
                verification["check_description"],
                verification["status"],
                verification["details"]
            ])
    print(tabulate(verification_table,
                  headers=["Dataset", "Check Description", "Status", "Details"],
                  tablefmt="grid"))

def main():
    """Main execution function"""
    try:
        logger.info("Starting data quality validation process")

        # Initialize Spark
        spark = create_spark_session()

        # Load data
        patients_df, encounters_df, procedures_df = load_dataframes(spark)

        # Initialize results dictionary
        results = {
            "timestamp": datetime.now().isoformat(),
            "dataset_metrics": {},
            "analysis_results": {},
            "verification_results": {}
        }

        # Process each dataset
        datasets = {
            "patients": patients_df,
            "encounters": encounters_df,
            "procedures": procedures_df
        }

        for dataset_name, df in datasets.items():
            # Add basic metrics
            results["dataset_metrics"][dataset_name] = {
                "record_count": df.count(),
                "column_count": len(df.columns)
            }

            # Run analysis
            results["analysis_results"][dataset_name] = analyze_dataset(spark, df, dataset_name)

            # Run verification
            results["verification_results"][dataset_name] = verify_dataset(spark, df, dataset_name)

        # Save results to JSON
        with open('data_quality_results.json', 'w') as f:
            json.dump(results, f, indent=4)

        # Save to SQLite
        database_path = 'healthcare_data.db'
        for dataset_name, df in datasets.items():
            save_to_sqlite(df, dataset_name, database_path)

        # Print results summary
        print_results_summary(results)

        logger.info("Data quality validation process completed successfully")

    except Exception as e:
        logger.error(f"Error in main execution: {str(e)}")
        raise
    finally:
        if 'spark' in locals():
            spark.stop()
            logger.info("Spark session stopped")

if __name__ == "__main__":
    main()


=== DATA QUALITY VALIDATION SUMMARY ===

Dataset Metrics:
+------------+--------------+---------+
| Dataset    | Metric       |   Value |
| patients   | record_count |     974 |
+------------+--------------+---------+
| patients   | column_count |      20 |
+------------+--------------+---------+
| encounters | record_count |   27891 |
+------------+--------------+---------+
| encounters | column_count |      14 |
+------------+--------------+---------+
| procedures | record_count |   47701 |
+------------+--------------+---------+
| procedures | column_count |       9 |
+------------+--------------+---------+

Analysis Results:
+------------+-------------------------+------------+
| Dataset    | Analyzer                | Value      |
| patients   | Record Count            | 974.00     |
+------------+-------------------------+------------+
| patients   | Gender Distribution (F) | 49.28%     |
+------------+-------------------------+------------+
| patients   | Gender Distribution (M)