In [1]:
df = spark.sql("SELECT * FROM lh_config.merge_rules_silver2gold LIMIT 1000")
display(df)

StatementMeta(, e5e9a859-973e-48ad-ad1a-8dfd74bd09d8, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 21c615dc-c9c5-4da3-a599-abc430f7ed10)

In [2]:
from pyspark.sql.functions import expr, current_timestamp

def merge_into_delta(source_df, target_table, merge_keys, update_columns, transform_exprs=None):
    try:
        target_df = spark.table(target_table)
    except Exception:

        df_transformed = source_df
        if transform_exprs:
            for col_name, expr_sql in transform_exprs.items():
                df_transformed = df_transformed.withColumn(col_name, expr(expr_sql))
        df_transformed = df_transformed.withColumn("insert_date", current_timestamp())
        (
            df_transformed.write.format("delta")
            .mode("overwrite")
            .saveAsTable(target_table)
        )
        print(f"Target table {target_table} not found â†’ Created new table with insert_date.")
        return

    df_transformed = source_df.alias("src")
    if transform_exprs:
        for col_name, expr_sql in transform_exprs.items():
            df_transformed = df_transformed.withColumn(col_name, expr(expr_sql))


    df_transformed = df_transformed.withColumn("insert_date", current_timestamp()) \
                                   .withColumn("update_date", current_timestamp())

    df_transformed.createOrReplaceTempView("src")


    merge_condition = " AND ".join([f"t.{k} = src.{k}" for k in merge_keys])

    set_clause = ", ".join([f"t.{c} = src.{c}" for c in update_columns] + ["t.update_date = src.update_date"])

    merge_sql = f"""
    MERGE INTO {target_table} AS t
    USING (SELECT * FROM src) AS src
    ON {merge_condition}
    WHEN MATCHED THEN UPDATE SET {set_clause}
    WHEN NOT MATCHED THEN INSERT *
    """

    spark.sql(merge_sql)
    print(f"Merge completed for {target_table} (with insert_date & update_date).")


StatementMeta(, c9163870-8f9d-41c0-8540-4279651c88ef, 4, Finished, Available, Finished)

In [3]:
import json

configs = [row.asDict() for row in spark.table("lh_config.merge_rules_silver2gold").filter("active_flag=1").collect()]

for cfg in configs:
    table_name = cfg["table_name"]
    target_table = cfg["target_table"]

    merge_keys = [c.strip() for c in cfg["merge_keys"].split(",")]
    update_columns = [c.strip() for c in cfg["update_columns"].split(",")]
    transform_exprs = json.loads(cfg["transform_exprs"]) if cfg["transform_exprs"] else {}

    print(f"Merge {table_name} -> into {target_table}")

    df_source = spark.table(f"{table_name}")

    merge_into_delta(
        source_df=df_source,
        target_table=target_table,
        merge_keys=merge_keys,
        update_columns=update_columns,
        transform_exprs=transform_exprs
    )


StatementMeta(, c9163870-8f9d-41c0-8540-4279651c88ef, 5, Finished, Available, Finished)

Processing lh_silver.sales -> lh_gold.sales
Merge completed for lh_gold.sales (with insert_date & update_date).
Processing lh_silver.payments -> lh_gold.payments
Merge completed for lh_gold.payments (with insert_date & update_date).
Processing lh_silver.invoices -> lh_gold.invoices
Merge completed for lh_gold.invoices (with insert_date & update_date).


StatementMeta(, c9163870-8f9d-41c0-8540-4279651c88ef, -1, Finished, Available, Finished)

In [5]:
df = spark.sql("SELECT * FROM lh_gold.invoices LIMIT 1000")
display(df)

StatementMeta(, c9163870-8f9d-41c0-8540-4279651c88ef, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3ea42008-af26-4a25-8dec-802c5edd8b57)