Read Source Delta Table


Register it as a temporary view (source_view) for optional SQL querying.


In [1]:
df_source = spark.read.format("delta").load("Tables/dbo/accounts")
df_source.createOrReplaceTempView("source_view")
src_path="Tables/dbo/accounts"

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 3, Finished, Available, Finished)

Create Target SCD Table (If Not Exists)

In [2]:
create_table_query = """
CREATE TABLE Account_SCD (
    account_id int,
    customer_id int,
    account_type string,
    balance float,
    hash_key BIGINT,
    created_by STRING,
    created_date TIMESTAMP,
    updated_by STRING,
    updated_date TIMESTAMP
)
USING DELTA
LOCATION 'Tables/Gold_layer/Account_SCD'
"""
# Execute the query to create the table
spark.sql(create_table_query)

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 4, Finished, Available, Finished)

DataFrame[]

 Load the Target Delta Table

In [3]:

from delta.tables import DeltaTable

target_path = "Tables/Gold_layer/Account_SCD"  
delta_target = DeltaTable.forPath(spark, target_path)

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 5, Finished, Available, Finished)

Generate Hash Key for Source Data

In [4]:
from pyspark.sql.functions import *
df_src1= df_source.withColumn("hash_key",crc32(concat(*df_source.columns)))

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 6, Finished, Available, Finished)

 Find Changed or New Records

In [5]:
df_src1=df_src1.alias("src").join(delta_target.toDF().alias("tgt"),
((col("src.account_id")==col("tgt.account_id"))&
(col("src.hash_key")==col("tgt.hash_key"))),"anti").select(col("src.*"))


StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 7, Finished, Available, Finished)

Merge for SCD Type 1 (Upsert Logic)

In [6]:

from pyspark.sql.functions import col

delta_target.alias("tgt").merge(df_src1.alias("src"),
"tgt.account_id = src.account_id")\
    .whenMatchedUpdate(set={"tgt.account_id":"src.account_id",
    "tgt.customer_id":"src.customer_id","tgt.account_type":"src.account_type",
    "tgt.balance":"src.balance","tgt.hash_key":"src.hash_key",
    "tgt.updated_date":current_timestamp(),"tgt.updated_by":lit("databricks_Updated")})\
        .whenNotMatchedInsert(values={"tgt.account_id":"src.account_id",
        "tgt.customer_id":"src.customer_id","tgt.account_type":"src.account_type",
        "tgt.balance":"src.balance","tgt.hash_key":"src.hash_key",
        "tgt.created_date":current_timestamp(),
        "tgt.created_by":lit("databricks"),"tgt.updated_date":current_timestamp(),
        "tgt.updated_by":lit("databricks")}).execute()
display(spark.read.format("delta").option("header","true").load(target_path))

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 9ad384cf-f625-4a2d-89fd-7f472c609918)

In [8]:
create_table_query = """
CREATE TABLE Customer_SCD1 (
    customer_id int,
    first_name string,
    last_name string,
    address string,
    city string,
    state string,
    zip string,
    hash_key BIGINT,
    created_by STRING,
    created_date TIMESTAMP,
    updated_by STRING,
    updated_date TIMESTAMP
)
USING DELTA
LOCATION 'Tables/Gold_layer/Customer_SCD1'
"""
# Execute the query to create the table
spark.sql(create_table_query)

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 10, Finished, Available, Finished)

DataFrame[]

In [29]:
from pyspark.sql.functions import *
df_source = spark.read.format("delta").load("Tables/dbo/customers")
df_source.createOrReplaceTempView("source_view")
src_path="Tables/dbo/customers"
print(src_path)

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 31, Finished, Available, Finished)

Tables/dbo/customers


In [30]:

df_src1=df_source.withColumn("hash_key",crc32(concat(*df_source.columns)))


StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 32, Finished, Available, Finished)

In [31]:

from delta.tables import DeltaTable
tgt_path = "Tables/Gold_layer/Customer_SCD1"  
dbtable = DeltaTable.forPath(spark, tgt_path)
dbtable.toDF().show()
     

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 33, Finished, Available, Finished)

+-----------+----------+---------+-------+----+-----+---+--------+----------+------------+----------+------------+
|customer_id|first_name|last_name|address|city|state|zip|hash_key|created_by|created_date|updated_by|updated_date|
+-----------+----------+---------+-------+----+-----+---+--------+----------+------------+----------+------------+
+-----------+----------+---------+-------+----+-----+---+--------+----------+------------+----------+------------+



In [33]:

df_src1=df_src1.alias("src").join(dbtable.toDF().alias("tgt"), 
((col("src.customer_id") == col("tgt.customer_id")) & (col("src.hash_key") == col("tgt.hash_key"))), 
"anti").select(col("src.*"))


StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 35, Finished, Available, Finished)

In [34]:
dbtable.alias("tgt").merge(df_src1.alias("src"),"tgt.customer_id = src.customer_id")\
    .whenMatchedUpdate(set={"tgt.customer_id":"src.customer_id","tgt.first_name":"src.first_name","tgt.last_name":"src.last_name","tgt.address":"src.address","tgt.city":"src.city","tgt.state":"src.state","tgt.zip":"src.zip","tgt.hash_key":"src.hash_key","tgt.updated_date":current_timestamp(),"tgt.updated_by":lit("databricks_Updated")})\
        .whenNotMatchedInsert(values={"tgt.customer_id":"src.customer_id","tgt.first_name":"src.first_name","tgt.last_name":"src.last_name","tgt.address":"src.address","tgt.city":"src.city","tgt.state":"src.state","tgt.zip":"src.zip","tgt.hash_key":"src.hash_key","tgt.created_date":current_timestamp(),"tgt.created_by":lit("databricks"),"tgt.updated_date":current_timestamp(),"tgt.updated_by":lit("databricks")}).execute()
     


StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 36, Finished, Available, Finished)

In [35]:
display(spark.read.format("delta").option("header","true").load(tgt_path))

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 37, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 37878011-df42-4dff-a2e5-ee8f96d5a27e)

In [36]:


create_table_query = """
CREATE TABLE Loan_Payment (
  payment_id int,
  loan_id int,
  payment_date timestamp,
  payment_amount float,
  hash_key bigint,
  created_by string,
  created_date timestamp,
  updated_by string,
  updated_date timestamp
)
USING DELTA
LOCATION 'Tables/Gold_layer/Loan_Payment'
"""
# Execute the query to create the table
spark.sql(create_table_query)

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 38, Finished, Available, Finished)

DataFrame[]

In [37]:
from pyspark.sql.functions import *
df_source = spark.read.format("delta").load("Tables/dbo/loan_payments")
df_source.createOrReplaceTempView("source_view")
src_path="Tables/dbo/loan_payments"
print(src_path)

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 39, Finished, Available, Finished)

Tables/dbo/loan_payments


In [38]:
df_src1=df_source.withColumn("hash_key",crc32(concat(*df_source.columns)))
display(df_src1)

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 40, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5dbc95d4-c7bf-4475-9c5f-3b3f12733d02)

In [39]:

from delta.tables import DeltaTable
tgt_path = "Tables/Gold_layer/Loan_Payment"  
dbtable = DeltaTable.forPath(spark, tgt_path)
dbtable.toDF().show()

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 41, Finished, Available, Finished)

+----------+-------+------------+--------------+--------+----------+------------+----------+------------+
|payment_id|loan_id|payment_date|payment_amount|hash_key|created_by|created_date|updated_by|updated_date|
+----------+-------+------------+--------------+--------+----------+------------+----------+------------+
+----------+-------+------------+--------------+--------+----------+------------+----------+------------+



In [40]:
df_src1=df_src1.alias("src").join(dbtable.toDF().alias("tgt"), ((col("src.payment_id") == col("tgt.payment_id")) & (col("src.hash_key") == col("tgt.hash_key"))), "anti").select(col("src.*"))
df_src1.show()

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 42, Finished, Available, Finished)

+----------+-------+------------+--------------+----------+
|payment_id|loan_id|payment_date|payment_amount|  hash_key|
+----------+-------+------------+--------------+----------+
|         1|     45|  2024-01-01|           100|3466784654|
|        99|      1|  2024-04-08|          5000|1369522191|
|         2|     23|  2024-01-02|           150|1385209747|
|        91|      2|  2024-03-31|          4600| 247445288|
|        91|      2|  2024-03-31|          4600| 247445288|
|         3|     67|  2024-01-03|           200|2829437187|
|         4|     89|  2024-01-04|           250|2467606655|
|        33|     64|  2024-02-02|          1700|4018524585|
|        33|     64|  2024-02-02|          1700|4018524585|
|         5|     12|  2024-01-05|           300|3225416827|
|         6|     34|  2024-01-06|           350|3597787397|
|        82|      3|  2024-03-22|          4150|3991820484|
|         7|     56|  2024-01-07|           400|2064246126|
|        73|      4|  2024-03-13|       

In [41]:
dbtable.alias("tgt").merge(df_src1.alias("src"),"tgt.payment_id = src.payment_id")\
    .whenMatchedUpdate(set={"tgt.payment_id":"src.payment_id","tgt.loan_id":"src.loan_id","tgt.payment_date":"src.payment_date","tgt.payment_amount":"src.payment_amount","tgt.hash_key":"src.hash_key","tgt.updated_date":current_timestamp(),"tgt.updated_by":lit("databricks_Updated")})\
        .whenNotMatchedInsert(values={"tgt.payment_id":"src.payment_id","tgt.loan_id":"src.loan_id","tgt.payment_date":"src.payment_date","tgt.payment_amount":"src.payment_amount","tgt.hash_key":"src.hash_key","tgt.created_date":current_timestamp(),"tgt.created_by":lit("databricks"),"tgt.updated_date":current_timestamp(),"tgt.updated_by":lit("databricks")}).execute()
    

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 43, Finished, Available, Finished)

In [42]:

display(spark.read.format("delta").option("header","true").load(tgt_path))

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 44, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f3ed813c-92c9-442c-858c-9152cd5620e9)

In [43]:
create_table_query = """
CREATE TABLE Loans1 (
  loan_id int,
  customer_id int,
  interest_rate float,
  loan_term int,
  loan_amount float,
  hash_key bigint,
  created_by string,
  created_date timestamp,
  updated_by string,
  updated_date timestamp
)
USING DELTA
LOCATION 'Tables/Gold_layer/Loans1'
"""
# Execute the query to create the table
spark.sql(create_table_query)

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 45, Finished, Available, Finished)

DataFrame[]

In [44]:
from pyspark.sql.functions import *
df_source = spark.read.format("delta").load("Tables/dbo/loans")
df_source.createOrReplaceTempView("source_view")
src_path="Tables/dbo/loans"
print(src_path)

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 46, Finished, Available, Finished)

Tables/dbo/loans


In [45]:

df_src1=df_source.withColumn("hash_key",crc32(concat(*df_source.columns)))
display(df_src1)

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 47, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5dc4181a-0b35-4673-aa19-ec523bdae32d)

In [46]:

from delta.tables import DeltaTable
tgt_path = "Tables/Gold_layer/Loans1"  
dbtable = DeltaTable.forPath(spark, tgt_path)
dbtable.toDF().show()

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 48, Finished, Available, Finished)

+-------+-----------+-------------+---------+-----------+--------+----------+------------+----------+------------+
|loan_id|customer_id|interest_rate|loan_term|loan_amount|hash_key|created_by|created_date|updated_by|updated_date|
+-------+-----------+-------------+---------+-----------+--------+----------+------------+----------+------------+
+-------+-----------+-------------+---------+-----------+--------+----------+------------+----------+------------+



In [47]:
df_ln1=df_src1.alias("src").join(dbtable.toDF().alias("tgt"), ((col("src.loan_id") == col("tgt.loan_id")) & (col("src.hash_key") == col("tgt.hash_key"))), "anti").select(col("src.*"))
df_ln1.show()
     

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 49, Finished, Available, Finished)

+-------+-----------+-----------+-------------+---------+----------+
|loan_id|customer_id|loan_amount|interest_rate|loan_term|  hash_key|
+-------+-----------+-----------+-------------+---------+----------+
|      1|         45|   10000.50|          5.5|       36|1485553409|
|      2|         12|   20000.75|          4.5|       48|3355992476|
|      2|         12|   20000.75|          4.5|       48|3355992476|
|     64|         12|   30000.00|          3.0|       24|2213555697|
|     64|         12|   30000.00|          3.0|       24|2213555697|
|      3|         78|   15000.00|          6.0|       60|1716007356|
|      4|         34|   30000.25|          3.5|       24|2847880128|
|     82|          2|   20000.50|          4.5|       48|3935580499|
|      5|         56|   25000.00|          5.0|       36| 951778087|
|      6|         23|   17500.50|          4.0|       48|3923234150|
|     11|          3|   10000.75|          6.0|       60|2179146733|
|      7|         89|   22500.75| 

In [48]:
dbtable.alias("tgt").merge(df_ln1.alias("src"),"tgt.loan_id = src.loan_id")\
    .whenMatchedUpdate(set={"tgt.loan_id":"src.loan_id","tgt.customer_id":"src.customer_id","tgt.loan_amount":"src.loan_amount","tgt.interest_rate":"src.interest_rate","tgt.loan_term":"src.loan_term","tgt.hash_key":"src.hash_key","tgt.updated_date":current_timestamp(),"tgt.updated_by":lit("databricks_Updated")})\
        .whenNotMatchedInsert(values={"tgt.loan_id":"src.loan_id","tgt.customer_id":"src.customer_id","tgt.loan_amount":"src.loan_amount","tgt.interest_rate":"src.interest_rate","tgt.loan_term":"src.loan_term","tgt.hash_key":"src.hash_key","tgt.created_date":current_timestamp(),"tgt.created_by":lit("databricks"),"tgt.updated_date":current_timestamp(),"tgt.updated_by":lit("databricks")}).execute()
 

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 50, Finished, Available, Finished)

In [49]:
display(spark.read.format("delta").option("header","true").load(tgt_path))

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 51, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, fb6cfe06-a7f7-4fee-a2e7-ae640d37cef4)

In [50]:
create_table_query = """
CREATE TABLE Transactions1 (
  transaction_id int,
  account_id int,
  transaction_date timestamp,
  transaction_amount float,
  transaction_type string,
  hash_key bigint,
  created_by string,
  created_date timestamp,
  updated_by string,
  updated_date timestamp
)
USING DELTA
LOCATION 'Tables/Gold_layer/Transactions1'
"""
# Execute the query to create the table
spark.sql(create_table_query)

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 52, Finished, Available, Finished)

DataFrame[]

In [51]:
from pyspark.sql.functions import *
df_source = spark.read.format("delta").load("Tables/dbo/transactions")
df_source.createOrReplaceTempView("source_view")
src_path="Tables/dbo/transactions"
print(src_path)

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 53, Finished, Available, Finished)

Tables/dbo/transactions


In [52]:
df_src1=df_source.withColumn("hash_key",crc32(concat(*df_source.columns)))
display(df_src1)

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 54, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3afcce21-4dc4-4574-8fe0-f439661c9b4c)

In [53]:

from delta.tables import DeltaTable
tgt_path = "Tables/Gold_layer/Transactions1"  
dbtable = DeltaTable.forPath(spark, tgt_path)
dbtable.toDF().show()

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 55, Finished, Available, Finished)

+--------------+----------+----------------+------------------+----------------+--------+----------+------------+----------+------------+
|transaction_id|account_id|transaction_date|transaction_amount|transaction_type|hash_key|created_by|created_date|updated_by|updated_date|
+--------------+----------+----------------+------------------+----------------+--------+----------+------------+----------+------------+
+--------------+----------+----------------+------------------+----------------+--------+----------+------------+----------+------------+



In [54]:
df_ts1=df_src1.alias("src").join(dbtable.toDF().alias("tgt"), ((col("src.transaction_id") == col("tgt.transaction_id")) & (col("src.hash_key") == col("tgt.hash_key"))), "anti").select(col("src.*"))
df_ts1.show()

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 56, Finished, Available, Finished)

+--------------+----------+----------------+------------------+----------------+----------+
|transaction_id|account_id|transaction_date|transaction_amount|transaction_type|  hash_key|
+--------------+----------+----------------+------------------+----------------+----------+
|             1|        45|      2024-01-01|            100.50|         Deposit|4252403073|
|             2|        12|      2024-01-02|            200.75|      Withdrawal| 463284897|
|            64|        12|      2024-03-04|            300.25|      Withdrawal|3270191256|
|            88|         1|      2024-03-28|            275.75|      Withdrawal| 777307740|
|             3|        78|      2024-01-03|            150.00|         Deposit|3567602057|
|             6|        23|      2024-01-06|            175.00|      Withdrawal|1611457425|
|             4|        34|      2024-01-04|            300.25|      Withdrawal|2754677836|
|            82|         2|      2024-03-22|            200.75|      Withdrawal|

In [55]:
dbtable.alias("tgt").merge(df_ts1.alias("src"),"tgt.transaction_id = src.transaction_id")\
    .whenMatchedUpdate(set={"tgt.transaction_id":"src.transaction_id","tgt.account_id":"src.account_id","tgt.transaction_date":"src.transaction_date","tgt.transaction_amount":"src.transaction_amount","tgt.transaction_type":"src.transaction_type","tgt.hash_key":"src.hash_key","tgt.updated_date":current_timestamp(),"tgt.updated_by":lit("databricks_Updated")})\
        .whenNotMatchedInsert(values={"tgt.transaction_id":"src.transaction_id","tgt.account_id":"src.account_id","tgt.transaction_date":"src.transaction_date","tgt.transaction_amount":"src.transaction_amount","tgt.transaction_type":"src.transaction_type","tgt.hash_key":"src.hash_key","tgt.created_date":current_timestamp(),"tgt.created_by":lit("databricks"),"tgt.updated_date":current_timestamp(),"tgt.updated_by":lit("databricks")}).execute()
     


StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 57, Finished, Available, Finished)

In [56]:
display(spark.read.format("delta").option("header","true").load(tgt_path))

StatementMeta(, 77387143-483f-4e98-9c7d-83fc125ca512, 58, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cc67f60b-5213-4d01-aea7-10cfb721a021)