In [2]:
# path to our tables in the silver layer
silver_tables_path = "abfss://3201caea-2820-458c-8915-9fe2f28ba2a7@onelake.dfs.fabric.microsoft.com/4e18f678-60fe-4988-82c1-2d51453db35e/Tables"
# initial loading of data
df_salesperson = spark.read.option("delimiter", "\t").format("csv").option("header", "True").load("Files/Bronze_AdventureWorks/SalesPerson_19_04_2024.csv")

StatementMeta(, 304c2258-b4ba-4514-928b-28e5b738b79a, 4, Finished, Available)

## specifying datatypes

In [3]:
from pyspark.sql.functions import col, regexp_replace, when, substring, to_date, date_format, regexp_extract
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

df_salesperson = df_salesperson.withColumn("EmployeeKey", col("EmployeeKey").cast("int"))
df_salesperson = df_salesperson.withColumn("EmployeeID", col("EmployeeID").cast("int"))

StatementMeta(, 304c2258-b4ba-4514-928b-28e5b738b79a, 5, Finished, Available)

## Changing the column names

In [4]:
df_salesperson = df_salesperson.withColumnRenamed("UPN", "Email")
df_salesperson = df_salesperson.withColumnRenamed("Salesperson", "SalespersonName")

StatementMeta(, 304c2258-b4ba-4514-928b-28e5b738b79a, 6, Finished, Available)

## slowly changing dimensions

In [5]:
# adding columns for SCD
from pyspark.sql.functions import lit, current_date

df_salesperson = df_salesperson.withColumn("isActive", lit(True))
df_salesperson = df_salesperson.withColumn("startDate", date_format(current_date(), "yyyy-MM-dd").cast("date"))
df_salesperson = df_salesperson.withColumn("endDate", lit("9999-12-31").cast("date"))

StatementMeta(, 304c2258-b4ba-4514-928b-28e5b738b79a, 7, Finished, Available)

## Loading the new data

In [6]:

df_salesperson_new = spark.read.option("delimiter", ",").format("csv").option("header", "True").load("Files/Bronze_AdventureWorks/Salesperson_new_29_04_2024.csv")
# Datatypes
df_salesperson_new = df_salesperson_new.withColumn("EmployeeKey", col("EmployeeKey").cast("int"))
df_salesperson_new = df_salesperson_new.withColumn("EmployeeID", col("EmployeeID").cast("int"))
# Column names
df_salesperson_new = df_salesperson_new.withColumnRenamed("UPN", "Email")
df_salesperson_new = df_salesperson_new.withColumnRenamed("Salesperson", "SalespersonName")

# columns added for scd
df_salesperson_new = df_salesperson_new.withColumn("isActive", lit(True))
df_salesperson_new = df_salesperson_new.withColumn("startDate", date_format(current_date(), "yyyy-MM-dd").cast("date"))
df_salesperson_new = df_salesperson_new.withColumn("endDate", lit("9999-12-31").cast("date"))

display(df_salesperson_new)

StatementMeta(, 304c2258-b4ba-4514-928b-28e5b738b79a, 8, Finished, Available)

SynapseWidget(Synapse.DataFrame, 9af126d0-43d5-4260-9496-1fab973d93c7)

## writing and loading silver

In [7]:
from pyspark.sql.functions import sha2, concat_ws

columns_except_employee_key = [col_name for col_name in df_salesperson.columns if col_name != "EmployeeKey" and col_name != "startDate" and col_name != "endDate"]
concatenated_columns = concat_ws("||", *columns_except_employee_key)
df_salesperson = df_salesperson.withColumn("hashdiff", sha2(concatenated_columns, 256))
df_salesperson_new = df_salesperson_new.withColumn("hashdiff", sha2(concatenated_columns, 256))

# Creation of the mergekey
df_salesperson_new = df_salesperson_new.withColumn("MergeKey", df_salesperson_new["EmployeeKey"])

display(df_salesperson)
display(df_salesperson_new)

StatementMeta(, 304c2258-b4ba-4514-928b-28e5b738b79a, 9, Finished, Available)

SynapseWidget(Synapse.DataFrame, ca5dfe5f-06a8-4734-b7a1-d6d23d8e6412)

SynapseWidget(Synapse.DataFrame, 7d925f7e-fe2f-460c-806f-e605b53aac2a)

## Check if the silver table already exists, otherwise initialize it

In [8]:
try:
    df_salesperson_silver = spark.read.format("delta").load(f"{silver_tables_path}/Salesperson")
except Exception as e:
    print("error:", e)
    df_salesperson.write.format("delta").mode("overwrite").save(f"{silver_tables_path}/Salesperson")

df_salesperson_silver = spark.read.format("delta").load(f"{silver_tables_path}/Salesperson")


StatementMeta(, 304c2258-b4ba-4514-928b-28e5b738b79a, 10, Finished, Available)

## Rename columns of df_sales_silver

In [9]:
df_salesperson_silver = df_salesperson_silver.select([col(col_name).alias("silver_" + col_name) for col_name in df_salesperson_silver.columns])
display(df_salesperson_silver)

StatementMeta(, 304c2258-b4ba-4514-928b-28e5b738b79a, 11, Finished, Available)

SynapseWidget(Synapse.DataFrame, 26c1c048-e9f5-496d-9dbc-c11aa8d02238)

## SCD Step 1: Left join to get the upserts

In [10]:
from pyspark.sql.functions import expr

df_salesperson_joined = df_salesperson_new \
    .join(df_salesperson_silver, df_salesperson_new['EmployeeKey'] == df_salesperson_silver['silver_EmployeeKey'], 'left') \
    .where(expr("silver_isActive = True AND silver_hashdiff != hashdiff"))

# With a left anti-join we figure out out what the upserts 
# https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-join.html#:~:text=An%20anti%20join%20returns%20values,as%20a%20left%20anti%20join.

df_salesperson_insert = df_salesperson_new.join(
    df_salesperson_silver,
    df_salesperson_new['EmployeeKey'] == df_salesperson_silver['silver_EmployeeKey'],
    'left_anti'
)

# We set the mergekeys to none/key in order for the merge operation to recognize if it's an insert or an update
df_salesperson_joined = df_salesperson_joined.withColumn("MergeKey", lit(None))
df_salesperson_insert = df_salesperson_insert.withColumn("MergeKey", lit(None))
# This df is a copy of the joined df, but with the key set to EmployeeKey to make the update work
df_salesperson_upsert = df_salesperson_joined.withColumn("MergeKey", df_salesperson_joined["EmployeeKey"])



StatementMeta(, 304c2258-b4ba-4514-928b-28e5b738b79a, 12, Finished, Available)

In [11]:
from pyspark.sql.functions import col, lit, when, concat_ws, sha2

# Drop columns from silver after the join, assuming df_salesperson_joined has the joined data
silver_columns = [col_name for col_name in df_salesperson_joined.columns if col_name.startswith("silver_")]
df_salesperson_joined = df_salesperson_joined.drop(*silver_columns)
df_salesperson_upsert = df_salesperson_upsert.drop(*silver_columns)

# We make a union of all the rows that need to be inserted or updated
df_salesperson_joined = df_salesperson_joined.union(df_salesperson_upsert)
df_salesperson_joined = df_salesperson_joined.union(df_salesperson_insert)

display(df_salesperson_joined)

StatementMeta(, 304c2258-b4ba-4514-928b-28e5b738b79a, 13, Finished, Available)

SynapseWidget(Synapse.DataFrame, 0350e41a-cc8b-4f07-babe-2bfcb50d4ac6)

## 

In [12]:
from delta.tables import *
from pyspark.sql import functions as F

silver_delta_table = DeltaTable.forPath(spark, f"{silver_tables_path}/Salesperson")
df_salesperson_silver = spark.read.format("delta").load(f"{silver_tables_path}/Salesperson")


# Merge for inserting new records
silver_delta_table.alias('silver') \
    .merge(
        df_salesperson_joined.alias('insert'),
        'silver.EmployeeKey = insert.MergeKey'
    ) \
    .whenNotMatchedInsert(values={
        "EmployeeKey": "insert.EmployeeKey",
        "EmployeeID": "insert.EmployeeID",
        "SalespersonName": "insert.SalespersonName",
        "Title": "insert.Title",
        "Email": "insert.Email",
        "isActive": "insert.isActive",
        "startDate": "insert.startDate",
        "endDate": "insert.endDate",
        "hashdiff": "insert.hashdiff"
    }) \
        .whenMatchedUpdate(condition="silver.hashdiff != insert.hashdiff", set={
        "isActive": F.lit(False),
        "endDate": F.current_date()
    }) \
    .execute()

# Reload the Delta table to verify the changes
df_salesperson_silver = spark.read.format("delta").load(f"{silver_tables_path}/Salesperson")
display(df_salesperson_silver)


StatementMeta(, 304c2258-b4ba-4514-928b-28e5b738b79a, 14, Finished, Available)

SynapseWidget(Synapse.DataFrame, 3c627d5c-0c7a-4ca7-a3fe-91b8fa49f84b)