#### Table in the Databricks Hive metastore already exists

#### Read tables from SQL through JDBC Connection

In [0]:

jdbc_url = "jdbc:sqlserver://faizan-sql.database.windows.net:1433;databaseName=faizan-sql"
connection_properties = {
    "user": "faizan-sql",
    "password": "messier87F",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}


query = "(SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE') AS table_list"


tables_df = spark.read.jdbc(url=jdbc_url, table=query, properties=connection_properties)


tables_df.show(truncate=False)


+----------------+
|TABLE_NAME      |
+----------------+
|systranschemas  |
|change_tables   |
|ddl_history     |
|lsn_time_mapping|
|captured_columns|
|index_columns   |
|cdc_jobs        |
|Employee        |
|dbo_Employee_CT |
+----------------+



#### Check for CDC data in the sql table

#### The logic I implemented is I have created a created_On as default timestamp column into table whenever I insert 
#### the values into it the default created_on value generated 
## where at each updated created_on as default command is need in the code 
#### example :
#### UPDATE Employee_CT
#### SET salary = 65000, created_On = CURRENT_TIMESTAMP
#### WHERE EmployeeID = 1;

#### Get the recent changes done on the table by using created_On

In [0]:
cdc_query = """
SELECT *
FROM (
    SELECT *, DENSE_RANK() OVER (ORDER BY created_On DESC) AS RW_NUMBER
    FROM [cdc].[dbo_Employee_CT]
) AS ranked_data
WHERE RW_NUMBER = 1
"""

# Load the CDC data into a DataFrame
cdc_df = spark.read.jdbc(url=jdbc_url, table=f"({cdc_query}) AS cdc_data", properties=connection_properties)

# Show the CDC data
display(cdc_df)

__$start_lsn,__$end_lsn,__$seqval,__$operation,__$update_mask,EmployeeID,FirstName,LastName,Department,Salary,HireDate,created_On,__$command_id,RW_NUMBER
AAAAPwAABDgAsw==,,AAAAPwAABDgABQ==,4,UA==,11,Liam,Miller,Sales,76560.0,2021-09-12,2024-10-20T13:56:16.713+0000,1,1
AAAAPwAABDgAsw==,,AAAAPwAABDgABg==,4,UA==,17,Isabella,Harris,Sales,80520.0,2023-04-15,2024-10-20T13:56:16.713+0000,2,1
AAAAPwAABDgAsw==,,AAAAPwAABDgABw==,4,UA==,23,Charlotte,King,Sales,79200.0,2021-01-27,2024-10-20T13:56:16.713+0000,3,1
AAAAPwAABDgAsw==,,AAAAPwAABDgACA==,4,UA==,29,Logan,Carter,Sales,77880.0,2023-05-17,2024-10-20T13:56:16.713+0000,4,1
AAAAPwAABDgAsw==,,AAAAPwAABDgACQ==,4,UA==,35,Ella,Cook,Sales,79200.0,2021-12-20,2024-10-20T13:56:16.713+0000,5,1
AAAAPwAABDgAsw==,,AAAAPwAABDgACg==,4,UA==,41,Avery,Torres,Sales,81840.0,2023-06-11,2024-10-20T13:56:16.713+0000,6,1
AAAAPwAABDgAsw==,,AAAAPwAABDgACw==,4,UA==,47,Scarlett,Turner,Sales,77880.0,2021-06-14,2024-10-20T13:56:16.713+0000,7,1
AAAAPwAABDgAsw==,,AAAAPwAABDgADA==,4,UA==,53,Lily,Stewart,Sales,83160.0,2022-11-20,2024-10-20T13:56:16.713+0000,8,1
AAAAPwAABDgAsw==,,AAAAPwAABDgADw==,4,UA==,59,Ellie,Morgan,Sales,84480.0,2023-04-17,2024-10-20T13:56:16.713+0000,9,1
AAAAPwAABDgAsw==,,AAAAPwAABDgAEA==,4,UA==,66,Eli,Henderson,Sales,81840.0,2023-02-08,2024-10-20T13:56:16.713+0000,10,1


#### Filter-out the Records which are deleted, Inserted, and update

#### operatons 
#### __$operation:
	•	This column indicates the type of operation performed:
	
	•	1: Delete
	
	•	2: Insert
	
	•	3: Update (before update) — old row values
	
	•	4: Update (after update) — new row values

In [0]:
from pyspark.sql.functions import * 

# Filter for different operations
inserts_df = cdc_df.filter(col("__$operation") == 2)  # Insert
updates_df = cdc_df.filter(col("__$operation") == 4)  # Update
deletes_df = cdc_df.filter(col("__$operation") == 1)  # Delete

# Show each type of operation
print("Inserts:")
display(inserts_df)

print("Updates:")
display(updates_df)

print("Deletes:")
display(deletes_df)


Inserts:


__$start_lsn,__$end_lsn,__$seqval,__$operation,__$update_mask,EmployeeID,FirstName,LastName,Department,Salary,HireDate,created_On,__$command_id,RW_NUMBER


Updates:


__$start_lsn,__$end_lsn,__$seqval,__$operation,__$update_mask,EmployeeID,FirstName,LastName,Department,Salary,HireDate,created_On,__$command_id,RW_NUMBER
AAAAPwAABDgAsw==,,AAAAPwAABDgABQ==,4,UA==,11,Liam,Miller,Sales,76560.0,2021-09-12,2024-10-20T13:56:16.713+0000,1,1
AAAAPwAABDgAsw==,,AAAAPwAABDgABg==,4,UA==,17,Isabella,Harris,Sales,80520.0,2023-04-15,2024-10-20T13:56:16.713+0000,2,1
AAAAPwAABDgAsw==,,AAAAPwAABDgABw==,4,UA==,23,Charlotte,King,Sales,79200.0,2021-01-27,2024-10-20T13:56:16.713+0000,3,1
AAAAPwAABDgAsw==,,AAAAPwAABDgACA==,4,UA==,29,Logan,Carter,Sales,77880.0,2023-05-17,2024-10-20T13:56:16.713+0000,4,1
AAAAPwAABDgAsw==,,AAAAPwAABDgACQ==,4,UA==,35,Ella,Cook,Sales,79200.0,2021-12-20,2024-10-20T13:56:16.713+0000,5,1
AAAAPwAABDgAsw==,,AAAAPwAABDgACg==,4,UA==,41,Avery,Torres,Sales,81840.0,2023-06-11,2024-10-20T13:56:16.713+0000,6,1
AAAAPwAABDgAsw==,,AAAAPwAABDgACw==,4,UA==,47,Scarlett,Turner,Sales,77880.0,2021-06-14,2024-10-20T13:56:16.713+0000,7,1
AAAAPwAABDgAsw==,,AAAAPwAABDgADA==,4,UA==,53,Lily,Stewart,Sales,83160.0,2022-11-20,2024-10-20T13:56:16.713+0000,8,1
AAAAPwAABDgAsw==,,AAAAPwAABDgADw==,4,UA==,59,Ellie,Morgan,Sales,84480.0,2023-04-17,2024-10-20T13:56:16.713+0000,9,1
AAAAPwAABDgAsw==,,AAAAPwAABDgAEA==,4,UA==,66,Eli,Henderson,Sales,81840.0,2023-02-08,2024-10-20T13:56:16.713+0000,10,1


Deletes:


__$start_lsn,__$end_lsn,__$seqval,__$operation,__$update_mask,EmployeeID,FirstName,LastName,Department,Salary,HireDate,created_On,__$command_id,RW_NUMBER


#### Delta Merge Logic for merging the CDC data into Databricks Delta table

In [0]:
from pyspark.sql.functions import *
from delta.tables import *

# Create a DeltaTable object for the target table
deltaTable = DeltaTable.forPath(spark, "/mnt/EMPLOYEE/Employeee_BRONZE_TABLE.delta")

# Merge for updates first
deltaTable.alias("target").merge(
    updates_df.alias("source"),
    "target.EmployeeID = source.EmployeeID"  # Condition for matching records
).whenMatchedUpdate(
    set={
        "FirstName": col("source.FirstName"),
        "LastName": col("source.LastName"),
        "Department": col("source.Department"),
        "Salary": col("source.Salary"),
        "HireDate": col("source.HireDate")
    }
).execute()

# Merge for inserts
deltaTable.alias("target").merge(
    inserts_df.alias("source"),
    "target.EmployeeID = source.EmployeeID"  # Condition for matching records
).whenNotMatchedInsert(
    values={
        "EmployeeID": col("source.EmployeeID"),
        "FirstName": col("source.FirstName"),
        "LastName": col("source.LastName"),
        "Department": col("source.Department"),
        "Salary": col("source.Salary"),
        "HireDate": col("source.HireDate")
    }
).execute()

# Deleting records from the target Delta table
if deletes_df.count() > 0:  # Check if there are any records to delete
    delete_condition = " OR ".join([f"target.EmployeeID = {row.EmployeeID}" for row in deletes_df.collect()])
    deltaTable.alias("target").delete(delete_condition)

#### Operations Performed againt the WORK_OUT.Employeee Table
#### MERGE - insert + Update (Updated post image)
#### Delete - Delete + Update (Updated Pre image)

#### Final View of The table

In [0]:
%sql
SELECT * FROM delta.`/mnt/EMPLOYEE/Employeee_BRONZE_TABLE.delta`

EmployeeID,FirstName,LastName,Department,Salary,HireDate
11,Liam,Miller,Sales,76560.0,2021-09-12
17,Isabella,Harris,Sales,80520.0,2023-04-15
23,Charlotte,King,Sales,79200.0,2021-01-27
29,Logan,Carter,Sales,77880.0,2023-05-17
35,Ella,Cook,Sales,79200.0,2021-12-20
41,Avery,Torres,Sales,81840.0,2023-06-11
47,Scarlett,Turner,Sales,77880.0,2021-06-14
53,Lily,Stewart,Sales,83160.0,2022-11-20
59,Ellie,Morgan,Sales,84480.0,2023-04-17
66,Eli,Henderson,Sales,81840.0,2023-02-08


In [0]:
%sql
SELECT EmployeeID, COUNT(*) as count_as 
FROM delta.`/mnt/EMPLOYEE/Employeee_BRONZE_TABLE.delta` 
GROUP BY EmployeeID 
ORDER BY count_as DESC;

EmployeeID,count_as
6,2
4,2
7,2
5,2
148,1
463,1
540,1
392,1
737,1
623,1
