In [None]:
from pyspark.sql import SparkSession
spark = (SparkSession
         .builder
         .appName("Jupyter")
         .config('spark.jars.packages', "mysql-connector-j-8.0.31.jar")
         .getOrCreate()
)

In [None]:
spark

#### Read and load init data to iceberg

In [None]:
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://mysql:3306") \
    .option("user", "root") \
    .option("password", "example") \
    .option("query", "SELECT * FROM dims.accounts") \
    .load()

In [None]:
df.show(truncate=False)

In [None]:
%%sql
CREATE DATABASE IF NOT EXISTS dims

In [None]:
%%sql
DROP TABLE IF EXISTS dims.accounts

In [None]:
df.write.saveAsTable("dims.accounts")

In [None]:
spark.sql("""SELECT * FROM dims.accounts""").show(truncate=False)

Tables by default are not configured to allow integrated audits, therefore the first step is enabling this by setting the write.wap.enabled table metadata property to true

In [None]:
%%sql

ALTER TABLE dims.accounts
SET TBLPROPERTIES ('write.wap.enabled' = 'true')

In [None]:
%%sql

ALTER TABLE dims.accounts CREATE BRANCH daily_load

In [None]:
spark.conf.set('spark.wap.branch', 'daily_load')

##### ** Let's assume in this point of time that some change was made to one accounts `status` in mysql `accounts`table.

# Loading the changes from mysql

In [None]:
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://mysql:3306") \
    .option("user", "root") \
    .option("password", "example") \
    .option("query", "SELECT * FROM dims.accounts") \
    .load()

In [None]:
df.createOrReplaceTempView("raw_accounts")

#### Let's check how the data looks in the in the branch:

In [None]:
%%sql

SELECT *
FROM dims.accounts VERSION AS OF 'daily_load'

#### Merging the data into wap branch

In [None]:
spark.sql("""
    MERGE INTO dims.accounts t
    USING raw_accounts s
        ON t.id = s.id
    WHEN MATCHED
        THEN UPDATE SET 
            name = s.name,
            status = s.status,
            owner = s.owner
    WHEN NOT MATCHED
        THEN INSERT (id, name, status, owner, created_at) VALUES (s.id, s.name, s.status, s.owner, s.created_at)"""
)

#### Peaking at the changes after MERGE

In [None]:
%%sql
SELECT * FROM dims.accounts VERSION AS OF 'daily_load'

### Auditing the results

In [None]:
distinct_statuses = (spark.read
    .option("branch", "daily_load")
    .format("iceberg")
    .load("dims.accounts")
    .select("status")
    .distinct()
    .toLocalIterator()
)
statues = {row[0] for row in distinct_statuses}

In [None]:
allowed_statues = ['active', 'in_review', 'declined', 'submitted']
if statues not in allowed_statues:
    raise ValueError(f"Audit failed, status set does not match expected values: {statues} != {allowed_statues}")
else:
    print(f"Audit has passed 🙌🏻")

### Merging audit-branch to main

In [None]:
spark.sql("CALL system.fast_forward('dims.accounts', 'main', 'daily_load')")

In [None]:
## Peaking at the final result

In [None]:
%%sql
    
SELECT* FROM dims.accounts