# Lakehouse - incremental data load POC using Spark streaming
#### We will utilise Spark structured streaming to implement our notebook for Dynamics customers data. Spark streaming has some powerful capabilities that handle incremental data with minimal setups. All new changes are read incrementally and merged into a target table

In [47]:
# Loading the necessary libraries
from pyspark.sql import SparkSession
from datetime import datetime
from dateutil import parser,relativedelta
import pyspark.sql.functions as f
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, hour, to_date, col, quarter, explode, sequence, expr,current_timestamp,lit
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, DoubleType, StringType, FloatType, ArrayType, LongType
from delta.tables import DeltaTable
from notebookutils import mssparkutils
spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite","CORRECTED")


StatementMeta(SparkNB, 193, 48, Finished, Available)

## Define variables
##### `delta_load` flag defines whether its full load or incremental load from source.
##### To run an initial full load or truncate/reload for a table, set `delta_load` flag as 0 and set the variable with name of the target table (for testing, define new target table). When `delta_load` is 0, it checks and removes the checkpoint directory if exists, before loading the data.
##### To run an incremental load, set `delta_load` flag as 1.
##### Define datalake, lakehouse, container and target table. 
##### <u>Make sure to create the lakehouse before this notebook is run.</u>

In [48]:
# 0 for full load and 1 for incremental load
delta_load=1 
#delta_load=0

# set path to source table - in our case storage account linked with synapse link
path_to_source_table = "abfss://yourcontainer@yourdatalake.dfs.core.windows.net/deltalake/vendtable_partitioned/"

#define lakehouse variables
datalakename = "yourdatalake"
container = "yourcontainer"
lakehouse = "lakehouse_1" # target lakehouse - make sure you manually create it first
target_table = "vendtable_silver" # target table

# set path to target table
path_to_target_table = f'abfss://{container}@{datalakename}.dfs.core.windows.net/{lakehouse}/{target_table}/'

# set temp table and its path. This table holds incremental data before merging into target table
temp_table = "vendtable_temp"
lakehouse_temptable = lakehouse + "." + temp_table
path_to_temp_table = f'abfss://{container}@{datalakename}.dfs.core.windows.net/{lakehouse}/{temp_table}/'


StatementMeta(SparkNB, 193, 49, Finished, Available)

## Checkpoints
##### The core principle this pipeline uses to achieve incremental data captures is Spark streaming's checkpoints feature. We define a checkpoint folder location and with each run we commit a checkpoint, so the next run reads the checkpoint, like a watermark and picks up the newer delta files for load. The original intent behind checkpoints is to recover in case of a failure or intentional shutdown. This is to support structured streaming that runs 24/7. It allows the pipeline to recover the previous progress and state of a previous query, and continue where it left off. We configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the quick example) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system (or Azure data lake, Amazon S3 etc) and can be set as an option in the DataStreamWriter when starting a query. More info: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing 
##### It is an easy, automated way of watermarking, which is a delta loading solution that loads the changed data between an old watermark and a new watermark. With checkpoints, we don’t have to manage any control tables, version numbers or timestamps. Spark's checkpoint folders do that for us. More on watermarks: https://learn.microsoft.com/en-us/azure/data-factory/tutorial-incremental-copy-overview#delta-data-loading-from-database-by-using-a-watermark

In [49]:
# Define checkpoint location inside temp table folder
checkpoint_dir = f"{path_to_temp_table}/_checkpoint"

StatementMeta(SparkNB, 193, 50, Finished, Available)

##### Data is loaded from the storage account associated with your Synapse link service. This is where your Dynamics data is exported. We load the data in dataframe `table_df` as a stream.
##### Structured Streaming does not handle input that is not an append and throws an exception if any modifications occur on the table being used as a source. The option `ignoreChanges` brings in updates and deletes on top of inserts. Though in case of Dataverse, we get deleted records as new records with same Id and IsDelete=true. We handle duplicates before calling the SQL merge. For more: https://docs.delta.io/0.4.0/delta-streaming.html#ignoring-updates-and-deletes

In [50]:
# for efficiency, specify partitionId to read only small set of data
# for eg, table_df = spark.readStream.format("delta").option("ignoreChanges",True).load(path_to_source_table).where("PartitionId=2023")

if delta_load == 0:
    # Check if the checkpoint directory exists before removing it
    if mssparkutils.fs.exists(checkpoint_dir):
        mssparkutils.fs.rm(checkpoint_dir, True)
    table_df = spark.readStream.format("delta").option("ignoreChanges",True).load(path_to_source_table)
else:
    table_df = spark.readStream.format("delta").option("ignoreChanges",True).load(path_to_source_table)
 

StatementMeta(SparkNB, 193, 51, Finished, Available)

 #####  We insert current timestamp in a column `lake_loadedtimestamp` to help note the timing

In [51]:
table_df = table_df.withColumn("lake_loadedtimestamp", current_timestamp())

StatementMeta(SparkNB, 193, 52, Finished, Available)

##### Define the function `writeToTempTable_inc` which writes current batch of data, from its dataframe into the temp table in delta format.
##### Note we use Overwrite mode so previous changes are discarded and we only retain new batch of incremental data.

In [52]:
def writeToTempTable_inc (microbatchdf, batchid):
    microbatchdf.write.format("delta").option("overwriteSchema", "true").saveAsTable(lakehouse_temptable, mode="overwrite")

StatementMeta(SparkNB, 193, 53, Finished, Available)

## Writing incremental data in a temp table 
##### We use Spark's foreachbatch function to store incremental data captured in current run of the script to a temp table. Spark's foreachBatch(...) allows you to specify a function that is executed on the output data of every micro-batch of a streaming query. It takes two parameters: a dataFrame that has the output data of a micro-batch and the unique ID of the micro-batch. Note that calling the function takes no parameter but still it has two parameters specified in function `writeToTempTable_inc`. More info: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch

##### Next we define, the trigger settings of our streaming query that defines the timing of streaming data processing, whether the query is going to be executed as micro-batch query with a fixed batch interval or as a continuous processing query. For our usecase, we use one time micro batch, that will process all the available data and stop on its own. More info: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

##### CheckpointLocation is set. We start the query and await termination of streaming before proceeding to next step.

In [53]:
# Define the streaming write operation with foreachBatch, checkpoint location and trigger once
inc_Query = table_df.writeStream.format("delta")\
    .foreachBatch(writeToTempTable_inc)\
    .trigger(once=True)\
    .option("checkpointLocation", checkpoint_dir)\
    .start()

# Await for the termination of the stream
inc_Query.awaitTermination()


StatementMeta(SparkNB, 193, 54, Finished, Available)

##### At this point, temp table has the incremental data saved. Lets analyse.

In [54]:
df = spark.sql(f"SELECT max(SinkModifiedOn), min(SinkModifiedOn) FROM {lakehouse_temptable}")
display(df)

StatementMeta(SparkNB, 193, 55, Finished, Available)

SynapseWidget(Synapse.DataFrame, e8b7a541-3f57-4b28-8e85-8236244431b5)

In [55]:
df = spark.sql(f"SELECT count(*) FROM {lakehouse_temptable}")
display(df)

StatementMeta(SparkNB, 193, 56, Finished, Available)

SynapseWidget(Synapse.DataFrame, 23a56476-3f61-4407-b8f4-d35c2b6d5031)

## De-duplication
##### Lets remove any duplicates and insert in a new view. We use the Row_number function of SQL to sort duplicates and pick up only the last updated.

In [56]:
spark.sql(f""" CREATE OR REPLACE TEMP VIEW temptable_remduplicates 
AS 
SELECT * FROM 

( 
    SELECT ROW_NUMBER() OVER(PARTITION BY Id ORDER BY SinkModifiedOn DESC) as row_id
    , * 
    FROM {lakehouse_temptable} 

) as table 

WHERE table.row_id = 1""")

StatementMeta(SparkNB, 193, 57, Finished, Available)

DataFrame[]

In [57]:
%%sql
select count(*) from temptable_remduplicates 

StatementMeta(SparkNB, 193, 58, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>

## Quality Checks
##### Here we are validating that certain columns in the dataframe are unique and not null. The validate function will check all the expectations we've set up and return the results.

In [58]:
'''
import great_expectations as ge
import great_expectations.dataset.sparkdf_dataset as sd

# Convert Spark DataFrame to Great Expectations Dataset
ge_df = ge.dataset.SparkDFDataset(table_df)

# Define Expectations

# Not null 
ge_df.expect_column_values_to_not_be_null('createdOn')
#ge_df.expect_column_values_to_not_be_null('creditMax')
#ge_df.expect_column_values_to_not_be_null('contactperson')


# Unique
ge_df.expect_column_values_to_not_be_null('Id')
ge_df.expect_column_values_to_be_unique('accountNum')


# Validate Expectations
results = ge_df.validate()

print(results)
'''

StatementMeta(SparkNB, 193, 59, Finished, Available)

"\nimport great_expectations as ge\nimport great_expectations.dataset.sparkdf_dataset as sd\n\n# Convert Spark DataFrame to Great Expectations Dataset\nge_df = ge.dataset.SparkDFDataset(table_df)\n\n# Define Expectations\n\n# Not null \nge_df.expect_column_values_to_not_be_null('createdOn')\n#ge_df.expect_column_values_to_not_be_null('creditMax')\n#ge_df.expect_column_values_to_not_be_null('contactperson')\n\n\n# Unique\nge_df.expect_column_values_to_not_be_null('Id')\nge_df.expect_column_values_to_be_unique('accountNum')\n\n\n# Validate Expectations\nresults = ge_df.validate()\n\nprint(results)\n"

## Enrich your silver
##### At this stage, you have a deduped incremental data. You can enrich it further by making joins with other tables, or picking out columns you are interested in and discarding rest. Next, we do the merge with the target silver table. This same process of picking out incremental changes using checkpoint folders can happen for your gold tables.

## Merge
##### Make a new dataframe based on the view that holds deduped table records. This dataframe will be used in the merge process.

In [59]:
# You can remove row_id column before merging
removedDupsTable = "temptable_remduplicates" 
table_temp_df = spark.sql(f"SELECT * FROM {removedDupsTable}")
table_temp_df = table_temp_df.drop("row_id")


StatementMeta(SparkNB, 193, 60, Finished, Available)

##### We use Delta lake merge function, to either update or insert. Note the condition uses Id and IsDelete. The deleted records will come with IsDelete as true and hence will be treated as new records. You need to handle the deletes later. More info: https://docs.delta.io/latest/delta-update.html

In [60]:
# Check if Delta table exists
if mssparkutils.fs.exists(path_to_target_table):
    targetTable = DeltaTable.forPath(spark, path_to_target_table)
else:
    # Create Delta table
    table_temp_df.write.format("delta").mode("overwrite").saveAsTable(lakehouse + "." + target_table)
    targetTable = DeltaTable.forPath(spark, path_to_target_table)


# Define merge condition
merge_condition = (
    "source.Id = target.Id"
)

# Execute merge
(
 targetTable.alias("target")
 .merge(
    table_temp_df.alias("source"),
    merge_condition

 )
 .whenMatchedUpdateAll()
 .whenNotMatchedInsertAll()
 .execute()
)

StatementMeta(SparkNB, 193, 61, Finished, Available)

## Cleanup with Vacuum command
##### Remove old data in the temp table by using the Vacuum command, which physically removes files from storage that are older than the retention period. This saves storage costs. You can consider using vaccum for source delta lake folders (Synapse link folders) as well once data is read. More info: https://delta.io/blog/remove-files-delta-lake-vacuum-command/

In [61]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
spark.sql(f"VACUUM {lakehouse_temptable} RETAIN 0 HOURS").show(truncate=False)


StatementMeta(SparkNB, 193, 62, Finished, Available)

+----------------------------------------------------------------------------------------------------+
|path                                                                                                |
+----------------------------------------------------------------------------------------------------+
|abfss://synapselink-lakehouses@salabcommercedatalake.dfs.core.windows.net/lakehouse_1/vendtable_temp|
+----------------------------------------------------------------------------------------------------+



## Conclusion
##### The notebook demonstrates a POC of achieving incremental pipeline for data loading at scale using Checkpoints in Spark Structured Streaming. Checkpoints offer an easy to manage and efficient mechansim of loading data.