# Batch or continuous ingestion of incremental data using Spark Structure Streaming

Scenario: You wish to maintain a "copy" of source system tables in your lakehouse. An incremental feed of changes is published as files (parquet) to OneLake. You need to merge these changes (inserts, updates & deletes) to target tables as an incremental operation i.e. only process new changes/files which have arrived. The process needs to keep the target tables in sync either in near real-time or frequently (hourly, daily) to support downstream ETL pipelines.  

Session outcomes: Demonstrate how to merge incremental data feeds from OneLake into target Lakehouse tables using Spark Structured Streaming.

![Overview](https://github.com/hurtn/Fabric-Spark-/blob/main/Overview.png?raw=true)

What is Spark Structured Streaming: Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming/incremental data continues to arrive. For more information on strucuted streaming please refer to the [Apache Spark documentation](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html).
The engine uses a mechanism called checkpointing to keep track of which files have already been processed. This means that even in the event of failure, Structured Streaming can unsure end-to-end exactly-once semantics whereby each file will be processed only once. The main notebook can also be run in both batch (scheduled) or continuous (micro-batch) mode based on the [streaming trigger](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) configuration and will utilise checkpoints to only process new data.

To get started: Ensure you have imported the following three notebooks (including this one) into your workspace. A summary of the notebooks is as follows:

01 - Setup: This notebook will guide you through the setup by creating Lakehouse tables based on WWI sample data as part of the [lakehouse tutorial](https://learn.microsoft.com/en-us/fabric/data-engineering/tutorial-lakehouse-data-ingestion). It will also allow you to simulate the arrival of new incremental data and verify that data has been merged into the target tables.

02 - Orchestrator: The notebook orchestrates the ingestion process. A definition of tables and related metadata is dynamically generated (DAG) which is submitted as a set of instructions to the [runMulitple Spark utility](https://learn.microsoft.com/en-us/fabric/data-engineering/microsoft-spark-utilities#reference-run-multiple-notebooks-in-parallel). The DAG runs the referenced notebook (03 - TableLoader) for each table, detecting new data to merge into the Lakehouse table. 

03 - TableLoader: An instance of this notebook runs for each table (concurrently) merging new incremental data/files with the associated target Lakehouse table. Depending on the trigger configuration, it will run either once for all tables per schedule before handing back control to the Orchestrator notebook, or it will run continuously until terminated or an error occurs.

 <font size="2" color="red" face="sans-serif" bold> 

<b> <i> <u>Open each notebook and make sure that for every Notebook a default lakehouse has been pinned
</font>




##### Step 1 - Initialise
Set the desired number of tables to run this demonstration against. Note if using the default starter pool the maximum is approximately 50 tables, 10 is ideal for demonstration purposes.

You may need to re-run this cell below if your session is disconnected as other cells below rely on the numtables variable to be set.

In [20]:
# Set the number of tables required
numtables=5

StatementMeta(, 33f9b13d-e4c6-4428-bb99-4af607609ee4, 22, Finished, Available)

Run the cell below to set up the tables (will be dropped if exist) and populate sample data in the relative base location. This will take around 5 minutes depending on the number of tables defined.

In [1]:

# Do not change the relative base - this is where change feed data and checkpoints will be stored
relbaselocation = "Files/AutoMerger/"

from pyspark.sql.functions import col,lit,current_timestamp, input_file_name

def create_tables(table_name):
  
  print("Dropping table "+table_name+" if exists")
  spark.sql("drop table if exists "+table_name+";")
  print("Creating table "+table_name + " from initial load file")
  # Read at least one initial file from the sample. This is to simulate the initial loading process. 
  # Using the withColumn statement, add the current timestamp and input file name as columns to the table.
  df = spark.read.format("parquet").load(relbaselocation+'/basetable/part-00000-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet') \
  .withColumn("changeTimestamp",current_timestamp()).withColumn("inputFile",input_file_name())
  df.write.mode("overwrite").format("delta").save("Tables/" + table_name)

print("Resetting checkpoint and incremental directories if they exist")
try:
  mssparkutils.fs.rm(relbaselocation+"checkpoints/",True)
except:
  None
try:
  mssparkutils.fs.rm(relbaselocation+"incrementalfeed/",True)
except:
  None


# Copy sample data
for i in range(0,10):
    if not mssparkutils.fs.exists(relbaselocation+"/basetable/part-0000"+str(i)+"-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet"):
      print("Copying sample data file: "+ relbaselocation+"/basetable/part-0000"+str(i)+"-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet")
      mssparkutils.fs.cp("https://azuresynapsestorage.blob.core.windows.net/sampledata/WideWorldImportersDW/parquet/incremental/fact_sale_1y_incremental/part-0000"+str(i)+"-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet",relbaselocation+"/basetable/part-0000"+str(i)+"-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet")

# Create tables. Note these will be appear in the tables section with the name "table" suffixed with a numerical value 
for i in range(numtables):
  table_name = 'table'+str(i+1)
  create_tables(table_name)
  
  # Copy the first sample of data to the incremental feed folder   
  print("Copying first incremental file (part-00001) to be processed...")
  mssparkutils.fs.cp(relbaselocation+"/basetable/part-00001-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet",relbaselocation+"/incrementalfeed/"+table_name+"/part-00001-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet")

print("Complete")


StatementMeta(, cfa13de6-0250-438e-b391-dee78a9f5d8a, 3, Finished, Available)

Resetting checkpoint and incremental directories if they exist
Copying sample data file: Files/AutoMerger//basetable/part-00000-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet
Copying sample data file: Files/AutoMerger//basetable/part-00001-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet
Copying sample data file: Files/AutoMerger//basetable/part-00002-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet
Copying sample data file: Files/AutoMerger//basetable/part-00003-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet
Copying sample data file: Files/AutoMerger//basetable/part-00004-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet
Copying sample data file: Files/AutoMerger//basetable/part-00005-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet
Copying sample data file: Files/AutoMerger//basetable/part-00006-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet
Copying sample data file: Files/AutoMerger//basetable/part-00007-5eaa21a0-54da-459d

##### Step 2 - Run the incremental loading notebook

Inspect the incrementalfeed folder (under the base location) for table1 - you should see at least one file, which is a new incremental file (part-00001) waiting for processing. 

Now navigate to the 02- orchestrator notebook, read the instructions and ensure the first cell has run successfully (if using batch mode) or is running (if using streaming trigger mode) before returning to this notebook.


##### Step 3 - Verify the new incremental file was loaded

Run the cell below to verify table1 was affected. Two entries appear in the history, the first was the initial load (write operation) and the second (most recent) is merge operation. You will notice these were all inserted rows, i.e. no updates or deletes occurred.

In [4]:
#set the table_name variable  
table_name = "table1"

df = spark.sql("describe history "+table_name)
display(df.select("timestamp","operation","operationMetrics.numFiles","operationMetrics.numOutputRows","operationMetrics.numTargetRowsInserted","operationMetrics.numTargetRowsMatchedUpdated").orderBy("timestamp",ascending=False))

StatementMeta(, 33f9b13d-e4c6-4428-bb99-4af607609ee4, 6, Finished, Available)

SynapseWidget(Synapse.DataFrame, 8f8d9cb3-4242-4169-8130-e3f477cfa7b2)

Another way to validate that data was loaded is by the changeTimestamp column...

In [15]:
%%sql
select changeTimestamp,count(*) from table1 group by changeTimestamp order by 1

StatementMeta(, 33f9b13d-e4c6-4428-bb99-4af607609ee4, 17, Finished, Available)

<Spark SQL result set with 4 rows and 2 fields>

##### Step 3a - Configuration additional incremental loads
The below cells allow you to simulate incoming incremental files to be merged with the target tables. Each incremental file arrives in an associated sub folder matching the table name. 

Instructions:
1. Once the stream is running or has been run (if using batch mode) in the Orchestrator notebook, set the variables below to simulate the number of inserts and updates. Leave the filepos variable set to 2.
2. Run this cell below only once as it sets the filepos variable which determines the starting position for incremental files of which there are 9 in total. 
3. Then run the cell in Step 3b to add new files to the incremental folder for each table. This increments the filepos variable to fetch the next incremental file.
4. Run the orchestrator notebook again (if using batch mode otherwise it should be continuosly running) to load the new incremental data from step 3.
5. Monitor the changes to the target table using the table history command in Step 4 to examine the number of rows affected. This should match the number of inserts and updates specified below.
6. Repeat steps 3b-5.
7. Terminate the running cell in the orchestrator if using streaming mode



In [8]:
# Set the number of tables, inserts and updates  
numupdates = 500
numinserts=500
relbaselocation = "Files/AutoMerger/"

# Do not change this starting file position 2 which is the incremental file starting position. 
# This value Will be incremented each time the cell below is run to add incremental files
filepos = 2


StatementMeta(, 33f9b13d-e4c6-4428-bb99-4af607609ee4, 10, Finished, Available)

##### Step 3b - Load incremental data into the incremental feed per table
The below cells generate the specified number of inserts (based on new sample data) and updates (using existing keys from the initial file) and allow you to simulate incoming incremental files to be merged with the target tables. Each incremental file arrives in an associated sub folder matching the table name. 


In [16]:
from pyspark.sql.functions import col,lit,current_timestamp

if filepos<10:
  print("Adding incremental file #" + str(filepos) + " to the incremental feed folder for each associated table.")
  print("Please wait until complete...")
  # Obtain a set of already existing records so that these can be updated to form part of the next incremental set of updates
  dfupdates = spark.read.format("parquet") \
  .load(relbaselocation+"/basetable/part-00000-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet") \
  .orderBy('salekey',ascending=True).limit(numupdates)
  dfupdates.write.mode("overwrite").format("delta").save("Tables/temptable")
  spark.sql("update temptable set Description = 'Test update"+str(filepos)+"'")
  dfupdates = spark.sql("select * from temptable")

  dfinserts = spark.read.format("parquet") \
  .load(relbaselocation+"/basetable/part-0000" + str(filepos) + "-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet") \
  .orderBy('salekey',ascending=False).limit(numinserts)
  df03 = dfupdates.union(dfinserts)
  for p in range(1,numtables+1):
    df03.coalesce(1).write.mode("append").format("parquet").save(relbaselocation+"/incrementalfeed/table"+str(p))  

  filepos = filepos+1
  print('Complete.')
else:
  print("No further sample incremental data is available. Please reset the demo by restarting this notebook.")

StatementMeta(, 33f9b13d-e4c6-4428-bb99-4af607609ee4, 18, Finished, Available)

Adding incremental file #4 to the incremental feed folder for each associated table.
Please wait until complete...
Complete.


##### Step 4 - Verify data has been merged into the target table

You can verify the number of inserts and updates as well as the latency/timestamp at which these occured.

In [19]:
#set the table_name variable  
table_name = "table1"

df = spark.sql("describe history "+table_name)
display(df.select("timestamp","operation","operationMetrics.numTargetRowsInserted","operationMetrics.numTargetRowsMatchedUpdated").orderBy("timestamp",ascending=False))

StatementMeta(, 33f9b13d-e4c6-4428-bb99-4af607609ee4, 21, Finished, Available)

SynapseWidget(Synapse.DataFrame, 3364ccdb-0c3c-4732-8972-8675a1062de6)

##### Optional: Obtain process wide statistics from all tables

In [13]:
# Verify the recent inserts and updates for a specific table
from delta.tables import *
from pyspark.sql.functions import col,lit,current_timestamp

spark = SparkSession(sc)
table_name = "table1"
deltaTable = DeltaTable.forPath(spark, "Tables/" + table_name)
df =  deltaTable.history()
dftemp = df.withColumn("Table",lit(table_name))
dfhist=dftemp.select("Table","timestamp","operationMetrics.numTargetRowsInserted","operationMetrics.numTargetRowsMatchedUpdated")
for p in range(2,numtables+1):
    table_name = "table"+str(p)
    deltaTable = DeltaTable.forPath(spark, "Tables/" + table_name)
    df =  deltaTable.history()
    dftemp = df.withColumn("Table",lit(table_name))
    dfhist =dfhist.union(dftemp.select("Table","timestamp","operationMetrics.numTargetRowsInserted","operationMetrics.numTargetRowsMatchedUpdated"))
display(dfhist.filter("numTargetRowsInserted is not null").orderBy(["Table","timestamp"],ascending=[True,False]))

StatementMeta(, 33f9b13d-e4c6-4428-bb99-4af607609ee4, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, e11f4838-1ac3-471d-bce7-fd82b67b81ae)

##### House-keeping

1. Terminate the running cell in the orchestrator notebook if using streaming mode.
2. If you ran the orchestrator notebook in streaming mode then ensure no active queries exist
3. Run the cells below to remove all demo files and tables

In [None]:
def drop_tables(tablenum):
  table_name = 'table'+str(tablenum+1)
  print("Dropping table "+table_name+" if exists")
  spark.sql("drop table if exists "+table_name+";")
  # Copy the first sample data to the incremental feed folder   
  mssparkutils.fs.cp(relbaselocation+"/basetable/part-00001-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet",relbaselocation+"/incrementalfeed/"+table_name+"/part-00001-5eaa21a0-54da-459d-b098-307a78d5d41e-c000.snappy.parquet")

print("Cleaning up...")
for i in range(numtables):
  drop_tables(i)

print("Removing base location files...")
try:
    mssparkutils.fs.rm(relbaselocation,True)
except:
    None
print("Done")


##### Troubleshooting

If you encounter an error which suggests that the stream for a particular table is still active, you should copy the cell below into the orchestrator notebook and run this to terminate any active queries

In [None]:
import time
# Helper method to stop a streaming query
def stop_stream_query(query, wait_time):
    """Stop a running streaming query"""
    while query.isActive:
        msg = query.status['message']
        data_avail = query.status['isDataAvailable']
        trigger_active = query.status['isTriggerActive']
        if not data_avail and not trigger_active and msg != "Initializing sources":
            print('Stopping query...')
            query.stop()
        time.sleep(0.5)

    # Okay wait for the stop to happen
    print('Awaiting termination...')
    query.awaitTermination(wait_time)

sqm = spark.streams
for q in sqm.active:
  print(q.name + "query is still active, terminating...")
  stop_stream_query(q,2000)
