In [2]:
# from pyspark.sql.types import *

# accounts_schema = StructType([
#     StructField("account_id", IntegerType(), True),
#     StructField("customer_id", IntegerType(), True),
#     StructField("account_type", StringType(), True),
#     StructField("balance", FloatType(), True),
#     StructField("hashkey", LongType(), True),
#     StructField("createdby", StringType(), True),
#     StructField("createddate", TimestampType(), True),
#     StructField("updatedby", StringType(), True),
#     StructField("updateddate", TimestampType(), True),
# ])

# spark.createDataFrame([], accounts_schema).write.format("delta").saveAsTable("lh_project4.accounts_scd1")


# customers_schema = StructType([
#     StructField("customer_id", IntegerType(), True),
#     StructField("first_name", StringType(), True),
#     StructField("last_name", StringType(), True),
#     StructField("address", StringType(), True),
#     StructField("city", StringType(), True),
#     StructField("state", StringType(), True),
#     StructField("zip", StringType(), True),
#     StructField("hashkey", LongType(), True),
#     StructField("createdby", StringType(), True),
#     StructField("createddate", TimestampType(), True),
#     StructField("updatedby", StringType(), True),
#     StructField("updateddate", TimestampType(), True)
# ])

# spark.createDataFrame([], customers_schema).write.format("delta").saveAsTable("lh_project4.customers_scd1")


# loan_payments_schema = StructType([
#     StructField("payment_id", IntegerType(), True),
#     StructField("loan_id", IntegerType(), True),
#     StructField("payment_date", DateType(), True),
#     StructField("payment_amount", FloatType(), True),
#     StructField("hashkey", LongType(), True),
#     StructField("createdby", StringType(), True),
#     StructField("createddate", TimestampType(), True),
#     StructField("updatedby", StringType(), True),
#     StructField("updateddate", TimestampType(), True)
# ])

# spark.createDataFrame([], loan_payments_schema).write.format("delta").saveAsTable("lh_project4.loan_payments_scd1")


# loans_schema = StructType([
#     StructField("loan_id", IntegerType(), True),
#     StructField("customer_id", IntegerType(), True),
#     StructField("loan_amount", FloatType(), True),
#     StructField("interest_rate", FloatType(), True),
#     StructField("loan_term", IntegerType(), True),
#     StructField("hashkey", LongType(), True),
#     StructField("createdby", StringType(), True),
#     StructField("createddate", TimestampType(), True),
#     StructField("updatedby", StringType(), True),
#     StructField("updateddate", TimestampType(), True)
# ])

# spark.createDataFrame([], loans_schema).write.format("delta").saveAsTable("lh_project4.loans_scd1")

# transactions_schema = StructType([
#     StructField("transaction_id", IntegerType(), True),
#     StructField("account_id", IntegerType(), True),
#     StructField("transaction_date", DateType(), True),
#     StructField("transaction_amount", FloatType(), True),
#     StructField("transaction_type", StringType(), True),
#     StructField("hashkey", LongType(), True),
#     StructField("createdby", StringType(), True),
#     StructField("createddate", TimestampType(), True),
#     StructField("updatedby", StringType(), True),
#     StructField("updateddate", TimestampType(), True)
# ])

# spark.createDataFrame([], transactions_schema).write.format("delta").saveAsTable("lh_project4.transactions_scd1")




StatementMeta(, 54d6c523-c3e5-45f4-8ab2-edba49e2f928, 4, Finished, Available, Finished)

In [1]:
from pyspark.sql import Row
from pyspark.sql.functions import col,current_timestamp,lit,crc32,concat
from com.microsoft.spark.fabric import Constants


StatementMeta(, ef118f81-d938-4c08-b81a-76705d410c49, 3, Finished, Available, Finished)

In [2]:
def read_table(name):
    df = spark.read.synapsesql(f'wh_project4.dbo.{name}')
    return df
        

StatementMeta(, 38dee4ff-74c5-4309-baa7-fc68c49aaa83, 4, Finished, Available, Finished)

In [3]:
def generate_hashkey(df):
    df_hashkey=df.withColumn('hashkey',crc32(concat(*df.columns)))
    return df_hashkey

StatementMeta(, 38dee4ff-74c5-4309-baa7-fc68c49aaa83, 5, Finished, Available, Finished)

In [1]:
from delta.tables import DeltaTable

    delta_tbl = DeltaTable.forName(spark,f'lh_project4.{name}_scd1')
    tgt_df=delta_tbl.toDF()


def merge_new_data(name, df_hashkey):
    
    join_keys = {
        'accounts': 'account_id',
        'customers': 'customer_id',
        'loan_payments': 'payment_id',
        'loans': 'loan_id',
        'transactions': 'transaction_id'
    }
    
    join_key = join_keys[name]

    src = df_hashkey.alias('src')
    tgt = tgt_df.alias('tgt')

    join_condition = col(f'src.{join_key}') == col(f'tgt.{join_key}')
    filter_condition = (col(f'tgt.{join_key}').isNull()) | (col('src.hashkey') != col('tgt.hashkey'))

    filtered_src = src.join(tgt, join_condition, 'left').filter(filter_condition).select('src.*')

    # Add columns in src list
    src_cols = []  
    for col_name in filtered_src.columns:
        src_cols.append(col_name)  # Add each column name

    # Update expression 
    update_expr = {}  # Start with an empty dictionary
    for col_name in src_cols:
        if col_name not in ['createdby', 'createddate']:     #exclude createdby and createddate
            update_expr[col_name] = f"src.{col_name}"         #colname:src_colname

    # Overwrite values for updatedby and updateddate
    update_expr['updatedby'] = lit('Databricks-updated')
    update_expr['updateddate'] = current_timestamp()

    # Insert expression 
    insert_expr = {}  # Start with an empty dictionary
    for col_name in src_cols:
        insert_expr[col_name] = f"src.{col_name}"

    # Override audit fields for insert
    insert_expr['createdby'] = lit('Databricks')
    insert_expr['createddate'] = current_timestamp()
    insert_expr['updatedby'] = lit('Databricks')
    insert_expr['updateddate'] = current_timestamp()

    # Merge into Gold Delta table
    delta_tbl.alias('tgt').merge(
        filtered_src.alias('src'),
        join_condition
    ).whenMatchedUpdate(set=update_expr
    ).whenNotMatchedInsert(values=insert_expr
    ).execute()

    print(f"Merged data into table: {name}")


StatementMeta(, f32d72ef-64b1-4d9d-977c-c263459c796a, 3, Finished, Available, Finished)

IndentationError: unexpected indent (3174284674.py, line 4)