In [None]:
from delta import tables
import pyspark.sql.functions as F

In [None]:
# Create widgets for base parameters from ADF
dbutils.widgets.text("type", "")
dbutils.widgets.text("factory", "")
dbutils.widgets.text("folder", "")

DATA_TYPE = dbutils.widgets.get("type")
FACTORY = dbutils.widgets.get("factory")
FOLDER = dbutils.widgets.get("folder")

In [None]:
delta_table = "Path_to_delta_table" #fake path
tags_path = "Path_to_tags" #fake path

if DATA_TYPE == "metadata":
    delta_table = delta_table + DATA_TYPE
    tags_path = tags_path + "metadata/"
elif DATA_TYPE == "tags":
    delta_table = delta_table + FACTORY
    tags_path = tags_path + f"{FACTORY}/{FOLDER}"
else:
    print("This data type is not supported")

# Get list of filenames in the folder
file_list = dbutils.fs.ls(tags_path)

In [None]:
# upsert metadata into delta table
def upsert_metadata(tag_df, delta_table):
    # Check if delta table doesnt exist
    if not tables.DeltaTable.isDeltaTable(spark, delta_table):
        # Create new delta table with new data
        (tag_df.write.format('delta')
                 .mode("overwrite")
                 .save(delta_table))
    else:
        # Open delta table and upsert
        # If FQN exitsts update the values, else insert the values
        deltaTable = tables.DeltaTable.forPath(spark, delta_table)
        (deltaTable.alias("existingData")
             .merge(source=tag_df.alias("newData"), condition="existingData.FQN = newData.FQN")
             .whenMatchedUpdateAll()
             .whenNotMatchedInsertAll()
             .execute()
        )

In [None]:
# upsert sensor data into delta table
def upsert_sensordata(tag_df, delta_table):
    # Check if the delta table does not exits
    if not tables.DeltaTable.isDeltaTable(spark, delta_table):
        # Create new delta table with new data, partitioned by year, month, day and FQN
        (tag_df.write.format('delta')
                 .mode("overwrite")
                 .partitionBy("Year", "Month", "Day", "FQN")
                 .save(delta_table))
    else:
        # Open delta table and upsert
        # If FQN and timestamp combination exists update the values, else insert the values
        deltaTable = tables.DeltaTable.forPath(spark, delta_table)
        (deltaTable.alias("existingData")
             .merge(source=tag_df.alias("newData"), condition="existingData.FQN = newData.FQN AND existingData.ObsTimeStamp = newData.ObsTimeStamp")
             .whenMatchedUpdateAll()
             .whenNotMatchedInsertAll()
             .execute()
        )

### Upsert landing to curated

In [None]:
# Read data from first file as a spark dataframe
tags_df = spark.read.format("csv").load(file_list[0].path, header=True)

# if datatype is metadata, upsert metadata
if DATA_TYPE == "metadata":
    upsert_metadata(tags_df, delta_table)
else:    
    # loop over every file and add the data to one spark dataframe
    for file in file_list[1:]:
        if file.size > 31:
            df = spark.read.format("csv").load(file.path, header=True)
            tags_df = tags_df.union(df)
    # Add year, month and day column for partitioning
    tags_df = tags_df.withColumn("Year", F.year(F.col("ObsTimeStamp")))
    tags_df = tags_df.withColumn("Month", F.month(F.col("ObsTimeStamp")))
    tags_df = tags_df.withColumn("Day", F.dayofmonth(F.col("ObsTimeStamp")))
    # upsert senor data into delta table
    upsert_sensordata(tags_df, delta_table)

In [None]:
# This deletes the whole folder
dbutils.fs.rm(tags_path, True)