In [44]:
import pyspark
from delta import *
from delta.tables import DeltaTable

builder = pyspark.sql.SparkSession.builder.appName("LocalDelta") \
    .master("local") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.extensions", "org.elasticsearch:elasticsearch-spark-30_2.12:8.15.1") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [45]:
spark.read.format('delta').load("local_lake/bronze/address").createOrReplaceTempView("addr")
spark.read.format('delta').load("local_lake/bronze/organisation").createOrReplaceTempView("org")
spark.read.format('delta').load("local_lake/bronze/org_addr").createOrReplaceTempView("org_addr")


In [43]:
DRIVER_TABLE_PATH = "local_lake/silver/driver_table"
GOLD_TABLE_PATH = "local_lake/gold/final_table"

def gold_write(batch_df, batch_id):
    batch_df.createOrReplaceTempView("keys")

    # gold_df = batch_df.sparkSession.sql('select org_id from keys')
    gold_df = batch_df.sparkSession.sql("""
        SELECT
            org.org_id as org_id,
            org.email as org_email,
            addr.city as city
        FROM org
        JOIN org_addr ON org.org_id = org_addr.org_id
        JOIN addr ON org_addr.addr_id = addr.addr_id
        where org.org_id in (select org_id from keys)
    """)

    if not DeltaTable.isDeltaTable(spark, GOLD_TABLE_PATH):
        gold_df.write.format("delta").mode("overwrite").save(GOLD_TABLE_PATH)
    else:

        delta_table = DeltaTable.forPath(spark, GOLD_TABLE_PATH)

        delta_table.alias("existing") \
        .merge(
            gold_df.alias("updates"),
            "existing.org_id = updates.org_id"  
        ) \
        .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [42]:
org_ids = spark.readStream \
    .format("delta") \
    .load(DRIVER_TABLE_PATH)


(org_ids.writeStream
    .foreachBatch(gold_write)
    .option("checkpointLocation", f"{GOLD_TABLE_PATH}_checkpoint")
    .trigger(once=True)
    .start()
    .awaitTermination()
)

In [46]:
spark.read.format('delta').load(GOLD_TABLE_PATH).show()

+------+--------------------+-----------------+
|org_id|           org_email|             city|
+------+--------------------+-----------------+
|    29|  tlucas@example.com|      Timothytown|
|    26|michaelroberts@ex...|        East Todd|
|    19|harriskelly@examp...|        Shahmouth|
|    22| jason33@example.org| East Reginahaven|
|    32|amandarose@exampl...|      New Bethany|
|    31|  jadams@example.net|      South Susan|
|    25|nicolethomas@exam...|      Kramermouth|
|    27|alexandersoto@exa...|         Leehaven|
|    17|jennifermullins@e...|        Port Lori|
|    28|amanda04@example.net|       Josephberg|
|    18|griffinmichelle@e...|Lake Elizabethton|
|    21|ebartlett@example...| West Veronicaton|
|    30|timothy97@example...|         Popetown|
|    23|clarkhector@examp...|      Lake Monica|
|    20|spencerkevin@exam...|     Jacksonmouth|
|    24|  ryan28@example.com|     West Whitney|
|    10|dustinclark@examp...| Port Danielville|
|    11|karencastillo@exa...|       Shaw