# Write Audit Publish

Write-Audit-Publish (WAP) is one of several key design patterns in data engineering that emphasize data quality and integrity, especially within data orchestration workflows.


Writing data to a stage layer before the data hits the production layer . 

Auditing the data to identify and rectify data quality issues . 

Publishing Data to production 


## Using Iceberg To Achieve WAP 

Before the inception of Project Nessie (https://projectnessie.org/), the implementation of Write-Ahead Protocol (WAP) typically necessitated two stages of data copies, thereby significantly increasing the data footprint and associated costs. However, following the integration of the Iceberg team with Project Nessie and the introduction of Git-like functionalities at the data layer starting from version 1.2.0, WAP can now be implemented without the need for any data replication. 

This demo will cover how can we use Iceberg to implement WAP based logic's without data replication . 

In [1]:
%%configure -f
{
"conf":{
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    },
    "jars":[
        "s3://mxtdw-audit-gamma/jars/iceberg-spark3-runtime.jar"
    ]
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1720710304377_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1720710304377_0001,pyspark,idle,Link,Link,,
2,application_1720710304377_0003,pyspark,idle,Link,Link,,✔


In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1720710304377_0001,pyspark,idle,Link,Link,,
2,application_1720710304377_0003,pyspark,idle,Link,Link,,✔


In [3]:
from pyspark.sql import SparkSession
catalog_name = "glue_catalog"
bucket_name = "mxtdw-audit-gamma"
bucket_prefix = "mxtdw_audit_table/"
database_name = "playground"
warehouse_path = f"s3://{bucket_name}/{bucket_prefix}"
spark = SparkSession.builder \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", f"{warehouse_path}") \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
%%sql
drop table  if exists glue_catalog.playground.wap_demo2;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [6]:
%%sql
create table glue_catalog.playground.wap_demo3(
    name string,
    phone long,
    email string,
    postalZip long,
    number int,
    country string,
    ingestion_date date
)
using iceberg 
PARTITIONED BY (country,year(ingestion_date))
Location 's3://mxtdw-audit-gamma/demo_iceberg5/'
TBLPROPERTIES ( 
    'write.metadata.metrics.column.postalZip'='full',
    'write.metadata.metrics.column.country'='full',
    'write.metadata.metrics.column.ingestion_date'='full'
);

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

# This will allow to create branches for the table 

In [8]:
%%sql
ALTER TABLE glue_catalog.playground.wap_demo3 SET TBLPROPERTIES (
    'write.wap.enabled'='true'
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

To create a branch and merge it into your mainline branch, Iceberg requires the mainline branch to be created first. Without it, even if you create a custom branch, you won't be able to merge the custom branch into the mainline. Currently, Iceberg does not allow the creation of a mainline without the initial insert statement. Therefore, to overcome this, you need to create a force a mainline branch creation using a fake branch . This is to be done for the 1st time . 

In [9]:
%%sql 
ALTER TABLE glue_catalog.playground.wap_demo3
CREATE BRANCH 20221201WAP

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [11]:
spark.sql("select * from glue_catalog.playground.wap_demo3.refs").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------+-------------------+-----------------------+---------------------+----------------------+
|       name|  type|        snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+-----------+------+-------------------+-----------------------+---------------------+----------------------+
|20221201WAP|BRANCH|2994098468836002628|                   null|                 null|                  null|
+-----------+------+-------------------+-----------------------+---------------------+----------------------+

In [12]:
%%sql 
call glue_catalog.system.cherrypick_snapshot('glue_catalog.playground.wap_demo3',2994098468836002628)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [14]:
spark.sql("select * from glue_catalog.playground.wap_demo3.refs").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------+-------------------+-----------------------+---------------------+----------------------+
|       name|  type|        snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+-----------+------+-------------------+-----------------------+---------------------+----------------------+
|20221201WAP|BRANCH|2994098468836002628|                   null|                 null|                  null|
|       main|BRANCH|2994098468836002628|                   null|                 null|                  null|
+-----------+------+-------------------+-----------------------+---------------------+----------------------+

## Lets implement WAP using Iceberg 

In [15]:
%%sql 
ALTER TABLE glue_catalog.playground.wap_demo3
CREATE BRANCH 2024WAP RETAIN 7 DAYS

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [30]:
spark.sql("select * from glue_catalog.playground.wap_demo.refs").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------+-------------------+-----------------------+---------------------+----------------------+
|       name|  type|        snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
+-----------+------+-------------------+-----------------------+---------------------+----------------------+
|    2022WAP|BRANCH|1840343740250005065|                   null|                 null|                  null|
|       main|BRANCH|1840343740250005065|                   null|                 null|                  null|
|20221201WAP|BRANCH|1840343740250005065|                   null|                 null|                  null|
|    2024WAP|BRANCH|1840343740250005065|              604800000|                 null|                  null|
+-----------+------+-------------------+-----------------------+---------------------+----------------------+

lets write the data to the pre commit branch and run some data quality checks and if all the DQ checks passes 
then automatically the data will get merged to mainline


In [16]:
#Reading the incoming data 
import pyspark.sql.functions as f
df = spark.read.csv("s3://mxtdw-audit-gamma/demo_data/wap_data.csv",header=True,inferSchema=True)
df = df.withColumn("ingestion_date",f.current_date())
df.createOrReplaceTempView("incomingData")
df.printSchema()
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- name: string (nullable = true)
 |-- phone: long (nullable = true)
 |-- email: string (nullable = true)
 |-- postalZip: integer (nullable = true)
 |-- number: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- ingestion_date: date (nullable = false)

+---------------+----------+--------------------+---------+------+------------------+--------------+
|           name|     phone|               email|postalZip|number|           country|ingestion_date|
+---------------+----------+--------------------+---------+------+------------------+--------------+
|   Price Savage|4245624115|varius.ultrices@g...|   586986|     7|             India|    2024-07-11|
|   Odette Logan|5627869778|ultricies.ligula@...|     1061|     8|Russian Federation|    2024-07-11|
|Katell Buchanan|9354459637|magna@protonmail.net|    55526|     9|           Nigeria|    2024-07-11|
|    Leigh Young|1808725898|ultrices.posuere....|     4906|    10|            Poland|    2024-07-11|
|  Bianca Sext

In [17]:
def merge_data(table_name:str,branch_name:str):
    merge_ = """
        merge into glue_catalog.playground.wap_demo3.branch_{branchName} t 
        using {incomingData} s
        on t.number = s.number 
        when matched then update set * 
        when not matched then insert *
    """.format(incomingData=table_name,branchName=branch_name)
    print(merge_)
    spark.sql(merge_)
    

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
merge_data("incomingData","2024WAP")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


        merge into glue_catalog.playground.wap_demo3.branch_2024WAP t 
        using incomingData s
        on t.number = s.number 
        when matched then update set * 
        when not matched then insert *

Lets Query the Branch data and the mainline table 

In [19]:
#Branch data has record
spark.sql("select * from glue_catalog.playground.wap_demo3.branch_2024WAP").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+----------+--------------------+---------+------+------------------+--------------+
|           name|     phone|               email|postalZip|number|           country|ingestion_date|
+---------------+----------+--------------------+---------+------+------------------+--------------+
|   Price Savage|4245624115|varius.ultrices@g...|   586986|     7|             India|    2024-07-11|
|Katell Buchanan|9354459637|magna@protonmail.net|    55526|     9|           Nigeria|    2024-07-11|
|    Leigh Young|1808725898|ultrices.posuere....|     4906|    10|            Poland|    2024-07-11|
|   Odette Logan|5627869778|ultricies.ligula@...|     1061|     8|Russian Federation|    2024-07-11|
|  Bianca Sexton|1808725891|risus.at@outlook.net|   115771|    11|           Vietnam|    2024-07-11|
+---------------+----------+--------------------+---------+------+------------------+--------------+

In [20]:
# as this is the first commit so mainline branch should not have any data 
spark.sql("select * from glue_catalog.playground.wap_demo3").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----+-----+---------+------+-------+--------------+
|name|phone|email|postalZip|number|country|ingestion_date|
+----+-----+-----+---------+------+-------+--------------+
+----+-----+-----+---------+------+-------+--------------+

In [21]:
# Let's run some run automated data quality checks on the branch before it moves to prod 
import pyspark.sql.functions as f
def run_data_quality_checks(branch_name):
    """
    Phone number length should be == 10 
    :return: 
    """
    df = spark.read.table(f"glue_catalog.playground.wap_demo3.branch_{branch_name}")
    where_condition_checks = {
        "phone_number_check":(f.col("phone").isNull()) | (f.length(f.col("phone"))<10) | (f.length(f.col("phone"))>10)
    }
    for key,value in where_condition_checks.items():
        print(f"running data quality check for {key}")
        if not df.where(value).rdd.isEmpty():
            print(f"{key} data quality failed")
            return False
    return True

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
if run_data_quality_checks("2024WAP"):
    spark.sql("""
                CALL glue_catalog.system.fast_forward('playground.wap_demo3','main', '2024WAP')     
              """)
else:
    print("Flagging off the branch as the data quality failed")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

running data quality check for phone_number_check
DataFrame[branch_updated: string, previous_ref: bigint, updated_ref: bigint]

Lets Query the Mainline Branch and check if the data has been merged . 

In [24]:
spark.sql("select * from glue_catalog.playground.wap_demo3").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+----------+--------------------+---------+------+------------------+--------------+
|           name|     phone|               email|postalZip|number|           country|ingestion_date|
+---------------+----------+--------------------+---------+------+------------------+--------------+
|   Price Savage|4245624115|varius.ultrices@g...|   586986|     7|             India|    2024-07-11|
|Katell Buchanan|9354459637|magna@protonmail.net|    55526|     9|           Nigeria|    2024-07-11|
|    Leigh Young|1808725898|ultrices.posuere....|     4906|    10|            Poland|    2024-07-11|
|   Odette Logan|5627869778|ultricies.ligula@...|     1061|     8|Russian Federation|    2024-07-11|
|  Bianca Sexton|1808725891|risus.at@outlook.net|   115771|    11|           Vietnam|    2024-07-11|
+---------------+----------+--------------------+---------+------+------------------+--------------+

## Lets run another load for bad data 

In [49]:
#Reading the incoming data 
import pyspark.sql.functions as f
df = spark.read.csv("s3://mxtdw-audit-gamma/demo_data/bad_data_quality.csv",header=True,inferSchema=True)
df = df.withColumn("ingestion_date",f.current_date())
df.createOrReplaceTempView("incomingData")
df.show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-----------+------------------------+---------+------+-------+--------------+
|name        |phone      |email                   |postalZip|number|country|ingestion_date|
+------------+-----------+------------------------+---------+------+-------+--------------+
|Price Savage|4245624115 |varius.ultrices@live.com|586986   |7     |India  |2024-06-24    |
|Random Guy  |15627869778|null                    |1061     |10    |USA    |2024-06-24    |
+------------+-----------+------------------------+---------+------+-------+--------------+

In [44]:
%%sql 
ALTER TABLE glue_catalog.playground.wap_demo
CREATE BRANCH 2025WAP RETAIN 7 DAYS

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [45]:
merge_data("incomingData","2025WAP")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


        merge into glue_catalog.playground.wap_demo.branch_2025WAP t 
        using incomingData s
        on t.number = s.number 
        when matched then update set * 
        when not matched then insert *

In [48]:
spark.sql("select * from glue_catalog.playground.wap_demo.branch_2025WAP").show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+-----------+---------------------------+---------+------+------------------+--------------+
|name           |phone      |email                      |postalZip|number|country           |ingestion_date|
+---------------+-----------+---------------------------+---------+------+------------------+--------------+
|Katell Buchanan|9354459637 |magna@protonmail.net       |55526    |9     |Nigeria           |2024-06-24    |
|Odette Logan   |5627869778 |ultricies.ligula@icloud.net|1061     |8     |Russian Federation|2024-06-24    |
|Bianca Sexton  |1808725891 |risus.at@outlook.net       |115771   |11    |Vietnam           |2024-06-24    |
|Price Savage   |4245624115 |varius.ultrices@live.com   |586986   |7     |India             |2024-06-24    |
|Random Guy     |15627869778|null                       |1061     |10    |USA               |2024-06-24    |
+---------------+-----------+---------------------------+---------+------+------------------+--------------+

In [50]:
if run_data_quality_checks():
    spark.sql("""
                CALL glue_catalog.system.fast_forward('playground.wap_demo','main', '2025WAP')     
              """)
else:
    print("Flagging off the branch as the data quality failed")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

running data quality check for phone_number_check
DataFrame[branch_updated: string, previous_ref: bigint, updated_ref: bigint]