#### Reading the YAML Config File

In [0]:
import yaml
from pyspark.sql import SparkSession

In [0]:
with open("config.yaml", "r") as f:
    config = yaml.safe_load(f)

In [0]:
print(config)

{'data_quality_checks': {'workspace.default.source_table_perfect': {'uniqueness': ['customer_id', 'order_id'], 'completeness': ['order_id', 'order_date'], 'validity': [{'column': 'order_date', 'check': "order_date > '2023-01-01'"}], 'row_count': {'min': 10, 'max': 100000}}, 'workspace.default.source_table_uniqueness_fail': {'uniqueness': ['customer_id', 'order_id'], 'completeness': ['order_id', 'order_date'], 'validity': [{'column': 'order_date', 'check': "order_date >= '2024-01-01'"}], 'row_count': {'min': 5, 'max': 10}}, 'workspace.default.source_table_completeness_validity_fail': {'uniqueness': ['customer_id', 'order_id'], 'completeness': ['order_id', 'order_date'], 'validity': [{'column': 'order_date', 'check': "order_date >= '2024-01-01'"}], 'row_count': {'min': 5, 'max': 10}}, 'workspace.default.source_table_row_count_fail': {'uniqueness': ['customer_id', 'order_id'], 'completeness': ['order_id', 'order_date'], 'validity': [{'column': 'order_date', 'check': "order_date >= '2024-0

In [0]:
config["data_quality_checks"].items()

dict_items([('workspace.default.source_table_perfect', {'uniqueness': ['customer_id', 'order_id'], 'completeness': ['order_id', 'order_date'], 'validity': [{'column': 'order_date', 'check': "order_date > '2023-01-01'"}], 'row_count': {'min': 10, 'max': 100000}}), ('workspace.default.source_table_uniqueness_fail', {'uniqueness': ['customer_id', 'order_id'], 'completeness': ['order_id', 'order_date'], 'validity': [{'column': 'order_date', 'check': "order_date >= '2024-01-01'"}], 'row_count': {'min': 5, 'max': 10}}), ('workspace.default.source_table_completeness_validity_fail', {'uniqueness': ['customer_id', 'order_id'], 'completeness': ['order_id', 'order_date'], 'validity': [{'column': 'order_date', 'check': "order_date >= '2024-01-01'"}], 'row_count': {'min': 5, 'max': 10}}), ('workspace.default.source_table_row_count_fail', {'uniqueness': ['customer_id', 'order_id'], 'completeness': ['order_id', 'order_date'], 'validity': [{'column': 'order_date', 'check': "order_date >= '2024-01-01'"

### Generate Sample Tables to check this Data Validation Code

**The Perfect Table** will have all the validation checks passed and there won't be any failure. 

In [0]:
perfect_data = [
    (1, "ORD001", "2024-01-01", 150.75, "processed"),
    (2, "ORD002", "2024-01-02", 200.00, "shipped"),
    (3, "ORD003", "2024-01-03", 50.25, "delivered"),
    (4, "ORD004", "2024-01-04", 300.50, "processed"),
    (5, "ORD005", "2024-01-05", 75.10, "shipped"),
    (6, "ORD006", "2024-01-06", 120.00, "delivered"),
    (7, "ORD007", "2024-01-07", 180.00, "processed"),
    (8, "ORD008", "2024-01-08", 90.00, "shipped"),
    (9, "ORD009", "2024-01-09", 60.50, "delivered"),
    (10, "ORD010", "2024-01-10", 250.00, "processed")
]
columns = ["customer_id", "order_id", "order_date", "total_amount", "status"]
perfect_df = spark.createDataFrame(perfect_data, schema=columns)

# Write this to a table
perfect_df.write.format("delta").mode("overwrite").saveAsTable("source_table_perfect")

**The Uniqueness Failure Dataset** has a duplicate "customer_id" and a duplicate "order_id", which should fail the uniqueness check.

In [0]:
uniqueness_fail_data = [
    (1, "ORD001", "2024-01-01", 150.75, "processed"),
    (2, "ORD002", "2024-01-02", 200.00, "shipped"),
    (3, "ORD003", "2024-01-03", 50.25, "delivered"),
    (3, "ORD003", "2024-01-04", 300.50, "processed"), # Duplicate customer_id and order_id
    (5, "ORD005", "2024-01-05", 75.10, "shipped")
]
columns = ["customer_id", "order_id", "order_date", "total_amount", "status"]
uniqueness_fail_df = spark.createDataFrame(uniqueness_fail_data, schema=columns)

# Write this to a table
uniqueness_fail_df.write.format("delta").mode("overwrite").saveAsTable("source_table_uniqueness_fail")

**The Completeness and Validity Failure Dataset**
includes **NULL** values in key columns and a date that violates your validity rule, which should trigger failures.

In [0]:
completeness_validity_fail_data = [
    (1, "ORD001", "2024-01-01", 150.75, "processed"),
    (2, "ORD002", None, 200.00, "shipped"),  # Null order_date
    (3, "ORD003", "2022-12-31", 50.25, "delivered"), # Fails validity check (< '2023-01-01')
    (4, None, "2024-01-04", 300.50, "processed"), # Null order_id
    (5, "ORD005", "2024-01-05", 75.10, "shipped")
]
columns = ["customer_id", "order_id", "order_date", "total_amount", "status"]
completeness_validity_fail_df = spark.createDataFrame(completeness_validity_fail_data, schema=columns)

# Write this to a table
completeness_validity_fail_df.write.format("delta").mode("overwrite").saveAsTable("source_table_completeness_validity_fail")

**The Row Count Failure Dataset**
has a very small number of rows, which should fall below the minimum row count threshold set in our config.yaml and will hence trigger a failure.

In [0]:
row_count_fail_data = [
    (1, "ORD001", "2024-01-01", 150.75, "processed"),
    (2, "ORD002", "2024-01-02", 200.00, "shipped")
]
columns = ["customer_id", "order_id", "order_date", "total_amount", "status"]
row_count_fail_df = spark.createDataFrame(row_count_fail_data, schema=columns)

# Write this to a table
row_count_fail_df.write.format("delta").mode("overwrite").saveAsTable("source_table_row_count_fail")

#### Data Quality Checks and Logging Checks

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct, sum, lit

def run_data_quality_checks(df, dq_rules):
    """
    Executes data quality checks on a DataFrame based on provided rules.
    Returns a dictionary of results.
    """
    results = {}
    
    # Check Uniqueness
    if 'uniqueness' in dq_rules:
        for column in dq_rules['uniqueness']:
            total_count = df.count()
            distinct_count = df.select(countDistinct(column)).first()[0]
            uniqueness_pct = (distinct_count / total_count) * 100
            results[f"uniqueness_{column}"] = {
                "status": "PASS" if uniqueness_pct == 100 else "FAIL",
                "message": f"Uniqueness for '{column}': {uniqueness_pct:.2f}%"
            }
    
    # Check Completeness
    if 'completeness' in dq_rules:
        for column in dq_rules['completeness']:
            completeness_count = df.filter(col(column).isNotNull()).count()
            total_count = df.count()
            completeness_pct = (completeness_count / total_count) * 100
            results[f"completeness_{column}"] = {
                "status": "PASS" if completeness_pct == 100 else "FAIL",
                "message": f"Completeness for '{column}': {completeness_pct:.2f}%"
            }

    # Check Validity (using a generic expression)
    if 'validity' in dq_rules:
        for rule in dq_rules['validity']:
            column = rule['column']
            check_expression = rule['check']
            valid_count = df.filter(check_expression).count()
            total_count = df.count()
            validity_pct = (valid_count / total_count) * 100
            results[f"validity_{column}"] = {
                "status": "PASS" if validity_pct == 100 else "FAIL",
                "message": f"Validity for '{column}': {validity_pct:.2f}%"
            }

    # Check Row Count
    if 'row_count' in dq_rules:
        row_count = df.count()
        min_rows = dq_rules['row_count'].get('min', 0)
        max_rows = dq_rules['row_count'].get('max', float('inf'))
        status = "PASS" if min_rows <= row_count <= max_rows else "FAIL"
        results["row_count_check"] = {
            "status": status,
            "message": f"Row count: {row_count} (Expected between {min_rows} and {max_rows})"
        }
    
    return results

def log_results(results, table_name):
    """
    Logs data quality check results to a Delta table for monitoring and alerting.
    """
    # Convert results dictionary to a list of rows
    rows = []
    for check_name, result in results.items():
        rows.append((table_name, check_name, result['status'], result['message']))

    # Define schema for the results table
    schema = "table_name STRING, check_name STRING, status STRING, message STRING"
    
    # Create DataFrame and write to a Delta table
    results_df = spark.createDataFrame(rows, schema=schema)
    results_df.write.format("delta").mode("append").saveAsTable("dq_metrics_results")
    print("DQ results logged successfully!")

#### Scanning YAML and doing Data Quality Validation for the Tables

In [0]:
# Loop Through Each Table and Run Checks 
for table_name, dq_rules in config['data_quality_checks'].items():
    print(f"Running DQ checks for table: {table_name}")

    # Read Data 
    try:
        df = spark.table(table_name)
    except Exception as e:
        print(f"Error reading table {table_name}: {e}")
        continue

    #  Step 4: Run Checks and Log Results 
    results = run_data_quality_checks(df, dq_rules)
    log_results(results, table_name)

    #  Step 5: Check for Failures and Trigger Alerting 
    failures = {k: v for k, v in results.items() if v['status'] == 'FAIL'}
    if failures:
        print(f"DQ checks failed for {table_name}. Triggering alert.")
        
        # Build the email body
        email_body = f"Data Quality Alert for table '{table_name}':\n\n"
        for check_name, failure in failures.items():
            email_body += f"- {check_name}: {failure['message']}\n"
        
        # Call the alerting function
        print(f"Data Quality Alert: {table_name} - FAILED", email_body)
    else:
        print(f"All DQ checks passed for {table_name}.")

Running DQ checks for table: workspace.default.source_table_perfect
DQ results logged successfully!
All DQ checks passed for workspace.default.source_table_perfect.
Running DQ checks for table: workspace.default.source_table_uniqueness_fail
DQ results logged successfully!
DQ checks failed for workspace.default.source_table_uniqueness_fail. Triggering alert.
Data Quality Alert: workspace.default.source_table_uniqueness_fail - FAILED Data Quality Alert for table 'workspace.default.source_table_uniqueness_fail':

- uniqueness_customer_id: Uniqueness for 'customer_id': 80.00%
- uniqueness_order_id: Uniqueness for 'order_id': 80.00%

Running DQ checks for table: workspace.default.source_table_completeness_validity_fail
DQ results logged successfully!
DQ checks failed for workspace.default.source_table_completeness_validity_fail. Triggering alert.
Data Quality Alert: workspace.default.source_table_completeness_validity_fail - FAILED Data Quality Alert for table 'workspace.default.source_tabl

#### Checking Generated Logs for Better Insights

In [0]:
spark.read.table("workspace.default.dq_metrics_results").display()

table_name,check_name,status,message
workspace.default.source_table_completeness_validity_fail,uniqueness_customer_id,PASS,Uniqueness for 'customer_id': 100.00%
workspace.default.source_table_completeness_validity_fail,uniqueness_order_id,FAIL,Uniqueness for 'order_id': 80.00%
workspace.default.source_table_completeness_validity_fail,completeness_order_id,FAIL,Completeness for 'order_id': 80.00%
workspace.default.source_table_completeness_validity_fail,completeness_order_date,FAIL,Completeness for 'order_date': 80.00%
workspace.default.source_table_completeness_validity_fail,validity_order_date,FAIL,Validity for 'order_date': 60.00%
workspace.default.source_table_completeness_validity_fail,row_count_check,PASS,Row count: 5 (Expected between 5 and 10)
workspace.default.source_table_uniqueness_fail,uniqueness_customer_id,FAIL,Uniqueness for 'customer_id': 80.00%
workspace.default.source_table_uniqueness_fail,uniqueness_order_id,FAIL,Uniqueness for 'order_id': 80.00%
workspace.default.source_table_uniqueness_fail,completeness_order_id,PASS,Completeness for 'order_id': 100.00%
workspace.default.source_table_uniqueness_fail,completeness_order_date,PASS,Completeness for 'order_date': 100.00%


**Note -** As expected we are having 6 tests in total based on our YAML file parameters and we did it for 4 tables in total, so we have 24 records in total.