In [None]:
# Set arguments
PrimaryKeys = "PersonID"
IsIncremental = False

SourceWorkspace= ""
SourceLakehouse =""
SourceLakehouseName ='LH_Data_Landingzone'
source_file_path = "WideWorldImporters"
source_file_name = "ApplicationPeople.parquet"

TargetWorkspace= ""
TargetLakehouse =""
TargetLakehouseName ='LH_Bronze_Layer'
target_schema = "Application"
target_name = "People"


StatementMeta(, e22e022b-74d1-4da0-84df-0e3b546cd694, 27, Finished, Available)

## Load Libraries

In [None]:
import re
import datetime
import json
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *
from notebookutils import mssparkutils
import uuid

StatementMeta(, e22e022b-74d1-4da0-84df-0e3b546cd694, 28, Finished, Available)

## Define Starttime

In [None]:
start_audit_time = datetime.datetime.now()

StatementMeta(, e22e022b-74d1-4da0-84df-0e3b546cd694, 29, Finished, Available)

## Set Configuration

In [None]:
#Make sure you have enabled V-Order

spark.conf.set("sprk.sql.parquet.vorder.enabled", "true")

StatementMeta(, e22e022b-74d1-4da0-84df-0e3b546cd694, 30, Finished, Available)

In [None]:
NotebookExecutionId = str(uuid.uuid4())

spark.conf.set("spark.databricks.delta.commitInfo.userMetadata", NotebookExecutionId)

StatementMeta(, e22e022b-74d1-4da0-84df-0e3b546cd694, 31, Finished, Available)

## Set your loading paths

In [None]:
#Set SourceFile and target Location
source_changes_data_path = f"abfss://{SourceWorkspace}@onelake.dfs.fabric.microsoft.com/{SourceLakehouse}/Files/{source_file_path}/{source_file_name}"
print(source_changes_data_path)

#Beware 
target_data_path = f"abfss://{TargetWorkspace}@onelake.dfs.fabric.microsoft.com/{TargetLakehouse}/Tables/{target_schema}{target_name}"
print(target_data_path)


StatementMeta(, e22e022b-74d1-4da0-84df-0e3b546cd694, 32, Finished, Available)

abfss://3a561f99-edc7-4d6c-a866-f3bf70bc7235@onelake.dfs.fabric.microsoft.com/942b0ceb-8b93-4e01-9224-7c3621dfb2e0/Files/WideWorldImporters/ApplicationPeople.parquet
abfss://3a561f99-edc7-4d6c-a866-f3bf70bc7235@onelake.dfs.fabric.microsoft.com/1e10a8ff-69b8-46d5-8cb5-5fb5a3c88bbc/Tables/ApplicationPeople


## Load new from Data Landingzone

In [None]:
#Read all incoming changes in Parquet format
dfDataChanged= spark.read\
                .format("parquet") \
                .load(f"{source_changes_data_path}")

StatementMeta(, e22e022b-74d1-4da0-84df-0e3b546cd694, 33, Finished, Available)

## DQ Checks

In [None]:
#split PKcolumns string on , ; or :
PrimaryKeys = str(PrimaryKeys)

PrimaryKeys = re.split('[, ; :]', PrimaryKeys)
#remove potential whitespaces around Pk columns
PrimaryKeys = [column.strip() for column in PrimaryKeys if column != ""]

key_columns = PrimaryKeys
print(f": {', '.join(key_columns)}")
# Check if all PK's exist in source
for pk_column in key_columns:
    if pk_column not in dfDataChanged.columns:
        raise ValueError(f"PK: {pk_column} doesn't exist in the source.")
        # Define all the Non-Key columns => HashExcludeColumns

read_key_columns = [column for column in dfDataChanged.columns if column in key_columns]

# Add a column with the calculated hash, easier in later stage of with multiple PK
dfDataChanged = (dfDataChanged
                .withColumn("HashedPKColumn", sha2(concat_ws("||", *read_key_columns), 256)))


StatementMeta(, e22e022b-74d1-4da0-84df-0e3b546cd694, 34, Finished, Available)

## Check for Duplicates

In [None]:
if dfDataChanged.select('HashedPKColumn').distinct().count() != dfDataChanged.select('HashedPKColumn').count():
    raise ValueError(f'Source file contains duplicated rows for PK: {", ".join(key_columns)}')

StatementMeta(, e22e022b-74d1-4da0-84df-0e3b546cd694, 35, Finished, Available)

## Add Hash

In [None]:
non_key_columns = [column for column in dfDataChanged.columns if column not in key_columns]

#add a hashed cloumn to detect changes
dfDataChanged = (dfDataChanged
                .withColumn("HashedNonKeyColumns", md5(concat_ws("||", *non_key_columns))))

#Add RecordLoadDate to see when the record arrived
dfDataChanged = dfDataChanged.withColumn('RecordLoadDate', current_timestamp())


StatementMeta(, e22e022b-74d1-4da0-84df-0e3b546cd694, 36, Finished, Available)

In [None]:
#display(dfDataChanged)

StatementMeta(, e22e022b-74d1-4da0-84df-0e3b546cd694, 37, Finished, Available)

## Read Original if exists

In [None]:
#Check if Target exist, if exists read the original data if not create table and exit
if DeltaTable.isDeltaTable(spark, target_data_path):
    # Read original/current data
    dfDataOriginal = (spark
                        .read.format("delta")
                        .load(target_data_path)
                        )

else:
    # Use first load when no data exists yet and then exit 
    dfDataChanged.write.format("delta").mode("overwrite").save(target_data_path)
    TotalRuntime = str((datetime.datetime.now() - start_audit_time)) 

    deltaTable = DeltaTable.forPath(spark, f'{target_data_path}')
    
    dfAudit = deltaTable.history()

    dfAudit = dfAudit.select('operationMetrics', 'userMetadata')\
        .withColumn('Deleted', col('operationMetrics.numTargetRowsDeleted'))\
        .withColumn('Inserted', col('operationMetrics.numTargetRowsInserted'))\
        .withColumn('Read', col('operationMetrics.numSourceRows'))\
        .withColumn('Updated', col('operationMetrics.numTargetRowsUpdated'))\
        .withColumn('Copied', col('operationMetrics.numTargetRowsCopied'))\
        .withColumn('Output', col('operationMetrics.numOutputRows'))\
        .drop('operationMetrics')\
        .groupBy('userMetadata')\
        .agg(
            sum('Deleted').cast('int').alias('Deleted'),
            sum('Inserted').cast('int').alias('Inserted'),
            sum('Updated').cast('int').alias('Updated'),
            sum('Output').cast('int').alias('Output'),
            sum('Copied').cast('int').alias('Copied'),
            sum('Read').cast('int').alias('Read')
        ).na.fill(0)
    #display(dfAudit)
    dfAuditFiltered = dfAudit.filter(dfAudit.userMetadata == NotebookExecutionId)



    try:
        RowsInserted = dfAuditFiltered.select('Inserted').collect()[0][0]
        RowsUpdated = dfAuditFiltered.select('Updated').collect()[0][0]
        RowsDeleted = dfAuditFiltered.select('Deleted').collect()[0][0]
        RowsCopied = dfAuditFiltered.select('Copied').collect()[0][0]
        RowsRead = dfAuditFiltered.select('Read').collect()[0][0]
        RowsOutput = dfAuditFiltered.select('Output').collect()[0][0]

    except:
        RowsInserted = 0
        RowsUpdated = 0
        RowsDeleted = 0
        RowsCopied = 0
        RowsRead = 0
        RowsOutput = 0

    TotalRuntime = str((datetime.datetime.now() - start_audit_time)) 

    # Your data
    result_data = {
        "CopyOutput":{
            "Total Runtime": TotalRuntime,
            "TargetSchema": target_schema,
            "TargetName" : target_name,
            "RowsRead": RowsOutput if RowsRead - RowsInserted == 0 else RowsRead - RowsInserted,
            "RowsNew": RowsOutput if RowsRead - RowsInserted == 0 else RowsInserted,
            "RowsUpdated": RowsUpdated,
            "RowsUnchanged": RowsCopied,
            "RowsDeleted": RowsDeleted
        }
    }
        
    mssparkutils.notebook.exit(result_data)

StatementMeta(, , , Waiting, )

## Merge table

In [None]:
#merge table 
deltaTable = DeltaTable.forPath(spark, f'{target_data_path}')
if IsIncremental in [False, 'false', 'False']:
    print(' - Incremental Loading is not enabled, deletes are allowed')
    merge = deltaTable.alias('original') \
        .merge(dfDataChanged.alias('updates'), 'original.HashedPKColumn == updates.HashedPKColumn') \
        .whenNotMatchedInsertAll() \
        .whenMatchedUpdateAll('original.HashedNonKeyColumns != updates.HashedNonKeyColumns') \
        .whenNotMatchedBySourceDelete() \
        .execute()
elif IsIncremental not in [False, 'false', 'False']:
    print(' - Incremental Loading is enabled, deletes are not allowed')
    merge = deltaTable.alias('original') \
        .merge(dfDataChanged.alias('updates'), 'original.HashedPKColumn == updates.HashedPKColumn') \
        .whenNotMatchedInsertAll() \
        .whenMatchedUpdateAll('original.HashedNonKeyColumns != updates.HashedNonKeyColumns') \
        .execute()

StatementMeta(, , , Waiting, )

## Exit notebook

In [None]:
dfAudit = deltaTable.history()

dfAudit = dfAudit.select('operationMetrics', 'userMetadata')\
    .withColumn('Deleted', col('operationMetrics.numTargetRowsDeleted'))\
    .withColumn('Inserted', col('operationMetrics.numTargetRowsInserted'))\
    .withColumn('Read', col('operationMetrics.numSourceRows'))\
    .withColumn('Updated', col('operationMetrics.numTargetRowsUpdated'))\
    .withColumn('Copied', col('operationMetrics.numTargetRowsCopied'))\
    .withColumn('Output', col('operationMetrics.numOutputRows'))\
    .drop('operationMetrics')\
    .groupBy('userMetadata')\
    .agg(
        sum('Deleted').cast('int').alias('Deleted'),
        sum('Inserted').cast('int').alias('Inserted'),
        sum('Updated').cast('int').alias('Updated'),
        sum('Output').cast('int').alias('Output'),
        sum('Copied').cast('int').alias('Copied'),
        sum('Read').cast('int').alias('Read')
    ).na.fill(0)
#display(dfAudit)
dfAuditFiltered = dfAudit.filter(dfAudit.userMetadata == NotebookExecutionId)



try:
    RowsInserted = dfAuditFiltered.select('Inserted').collect()[0][0]
    RowsUpdated = dfAuditFiltered.select('Updated').collect()[0][0]
    RowsDeleted = dfAuditFiltered.select('Deleted').collect()[0][0]
    RowsCopied = dfAuditFiltered.select('Copied').collect()[0][0]
    RowsRead = dfAuditFiltered.select('Read').collect()[0][0]
    RowsOutput = dfAuditFiltered.select('Output').collect()[0][0]

except:
    RowsInserted = 0
    RowsUpdated = 0
    RowsDeleted = 0
    RowsCopied = 0
    RowsRead = 0
    RowsOutput = 0

TotalRuntime = str((datetime.datetime.now() - start_audit_time)) 

# Your data
result_data = {
    "CopyOutput":{
        "Total Runtime": TotalRuntime,
        "TargetSchema": target_schema,
        "TargetName" : target_name,
        "RowsRead": RowsOutput if RowsRead - RowsInserted == 0 else RowsRead - RowsInserted,
        "RowsNew": RowsOutput if RowsRead - RowsInserted == 0 else RowsInserted,
        "RowsUpdated": RowsUpdated,
        "RowsUnchanged": RowsCopied,
        "RowsDeleted": RowsDeleted
    }
}
    
mssparkutils.notebook.exit(result_data)