Incrementally load parquet files from s3 (DMS generated with CDC) with Databricks' AutoLoader and merge to delta table in bronze layer.

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col
from datetime import datetime
from pyspark.sql.types import *

In [0]:
%sql
-- sample table that keep track of stock levels for all items from operational source which is RDBMS in this case
-- created delta table with unity catalog enabled with 3 level namespace.

create or replace table demo_catalog.demo_schema.stock_levels(
  id bigint,
  item_id bigint,
  stock_level int,
  mca_cost decimal(10, 5),
  discontinued_date timestamp,
  discontinued_flag int,
  is_new int,
  unit_cost decimal(10,5),
  created_at timestamp,
  updated_at timestamp,
  deleted_at timestamp
);

In [0]:
#schema generated from DMS

tschema = StructType([
    StructField("Op", StringType()),
    StructField("operation_timestamp", TimestampType()),
    StructField("id", LongType()),
    StructField("item_id", LongType()),
    StructField("stock_level", IntegerType()),
    StructField('mca_cost',DecimalType(10,5)),
    StructField('discontinued_date', TimestampType()),
    StructField('discontinued_flag', IntegerType()),
    StructField("deleted_at", TimestampType()),
    StructField("updated_at", TimestampType()),
    StructField("created_at", TimestampType())
])

In [0]:
#read parquet files from S3 as streaming dataframe to use AutoLoader

df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", "s3://path")
    .load("s3://path", schema=tschema)
)

In [0]:
df.display()

id,item_id,stock_level,mca_cost,discontinued_date,discontinued_flag,is_new,unit_cost,created_at,updated_at,deleted_at
1,1,100,,,0,1,15.0,2023-11-16T10:17:09Z,2023-11-16T10:17:09Z,
2,12,100,,,0,1,0.15888,2023-11-16T10:39:00Z,2023-11-16T10:39:00Z,
3,14,100,,,0,1,4.15888,2023-11-16T10:40:24Z,2023-11-16T10:40:24Z,
4,3,25,,,0,1,34.0,2023-11-16T10:51:46Z,2023-11-16T10:51:46Z,
5,34,25,,,0,1,34.0,2023-11-16T10:51:53Z,2023-11-16T10:51:53Z,


In [0]:
df.limit(20).display()

Option 1: this is when you need the most recent data, same data with source table. <br>
Then you can upsert the incoming rows from DMS into delta table as below.

In [0]:
#get reference of delta table by name
deltaTable = DeltaTable.forName(spark, "demo_catalog.demo_schema.stock_levels")

#function to apply in foreachBatch() to write data into any storage
def upsertToUC(microBatchOutputDF, batchId):

    #get list of id column to delete in target table
    #need to get list before drop duplicate operation
    deleted_id_list = microBatchOutputDF.filter(microBatchOutputDF["Op"] == "D").select("id").toPandas()["id"].tolist()
    
    #sorted and drop duplicates and get the most recent row only 
    #the merge will get error when trying to merge two rows from source to one row at target
    microBatchOutputDF = microBatchOutputDF.sort(microBatchOutputDF["operation_timestamp"].desc())
    microBatchOutputDF = microBatchOutputDF.dropDuplicates(["id"])

    microBatchOutputDF = microBatchOutputDF.drop(col("Op"), col("operation_timestamp"))

    microBatchOutputDF = microBatchOutputDF.sort(microBatchOutputDF["id"])

    (

        deltaTable.alias("t")\
        .merge(microBatchOutputDF.alias("s"), "s.id = t.id")\
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
    )

    #delete after merging
    deltaTable.delete(col("id").isin(deleted_id_list))
    

In [0]:
#write to bronze table incremenatlly 
#databricks will store metadata in the checkpoint location to track data changes and write the new data only.
(
    df.writeStream.option("checkpointLocation", "dbfs_path")
    .option("mergeSchema", "true")
    .trigger(availableNow=True)
    .foreachBatch(upsertToUC)
    .start()
    .awaitTermination()
)

In [0]:
%sql
select * from demo_catalog.demo_schema.stock_levels

Option 2: this is when you want all changes record in the delta table. <br>
It is pretty straightforward, you just append all the rows generated from DMS to the table.

In [0]:
(
    df.writeStream.option("checkpointLocation", "dbfs_path")
    .option("mergeSchema", "true")
    .mode("append")
    .trigger(availableNow=True)
    .start()
    .awaitTermination()
)

In [0]:
%sql
select * from demo_catalog.demo_schema.stock_levels

After you got all the changes for each row in the table, now you can create SCD tables in next step. <br>
SCD table design will depend on your analytics requirements.

In [0]:
%sql
select * from demo_catalog.demo_schema.stock_levels

In [0]:
%sql
-- 

In [0]:
#

All these code may or may not work depending on databricks cluster and spark configs you use.