In [None]:
import sys
sys.path

In [None]:
sys.prefix

In [None]:
import os
from pyspark.sql.session import SparkSession

# Setting the spark expectations environment variable to local, so that the spark session can be set accordingly for local testing
os.environ["SPARKEXPECTATIONS_ENV"] = "local"

In [None]:
# Setting up Kafka locally to stream the spark-expectations stats to Kafka
os.system(f"sh ./docker_scripts/docker_kafka_start_script.sh")

In [None]:
# Setting up the spark session for delta lake
spark = SparkSession.builder\
    .config( # type: ignore
        "spark.jars",
        "./jars/spark-sql-kafka-0-10_2.12-3.0.0.jar,"
        "./jars/kafka-clients-3.0.0.jar,"
        "./jars/commons-pool2-2.8.0.jar,"
        "./jars/spark-token-provider-kafka-0-10_2.12-3.0.0.jar"
    )\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")\
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .config("spark.sql.warehouse.dir", "/tmp/hive/warehouse")\
    .config("spark.driver.extraJavaOptions", "-Dderby.system.home=/tmp/derby")\
    .config("spark.jars.ivy", "/tmp/ivy2")\
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")\
    .getOrCreate()

In [None]:
# cleanup the previous runs data if any existing        
spark.sql("drop database if exists  dq_spark_local cascade")
os.system("rm -rf /tmp/hive/warehouse/dq_spark_local.db")

In [None]:
# Creating the database for the spark expectations local testing
spark.sql("create database if not exists spark_catalog.dq_spark_local")
spark.sql(" use spark_catalog.dq_spark_local")

In [None]:
# Validate to expect zero tables in the database
spark.sql("show tables").show()

In [None]:
# Setup the rules table schema
RULES_TABLE_SCHEMA = """ ( product_id STRING,
    table_name STRING,
    rule_type STRING,
    rule STRING,
    column_name STRING,
    expectation STRING,
    action_if_failed STRING,
    tag STRING,
    description STRING,
    enable_for_source_dq_validation BOOLEAN, 
    enable_for_target_dq_validation BOOLEAN,
    is_active BOOLEAN,
    enable_error_drop_alert BOOLEAN,
    error_drop_threshold INT )
"""

# Setup the rules
RULES_DATA = """ 
    ("your_product", "dq_spark_local.customer_order",  "row_dq", "customer_id_is_not_null", "customer_id", "customer_id is not null","drop", "validity", "customer_id should not be null", true, true,true, false, 0)
    ,("your_product", "dq_spark_local.customer_order", "row_dq", "sales_greater_than_two", "sales", "sales > 2", "drop", "accuracy", "sales value should be greater than two", true, true, true, false, 0)
    ,("your_product", "dq_spark_local.customer_order", "row_dq", "discount_threshold", "discount", "discount*100 < 60","drop", "validity", "discount should be less than 40", true, true, true, false, 0)
    ,("your_product", "dq_spark_local.customer_order", "row_dq", "ship_mode_in_set", "ship_mode", "lower(trim(ship_mode)) in('second class', 'standard class', 'standard class')", "drop", "validity", "ship_mode mode belongs in the sets", true, true, true, false, 0)
    ,("your_product", "dq_spark_local.customer_order", "row_dq", "profit_threshold", "profit", "profit>0", "drop", "validity", "profit threshold should be greater than 0", true, true, true, true, 0)
    
    ,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_sales", "sales", "sum(sales)>10000", "ignore", "validity", "sum of sales should be greater than 10000",  true, true, true, false, 0)
    ,("your_product", "dq_spark_local.customer_order", "agg_dq", "sum_of_quantity", "quantity", "sum(quantity)>10000", "ignore", "validity", "sum of quantity should be greater than 10000", true, true, true, false, 0)
    ,("your_product", "dq_spark_local.customer_order", "agg_dq", "distinct_of_ship_mode", "ship_mode", "count(distinct ship_mode)<=3", "ignore", "validity", "ship_mode's should not be more than 3", true, true, true, false, 0)
    ,("your_product", "dq_spark_local.customer_order", "agg_dq", "row_count", "*", "count(*)>=10000", "ignore", "validity", "count should not be greater than 10000", true, true, true, false, 0)

    ,("your_product", "dq_spark_local.customer_order", "query_dq", "product_missing_count_threshold", "*", "((select count(distinct product_id) from product) - (select count(distinct product_id) from order))>(select count(distinct product_id) from product)*0.2", "ignore", "validity", "row count threshold", true, true, true, false, 0)
    ,("your_product", "dq_spark_local.customer_order", "query_dq", "product_category", "*", "(select count(distinct category) from product) < 5", "ignore", "validity", "distinct product category", true, true, true, false, 0)
    ,("your_product", "dq_spark_local.customer_order", "query_dq", "row_count_in_order", "*", "(select count(*) from order)<10000", "ignore", "accuracy", "count of the row in order dataset", true, true, true, false, 0)
    
"""

In [None]:
# create rules table
spark.sql(f" CREATE TABLE dq_spark_local.dq_rules {RULES_TABLE_SCHEMA} USING DELTA")

# insert the rules data
spark.sql(f" INSERT INTO dq_spark_local.dq_rules  values {RULES_DATA} ")

In [None]:
# show rules
spark.sql("select * from dq_spark_local.dq_rules order by rule_type").show(truncate=False)

In [None]:
from pyspark.sql import DataFrame
from spark_expectations import _log
from spark_expectations.examples.base_setup import set_up_delta
from spark_expectations.core.expectations import (
    SparkExpectations,
    WrappedDataFrameWriter,
)
from spark_expectations.config.user_config import Constants as user_config

In [None]:
# Setup the writer configuration for the spark expectations
writer = WrappedDataFrameWriter().mode("append").format("delta")

# Setup the spark expectations object
se: SparkExpectations = SparkExpectations(
    product_id="your_product",
    rules_df=spark.table("dq_spark_local.dq_rules"),
    stats_table="dq_spark_local.dq_stats",
    stats_table_writer=writer,
    target_and_error_table_writer=writer,
    debugger=False,
    # stats_streaming_options={user_config.se_enable_streaming: False},
)

In [None]:
# setup the config for the spark expectations
user_conf = {
    user_config.se_notifications_enable_email: False,
    user_config.se_notifications_email_smtp_host: "mailhost.com",
    user_config.se_notifications_email_smtp_port: 25,
    user_config.se_notifications_email_from: "",
    user_config.se_notifications_email_to_other_mail_id: "",
    user_config.se_notifications_email_subject: "spark expectations - data quality - notifications",
    user_config.se_notifications_enable_slack: False,
    user_config.se_notifications_slack_webhook_url: "",
    user_config.se_notifications_on_start: True,
    user_config.se_notifications_on_completion: True,
    user_config.se_notifications_on_fail: True,
    user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True,
    user_config.se_notifications_on_error_drop_threshold: 15,
}

In [None]:
# create product view
_df_product: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv("./resources/product.csv")
    )
_df_product.createOrReplaceTempView("product")

# create customer view
_df_customer: DataFrame = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("./resources/customer.csv")
)

_df_customer.createOrReplaceTempView("customer")

In [None]:
# create order view
_df_order: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv("./resources/order.csv")
    )
_df_order.createOrReplaceTempView("order")

In [None]:
# get the count of the order data
_df_order.count()

In [None]:
# configure the spark expectations with the config and all the required parameters in the code
# Note the function should return a dataframe for the spark expectations to run or else it will throw an error
@se.with_expectations(
    target_table="dq_spark_local.customer_order",
    write_to_table=True,
    user_conf=user_conf,
    target_table_view="order",
)
def run_se() -> DataFrame:
    # _df_order: DataFrame = (
    #     spark.read.option("header", "true")
    #     .option("inferSchema", "true")
    #     .csv("./resources/order.csv")
    # )
    # _df_order.createOrReplaceTempView("order")

    return _df_order

In [None]:
# Running the function to build the dataframe and run the spark expectations
run_se()

# Order of execution for the rules:

* Source Query DQ
* Source Agg DQ
* Row DQ
* Target Query DQ
* Target Agg DQ

In [None]:
# show tables
spark.sql("show tables").show()

In [None]:
# review sample data in the stats table
spark.sql("select * from dq_spark_local.dq_stats").show(truncate=False)

In [None]:
# review the schema of the stats table
spark.sql("select * from dq_spark_local.dq_stats").printSchema()

In [None]:
# get the count of the target table
spark.sql("select count(*) from dq_spark_local.customer_order").show()

In [None]:
# review the target table
spark.sql("select * from dq_spark_local.customer_order").show(truncate=False)

In [None]:
# get the count of the error table
spark.sql("select count(*) from dq_spark_local.customer_order_error").show()

In [None]:
spark.sql("select * from dq_spark_local.customer_order_error").show(truncate=False, n=1000)

In [None]:
# reviews the stats from the kafka stream
spark.read\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "dq-sparkexpectations-stats")\
    .option("startingOffsets", "earliest")\
    .option("endingOffsets", "latest")\
    .load()\
    .selectExpr("cast(value as string) as stats_records")\
    .show(truncate=False)

In [None]:
# update the rules table to disable is_active flag for the rule - "sales_greater_than_two"
spark.sql("update dq_spark_local.dq_rules set is_active=false where rule='sales_greater_than_two'")

In [None]:
# show the rules table  
spark.sql("select * from dq_spark_local.dq_rules order by rule_type, rule").show(truncate=False)

In [None]:
# Setup the writer configuration for the spark expectations
writer = WrappedDataFrameWriter().mode("append").format("delta")

# Setup the spark expectations object
se: SparkExpectations = SparkExpectations(
    product_id="your_product",
    rules_df=spark.table("dq_spark_local.dq_rules"),
    stats_table="dq_spark_local.dq_stats",
    stats_table_writer=writer,
    target_and_error_table_writer=writer,
    debugger=False,
    # stats_streaming_options={user_config.se_enable_streaming: False},
)

# Running for the second time to see the changes in the stats table
@se.with_expectations(
    target_table="dq_spark_local.customer_order",
    write_to_table=True,
    user_conf=user_conf,
    target_table_view="order",
)
def run_se() -> DataFrame:
    # _df_order: DataFrame = (
    #     spark.read.option("header", "true")
    #     .option("inferSchema", "true")
    #     .csv("./resources/order.csv")
    # )
    # _df_order.createOrReplaceTempView("order")

    return _df_order

run_se()

In [None]:
# review sample data in the stats table
spark.sql("select * from dq_spark_local.dq_stats").show(truncate=False)

In [None]:
should_not_fail_records = spark.sql("select * from order where sales<=2").count()
print(should_not_fail_records)

In [None]:
if should_not_fail_records == 3869-3804:
    print("Test Passed")

In [None]:
# get the count of the target table
spark.sql("select count(*) from dq_spark_local.customer_order").show()

In [None]:
# get the count of the error table
spark.sql("select count(*) from dq_spark_local.customer_order_error").show()

In [None]:
# reviews the stats from the kafka stream
spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option(
    "subscribe", "dq-sparkexpectations-stats"
).option("startingOffsets", "earliest").option(
    "endingOffsets", "latest"
).load().selectExpr(
    "cast(value as string) as stats_records"
).show(
    truncate=False
)

In [None]:
# Failure Scenario
# Insert into dq_rules table to fail the job, have a dummy column to fail
spark.sql("insert into dq_spark_local.dq_rules values ('your_product', 'dq_spark_local.customer_order', 'row_dq', 'dummy', 'dummy', 'dummy', 'drop', 'dummy', 'dummy', true, true, true, false, 0)")

In [None]:
# Setup the writer configuration for the spark expectations
writer = WrappedDataFrameWriter().mode("append").format("delta")

# Setup the spark expectations object
se: SparkExpectations = SparkExpectations(
    product_id="your_product",
    rules_df=spark.table("dq_spark_local.dq_rules"),
    stats_table="dq_spark_local.dq_stats",
    stats_table_writer=writer,
    target_and_error_table_writer=writer,
    debugger=False,
    # stats_streaming_options={user_config.se_enable_streaming: False},
)


# Running for the second time to see the changes in the stats table
@se.with_expectations(
    target_table="dq_spark_local.customer_order",
    write_to_table=True,
    user_conf=user_conf,
    target_table_view="order",
)
def run_se() -> DataFrame:
    # _df_order: DataFrame = (
    #     spark.read.option("header", "true")
    #     .option("inferSchema", "true")
    #     .csv("./resources/order.csv")
    # )
    # _df_order.createOrReplaceTempView("order")

    return _df_order

try:
    run_se()
except Exception as e:
    print(e)

In [None]:
# Exploring the stats table
# Total Input, Error and Output records
spark.sql("""
    SELECT SUM(input_count) AS total_input, 
    SUM(error_count) AS total_errors, 
    SUM(output_count) AS total_output 
    FROM  dq_spark_local.dq_stats
    """
).show()


In [None]:
spark.sql("""
    SELECT product_id, round(avg(success_percentage), 2) AS avg_success, 
    round(avg(error_percentage), 2) AS avg_error 
    FROM dq_spark_local.dq_stats group by product_id
""").show()

In [None]:
spark.sql(
    """
    SELECT product_id, key AS dq_status_type, value AS dq_status, COUNT(*) AS count
FROM dq_spark_local.dq_stats 
LATERAL VIEW explode(dq_status) AS key, value
GROUP BY product_id, key, value
order by product_id, key, value;
"""
).show()

In [None]:
spark.sql("""
    SELECT product_id, table_name, rule.rule_type, rule.description, rule.rule, rule.failed_row_count
    FROM dq_spark_local.dq_stats 
    LATERAL VIEW explode(row_dq_res_summary) exploded_table AS rule
    WHERE CAST(rule.failed_row_count AS INT) > 0
""").show(truncate=False)

In [None]:
spark.sql("""
    SELECT product_id, table_name, 'source_agg_dq' AS dq_type, rule.rule AS rule_type, rule.description, CAST(rule.failed_row_count AS INT) AS failed_row_count
    FROM dq_spark_local.dq_stats 
    LATERAL VIEW explode(source_agg_dq_results) exploded_table AS rule
    WHERE CAST(rule.failed_row_count AS INT) > 0
    UNION ALL
    SELECT product_id, table_name, 'final_agg_dq' AS dq_type, rule.rule AS rule_type, rule.description, CAST(rule.failed_row_count AS INT) AS failed_row_count
    FROM dq_spark_local.dq_stats 
    LATERAL VIEW explode(final_agg_dq_results) exploded_table AS rule
    WHERE CAST(rule.failed_row_count AS INT) > 0
    UNION ALL
    SELECT product_id, table_name, 'source_query_dq' AS dq_type, rule.rule AS rule_type, rule.description, CAST(rule.failed_row_count AS INT) AS failed_row_count
    FROM dq_spark_local.dq_stats 
    LATERAL VIEW explode(source_query_dq_results) exploded_table AS rule
    WHERE CAST(rule.failed_row_count AS INT) > 0
    UNION ALL
    SELECT product_id, table_name, 'final_query_dq' AS dq_type, rule.rule AS rule_type, rule.description, CAST(rule.failed_row_count AS INT) AS failed_row_count
    FROM dq_spark_local.dq_stats 
    LATERAL VIEW explode(final_query_dq_results) exploded_table AS rule
    WHERE CAST(rule.failed_row_count AS INT) > 0
    UNION ALL
    SELECT product_id, table_name, 'row_dq' AS dq_type, rule.rule AS rule_type, rule.description, CAST(rule.failed_row_count AS INT) AS failed_row_count
    FROM dq_spark_local.dq_stats 
    LATERAL VIEW explode(row_dq_res_summary) exploded_table AS rule
    WHERE CAST(rule.failed_row_count AS INT) > 0
""").show(truncate=False, n=1000)

In [None]:
spark.sql("""
    select *, count(*) as failed_count from (
    SELECT product_id, table_name, 'source_agg_dq' AS dq_type, rule.rule AS rule_type, rule.description
    FROM dq_spark_local.dq_stats 
    LATERAL VIEW explode(source_agg_dq_results) exploded_table AS rule
    UNION ALL
    SELECT product_id, table_name, 'final_agg_dq' AS dq_type, rule.rule AS rule_type, rule.description
    FROM dq_spark_local.dq_stats 
    LATERAL VIEW explode(final_agg_dq_results) exploded_table AS rule
    UNION ALL
    SELECT product_id, table_name, 'source_query_dq' AS dq_type, rule.rule AS rule_type, rule.description
    FROM dq_spark_local.dq_stats 
    LATERAL VIEW explode(source_query_dq_results) exploded_table AS rule
    UNION ALL
    SELECT product_id, table_name, 'final_query_dq' AS dq_type, rule.rule AS rule_type, rule.description
    FROM dq_spark_local.dq_stats 
    LATERAL VIEW explode(final_query_dq_results) exploded_table AS rule
    UNION ALL
    SELECT product_id, table_name, 'row_dq' AS dq_type, rule.rule AS rule_type, rule.description
    FROM dq_spark_local.dq_stats 
    LATERAL VIEW explode(row_dq_res_summary) exploded_table AS rule
    WHERE CAST(rule.failed_row_count AS INT) > 0) group by all
""").show(truncate=False)

In [None]:
# remove spark session
# spark.stop()

In [None]:
# remove the docker container
# os.system(f"sh ./docker_scripts/docker_kafka_stop_script.sh")