In [0]:

from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
init_load_flag = int(dbutils.widgets.get("init_load_flag"))

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# **Data Consumption**

In [0]:
df = spark.sql("select * from azuredb_catalog.silver.products_silver")
df.display()




# **Deduplication**

In [0]:
df = df.dropDuplicates(subset=['product_id'])

### **New vs Old Records Filter**

In [0]:
if init_load_flag == 0:
    
    df_old = spark.sql("""
        select 
            DimProductKey,
            product_id,
            product_name,
            category,
            brand,
            price,
            discounted_price,
            effective_start_date,
            effective_end_date,
            is_current
        from azuredb_catalog.gold.DimProducts
        where is_current = true
    """)

else:

    df_old = spark.sql("""
        select 
            0 DimProductKey,
            '' product_id,
            '' product_name,
            '' category,
            '' brand,
            0.0 price,
            0.0 discounted_price,
            current_timestamp() effective_start_date,
            current_timestamp() effective_end_date,
            false is_current
        from azuredb_catalog.silver.products_silver
        where 1 = 0
    """)


In [0]:
df_old.display()


### **Renaming df_old Columns**

In [0]:
df_old = df_old \
    .withColumnRenamed("DimProductKey", "old_DimProductKey") \
    .withColumnRenamed("product_name", "old_product_name") \
    .withColumnRenamed("category", "old_category") \
    .withColumnRenamed("brand", "old_brand") \
    .withColumnRenamed("price", "old_price") \
    .withColumnRenamed("discounted_price", "old_discounted_price") \
    .withColumnRenamed("effective_start_date", "old_effective_start_date") \
    .withColumnRenamed("effective_end_date", "old_effective_end_date") \
    .withColumnRenamed("is_current", "old_is_current")


# **Historical Record Join for Change Detection**

In [0]:
df_join = df.join(
    df_old,
    df['product_id'] == df_old['product_id'],
    'left'
)

df_join.display()


# **New vs Changed vs Unchanged Records**

In [0]:
df_changes = df_join.filter(
    (col("old_DimProductKey").isNull()) |
    (col("product_name") != col("old_product_name")) |
    (col("category") != col("old_category")) |
    (col("brand") != col("old_brand")) |
    (col("price") != col("old_price")) |
    (col("discounted_price") != col("old_discounted_price"))
)


# **Preparing Expired Records (Old Versions)**

In [0]:
# Drop the duplicate product_id from df_old to resolve ambiguity
df_changes_clean = df_changes.drop(df_old["product_id"])

df_expired = df_changes_clean \
    .filter(col("old_DimProductKey").isNotNull()) \
    .select(
        col("old_DimProductKey").alias("DimProductKey"),
        col("product_id").alias("product_id"),
        col("old_product_name").alias("product_name"),
        col("old_category").alias("category"),
        col("old_brand").alias("brand"),
        col("old_price").alias("price"),
        col("old_discounted_price").alias("discounted_price"),
        col("old_effective_start_date").alias("effective_start_date"),
        current_timestamp().alias("effective_end_date"),
        lit(False).alias("is_current")
    )

In [0]:
df_expired.display()


# **Preparing New Current Records**

In [0]:
df_new = df_changes_clean \
    .filter(
        col("old_DimProductKey").isNull() |
        (col("product_name") != col("old_product_name")) |
        (col("category") != col("old_category")) |
        (col("brand") != col("old_brand")) |
        (col("price") != col("old_price")) |
        (col("discounted_price") != col("old_discounted_price"))
    ) \
    .select(
        col("product_id"),
        col("product_name"),
        col("category"),
        col("brand"),
        col("price"),
        col("discounted_price"),
        current_timestamp().alias("effective_start_date"),
        lit("9999-12-31").cast("timestamp").alias("effective_end_date"),
        lit(True).alias("is_current")
    )


In [0]:
df_new.display()


# **Surrogate Key Generation**

In [0]:
df_new = df_new.withColumn("DimProductKey", monotonically_increasing_id() + lit(1))


## **Integrating Max Surrogate Key**


In [0]:
if init_load_flag == 1 or not spark.catalog.tableExists("azuredb_catalog.gold.DimProducts"):
    max_surrogate_key = 0
else:
    df_maxsur = spark.sql("""
        SELECT MAX(DimProductKey) AS max_surrogate_key
        FROM azuredb_catalog.gold.DimProducts
    """)
    max_surrogate_key = df_maxsur.collect()[0]["max_surrogate_key"]


In [0]:
df_new = df_new.withColumn(
    "DimProductKey",
    lit(max_surrogate_key) + col("DimProductKey")
)


# **Union Expired + New Records**

In [0]:
df_final = df_expired.unionByName(df_new)
df_final.display()


# **SCD TYPE-2 MERGE (Delta)**

In [0]:
from delta.tables import DeltaTable

if spark.catalog.tableExists("azuredb_catalog.gold.DimProducts"):

    dlt_obj = DeltaTable.forPath(
        spark,
        "abfss://gold@dbxdev.dfs.core.windows.net/DimProducts"
    )

    dlt_obj.alias("target") \
        .merge(
            df_final.alias("source"),
            "source.DimProductKey = target.DimProductKey"
        ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

else:

    df_final.write.mode("overwrite") \
        .format("delta") \
        .option("path", "abfss://gold@dbxdev.dfs.core.windows.net/DimProducts") \
        .saveAsTable("azuredb_catalog.gold.DimProducts")


In [0]:
%sql
SELECT *
FROM azuredb_catalog.gold.dimproducts
ORDER BY product_id, effective_start_date;
