In [None]:
def dynamic_merge(TableName: str, Destination_Table: str, primary_key_list: str):
    # Retrieve column names for both source and destination tables
    source_columns = [col[0] for col in spark.sql(f"DESCRIBE {TableName}").collect()]
    target_columns = [col[0] for col in spark.sql(f"DESCRIBE {Destination_Table}").collect()]
 
    # Filter columns to only those in source_columns that are also in target_columns
    matching_columns = [col for col in source_columns if col in target_columns]
 
    if not matching_columns:
        raise ValueError("No matching columns found between source and destination tables.")
 
    # Deduplicate the source table
    deduplicated_view = "deduplicated_source"
    dedup_query = f"""
    CREATE OR REPLACE TEMP VIEW {deduplicated_view} AS
    SELECT *
    FROM (
        SELECT *,
            ROW_NUMBER() OVER (
                PARTITION BY CUSTOMER_ID
                ORDER BY
                    CASE
                        WHEN ⁠ METADATA$ACTION ⁠ = 'DELETE' AND ⁠ METADATA$ISUPDATE ⁠ = FALSE THEN 1 -- High priority
                        WHEN ⁠ METADATA$ISUPDATE ⁠ = TRUE THEN 2  -- Medium priority
                        WHEN ⁠ METADATA$ACTION ⁠ = 'INSERT' AND ⁠ METADATA$ISUPDATE ⁠ = FALSE THEN 3 -- Low priority
                    END, current_timestamp DESC
            ) AS row_num
        FROM CUSTOMER_Staging_Table
    ) t
    WHERE row_num = 1
    """
    spark.sql(dedup_query)
    print("Source table deduplicated successfully.")
 
    # Display the deduplicated source table
    deduplicated_data = spark.sql(f"SELECT * FROM {deduplicated_view}")
    deduplicated_data.show(truncate=False)
 
    # Create temp views for counting purposes
    spark.sql(f"CREATE OR REPLACE TEMP VIEW to_delete AS SELECT * FROM {deduplicated_view} WHERE ⁠ METADATA$ACTION ⁠ = 'DELETE' AND ⁠ METADATA$ISUPDATE ⁠ = FALSE")
    spark.sql(f"CREATE OR REPLACE TEMP VIEW to_update AS SELECT * FROM {deduplicated_view} WHERE ⁠ METADATA$ACTION ⁠ = 'INSERT' AND ⁠ METADATA$ISUPDATE ⁠ = TRUE")
    spark.sql(f"CREATE OR REPLACE TEMP VIEW to_insert AS SELECT * FROM {deduplicated_view} WHERE ⁠ METADATA$ACTION ⁠ = 'INSERT' AND ⁠ METADATA$ISUPDATE ⁠ = FALSE")
 
    # Get counts before MERGE
    deleted_count = spark.sql("SELECT COUNT(*) AS count FROM to_delete").collect()[0]["count"]
    updated_count = spark.sql("SELECT COUNT(*) AS count FROM to_update").collect()[0]["count"]
    inserted_count = spark.sql("SELECT COUNT(*) AS count FROM to_insert").collect()[0]["count"]
 
    # Construct the MERGE query dynamically
    update_clause = ", ".join([f"target.{col} = stream_stage.{col}" for col in matching_columns])
    insert_columns = ", ".join(matching_columns)
    insert_values = ", ".join([f"stream_stage.{col}" for col in matching_columns])
 
    merge_sql = f"""
    MERGE INTO {Destination_Table} AS target
    USING {deduplicated_view} AS stream_stage
    ON {" AND ".join([f"target.{key} = stream_stage.{key}" for key in primary_key_list.split(",")])}
    WHEN MATCHED AND stream_stage.⁠ METADATA$ACTION ⁠ = 'DELETE' AND stream_stage.⁠ METADATA$ISUPDATE ⁠ = FALSE THEN DELETE
    WHEN MATCHED AND stream_stage.⁠ METADATA$ACTION ⁠ = 'INSERT' AND stream_stage.⁠ METADATA$ISUPDATE ⁠ = TRUE THEN
        UPDATE SET {update_clause}
    WHEN NOT MATCHED AND stream_stage.⁠ METADATA$ACTION ⁠ = 'INSERT' AND stream_stage.⁠ METADATA$ISUPDATE ⁠ = FALSE THEN
        INSERT ({insert_columns})
        VALUES ({insert_values})
    """
 
    # Execute the merge statement
    spark.sql(merge_sql)
    print("MERGE operation completed successfully.")
 
    # Print counts
    print(f"Deleted rows: {deleted_count}")
    print(f"Updated rows: {updated_count}")
    print(f"Inserted rows: {inserted_count}")

In [None]:
dynamic_merge(TableName, Destination_Table, primary_key_list)