In [None]:
# parameter cell
ListOfTables = "{'name':'productmodelproductdescription','partitionkey':'ProductModelID','uuid':'F7943D78-ECFD-4246-81A6-C8221F43BDBB','PK_COLS':'ProductDescriptionID'},{'name':'salesorderdetail','partitionkey':'','uuid':'5DF6AB3A-B306-45D7-B63F-A3BC3B92466B','PK_COLS':''},{'name':'salesorderheader','partitionkey':'','uuid':'BB74FEA8-C9D0-4B71-8F6B-E1185585C511','PK_COLS':''},{'name':'vgetallcategories','partitionkey':'','uuid':'2B84F980-33E8-4FDA-A560-1B0C4741ECEC','PK_COLS':''}"
bronze_format = 'parquet'
datasource = 'wwi'
bronze_date_slize = "2023/07/17/16/"
bronze_container_name = "bronze"
bronze_relative_path = "salesdata/saleslt" 
silver_relative_path = "salesdata/saleslt"
target_mode ='RL'


# Parameter Documentation #

## *ListOfTables:*
This is the main attribute list of tables that needs to be processed in the main for each loop, this is constructed as:
{'name':'***Table Name***','partitionkey':'***Column that the table is partitioned by***','uuid':'***This is the GUID for the source extraction log***','PK_COLS':'***This is the primary key from source***'}
## *bronze_format* 
This controls the spark.read format when loading the source data, we assume this is  ***parquet***
## *datasource*
This is unused
## *bronze_date_slize*
This is the date folder in the bronze folder ie: ***"YYYY/MM/DD/hh/"***
## *bronze_container_name*
This is the storage container name.
## *silver_container_name*
This is the storage container name.
## *bronze_relative_path*
This is the base folder path of the source data batch ie: "***folder/sub_folder***" 
## *silver_relative_path* 
This is the base folder path of the target table batch ie: "***folder/sub_folder***"
## *target_mode*
This defines how to handle the batch IE: is the transaction append/incremental insert(***II***) , Reload table(***RL***), Merge Insert(***MI***) or Upsert(***US***)


In [None]:
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

spark.conf.set("sprk.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")

In [None]:
# Runtime variables
import ast
bronze_layer = 'Files/bronze/'+bronze_relative_path+'/'
silver_layer = 'Tables/'
silver_relative_path = silver_relative_path.replace('/','_')
print(bronze_layer)
print(silver_layer)

ListOfTables = "[" + ListOfTables + "]"
# Parse the string as a Python expression and get a list of dictionaries
mylist = ast.literal_eval(ListOfTables)
#print(ListOfTables)
print(silver_relative_path)
tables = mylist
#print(tables)

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from delta.tables import *


#build UDF execution pattern for the For Loop
def loadFullDataFromSource(table_name,partition_by_clause,log_uuid,businesskey):
    #state = '1'
    df = spark.read.format(bronze_format).load(bronze_layer + table_name + '/' + bronze_date_slize)
    df = df.withColumn('trans_log_id',lit(log_uuid))
    df = df.withColumn("trans_utc_timestamp", lit(current_timestamp()))
    print(table_name)
    if len(partition_by_clause) > 1:
        df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").partitionBy(partition_by_clause).save(silver_layer + '/' + table_name)
        
    else:
        df.write.mode("overwrite").format("delta").save(silver_layer + '/' + table_name)
        
#build UDF execution pattern for the For Loop
def loadIncrementalInsertDataFromSource(table_name,partition_by_clause,log_uuid,businesskey):
    #state = '1'
    df = spark.read.format(bronze_format).load(bronze_layer + table_name + '/' + bronze_date_slize)
    df = df.withColumn('trans_log_id',lit(log_uuid))
    df = df.withColumn("trans_utc_timestamp", lit(current_timestamp()))
    print(table_name)
    if len(partition_by_clause) > 1:
        df.write.mode("append").format("delta").option("overwriteSchema", "true").partitionBy(partition_by_clause).save(silver_layer + '/' + table_name)
      
    else:
        df.write.mode("append").format("delta").save(silver_layer + '/' + table_name)

#build UDF execution pattern for the For Loop
def loadMergeInsertDataFromSource(table_name,partition_by_clause,log_uuid,businesskey):
    #state = '1'
    df = spark.read.format(bronze_format).load(bronze_layer + table_name + '/' + bronze_date_slize)
    df = df.withColumn('trans_log_id',lit(log_uuid))
    df = df.withColumn("trans_utc_timestamp", lit(current_timestamp()))
    print(table_name)
    if DeltaTable.isDeltaTable(spark,  silver_layer + '/' + table_name):
        print("The delta table exists!")
        deltaTable = DeltaTable.forPath(spark, silver_layer + '/' + table_name)
        deltaTable.alias("t").merge(df.alias("s"),"t." + businesskey + " = s." + businesskey ).whenNotMatchedInsertAll().execute()
    else:
        print("The delta table does not exist.")
        if len(partition_by_clause) > 1:
            df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").partitionBy(partition_by_clause).save(silver_layer + '/' + table_name)
        
        else:
            df.write.mode("overwrite").format("delta").save(silver_layer + '/' + table_name)
    
    
    

#build UDF execution pattern for the For Loop
def loadUpsertDataFromSource(table_name,partition_by_clause,log_uuid,businesskey):
    #state = '1'
    df = spark.read.format(bronze_format).load(bronze_layer + table_name + '/' + bronze_date_slize)
    df = df.withColumn('trans_log_id',lit(log_uuid))
    df = df.withColumn("trans_utc_timestamp", lit(current_timestamp()))
    print(table_name)
    if DeltaTable.isDeltaTable(spark,  silver_layer + '/' + table_name):
        print("The delta table exists!")
        deltaTable = DeltaTable.forPath(spark, silver_layer + '/' + table_name)
        deltaTable.alias("t").merge(df.alias("s"),"t." + businesskey + " = s." + businesskey ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    else:
        print("The delta table does not exist.")
        if len(partition_by_clause) > 1:
            df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").partitionBy(partition_by_clause).save(silver_layer + '/' + table_name)
        
        else:
            df.write.mode("overwrite").format("delta").save(silver_layer + '/' + table_name)




full_tables = tables
for item in tables:
    name, partitionkey, uuid, PK_COLS = item.values()
    if target_mode =='RL': 
        loadFullDataFromSource(name, partitionkey, uuid, PK_COLS)
    if target_mode =='II': 
        loadIncrementalInsertDataFromSource(name, partitionkey, uuid, PK_COLS)
    if target_mode =='MI': 
        loadMergeInsertDataFromSource(name, partitionkey, uuid, PK_COLS)
    if target_mode =='US': 
        loadUpsertDataFromSource(name, partitionkey, uuid, PK_COLS)
    else:
        print("Nothing to do...")