In [25]:
# List of CSV filenames containing different financial datasets to be processed or analyzed

tables = ['accounts.csv', 'customers.csv', 'loan_payments.csv', 'loans.csv', 'transactions.csv']

StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 39, Finished, Available, Finished)

In [26]:
# Load multiple tables from Synapse SQL (warehouse) into a list of Spark DataFrames

from com.microsoft.spark.fabric.Constants import Constants

all_data_df = []
for i in range(0,len(tables)):
    tbl = tables[i].replace(".csv", "")
    tbl_prm = "singh_warehouse.dbo."+tbl
    all_data_df.append(spark.read.synapsesql(tbl_prm))

print(len(all_data_df))

StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 40, Finished, Available, Finished)

5


In [27]:
%%sql

-- Create Delta Table: Accounts Table
CREATE TABLE IF NOT EXISTS accounts_scd1 (
  AccountId INT,
  CustomerId INT,
  AccountType STRING,
  Balance DECIMAL(18,2),
  CreatedBy STRING,
  CreatedDate TIMESTAMP,
  UpdatedBy STRING,
  UpdatedDate TIMESTAMP,
  Hashkey BIGINT
);

-- Create Delta Table: Customers Table
CREATE TABLE IF NOT EXISTS customers_scd1 (
  CustomerId INT,
  FirstName STRING,
  LastName STRING,
  Address STRING,
  City STRING,
  State STRING,
  Zip STRING,
  CreatedBy STRING,
  CreatedDate TIMESTAMP,
  UpdatedBy STRING,
  UpdatedDate TIMESTAMP,
  Hashkey BIGINT
);

-- Create Delta Tabler: Loan Payments Table
CREATE TABLE IF NOT EXISTS loan_payments_scd1 (
  PaymentId INT,
  LoanId INT,
  PaymentDate DATE,
  PaymentAmount DECIMAL(18,2),
  CreatedBy STRING,
  CreatedDate TIMESTAMP,
  UpdatedBy STRING,
  UpdatedDate TIMESTAMP,
  Hashkey BIGINT
);

-- Create Delta Table: Loans Table
CREATE TABLE IF NOT EXISTS loans_scd1 (
  LoanId INT,
  CustomerId INT,
  LoanAmount DECIMAL(10,2),
  InterestRate DECIMAL(5,2),
  LoanTerm INT,
  CreatedBy STRING,
  CreatedDate TIMESTAMP,
  UpdatedBy STRING,
  UpdatedDate TIMESTAMP,
  Hashkey BIGINT
);

-- Create Delta Table: Transactions Table
CREATE TABLE IF NOT EXISTS transactions_scd1 (
  TransactionId INT,
  AccountId INT,
  TransactionDate DATE,
  TransactionAmount DECIMAL(10,2),
  TransactionType STRING,
  CreatedBy STRING,
  CreatedDate TIMESTAMP,
  UpdatedBy STRING,
  UpdatedDate TIMESTAMP,
  Hashkey BIGINT
);


StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 45, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [28]:
# Add a Hashkey column to each DataFrame using CRC32 over all columns

from pyspark.sql.functions import crc32, concat

for i in range(0,len(tables)):
    all_data_df[i] = all_data_df[i].withColumn("Hashkey", crc32(concat(*all_data_df[i].columns)))

display(all_data_df[4])

StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 46, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3c732a1e-13ba-4f61-90e1-680556f16610)

In [29]:
# Load existing SCD1 Delta tables by name and store them in delta_tables and delta_table_dfs lists

from delta.tables import DeltaTable

delta_tables = []
delta_table_dfs = []

for i in range(len(tables)):
    tbl = tables[i].replace(".csv", "")
    tbl_prm = f"{tbl}_scd1"
    
    delta_table = DeltaTable.forName(spark, tbl_prm)
    delta_tables.append(delta_table)
    delta_table_dfs.append(delta_table.toDF())

delta_table_dfs[4].show()
print(len(delta_tables))
print(len(delta_table_dfs))

StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 47, Finished, Available, Finished)

+-------------+---------+---------------+-----------------+---------------+---------+-----------+---------+-----------+-------+
|TransactionId|AccountId|TransactionDate|TransactionAmount|TransactionType|CreatedBy|CreatedDate|UpdatedBy|UpdatedDate|Hashkey|
+-------------+---------+---------------+-----------------+---------------+---------+-----------+---------+-----------+-------+
+-------------+---------+---------------+-----------------+---------------+---------+-----------+---------+-----------+-------+

5
5


In [30]:
# Define SCD1 merge logic for each table with corresponding update and insert field mappings

from pyspark.sql.functions import col, lit, current_timestamp

values = {
    "accounts.csv":
        {
            "set_values":
                {
                    "tgt.AccountId": "src.account_id",
                    "tgt.CustomerId": "src.customer_id",
                    "tgt.AccountType": "src.account_type",
                    "tgt.Balance": "src.balance",
                    "tgt.UpdatedBy": lit("Fabric-Notebook-Update"),
                    "tgt.UpdatedDate": current_timestamp(),
                    "tgt.Hashkey": "src.hashkey"
                },
            "insert_values":
                {
                    "tgt.AccountId": "src.account_id",
                    "tgt.CustomerId": "src.customer_id",
                    "tgt.AccountType": "src.account_type",
                    "tgt.Balance": "src.balance",
                    "tgt.CreatedBy": lit("Fabric-Notebook"),
                    "tgt.CreatedDate": current_timestamp(),
                    "tgt.UpdatedBy": lit("Fabric-Notebook"),
                    "tgt.UpdatedDate": current_timestamp(),
                    "tgt.Hashkey": "src.hashkey"
                }
        },

    "customers.csv":
        {
            "set_values":
                {
                    "tgt.CustomerId": "src.customer_id",
                    "tgt.FirstName": "src.first_name",
                    "tgt.LastName": "src.last_name",
                    "tgt.Address": "src.address",
                    "tgt.City": "src.city",
                    "tgt.State": "src.state",
                    "tgt.Zip": "src.zip",
                    "tgt.UpdatedBy": lit("Fabric-Notebook-Update"),
                    "tgt.UpdatedDate": current_timestamp(),
                    "tgt.Hashkey": "src.hashkey"
                },
            "insert_values":
                {
                    "tgt.CustomerId": "src.customer_id",
                    "tgt.FirstName": "src.first_name",
                    "tgt.LastName": "src.last_name",
                    "tgt.Address": "src.address",
                    "tgt.City": "src.city",
                    "tgt.State": "src.state",
                    "tgt.Zip": "src.zip",
                    "tgt.CreatedBy": lit("Fabric-Notebook"),
                    "tgt.CreatedDate": current_timestamp(),
                    "tgt.UpdatedBy": lit("Fabric-Notebook"),
                    "tgt.UpdatedDate": current_timestamp(),
                    "tgt.Hashkey": "src.hashkey"
                }
        },

    "loan_payments.csv":
        {
            "set_values": {
                "tgt.PaymentId": "src.payment_id",
                "tgt.LoanId": "src.loan_id",
                "tgt.PaymentDate": "src.payment_date",
                "tgt.PaymentAmount": "src.payment_amount",
                "tgt.UpdatedBy": lit("Fabric-Notebook-Update"),
                "tgt.UpdatedDate": current_timestamp(),
                "tgt.Hashkey": "src.hashkey"
            },
            "insert_values": {
                "tgt.PaymentId": "src.payment_id",
                "tgt.LoanId": "src.loan_id",
                "tgt.PaymentDate": "src.payment_date",
                "tgt.PaymentAmount": "src.payment_amount",
                "tgt.CreatedBy": lit("Fabric-Notebook"),
                "tgt.CreatedDate": current_timestamp(),
                "tgt.UpdatedBy": lit("Fabric-Notebook"),
                "tgt.UpdatedDate": current_timestamp(),
                "tgt.Hashkey": "src.hashkey"
            }
        },

    "loans.csv":
        {
            "set_values": {
                "tgt.LoanId": "src.loan_id",
                "tgt.CustomerId": "src.customer_id",
                "tgt.LoanAmount": "src.loan_amount",
                "tgt.InterestRate": "src.interest_rate",
                "tgt.LoanTerm": "src.loan_term",
                "tgt.UpdatedBy": lit("Fabric-Notebook-Update"),
                "tgt.UpdatedDate": current_timestamp(),
                "tgt.Hashkey": "src.hashkey"
            },
            "insert_values": {
                "tgt.LoanId": "src.loan_id",
                "tgt.CustomerId": "src.customer_id",
                "tgt.LoanAmount": "src.loan_amount",
                "tgt.InterestRate": "src.interest_rate",
                "tgt.LoanTerm": "src.loan_term",
                "tgt.CreatedBy": lit("Fabric-Notebook"),
                "tgt.CreatedDate": current_timestamp(),
                "tgt.UpdatedBy": lit("Fabric-Notebook"),
                "tgt.UpdatedDate": current_timestamp(),
                "tgt.Hashkey": "src.hashkey"
            }
        },

    "transactions.csv":
        {
            "set_values": {
                "tgt.TransactionId": "src.transaction_id",
                "tgt.AccountId": "src.account_id",
                "tgt.TransactionDate": "src.transaction_date",
                "tgt.TransactionAmount": "src.transaction_amount",
                "tgt.TransactionType": "src.transaction_type",
                "tgt.UpdatedBy": lit("Fabric-Notebook-Update"),
                "tgt.UpdatedDate": current_timestamp(),
                "tgt.Hashkey": "src.hashkey"
            },
            "insert_values": {
                "tgt.TransactionId": "src.transaction_id",
                "tgt.AccountId": "src.account_id",
                "tgt.TransactionDate": "src.transaction_date",
                "tgt.TransactionAmount": "src.transaction_amount",
                "tgt.TransactionType": "src.transaction_type",
                "tgt.CreatedBy": lit("Fabric-Notebook"),
                "tgt.CreatedDate": current_timestamp(),
                "tgt.UpdatedBy": lit("Fabric-Notebook"),
                "tgt.UpdatedDate": current_timestamp(),
                "tgt.Hashkey": "src.hashkey"
            }
        }
}


StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 48, Finished, Available, Finished)

In [31]:
# Extract target and source ID column mappings from the values dictionary

tgt_ids = []
src_ids = []

for i in range(0,len(tables)):
    tgt_ids.append(next(iter(values[tables[i]]["set_values"])))
    src_ids.append(values[tables[i]]["set_values"][tgt_ids[i]])
    
print(tgt_ids)
print(src_ids)

StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 49, Finished, Available, Finished)

['tgt.AccountId', 'tgt.CustomerId', 'tgt.PaymentId', 'tgt.LoanId', 'tgt.TransactionId']
['src.account_id', 'src.customer_id', 'src.payment_id', 'src.loan_id', 'src.transaction_id']


In [32]:
# Filter source records that do not exist in target based on ID and Hashkey (anti-join)

src1_dfs = []

for i in range(0,len(tables)):
    src1_dfs.append(all_data_df[i].alias("src").join(
        delta_table_dfs[i].alias("tgt"),
        ( 
            (col(src_ids[i]) == col(tgt_ids[i])) & (col("src.Hashkey") == col("tgt.Hashkey"))
        ),
        "anti"
    ).select("src.*"))

print(len(src1_dfs))

StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 50, Finished, Available, Finished)

5


In [33]:
# Perform SCD1 merge: update if matched, insert if not matched

for i in range(0,len(tables)):
    delta_tables[i].alias("tgt").merge(src1_dfs[i].alias("src"),((col(tgt_ids[i]) == col(src_ids[i]))))\
        .whenMatchedUpdate(
            set = values[tables[i]]["set_values"]
            )\
            .whenNotMatchedInsert(
                values =  values[tables[i]]["insert_values"]
            ).execute()

StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 51, Finished, Available, Finished)

In [34]:
df = spark.sql("SELECT * FROM singh_lakehouse.accounts_scd1 LIMIT 1000").distinct()
display(df)

StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 52, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 42461bd0-3b99-46b9-87f3-117122c296c6)

In [35]:
df = spark.sql("SELECT * FROM singh_lakehouse.customers_scd1 LIMIT 1000")
display(df)

StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 53, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 54372c98-5e55-482d-864b-fb5a67bc0d20)

In [36]:
df = spark.sql("SELECT * FROM singh_lakehouse.loans_scd1 LIMIT 1000")
display(df)

StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 54, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 27946478-07a8-44c5-822c-31e96359eae8)

In [37]:
df = spark.sql("SELECT * FROM singh_lakehouse.loan_payments_scd1 LIMIT 1000")
display(df)

StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 55, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7408963e-31a5-45c4-aa69-570bc2af0ee4)

In [38]:
df = spark.sql("SELECT * FROM singh_lakehouse.transactions_scd1 LIMIT 1000")
display(df)

StatementMeta(, 40a0061e-8894-4953-a573-d019fd9f4bd5, 56, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 908c1f0e-1129-4d3a-a7a9-6be862b16dfb)