In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp, row_number, col
from pyspark.sql.window import Window

# Read Bronze Customers
bronze_df = spark.table("bronze_db.customers")

# Deduplicate Bronze (latest per customer_id)
window_spec = Window.partitionBy("customer_id").orderBy(col("ingestion_ts").desc())

dedup_df = (
    bronze_df
    .withColumn("rn", row_number().over(window_spec))
    .filter(col("rn") == 1)
    .drop("rn")
)

# Initial load or incremental merge
if not spark.catalog.tableExists("silver_db.customers"):
    
    silver_df = (
        dedup_df
        .withColumn("created_ts", current_timestamp())
        .withColumn("modified_ts", current_timestamp())
    )
    
    silver_df.write.format("delta").saveAsTable("silver_db.customers")

else:
    target = DeltaTable.forName(spark, "silver_db.customers")

    staged_df = dedup_df.withColumn("etl_ts", current_timestamp())

    (
        target.alias("t")
        .merge(
            staged_df.alias("s"),
            "t.customer_id = s.customer_id"
        )
        .whenMatchedUpdate(
            condition="""
                NOT (
                    t.name  <=> s.name  AND
                    t.email <=> s.email AND
                    t.city  <=> s.city
                )
            """,
            set={
                "name": "s.name",
                "email": "s.email",
                "city": "s.city",
                "ingestion_ts": "s.etl_ts",
                "modified_ts": "s.etl_ts"
            }
        )
        .whenNotMatchedInsert(
            values={
                "customer_id": "s.customer_id",
                "name": "s.name",
                "email": "s.email",
                "city": "s.city",
                "ingestion_ts": "s.etl_ts",
                "created_ts": "s.etl_ts",
                "modified_ts": "s.etl_ts"
            }
        )
        .execute()
    )
