### **Purpose of this Notebook**:

- Notebook to Extract the Maxdate and rowsinserted for the Incremental Load part of the Pipeline
- Merge the new rows with the existing tables in the Lakehouse

In [1]:
from delta.tables import *
from pyspark.sql.functions import *
import pandas as pd

StatementMeta(, a31d36c2-21c7-45c5-827f-1f762beca59b, 3, Finished, Available)

<mark> In this cell we will be setting the parameters to align with the pipeline parameters. </mark>

In [2]:
lakehousePath = "abfss://61921ad2-87ec-40db-9f44-830a64b2937c@onelake.dfs.fabric.microsoft.com/c129ce55-1e7d-4900-b376-a67010776747"
tableName = "Product"
tableKey = "ProductID"
tableKey2 = None
dateColumn = "LastEdited"

StatementMeta(, a31d36c2-21c7-45c5-827f-1f762beca59b, 4, Finished, Available)

<mark>We will then define the Lakehouse Path and parameterise details like lakehousePath and tableName</mark>

In [3]:
# deltaTablePath = f"{lakehousePath}/Tables/{tableName}" 
deltaTablePath = f"{lakehousePath}/Tables/{tableName}" 
print(deltaTablePath)

StatementMeta(, a31d36c2-21c7-45c5-827f-1f762beca59b, 5, Finished, Available)

abfss://61921ad2-87ec-40db-9f44-830a64b2937c@onelake.dfs.fabric.microsoft.com/c129ce55-1e7d-4900-b376-a67010776747/Tables/Product


<mark>Set the parquetFilePath</mark>

In [4]:
parquetFilePath = f"{lakehousePath}/Files/incremental/{tableName}/{tableName}.parquet"
print(parquetFilePath)

StatementMeta(, a31d36c2-21c7-45c5-827f-1f762beca59b, 6, Finished, Available)

abfss://61921ad2-87ec-40db-9f44-830a64b2937c@onelake.dfs.fabric.microsoft.com/c129ce55-1e7d-4900-b376-a67010776747/Files/incremental/Product/Product.parquet


<mark>Read the parquetFile i.e. incremental loaded data as df2</mark>

In [13]:
df2 = spark.read.parquet(parquetFilePath)
#display(df2)

StatementMeta(, a31d36c2-21c7-45c5-827f-1f762beca59b, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, da2a9c32-053e-49f3-afcf-842cd1d6c256)

<mark>Merge the tableKey if more than 1 </mark>

In [9]:
if tableKey2 is None:
    mergeKeyExpr = f"t.{tableKey} = s.{tableKey}"
else:
    mergeKeyExpr = f"t.{tableKey} = s.{tableKey} AND t.{tableKey2} = s.{tableKey2}"

StatementMeta(, a31d36c2-21c7-45c5-827f-1f762beca59b, 11, Finished, Available)

<mark>In this cell we will be extracting details like TargetRowsInserted and TargetRowsUpdated and these metrics will be used to update the Metadata Table</mark>

In [10]:
if DeltaTable.isDeltaTable(spark,deltaTablePath):
    deltaTable = DeltaTable.forPath(spark,deltaTablePath)
    deltaTable.alias("t").merge(
        df2.alias("s"),
        mergeKeyExpr
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    history = deltaTable.history(1).select("operationMetrics")
    operationMetrics = history.collect()[0]["operationMetrics"]
    numInserted = operationMetrics["numTargetRowsInserted"]
    numUpdated = operationMetrics["numTargetRowsUpdated"]
else:
    df2.write.format("delta").save(deltaTablePath)  
    deltaTable = DeltaTable.forPath(spark,deltaTablePath)
    operationMetrics = history.collect()[0]["operationMetrics"]
    numInserted = operationMetrics["numOutputRows"]
    numUpdated = 0

StatementMeta(, a31d36c2-21c7-45c5-827f-1f762beca59b, 12, Finished, Available)

<mark>In the cell below we will be appending the data from a Parquet file in Lakehouse files to a Parquet table in Lakehouse tables., </mark>

In [11]:
deltaTablePath = f"{lakehousePath}/Tables/{tableName}"
df3 = spark.read.format("delta").load(deltaTablePath)
maxdate = df3.agg(max(dateColumn)).collect()[0][0]
# print(maxdate)
maxdate_str = maxdate.strftime("%Y-%m-%d %H:%M:%S")

StatementMeta(, a31d36c2-21c7-45c5-827f-1f762beca59b, 13, Finished, Available)

<mark>The cell below checks if the variable maxdate is not None before formatting it as a string.</mark>

In [12]:
result = "maxdate="+maxdate_str +  "|numInserted="+str(numInserted)+  "|numUpdated="+str(numUpdated)
# result = {"maxdate": maxdate_str, "numInserted": numInserted, "numUpdated": numUpdated}
mssparkutils.notebook.exit(str(result))

StatementMeta(, a31d36c2-21c7-45c5-827f-1f762beca59b, 14, Finished, Available)

ExitValue: maxdate=2024-05-13 09:42:35|numInserted=2|numUpdated=3

In [17]:
input_table_path = f"{lakehousePath}/Files/incremental/{tableName}/{tableName}.parquet"
output_table_path = f"{lakehousePath}/Tables/{tableName}"

input_table_df = spark.read.parquet(input_table_path)
input_table_df.write.mode("append").parquet(output_table_path)

StatementMeta(, a31d36c2-21c7-45c5-827f-1f762beca59b, 19, Finished, Available)