# Parameters Section

In [None]:
# Parameters
source_workspace_name = "std-001-datamovement"
source_lakehouse_name = "lh_staging"
source_schema_name = "dbo"
source_table_name = "Person_Person"

destination_workspace_name = "std-001-datamovement"
destination_lakehouse_name = "lh_operations"
destination_schema_name = "dbo"
destination_table_name = "Person"

merge_ID_column ="BusinessEntityID"


In [None]:
# Formulate the full table names
source_full_table_name = f"`{source_lakehouse_name}`.`{source_table_name}`"
destination_full_table_name = f"`{destination_lakehouse_name}`.`{destination_table_name}`"

# Function to Merge Data Dynamically

In [None]:
def merge_data_scdtype1(source_full_table_name, destination_full_table_name, id_column):
    try:
        # Get columns from source and destination
        source_columns = [row[0] for row in spark.sql(f"DESCRIBE {source_full_table_name}").collect()]
        destination_columns = [row[0] for row in spark.sql(f"DESCRIBE {destination_full_table_name}").collect()]

        # Use only shared columns, exclude SCD2 fields if present
        scd2_cols = {'DWIsCurrent', 'DWStartDate', 'DWEndDate'}
        columns_to_use = [col for col in source_columns if col in destination_columns and col not in scd2_cols]

        # Generate dynamic clauses
        set_clause = ", ".join([f"target.{col} = source.{col}" for col in columns_to_use if col != id_column])
        insert_clause = ", ".join([f"source.{col}" for col in columns_to_use])
        insert_columns = ", ".join(columns_to_use)

        # Construct and run the MERGE query
        merge_query = f"""
        MERGE INTO {destination_full_table_name} AS target
        USING {source_full_table_name} AS source
        ON target.{id_column} = source.{id_column}
        WHEN MATCHED THEN
            UPDATE SET {set_clause}
        WHEN NOT MATCHED THEN
            INSERT ({insert_columns})
            VALUES ({insert_clause})
        """
        spark.sql(merge_query)
        print(f"✅ SCD Type 1 merge complete for table {destination_full_table_name}.")

        return "Succeed", f"SCD Type 1 merge completed for {destination_full_table_name}."

    except Exception as e:
        print(f"❌ Error during SCD Type 1 merge: {str(e)}")
        return "Error", f"An error occurred during the SCD Type 1 merge: {str(e)}"


In [None]:
def merge_data_scdtype2(source_full_table_name, destination_full_table_name, id_column):
    try:
        # Columns to compare
        source_cols = [row[0] for row in spark.sql(f"DESCRIBE {source_full_table_name}").collect()]
        dest_cols = [row[0] for row in spark.sql(f"DESCRIBE {destination_full_table_name}").collect()]
        scd_cols = {id_column, 'DWIsCurrent', 'DWStartDate', 'DWEndDate'}
        compare_cols = [col for col in source_cols if col in dest_cols and col not in scd_cols]

        # Check if destination is empty
        dest_count = spark.table(destination_full_table_name).count()
        if dest_count == 0:
            print("📭 Initial load: destination is empty.")
            insert_targets = ", ".join(source_cols + ["DWIsCurrent", "DWStartDate", "DWEndDate"])
            insert_values = ", ".join(source_cols + ["true", "current_timestamp()", "NULL"])
            spark.sql(f"""
                INSERT INTO {destination_full_table_name} ({insert_targets})
                SELECT {insert_values}
                FROM {source_full_table_name}
            """)
            inserted = spark.table(source_full_table_name).count()
            return "Succeed", f"{inserted} row(s) inserted (initial load)."

        # Build change detection condition
        change_condition = " OR ".join([
            f"(source.{col} IS DISTINCT FROM target.{col})"
            for col in compare_cols
        ])

        temp_view = "scd2_changes"
        spark.sql(f"""
            CREATE OR REPLACE TEMP VIEW {temp_view} AS
            SELECT source.*
            FROM {source_full_table_name} AS source
            LEFT JOIN (
                SELECT * FROM {destination_full_table_name} WHERE DWIsCurrent = true
            ) AS target
            ON source.{id_column} = target.{id_column}
            WHERE target.{id_column} IS NULL OR ({change_condition})
        """)

        # Count rows to be inserted
        changed_count = spark.sql(f"SELECT COUNT(*) AS c FROM {temp_view}").collect()[0]["c"]

        if changed_count == 0:
            spark.sql(f"DROP VIEW IF EXISTS {temp_view}")
            return "Succeed", "0 rows inserted. No changes detected."

        # Close previous versions
        spark.sql(f"""
            MERGE INTO {destination_full_table_name} AS target
            USING {temp_view} AS source
            ON target.{id_column} = source.{id_column} AND target.DWIsCurrent = true
            WHEN MATCHED THEN
              UPDATE SET target.DWIsCurrent = false,
                         target.DWEndDate = current_timestamp()
        """)

        # Insert new versions
        insert_targets = ", ".join(source_cols + ["DWIsCurrent", "DWStartDate", "DWEndDate"])
        insert_values = ", ".join(source_cols + ["true", "current_timestamp()", "NULL"])
        spark.sql(f"""
            INSERT INTO {destination_full_table_name} ({insert_targets})
            SELECT {insert_values}
            FROM {temp_view}
        """)

        spark.sql(f"DROP VIEW IF EXISTS {temp_view}")
        return "Succeed", f"{changed_count} row(s) inserted and old versions closed."

    except Exception as e:
        try:
            spark.sql("DROP VIEW IF EXISTS scd2_changes")
        except:
            pass
        return "Error", f"❌ Merge failed: {str(e)}"


# Main Script Execution

In [None]:
from pyspark.sql import SparkSession
import json

# Merge data into the persistent staging table
status, description = merge_data_scdtype2(source_full_table_name, destination_full_table_name, merge_ID_column)

# Prepare the result as a JSON string
result = {
    "status": status,
    "description": description
}

# Check the merge status and exit with an error if the merge failed
if status == "Error":
    print(f"Error: {description}")
    raise Exception(result)

# If the merge succeeded, exit normally
mssparkutils.notebook.exit(result)