##### Create SCD-1 Tables

In [11]:
#------Imports-----------------
import com.microsoft.spark.fabric

StatementMeta(, a2c4f750-6efe-49b0-8e5c-278adaa3d543, 17, Finished, Available, Finished)

In [6]:
%%sql


CREATE TABLE IF NOT EXISTS  transactions_scd1 (
    transaction_id INT,
    account_id INT,
    transaction_date DATE,
    transaction_amount FLOAT,
    transaction_type STRING,
    CreatedDate TIMESTAMP,
    UpdatedDate TIMESTAMP,
    CreatedBy STRING,
    UpdatedBy STRING,
    HashKey BIGINT
);
CREATE TABLE IF NOT EXISTS loan_payments_scd1 (
    payment_id INT,
    loan_id INT,
    payment_date DATE,
    payment_amount FLOAT,
    CreatedDate TIMESTAMP,
    UpdatedDate TIMESTAMP,
    CreatedBy STRING,
    UpdatedBy STRING,
    HashKey BIGINT
);
CREATE TABLE IF NOT EXISTS loans_scd1 (
    loan_id INT,
    customer_id INT,
    loan_amount FLOAT,
    interest_rate FLOAT,
    loan_term INT,
    CreatedDate TIMESTAMP,
    UpdatedDate TIMESTAMP,
    CreatedBy STRING,
    UpdatedBy STRING,
    HashKey BIGINT
);
CREATE TABLE IF NOT EXISTS customers_scd1 (
    customer_id INT,    
    first_name STRING,
    last_name STRING,
    address STRING,
    city STRING,
    zip STRING,
    CreatedDate TIMESTAMP,
    UpdatedDate TIMESTAMP,
    CreatedBy STRING,
    UpdatedBy STRING,
    HashKey BIGINT
);
CREATE TABLE IF NOT EXISTS accounts_scd1 (
    account_id INT,
    customer_id INT,
    account_type STRING,
    balance DOUBLE,
    CreatedDate TIMESTAMP,
    UpdatedDate TIMESTAMP,
    CreatedBy STRING,
    UpdatedBy STRING,
    HashKey BIGINT
);

StatementMeta(, a2c4f750-6efe-49b0-8e5c-278adaa3d543, 11, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

##### Function for SCD 1

In [9]:
def scd_type1_logic(
    warehouse_table: str,
    lakehouse_table_name: str,
    join_key: str,
    update_columns: list,
    insert_columns: list,
    created_by: str = "fabric",
    updated_by: str = "fabric-updated"
):
    try:
        from delta.tables import DeltaTable
        from pyspark.sql.functions import col, crc32, concat_ws, current_timestamp, lit

        # Read from SQL Warehouse
        df_warehouse = spark.read.synapsesql(warehouse_table)

        # Add hash and audit columns
        df_hashed = (
            df_warehouse
            .withColumn("HashKey", crc32(concat_ws("|", *df_warehouse.columns)))
            .withColumn("CreatedDate", current_timestamp())
            .withColumn("UpdatedDate", current_timestamp())
            .withColumn("CreatedBy", lit(created_by))
            .withColumn("UpdatedBy", lit(created_by))
        )

        # Read the existing Lakehouse table by name
        target_table = DeltaTable.forName(spark, lakehouse_table_name)

        # Find new/changed records
        df_src = (
            df_hashed.alias("src")
            .join(
                target_table.toDF().alias("tgt"),
                col(f"src.{join_key}") == col(f"tgt.{join_key}"),
                "left"
            )
            .filter(
                col(f"tgt.{join_key}").isNull() |
                (col("src.HashKey") != col("tgt.HashKey"))
            )
            .select("src.*")
        )

        if df_src.count() > 0:
            update_set = {colname: f"source.{colname}" for colname in update_columns}
            update_set.update({
                "UpdatedDate": current_timestamp(),
                "UpdatedBy": lit(updated_by),
                "HashKey": "source.HashKey"
            })

            insert_values = {colname: f"source.{colname}" for colname in insert_columns}
            insert_values.update({
                "CreatedDate": "source.CreatedDate",
                "UpdatedDate": current_timestamp(),
                "CreatedBy": "source.CreatedBy",
                "UpdatedBy": lit(created_by),
                "HashKey": "source.HashKey"
            })

            target_table.alias("target").merge(
                df_src.alias("source"),
                f"target.{join_key} = source.{join_key}"
            ).whenMatchedUpdate(set=update_set).whenNotMatchedInsert(values=insert_values).execute()

            print(f"Merge completed for: {lakehouse_table_name}")
        else:
            print(f"No changes found for: {lakehouse_table_name}")

    except Exception as e:
        print(f"Error in SCD Type 1 process: {str(e)}")


StatementMeta(, a2c4f750-6efe-49b0-8e5c-278adaa3d543, 15, Finished, Available, Finished)

##### Calling the function

In [12]:
warehouse_name = "warehouse_hv"
schema = "dbo"
lakehouse_base_path = "/lakehouse/default/tables/"

table_names = ["accounts", "customers", "loans", "loan_payments", "transactions"]

for name in table_names:
    if name == "accounts":
        join_key = "account_id"
        update_cols = insert_cols = ["account_id", "customer_id", "account_type", "balance"]

    elif name == "customers":
        join_key = "customer_id"
        update_cols = insert_cols = ["customer_id", "first_name", "last_name", "address", "city", "zip"]

    elif name == "loans":
        join_key = "loan_id"
        update_cols = insert_cols = ["loan_id", "customer_id", "loan_amount", "interest_rate", "loan_term"]

    elif name == "loan_payments":
        join_key = "payment_id"
        update_cols = insert_cols = ["payment_id", "loan_id", "payment_date", "payment_amount"]

    elif name == "transactions":
        join_key = "transaction_id"
        update_cols = insert_cols = ["transaction_id", "account_id", "transaction_date", "transaction_amount", "transaction_type"]

    else:
        print(f"Unknown table: {name}")
        continue

    warehouse_table = f"{warehouse_name}.{schema}.{name}"
    lakehouse_table_name = f"{name}_scd1"

    print(f"Processing: {name}")
    scd_type1_logic(
        warehouse_table=warehouse_table,
        lakehouse_table_name=lakehouse_table_name,
        join_key=join_key,
        update_columns=update_cols,
        insert_columns=insert_cols
    )


StatementMeta(, a2c4f750-6efe-49b0-8e5c-278adaa3d543, 18, Finished, Available, Finished)

Processing: accounts
Merge completed for: accounts_scd1
Processing: customers
Merge completed for: customers_scd1
Processing: loans
Merge completed for: loans_scd1
Processing: loan_payments
Merge completed for: loan_payments_scd1
Processing: transactions
Merge completed for: transactions_scd1
