In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def apply_cast(df,casts:dict):
    # casts:{'col':"long"/"string"/"date"/"ts"}
    for c,t in casts.items():
        if t=="date":
            df=df.withColumn(c,F.to_date(F.col(c)))
        elif t=="timestamp":
            df=df.withColumn(c,F.to_timestamp(F.col(c)))
        else:
            df=df.withColumn(c,F.col(c).cast(t))
    return df

def add_event_key(df,key_cols,existing_key_col=None,sep="||"):
    # if existing event key, use it, else create
    if existing_key_col:
        df=df.withColumn("event_key",
                         F.when(
                             F.col(existing_key_col).isNotNull()&(F.col(existing_key_col)!=""),
                             F.col(existing_key_col).cast("string")
                         ).otherwise(F.sha2(F.concat_ws(sep,*[F.coalesce(F.col(c).cast("string"),F.lit("")) for c in key_cols]),
                                            256
                                            )
                                     )
                         )
    else:
        return df.withColumn("event_key",
                         F.sha2(F.concat_ws(sep,*[F.coalesce(F.col(c).cast("string"),F.lit("")) for c in key_cols]),
                                256
                         )
        )
    return df

def dedupe_lateste(df,key_col="event_key",order_col="ROW_INSERT_DATE"):
    w=Window.partitionBy(key_col).orderBy(F.col(order_col).desc_nulls_last())
    return df.withColumn("rn",F.row_number().over(w)).filter("rn=1").drop("rn")

def ensure_table_schema(df,silver_tbl,key_col="event_key",order_col="ROW_INSERT_DATE"):
    if not spark.catalog.tableExists(silver_tbl):
        w=Window.partitionBy(F.col(key_col)).orderBy(F.col(order_col).desc_nulls_last())
        final_df=(df.withColumn("rn",F.row_number().over(w)).filter("rn=1").drop('rn'))
        final_df.limit(0).write.format("delta").mode("overwrite").saveAsTable(silver_tbl)

def get_max_dt(silver_tbl,order_col):
    if not spark.catalog.tableExists(silver_tbl):
        return None
    return spark.sql(f"SELECT max({order_col}) AS max_dt FROM {silver_tbl}").collect()[0]['max_dt']

def apply_lookback(df,max_dt,order_col,lookback_days=2):
    if max_dt is None:
        return df
    return df.filter(F.col(order_col)>=F.date_sub(F.lit(max_dt),lookback_days))

def merge_delta(silver_tbl,staging_df,key_col="event_key",order_col="ROW_INSERT_DATE"):
    staging_df.createOrReplaceTempView("stg_upsert")
    spark.sql(f"""
    MERGE INTO {silver_tbl} t
    USING stg_upsert s
    ON s.{key_col}=t.{key_col}
    WHEN MATCHED AND coalesce(s.{order_col},to_date('1900-01-01'))>coalesce(t.{order_col},to_date('1900-01-01'))
    THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
    """)

def add_scd2_cols(df,start_col,track_cols,open_end="9999-12-31"):
    hash_exprs = [F.coalesce(F.col(c).cast("string"), F.lit("")) for c in track_cols]
    return (df
            .withColumn("row_hash", F.sha2(F.concat_ws("||", *hash_exprs), 256))
            .withColumn("effective_start_date", F.to_date(F.col(start_col)))
            .withColumn("effective_end_date", F.to_date(F.lit(open_end)))
            .withColumn("is_current", F.lit(True))
    )

def merge_data_SCD2(
    silver_tbl: str,
    staging_df,
    natural_keys=list,
    track_cols= list, # preference columns to track (exclude ROW_* audit cols)
    start_col: str = "CONSENT_UPDATE_TIMESTAMP",
    open_end: str = "9999-12-31"
):
    # 1) prepare SCD2 staging (row_hash + effective dates + current flag)
    hash_exprs = [F.coalesce(F.col(c).cast("string"), F.lit("")) for c in track_cols]
    u = staging_df.withColumn("_op", F.lit("U"))
    i = staging_df.withColumn("_op", F.lit("I"))

    stg = u.unionByName(i)
    stg.createOrReplaceTempView("stg_scd2")

    on_keys = " AND ".join([f"t.{k}=s.{k}" for k in natural_keys])

    # 2) Close current rows when changed (hash differs)
    spark.sql(f"""
        MERGE INTO {silver_tbl} AS t
        USING stg_scd2 AS s
        ON ({on_keys} AND t.is_current = true)
        WHEN MATCHED AND s._op='U' AND t.row_hash <> s.row_hash THEN
          UPDATE SET
            t.effective_end_date = date_sub(s.effective_start_date, 1),
            t.is_current = false
        WHEN NOT MATCHED AND s._op='I' THEN
          INSERT *
    """)



In [0]:
CFG = {
"bronze_tbl": "bronze_consent", 
"silver_tbl": "silver_consent", 
"natural_keys": ["CLIENT_ID","CONSENT_SOURCE"], # SCD2 natural key
"start_col": "CONSENT_UPDATE_TIMESTAMP", # prefer consent update time;
"order_col": "ROW_INSERT_DATE", # incremental watermark column (ingestion time)
"lookback_days": 2,

# Cast types (add/remove columns to match your file)
"casts": {
"CLIENT_ID": "long",
"CONSENT_UPDATE_FIELD": "string",
"CONSENT_UPDATE_TIMESTAMP": "timestamp",
"CONSENT_SOURCE": "string",
"ROW_INSERT_DATE": "date"
},
"key_cols": ["CLIENT_ID", "CONSENT_UPDATE_FIELD", "CONSENT_UPDATE_TIMESTAMP"],


# Columns that define "business change" (DO NOT include ROW_* audit cols)
"track_cols": [
"CLIENT_ID",
"CONSENT_UPDATE_TIMESTAMP",
"CONSENT_SOURCE"
]
# add your opt-in flags here if you have them, e.g. "EMAIL_OPT_IN","SMS_OPT_IN","PUSH_OPT_IN"

}

# 1) Read bronze
b = spark.table(CFG["bronze_tbl"])
b.display()



In [0]:

# 2) Cast / clean
s = apply_cast(b, CFG["casts"])
s.display()



In [0]:
# 3) Add deterministic event_key
s = add_event_key(s, CFG["key_cols"],existing_key_col=None,sep="||")
s.display()

In [0]:
s_scd2=add_scd2_cols(s,start_col="CONSENT_UPDATE_TIMESTAMP",track_cols=CFG["track_cols"])
s_scd2.display()
# 4) Ensure silver table exists with correct schema (incl. event_key)

In [0]:

# 4) Ensure silver table exists with correct schema (incl. event_key)
ensure_table_schema(s_scd2, CFG["silver_tbl"])
spark.table('silver_consent').display()


In [0]:
# 5) Incremental lookback window
max_dt = get_max_dt(CFG["silver_tbl"], CFG["order_col"])
inc = apply_lookback(s_scd2, max_dt, CFG["order_col"], CFG["lookback_days"])

# 6) Deduplicate within this batch
upsert_df = dedupe_lateste(inc, key_col="event_key", order_col=CFG["order_col"])
upsert_df.display()




In [0]:
#s_inc=dedupe_lateste(upsert_df,key_col=CFG['natural_keys'],order_col="CONSENT_UPDATE_TIMESTAMP")
#s_inc.display()

In [0]:

# 6) One-merge SCD2 upsert
merge_data_SCD2(
silver_tbl=CFG["silver_tbl"],
staging_df=upsert_df,
natural_keys=CFG["natural_keys"],
track_cols=CFG["track_cols"],
start_col=CFG["start_col"]
)

# 7) Quick check
display(
spark.table(CFG["silver_tbl"])
.orderBy(F.col("CLIENT_ID"), F.col("effective_start_date"))
)