In [0]:
from pyspark.sql.functions import col, lit, current_timestamp, monotonically_increasing_id, row_number, max as max_
from pyspark.sql.window import Window
from delta.tables import DeltaTable

silver_table = "databricks_cata.silver.products_silver"
gold_table = "databricks_cata.gold.DimProducts"
gold_path = "abfss://gold@ayaneteprojstor.dfs.core.windows.net/DimProducts"

### **Data Reading, Cleaning and Quality Assignment**

In [0]:
source_df = spark.read.table(silver_table)

clean_source_df = source_df.filter(
    (col("product_id").isNotNull()) & (col("product_name").isNotNull())
)

updates_df = clean_source_df.withColumn("processing_time", current_timestamp())

### **SCD Type 2 Merge Operation**

In [0]:
table_exists = spark.catalog.tableExists(gold_table)

if not table_exists:
    print(f"Table {gold_table} does not exist. Creating it for the first time.")
    
    initial_df = updates_df.withColumn("DimProductKey", monotonically_increasing_id()) \
                           .withColumn("__CURRENT_FLAG", lit(True)) \
                           .withColumn("__START_AT", col("processing_time")) \
                           .withColumn("__END_AT", lit(None).cast("timestamp"))

    initial_df.select(
        "DimProductKey",
        "product_id", 
        "product_name",
        "category",
        "brand",
        "price",
        "discounted_price", 
        "__CURRENT_FLAG", 
        "__START_AT", 
        "__END_AT"
    ).write.format("delta").option("path", gold_path).saveAsTable(gold_table)
    
else:
    print(f"Table {gold_table} exists. Applying changes.")
    
    target_table = DeltaTable.forName(spark, gold_table)
    target_df = target_table.toDF()

    max_key = target_df.select(max_("DimProductKey")).collect()[0][0]

    target_table.alias("target").merge(
        source=updates_df.alias("source"),
        condition="target.product_id = source.product_id"
    ) \
    .whenMatchedUpdate(
        condition="target.__CURRENT_FLAG = true AND target.product_name <> source.product_name",
        set={
            "__CURRENT_FLAG": "false",
            "__END_AT": "source.processing_time"
        }
    ) \
    .execute()

    new_and_updated_records_df = updates_df.join(
        target_df.where("__CURRENT_FLAG = true"),
        updates_df.product_id == target_df.product_id,
        "left_anti"
    )
    
    if not new_and_updated_records_df.isEmpty():
        window = Window.orderBy(col("product_id"))

        records_to_insert = new_and_updated_records_df \
            .withColumn("rn", row_number().over(window)) \
            .withColumn("DimProductKey", col("rn") + max_key) \
            .withColumn("__CURRENT_FLAG", lit(True)) \
            .withColumn("__START_AT", col("processing_time")) \
            .withColumn("__END_AT", lit(None).cast("timestamp")) \
            .select("DimProductKey", "product_id", "product_name", "__CURRENT_FLAG", "__START_AT", "__END_AT")
        
        records_to_insert.write.format("delta").mode("append").option("path", gold_path).saveAsTable(gold_table)
