<img src="../Images/pyspark_iceberg.png" width ="500" height=500> </img>
<h3>Data Quality With Pyspark And Iceberg: WAP (write-audit-publish) methodology</h3>
<ul>
    <li><h4>I recommend run this notebook on google colab, there, pyspark is preinstalled and iceberg jar files are ready to go and it's easier to set it up.</h4></li>
    <li><h4>I use spark, version 3.5.3 and iceberg, artifact version 3.5_2.12:1.7.1</h4></li>
    <li><h4>These spark and iceberg versions are compatible with each other, if you want to use any other spark versions check this compatibility to avoid errors.</h4></li>
    <li><h4>for downloading iceberg runtime and extension jar files check this link and download proper versions: <a href="https://repo1.maven.org/maven2/org/apache/iceberg/">iceberg mavens</a></h4></li>
    <li><h4>I use new york yellow taxis trip data for Jan 2024, the original data was too big so I made a random sample from it in data folder in current repo, but if you need full data click on this link to download it: <a href="https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"> new york city yellow taxis trip data</a></h4></li>
</ul>

In [None]:
# Import required libraries
import pyspark
from pyspark.sql import DataFrame, SparkSession
from pyspark import SparkConf, SparkContext 

# Initialize Spark and Iceberg configurations
warehouse_path = "./warehouse"
catalog_name = "demo"
iceberg_spark_jar = 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1'
iceberg_spark_ext = 'org.apache.iceberg:iceberg-spark-extensions-3.5_2.12:1.7.1'

conf = SparkConf().setAppName("Data Quality - WAP termo - Apache Iceberg") \
    .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .set("spark.jars.packages" , iceberg_spark_jar + "," + iceberg_spark_ext) \
    .set(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .set(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse_path) \
    .set(f"spark.sql.catalog.{catalog_name}.type", "hadoop")\
    .set("spark.sql.defaultCatalog", catalog_name)

# create spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [None]:
# Variables
database_name = "nyc"
table_name = "yellow_taxi_trips"
write_table_name = "yellow_taxi_trips_write"

table_full_name = database_name + "." + table_name
write_table_full_name = database_name + "." + write_table_name

# nyc_yellow_trip data is for initializing our table and the other one is for write section in which our data will change and we want to audit it.
data_dir_path = "../Data/nyc_yellow_trip_data_2024.csv"
write_data_dir_path = "../Data/nyc_yellow_trip_data_2024_only_to_write.csv"

# we will have two branches, one of them is main and users could see it, and the other is etl_job_v_1 which we want to audit
branch_name = "etl_job_v_1"
main_branch_name = "main"

In [None]:
# Setup: Drop database and table if they exist (to ensure clean runs)
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
spark.sql(f"DROP TABLE IF EXISTS {table_full_name}")
spark.sql(f"DROP TABLE IF EXISTS {write_table_full_name}")

In [None]:
# Step 1: Initialize the Iceberg table
df = spark.read.option("multiline","true") \
          .csv(data_dir_path, header=True)

# save dataframe as an iceberg table
df.write.saveAsTable(table_full_name)

# check data, in write section, we will write data to our audit branch, so there should be two vendorIds in table, but after write section
# there should be three different vendorId in table on this branch.
spark.sql(f"""
SELECT vendorId, sum(total_amount) vendor_total_amount
FROM {table_full_name}
GROUP BY VendorId
""").show(5,False)

In [None]:
# enable wap on table by using TBLPROPERTIES.
spark.sql(f"""
ALTER TABLE {table_full_name}
SET TBLPROPERTIES (
    'write.wap.enabled'='true'
)
""")

# create a new branch to write data and audit it.
spark.sql(f"""
ALTER TABLE {table_full_name}
CREATE BRANCH {branch_name}
""")

# config spark and set this new branch as its context branch name
spark.conf.set('spark.wap.branch', branch_name)


<h4>Write Section</h4>

<h5>writing operations can include various actions such as deleting data, adding new data, schema evolution, any possible change on your data which you want to audit</h5>

In [None]:
# read to write data from csv file and make spark dataframe out of it.
write_df = spark.read.option("multiline" , "true")\
                .csv(write_data_dir_path, header= True)

# save it as an iceberg table 
write_df.write.saveAsTable(write_table_full_name)

# write new data to main table
spark.sql(f"""
INSERT INTO {table_full_name}
SELECT * FROM {write_table_full_name}
""")

# check that this added data is only on new branch > we should see 3 vendorId here 
spark.sql(f"""
SELECT VendorId, sum(total_amount) vendor_total_amount
FROM  {table_full_name} VERSION AS OF '{branch_name}'
GROUP BY VendorId
""").show(5,False)

# check main branch, it shouldnt has the new data, because we didnt audit and publish it to main branch yet! > we should see 2 vendorId here.
spark.sql(f"""
SELECT VendorId, sum(total_amount) vendor_total_amount
FROM {table_full_name} VERSION AS OF '{main_branch_name}'
GROUP BY VendorId
""").show(5,False)

<h4>Audit Section</h4>

<h5>In Audit section we could check data healthiness, so you could run any queries on your data, also you could use <strong>greate expectations</strong> library to validate your expectations from added data, if you dont see any error or exception, then your tests and audits are passed and you could publish it to main branch, so your users could see new data, and if you see any error you could investigate your data and see the problem before publish section</h5>

In [None]:
# we expect to have 3 different vendorID (1,2,6) so if we see only this three ids then audit passed else audit failed.
# another expectation here could be len of distinct of vendorID column, so if len is not 3 then audit failed.
to_audit_df = spark.read \
    .option("branch", branch_name) \
    .format("iceberg") \
    .load(table_full_name) \
    .select("vendorID") \
    .distinct() \
    .toLocalIterator()

vendor_ids = {row[0] for row in to_audit_df}

# our expectations 
expected_values = {"1" , "2" , "6"}
number_of_different_vendor_ids = 3

# audit
if  (len(vendor_ids) != number_of_different_vendor_ids) \
    or \
    (len(vendor_ids) != len(set.union(expected_values, vendor_ids))):
  raise ValueError("Audit Failed!")
else:
  print("Audit Passed!")

<h4>Publish Section</h4>
<h5>In publish we want to merge our branch with main branch, in spark, documentations recommend to use <strong>fast_forward</strong> function to do so!</h5>

In [None]:
# publish etl_job_v_1 branch to main branch
publish_query = f"CALL demo.system.fast_forward('{table_full_name}', '{main_branch_name}', '{branch_name}')"
spark.sql(publish_query)