In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([StructField("emp_id", IntegerType(), True),
                    StructField("name", StringType(), True),
                    StructField("city", StringType(), True),
                    StructField("country", StringType(), True),
                    StructField("contact_no", IntegerType(), True)])

data = [(1000, "Michael", "Columbus", "USA", 689546323)]

In [0]:
df = spark.createDataFrame(data, schema)
df.display()

emp_id,name,city,country,contact_no
1000,Michael,Columbus,USA,689546323


In [0]:
%sql
CREATE OR REPLACE TABLE dim_employee (
  emp_id INT,
  name STRING,
  city STRING,
  country STRING,
  contact_no INT
)
USING DELTA
LOCATION '/FileStore/tables/delta_merge'

In [0]:
%sql
DROP TABLE IF EXISTS dim_employee;
CREATE TABLE dim_employee (
  emp_id INT,
  name STRING,
  city STRING,
  country STRING,
  contact_no INT
)
USING DELTA
LOCATION '/FileStore/tables/delta_merge'

In [0]:
%sql
SELECT * FROM dim_employee

emp_id,name,city,country,contact_no


### Method 1 - Spark SQL

In [0]:
spark.catalog.dropTempView("source_view")

Out[36]: False

In [0]:
df.createOrReplaceTempView("source_view")

In [0]:
%sql
SELECT * FROM source_view

emp_id,name,city,country,contact_no
1000,Michael,Columbus,USA,689546323


In [0]:
%sql
MERGE INTO dim_employee AS target
USING source_view AS source
ON target.emp_id = source.emp_id
WHEN MATCHED 
THEN UPDATE SET
target.name = source.name,
target.city = source.city,
target.country = target.country,
target.contact_no = target.contact_no
WHEN NOT MATCHED THEN 
INSERT (emp_id, name, city, country, contact_no) VALUES (emp_id, name, city, country, contact_no)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1,0,0,1


In [0]:
%sql
SELECT * FROM dim_employee

emp_id,name,city,country,contact_no
1000,Michael,Columbus,USA,689546323


In [0]:

data = [(1000, "Michael", "Chicago", "USA", 689546323), (20000, "Nancy", "New York", "USA", 76345902)]
df = spark.createDataFrame(data, schema)
df.display()

emp_id,name,city,country,contact_no
1000,Michael,Chicago,USA,689546323
20000,Nancy,New York,USA,76345902


In [0]:
df.createOrReplaceTempView("source_view")

In [0]:
%sql
SELECT * FROM source_view

emp_id,name,city,country,contact_no
1000,Michael,Chicago,USA,689546323
20000,Nancy,New York,USA,76345902


In [0]:
%sql
SELECT * FROM dim_employee

emp_id,name,city,country,contact_no
1000,Michael,Columbus,USA,689546323


In [0]:
%sql
MERGE INTO dim_employee AS target
USING source_view AS source
ON target.emp_id = source.emp_id
WHEN MATCHED 
THEN UPDATE SET
target.name = source.name,
target.city = source.city,
target.country = target.country,
target.contact_no = target.contact_no
WHEN NOT MATCHED THEN 
INSERT (emp_id, name, city, country, contact_no) VALUES (emp_id, name, city, country, contact_no)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,1,0,1


In [0]:
%sql
SELECT * FROM dim_employee

emp_id,name,city,country,contact_no
1000,Michael,Chicago,USA,689546323
20000,Nancy,New York,USA,76345902


### Method 2: PySpark

In [0]:
data = [(20000, "Sarah", "London", "UK", 789456123), (3000, "John", "Berlin", "Germany", 987654321)]
df = spark.createDataFrame(data, schema)
df.display()

emp_id,name,city,country,contact_no
20000,Sarah,London,UK,789456123
3000,John,Berlin,Germany,987654321


In [0]:
from delta.tables import *
delta_df = DeltaTable.forPath(spark, "dbfs:/FileStore/tables/delta_merge")

In [0]:
delta_df.alias("target").merge(
    source=df.alias("source"),
    condition="target.emp_id = source.emp_id"
).whenMatchedUpdate(set={
    "name": "source.name",
    "city": "source.city",
    "country": "target.country",
    "contact_no": "target.contact_no"
}).whenNotMatchedInsert(values={
    "emp_id": "source.emp_id",
    "name": "source.name",
    "city": "source.city",
    "country": "source.country",
    "contact_no": "source.contact_no"
}).execute()

In [0]:
%sql
SELECT * FROM dim_employee

emp_id,name,city,country,contact_no
1000,Michael,Chicago,USA,689546323
3000,John,Berlin,Germany,987654321
20000,Sarah,London,USA,76345902


### Audit Log

In [0]:
%sql
CREATE TABLE audit_log( operation STRING, 
                        updated_time TIMESTAMP,
                        user_name STRING,
                        notebook_name STRING, 
                        numTargetRowsUpdated INT,
                        numTargetRowsInserted INT,
                        numTargetRowsDeleted INT)

In [0]:
%sql
SELECT * FROM audit_log

operation,updated_time,user_name,notebook_name,numTargetRowsUpdated,numTargetRowsInserted,numTargetRowsDeleted


### Create DataFrame with Last Operation in Delta Table

In [0]:
lastOperationDF = delta_df.history(2) # get the last operation
lastOperationDF.display()

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2025-11-23T12:00:54.000+0000,6333491925723577,srishtishetty53@gmail.com,MERGE,"Map(predicate -> [""(emp_id#9067 = emp_id#9077)""], matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}], notMatchedBySourcePredicates -> [])",,List(2423641351571634),1123-105155-zdhymzm8,3,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 3125, numTargetBytesRemoved -> 1566, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 1, executionTimeMs -> 4690, materializeSourceTimeMs -> 533, numTargetRowsInserted -> 1, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 1732, numTargetRowsUpdated -> 1, numOutputRows -> 2, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 2174)",,Databricks-Runtime/12.2.x-scala2.12
3,2025-11-23T11:56:43.000+0000,6333491925723577,srishtishetty53@gmail.com,MERGE,"Map(predicate -> [""(emp_id#7877 = emp_id#7657)""], matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}], notMatchedBySourcePredicates -> [])",,List(2423641351571634),1123-105155-zdhymzm8,2,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 3139, numTargetBytesRemoved -> 1580, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 1, executionTimeMs -> 4089, materializeSourceTimeMs -> 400, numTargetRowsInserted -> 1, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 1580, numTargetRowsUpdated -> 1, numOutputRows -> 2, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1874)",,Databricks-Runtime/12.2.x-scala2.12


### Explode Operation Metrics Column

In [0]:
explodeDF = lastOperationDF.select(lastOperationDF.operation, explode(lastOperationDF.operationMetrics))
explodeDF.display(5)

operation,key,value
MERGE,numTargetRowsCopied,0
MERGE,numTargetRowsDeleted,0
MERGE,numTargetFilesAdded,2
MERGE,numTargetBytesAdded,3125
MERGE,numTargetBytesRemoved,1566
MERGE,numTargetDeletionVectorsAdded,0
MERGE,numTargetRowsMatchedUpdated,1
MERGE,executionTimeMs,4690
MERGE,materializeSourceTimeMs,533
MERGE,numTargetRowsInserted,1


In [0]:
explodeDF_select = explodeDF.select(explodeDF.operation, explodeDF.key, explodeDF.value.cast('INT'))
explodeDF_select.display()

operation,key,value
MERGE,numTargetRowsCopied,0
MERGE,numTargetRowsDeleted,0
MERGE,numTargetFilesAdded,2
MERGE,numTargetBytesAdded,3125
MERGE,numTargetBytesRemoved,1566
MERGE,numTargetDeletionVectorsAdded,0
MERGE,numTargetRowsMatchedUpdated,1
MERGE,executionTimeMs,4690
MERGE,materializeSourceTimeMs,533
MERGE,numTargetRowsInserted,1


### Pivot Operation to Convert Rows to Columns

In [0]:
PivotDF = explodeDF_select.groupBy("operation").pivot("key").sum("value")
PivotDF.display()

operation,executionTimeMs,materializeSourceTimeMs,numOutputRows,numSourceRows,numTargetBytesAdded,numTargetBytesRemoved,numTargetChangeFilesAdded,numTargetDeletionVectorsAdded,numTargetDeletionVectorsRemoved,numTargetFilesAdded,numTargetFilesRemoved,numTargetRowsCopied,numTargetRowsDeleted,numTargetRowsInserted,numTargetRowsMatchedDeleted,numTargetRowsMatchedUpdated,numTargetRowsNotMatchedBySourceDeleted,numTargetRowsNotMatchedBySourceUpdated,numTargetRowsUpdated,rewriteTimeMs,scanTimeMs
MERGE,8779,933,4,4,6264,3146,0,0,0,4,2,0,0,2,0,2,0,0,2,4048,3312


### Select ONLY columns needed for Audit Log Table

In [0]:
PivotDF_select = PivotDF.select(PivotDF.operation, PivotDF.numTargetRowsUpdated, PivotDF.numTargetRowsInserted, PivotDF.numTargetRowsDeleted)
PivotDF_select.display()

operation,numTargetRowsUpdated,numTargetRowsInserted,numTargetRowsDeleted
MERGE,2,2,0


### Add Notebook Parameters such as Username, NotebookPath, etc

In [0]:
auditDF = PivotDF_select\
    .withColumn("user_name", lit(dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()))\
    .withColumn("notebook_name", lit(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()))\
    .withColumn("updated_time", lit(current_timestamp())) 

auditDF.display() # you can get this from history() also


operation,numTargetRowsUpdated,numTargetRowsInserted,numTargetRowsDeleted,user_name,notebook_name,updated_time
MERGE,2,2,0,srishtishetty53@gmail.com,/Users/srishtishetty53@gmail.com/PySpark Practical/36_PySpark_Merge_Statement (SCD TYpe 1),2025-11-23T13:24:03.164+0000


### Rearanging Columns in Dataframe to match with the Audit Log Table

In [0]:
auditDF_select = auditDF.select(auditDF.operation, auditDF.updated_time, auditDF.user_name, auditDF.notebook_name, auditDF.numTargetRowsUpdated, auditDF.numTargetRowsInserted, auditDF.numTargetRowsDeleted)
auditDF_select.display()

operation,updated_time,user_name,notebook_name,numTargetRowsUpdated,numTargetRowsInserted,numTargetRowsDeleted
MERGE,2025-11-23T13:44:22.554+0000,srishtishetty53@gmail.com,/Users/srishtishetty53@gmail.com/PySpark Practical/36_PySpark_Merge_Statement (SCD TYpe 1),2,2,0


In [0]:
auditDF_select.createOrReplaceTempView("Audit")

In [0]:
%sql
SELECT * FROM Audit

operation,updated_time,user_name,notebook_name,numTargetRowsUpdated,numTargetRowsInserted,numTargetRowsDeleted
MERGE,2025-11-23T13:45:13.905+0000,srishtishetty53@gmail.com,/Users/srishtishetty53@gmail.com/PySpark Practical/36_PySpark_Merge_Statement (SCD TYpe 1),2,2,0


In [0]:
%sql
SELECT * FROM audit_log

operation,updated_time,user_name,notebook_name,numTargetRowsUpdated,numTargetRowsInserted,numTargetRowsDeleted


### INSERT Audit data into Audit Log table

In [0]:
%sql
INSERT INTO audit_log
SELECT * FROM Audit

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
SELECT * FROM audit_log

operation,updated_time,user_name,notebook_name,numTargetRowsUpdated,numTargetRowsInserted,numTargetRowsDeleted
MERGE,2025-11-23T13:46:39.551+0000,srishtishetty53@gmail.com,/Users/srishtishetty53@gmail.com/PySpark Practical/36_PySpark_Merge_Statement (SCD TYpe 1),2,2,0
