## Silver to Gold Layer
------------------------------------------------------------

In [0]:
%sql
create database if not exists hive_metastore.bankdb

In [0]:
%sql
CREATE TABLE IF NOT EXISTS  hive_metastore.bankdb.transactions (
    transaction_id INT,
    account_id INT,
    transaction_date DATE,
    transaction_amount FLOAT,
    transaction_type STRING,
    CreatedDate TIMESTAMP,
    UpdatedDate TIMESTAMP,
    CreatedBy STRING,
    UpdatedBy STRING,
    HashKey BIGINT
)
USING DELTA
LOCATION  '/mnt/project2/gold/transactions';

In [0]:
%sql
CREATE TABLE IF NOT EXISTS hive_metastore.bankdb.loan_payments (
    payment_id INT,
    loan_id INT,
    payment_date DATE,
    payment_amount FLOAT,
    CreatedDate TIMESTAMP,
    UpdatedDate TIMESTAMP,
    CreatedBy STRING,
    UpdatedBy STRING,
    HashKey BIGINT
)
USING DELTA
LOCATION '/mnt/project2/gold/loan_payments';

In [0]:
%sql
CREATE TABLE IF NOT EXISTS hive_metastore.bankdb.loans (
    loan_id INT,
    customer_id INT,
    loan_amount FLOAT,
    interest_rate FLOAT,
    loan_term INT,
    CreatedDate TIMESTAMP,
    UpdatedDate TIMESTAMP,
    CreatedBy STRING,
    UpdatedBy STRING,
    HashKey BIGINT
)
USING DELTA
LOCATION '/mnt/project2/gold/loans';

In [0]:
%sql
CREATE TABLE IF NOT EXISTS hive_metastore.bankdb.customers (
    customer_id INT,    
    first_name STRING,
    last_name STRING,
    address STRING,
    city STRING,
    zip STRING,
    CreatedDate TIMESTAMP,
    UpdatedDate TIMESTAMP,
    CreatedBy STRING,
    UpdatedBy STRING,
    HashKey BIGINT
)
USING DELTA
LOCATION '/mnt/project2/gold/customers';

In [0]:
%sql
CREATE TABLE IF NOT EXISTS hive_metastore.bankdb.accounts (
    account_id INT,
    customer_id INT,
    account_type STRING,
    balance DOUBLE,
    CreatedDate TIMESTAMP,
    UpdatedDate TIMESTAMP,
    CreatedBy STRING,
    UpdatedBy STRING,
    HashKey BIGINT
)
USING DELTA
LOCATION '/mnt/project2/gold/accounts'

##### Check for new Data and load it into the tables:

In [0]:
def scd_type1_logic(
    silver_path: str,
    gold_path: str,
    join_key: str,
    update_columns: list,
    insert_columns: list,
    created_by: str = "databricks",
    updated_by: str = "databricks-updated"
):
    try:
        # Load Silver layer and add hash
        df_silver = spark.read.format("parquet").load(silver_path)
        from pyspark.sql.functions import current_timestamp, lit
        df_hashed = (
            df_silver
             .withColumn("HashKey", crc32(concat_ws("|", *df_silver.columns)))
            .withColumn("CreatedDate", current_timestamp())
            .withColumn("UpdatedDate", current_timestamp())
            .withColumn("CreatedBy", lit(created_by))
            .withColumn("UpdatedBy", lit(created_by))
        )
      
        # Load existing Gold table
        target_table = DeltaTable.forPath(spark, gold_path)

        # Anti-join to find new/changed rows
        df_src = (
        df_hashed.alias("src")
        .join(
            target_table.toDF().alias("tgt"),
            col(f"src.{join_key}") == col(f"tgt.{join_key}"),
            "left"
        )
        .filter(
            (col(f"tgt.{join_key}").isNull()) |  # New records
            (col("src.HashKey") != col("tgt.HashKey"))  # Changed records
        )
        .select("src.*")
        )

        if df_src.count() > 0:
            # Update dictionary
            update_set = {colname: f"source.{colname}" for colname in update_columns}
            update_set["UpdatedDate"] = current_timestamp()
            update_set["UpdatedBy"] = lit(updated_by)
            update_set["HashKey"] = "source.HashKey"

            # Insert dictionary
            insert_values = {colname: f"source.{colname}" for colname in insert_columns}
            insert_values.update({
                "CreatedDate": "source.CreatedDate",
                "UpdatedDate": current_timestamp(),
                "CreatedBy": "source.CreatedBy",
                "UpdatedBy": lit(created_by),
                "HashKey": "source.HashKey"
            })

            # Perform merge
            target_table.alias("target").merge(
                df_src.alias("source"),
                f"target.{join_key} = source.{join_key}"
            ).whenMatchedUpdate(
                set = update_set
            ).whenNotMatchedInsert(
                values = insert_values
            ).execute()

            print(f" --> Merge completed for: {gold_path}")
        else:
            print(f"No changes found for: {gold_path}")

    except Exception as e:
        print(f"Error in SCD Type 1 process: {str(e)}")


In [0]:
def scd_type1_logic(
    silver_path: str,
    gold_path: str,
    join_key: str,
    update_columns: list,
    insert_columns: list,
    created_by: str = "databricks",
    updated_by: str = "databricks-updated"
):
    try:
        # Load Silver layer and add hash
        df_silver = spark.read.format("parquet").load(silver_path)
        from pyspark.sql.functions import current_timestamp, lit
        df_hashed = (
            df_silver
             .withColumn("HashKey", crc32(concat_ws("|", *df_silver.columns)))
            .withColumn("CreatedDate", current_timestamp())
            .withColumn("UpdatedDate", current_timestamp())
            .withColumn("CreatedBy", lit(created_by))
            .withColumn("UpdatedBy", lit(created_by))
        )

        # Load existing Gold table
        target_table = DeltaTable.forPath(spark, gold_path)

        # Anti-join to find new/changed rows
        df_src = (
        df_hashed.alias("src")
        .join(
            target_table.toDF().alias("tgt"),
            col(f"src.{join_key}") == col(f"tgt.{join_key}"),
            "left"
        )
        .filter(
            (col(f"tgt.{join_key}").isNull()) |  # New records
            (col("src.HashKey") != col("tgt.HashKey"))  # Changed records
        )
        .select("src.*")
)


        if df_src.count() > 0:
            # Update dictionary
            update_set = {colname: f"source.{colname}" for colname in update_columns}
            update_set["UpdatedDate"] = current_timestamp()
            update_set["UpdatedBy"] = lit(updated_by)
            update_set["HashKey"] = "source.HashKey"

            # Insert dictionary
            insert_values = {colname: f"source.{colname}" for colname in insert_columns}
            insert_values.update({
                "CreatedDate": "source.CreatedDate",
                "UpdatedDate": current_timestamp(),
                "CreatedBy": "source.CreatedBy",
                "UpdatedBy": lit(created_by),
                "HashKey": "source.HashKey"
            })

            # Perform merge
            target_table.alias("target").merge(
                df_src.alias("source"),
                f"target.{join_key} = source.{join_key}"
            ).whenMatchedUpdate(
                set = update_set
            ).whenNotMatchedInsert(
                values = insert_values
            ).execute()

            print(f" --> Merge completed for: {gold_path}")
        else:
            print(f"No changes found for: {gold_path}")

    except Exception as e:
        print(f"Error in SCD Type 1 process: {str(e)}")


In [0]:
file_infos = dbutils.fs.ls(bronze_base_path)
file_names = [f.name for f in file_infos if f.name.endswith(".csv")]

# Loop through each file
for file in file_names:
    name = file.replace(".csv", "")

    # Dynamically define join key and columns per file
    if name == "accounts":
        join_key = "account_id"
        update_cols = insert_cols = ["account_id", "customer_id", "account_type", "balance"]

    elif name == "customers":
        join_key = "customer_id"
        update_cols = insert_cols = ["customer_id", "first_name","last_name",  "address", "city", "zip"]

    elif name == "loans":
        join_key = "loan_id"
        update_cols = insert_cols = ["loan_id", "customer_id", "loan_amount", "interest_rate", "loan_term"]

    elif name == "loan_payments":
        join_key = "payment_id"
        update_cols = insert_cols = ["payment_id", "loan_id", "payment_date", "payment_amount"]

    elif name == "transactions":
        join_key = "transaction_id"
        update_cols = insert_cols = ["transaction_id", "account_id", "transaction_date", "transaction_amount", "transaction_type"]

    else:
        print(f"Unknown file: {file}")
        continue

    silver_path = f"{silver_base_path}{name}.parquet"
    gold_path = f"{gold_base_path}{name}"

    print(f"Processing: {name}")
    scd_type1_logic(
        silver_path=silver_path,
        gold_path=gold_path,
        join_key=join_key,
        update_columns=update_cols,
        insert_columns=insert_cols
    )


In [0]:
df_gold_accounts = spark.read.format("delta").option("header", True).load(f"/mnt/project2/gold/accounts")
print("----------------------------------accounts.csv file Data---------------------------------")
display(df_gold_accounts)

In [0]:
df_gold_customers = spark.read.format("delta").option("header", True).load(f"{gold_base_path}customers")
print("---------------------------------customers.csv file Data--------------------------------")
display(df_gold_customers)

In [0]:
df_gold_loans = spark.read.format("delta").option("header", True).load(f"{gold_base_path}loans")
print("--------------------------------loans.csv file Data---------------------------------------")
display(df_gold_loans)

In [0]:
df_gold_loan_payments = spark.read.format("delta").option("header", True).load(f"{gold_base_path}loan_payments")
print("-----------------------------loan_payments.csv file Data---------------------------------")
display(df_gold_loan_payments)

In [0]:
df_gold_transactions = spark.read.format("delta").option("header", True).load(f"{gold_base_path}transactions")
print("-------------------------------transactions.csv file Data---------------------------------")
display(df_gold_transactions)