**Clear the current Catalog Path if exists**

In [1]:
!rm -rf /content/spark-warehouse

**Install required modules**

In [2]:
!pip install delta-spark==2.2.0
!pip install pyspark

Collecting delta-spark==2.2.0
  Downloading delta_spark-2.2.0-py3-none-any.whl.metadata (1.9 kB)
Collecting pyspark<3.4.0,>=3.3.0 (from delta-spark==2.2.0)
  Downloading pyspark-3.3.4.tar.gz (281.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.5/281.5 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5 (from pyspark<3.4.0,>=3.3.0->delta-spark==2.2.0)
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading delta_spark-2.2.0-py3-none-any.whl (20 kB)
Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m14.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.4-py2.py3-none-any.whl size=281945742 sha256=6cfe7067b81b31bb66e18c596dbbbaaac6

**Setup Spark & DeltaLake**

In [3]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [4]:
from pyspark.sql.functions import *

**Create the Database & tables**

In [5]:
db = "demo_delta_cdf"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")

DataFrame[]

In [6]:
source_table = f"{db}.product_sales_src"
target_table = f"{db}.product_sales_tgt"

In [7]:
spark.sql(f"DROP TABLE IF EXISTS {source_table}")
spark.sql(f"DROP TABLE IF EXISTS {target_table}")

DataFrame[]

In [8]:
spark.sql(f"""
CREATE or replace TABLE {source_table} (
  product_id   STRING,
  sales_date   DATE,
  quantity     INT,
  revenue      DOUBLE
) using Delta TBLPROPERTIES (delta.enableChangeDataFeed = true)
-- CDF is enabled on this table so that any inserts, updates, or deletes can be captured
""")

DataFrame[]

In [9]:
spark.sql(f"""
INSERT INTO {source_table} VALUES
('P001','2025-08-01',10,100.0),
('P001','2025-08-02',20,200.0),
('P002','2025-08-01',5,  50.0),
('P003','2025-08-01',7,  70.0)
""")

DataFrame[]

In [10]:
spark.sql(f"""
CREATE or replace TABLE {target_table} (
  product_id   STRING,
  sales_date   DATE,
  quantity     INT,
  revenue      DOUBLE
) using Delta
""")

DataFrame[]

In [11]:
spark.sql(f"""select * from {source_table}""").show(truncate=False)

+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|20      |200.0  |
|P002      |2025-08-01|5       |50.0   |
|P003      |2025-08-01|7       |70.0   |
+----------+----------+--------+-------+



**Inspect the changed data post inster**

In [12]:
cdc_df = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingTimestamp", "1970-01-01 00:00:00") \
     .table(f"{source_table}")
cdc_df.show(truncate=False)

+----------+----------+--------+-------+------------+---------------+-----------------------+
|product_id|sales_date|quantity|revenue|_change_type|_commit_version|_commit_timestamp      |
+----------+----------+--------+-------+------------+---------------+-----------------------+
|P001      |2025-08-01|10      |100.0  |insert      |1              |2025-08-18 16:28:09.733|
|P001      |2025-08-02|20      |200.0  |insert      |1              |2025-08-18 16:28:09.733|
|P002      |2025-08-01|5       |50.0   |insert      |1              |2025-08-18 16:28:09.733|
|P003      |2025-08-01|7       |70.0   |insert      |1              |2025-08-18 16:28:09.733|
+----------+----------+--------+-------+------------+---------------+-----------------------+



**We will be creating SQL merge statement dynamically, but this cdc df is not available as table, for that we need to create a temp view which can act as source table for us**

In [13]:
source_temp_view = f"{source_table}_view".replace(".", "_")
print("The temp view which will act as source for our Merge Statement : ",source_temp_view)
cdc_df.createOrReplaceTempView(source_temp_view)
spark.sql(f"select * from {source_temp_view}").show(truncate=False)

The temp view which will act as source for our Merge Statement :  demo_delta_cdf_product_sales_src_view
+----------+----------+--------+-------+------------+---------------+-----------------------+
|product_id|sales_date|quantity|revenue|_change_type|_commit_version|_commit_timestamp      |
+----------+----------+--------+-------+------------+---------------+-----------------------+
|P001      |2025-08-01|10      |100.0  |insert      |1              |2025-08-18 16:28:09.733|
|P001      |2025-08-02|20      |200.0  |insert      |1              |2025-08-18 16:28:09.733|
|P002      |2025-08-01|5       |50.0   |insert      |1              |2025-08-18 16:28:09.733|
|P003      |2025-08-01|7       |70.0   |insert      |1              |2025-08-18 16:28:09.733|
+----------+----------+--------+-------+------------+---------------+-----------------------+



**Keep the data columns & primary key columns stored in variables**

In [14]:
meta_cols = {"_change_type", "_commit_version", "_commit_timestamp"}
data_columns = [c for c in cdc_df.columns if c not in meta_cols]
print(f"The Data Columns in this case: {data_columns}")

The Data Columns in this case: ['product_id', 'sales_date', 'quantity', 'revenue']


In [15]:
primary_key_columns = ['product_id', 'sales_date']

**Handling Insert section of Merge Statement**

In [16]:
def dynamic_merge_statement_generator(source_temp_view_name, target_delta_table_name, data_columns, primary_key_columns):
    # Initialize an empty string to build the merge SQL statement
    merge_statement = ""

    # Begin the MERGE INTO statement with target table and source view
    merge_statement += f"""MERGE INTO {target_delta_table_name} target USING {source_temp_view_name} source ON """

    # Create a list to hold join expressions between primary key columns of source and target
    join_expression_list = []

    # For each primary key column, build the equality condition (target.pk = source.pk)
    for pk in primary_key_columns:
        join_expression_list.append(f"target.{pk} = source.{pk}")

    # Join all primary key conditions with "AND" to form the final join expression
    join_expression = " AND ".join(join_expression_list)

    # Append the join expression to the merge statement
    merge_statement += join_expression

    # Convert the list of data columns into a comma-separated string for the INSERT part
    data_columns_str = ", ".join(data_columns)

    # Add the NOT MATCHED clause for insert operations, with column list
    merge_statement += f""" WHEN NOT MATCHED THEN INSERT ({data_columns_str}) Values ("""

    # Map each column in data_columns to its "source.column_name" counterpart
    merge_statement += ", ".join([f"source.{c}" for c in data_columns])

    # Close the VALUES parenthesis
    merge_statement += ")"

    # Return the dynamically built merge statement
    return merge_statement


In [17]:
merge_statement =  dynamic_merge_statement_generator(source_temp_view, target_table, data_columns, primary_key_columns)
print(merge_statement)

MERGE INTO demo_delta_cdf.product_sales_tgt target USING demo_delta_cdf_product_sales_src_view source ON target.product_id = source.product_id AND target.sales_date = source.sales_date WHEN NOT MATCHED THEN INSERT (product_id, sales_date, quantity, revenue) Values (source.product_id, source.sales_date, source.quantity, source.revenue)


In [18]:
print("Target Table before First Merge Statement execution...")
spark.sql(f"""select * from {target_table}""").show(truncate=False)
spark.sql(merge_statement)
print("Target Table after First Merge Statement execution...")
spark.sql(f"""select * from {target_table}""").show(truncate=False)

Target Table before First Merge Statement execution...
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
+----------+----------+--------+-------+

Target Table after First Merge Statement execution...
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|20      |200.0  |
|P002      |2025-08-01|5       |50.0   |
|P003      |2025-08-01|7       |70.0   |
+----------+----------+--------+-------+



**Handling Delete Case in Source Table**

In [19]:
current_timestamp = spark.sql("select current_timestamp").collect()[0][0]
# DELETE sales record (product completely withdrawn that day)
spark.sql(f"DELETE FROM {source_table} WHERE product_id='P002' AND sales_date='2025-08-01'")
cdc_df = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingTimestamp", current_timestamp) \
     .table(f"{source_table}")
cdc_df.createOrReplaceTempView(source_temp_view)
spark.sql(f"select * from {source_temp_view}").show(truncate=False)

+----------+----------+--------+-------+------------+---------------+-----------------------+
|product_id|sales_date|quantity|revenue|_change_type|_commit_version|_commit_timestamp      |
+----------+----------+--------+-------+------------+---------------+-----------------------+
|P002      |2025-08-01|5       |50.0   |delete      |2              |2025-08-18 16:28:57.969|
+----------+----------+--------+-------+------------+---------------+-----------------------+



In [20]:
def dynamic_merge_statement_generator(source_temp_view_name, target_delta_table_name, data_columns, primary_key_columns):
    # Initialize an empty string to build the merge SQL statement
    merge_statement = ""

    # Begin the MERGE INTO statement with target table and source view
    merge_statement += f"""MERGE INTO {target_delta_table_name} target USING {source_temp_view_name} source ON """

    # Create a list to hold join expressions between primary key columns of source and target
    join_expression_list = []

    # For each primary key column, build the equality condition (target.pk = source.pk)
    for pk in primary_key_columns:
        join_expression_list.append(f"target.{pk} = source.{pk}")

    # Join all primary key conditions with "AND" to form the final join expression
    join_expression = " AND ".join(join_expression_list)

    # Append the join expression to the merge statement
    merge_statement += join_expression

    ## Add delete clause for matched rows with change_type = delete
    merge_statement += f" WHEN MATCHED AND source._change_type = 'delete' THEN DELETE "

    # Convert the list of data columns into a comma-separated string for the INSERT part
    data_columns_str = ", ".join(data_columns)

    # Add the NOT MATCHED clause for insert operations, with column list
    merge_statement += f""" WHEN NOT MATCHED THEN INSERT ({data_columns_str}) Values ("""

    # Map each column in data_columns to its "source.column_name" counterpart
    merge_statement += ", ".join([f"source.{c}" for c in data_columns])

    # Close the VALUES parenthesis
    merge_statement += ")"

    # Return the dynamically built merge statement
    return merge_statement

In [21]:
merge_statement =  dynamic_merge_statement_generator(source_temp_view, target_table, data_columns, primary_key_columns)
print(merge_statement)

MERGE INTO demo_delta_cdf.product_sales_tgt target USING demo_delta_cdf_product_sales_src_view source ON target.product_id = source.product_id AND target.sales_date = source.sales_date WHEN MATCHED AND source._change_type = 'delete' THEN DELETE  WHEN NOT MATCHED THEN INSERT (product_id, sales_date, quantity, revenue) Values (source.product_id, source.sales_date, source.quantity, source.revenue)


In [22]:
print("Target Table before Second Merge Statement execution...")
spark.sql(f"""select * from {target_table}""").show(truncate=False)
spark.sql(merge_statement)
print("Target Table after Second Merge Statement execution...")
spark.sql(f"""select * from {target_table}""").show(truncate=False)

Target Table before Second Merge Statement execution...
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|20      |200.0  |
|P002      |2025-08-01|5       |50.0   |
|P003      |2025-08-01|7       |70.0   |
+----------+----------+--------+-------+

Target Table after Second Merge Statement execution...
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|20      |200.0  |
|P003      |2025-08-01|7       |70.0   |
+----------+----------+--------+-------+



**Handling Update case in Source Table**

In [23]:
# Capture the current timestamp so that the CDC query will only fetch changes
# that happen after this point (mimicking incremental processing).
current_timestamp = spark.sql("SELECT current_timestamp").collect()[0][0]

# UPDATE sales for a product on a given day
spark.sql(f"UPDATE {source_table} SET quantity = 25, revenue = 250.0 WHERE product_id='P001' AND sales_date='2025-08-02'")

# Read the Change Data Feed (CDF) again, but this time starting from the captured timestamp.
# This ensures we only capture the DELETE event and not the earlier inserts.
cdc_df = (
    spark.read.format("delta")
        .option("readChangeFeed", "true")
        .option("startingTimestamp", current_timestamp)
        .table(f"{source_table}")
).filter(col('_change_type')!='update_preimage')

# Register the CDF dataframe as a temporary view so it can be used in merge operations.
cdc_df.createOrReplaceTempView(source_temp_view)

# Display the captured CDC records.
# This should show the delete event which can be used in the merge logic.
spark.sql(f"SELECT * FROM {source_temp_view}").show(truncate=False)

+----------+----------+--------+-------+----------------+---------------+----------------------+
|product_id|sales_date|quantity|revenue|_change_type    |_commit_version|_commit_timestamp     |
+----------+----------+--------+-------+----------------+---------------+----------------------+
|P001      |2025-08-02|25      |250.0  |update_postimage|3              |2025-08-18 16:29:16.36|
+----------+----------+--------+-------+----------------+---------------+----------------------+



In [24]:
def dynamic_merge_statement_generator(source_temp_view_name, target_delta_table_name, data_columns, primary_key_columns):
    # Initialize an empty string to build the merge SQL statement
    merge_statement = ""

    # Begin the MERGE INTO statement with target table and source view
    merge_statement += f"""MERGE INTO {target_delta_table_name} target USING {source_temp_view_name} source ON """

    # Create a list to hold join expressions between primary key columns of source and target
    join_expression_list = []

    # For each primary key column, build the equality condition (target.pk = source.pk)
    for pk in primary_key_columns:
        join_expression_list.append(f"target.{pk} = source.{pk}")

    # Join all primary key conditions with "AND" to form the final join expression
    join_expression = " AND ".join(join_expression_list)

    # Append the join expression to the merge statement
    merge_statement += join_expression

    # Add the SQL clause to handle update operations when the source record has _change_type = 'update_postimage'.
    # This indicates that the target table should be updated with the latest values from the source table.
    merge_statement += f" WHEN MATCHED AND source._change_type = 'update_postimage' THEN UPDATE SET "

    # Dynamically build the "SET" part of the update by mapping each column in data_columns
    # so that target.column = source.column (ensuring all specified columns are updated).
    merge_statement += ", ".join([f"{c} = source.{c}" for c in data_columns])

    ## Add delete clause for matched rows with change_type = delete
    merge_statement += f" WHEN MATCHED AND source._change_type = 'delete' THEN DELETE "

    # Convert the list of data columns into a comma-separated string for the INSERT part
    data_columns_str = ", ".join(data_columns)

    # Add the NOT MATCHED clause for insert operations, with column list
    merge_statement += f""" WHEN NOT MATCHED THEN INSERT ({data_columns_str}) Values ("""

    # Map each column in data_columns to its "source.column_name" counterpart
    merge_statement += ", ".join([f"source.{c}" for c in data_columns])

    # Close the VALUES parenthesis
    merge_statement += ")"

    # Return the dynamically built merge statement
    return merge_statement

In [25]:
merge_statement =  dynamic_merge_statement_generator(source_temp_view, target_table, data_columns, primary_key_columns)
print(merge_statement)

MERGE INTO demo_delta_cdf.product_sales_tgt target USING demo_delta_cdf_product_sales_src_view source ON target.product_id = source.product_id AND target.sales_date = source.sales_date WHEN MATCHED AND source._change_type = 'update_postimage' THEN UPDATE SET product_id = source.product_id, sales_date = source.sales_date, quantity = source.quantity, revenue = source.revenue WHEN MATCHED AND source._change_type = 'delete' THEN DELETE  WHEN NOT MATCHED THEN INSERT (product_id, sales_date, quantity, revenue) Values (source.product_id, source.sales_date, source.quantity, source.revenue)


In [26]:
print("Target Table before Third Merge Statement execution...")
spark.sql(f"""select * from {target_table}""").show(truncate=False)
spark.sql(merge_statement)
print("Target Table after Third Merge Statement execution...")
spark.sql(f"""select * from {target_table}""").show(truncate=False)

Target Table before Third Merge Statement execution...
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|20      |200.0  |
|P003      |2025-08-01|7       |70.0   |
+----------+----------+--------+-------+

Target Table after Third Merge Statement execution...
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|25      |250.0  |
|P003      |2025-08-01|7       |70.0   |
+----------+----------+--------+-------+



# **Handling Corner Cases (Insert + Update in Same Batch)**

In [27]:
# Capture the current timestamp so that the CDC query will only fetch changes
# that happen after this point (mimicking incremental processing).
current_timestamp = spark.sql("SELECT current_timestamp").collect()[0][0]

# UPDATE sales for a product on a given day
spark.sql(f"""INSERT INTO {source_table} VALUES ('P009','2025-08-03',10,100.0)""");
spark.sql(f"UPDATE {source_table} SET quantity = 50, revenue = 250.0 WHERE product_id='P009' AND sales_date='2025-08-03'")

# Read the Change Data Feed (CDF) again, but this time starting from the captured timestamp.
# This ensures we only capture the DELETE event and not the earlier inserts.
cdc_df = (
    spark.read.format("delta")
        .option("readChangeFeed", "true")
        .option("startingTimestamp", current_timestamp)
        .table(f"{source_table}")
).filter(col('_change_type')!='update_preimage')

# Register the CDF dataframe as a temporary view so it can be used in merge operations.
cdc_df.createOrReplaceTempView(source_temp_view)

# Display the captured CDC records.
# This should show the delete event which can be used in the merge logic.
spark.sql(f"SELECT * FROM {source_temp_view}").show(truncate=False)

+----------+----------+--------+-------+----------------+---------------+-----------------------+
|product_id|sales_date|quantity|revenue|_change_type    |_commit_version|_commit_timestamp      |
+----------+----------+--------+-------+----------------+---------------+-----------------------+
|P009      |2025-08-03|50      |250.0  |update_postimage|5              |2025-08-18 16:29:36.511|
|P009      |2025-08-03|10      |100.0  |insert          |4              |2025-08-18 16:29:30.842|
+----------+----------+--------+-------+----------------+---------------+-----------------------+



In [28]:
print("Target Table before Fourth Merge Statement execution...")
spark.sql(f"""select * from {target_table}""").show(truncate=False)
spark.sql(merge_statement)
print("Target Table after Fourth Merge Statement execution...")
spark.sql(f"""select * from {target_table}""").show(truncate=False)

Target Table before Fourth Merge Statement execution...
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|25      |250.0  |
|P003      |2025-08-01|7       |70.0   |
+----------+----------+--------+-------+

Target Table after Fourth Merge Statement execution...
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P009      |2025-08-03|50      |250.0  |
|P009      |2025-08-03|10      |100.0  |
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|25      |250.0  |
|P003      |2025-08-01|7       |70.0   |
+----------+----------+--------+-------+



**Rollback to previous version of Target Table as the previous case was handled wrongly**

In [29]:
second_last_version = spark.sql(f"""describe history {target_table}""").collect()[1]['version']
print("The second last version for the target table is : ",second_last_version)
spark.sql(f"""RESTORE TABLE {target_table} TO VERSION AS OF {second_last_version}""").show(truncate=False)

The second last version for the target table is :  3
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|2506                    |2                         |1                |0                 |1308              |0                  |
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+



In [30]:
spark.sql(f"""select * from {target_table}""").show(truncate=False)

+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|25      |250.0  |
|P003      |2025-08-01|7       |70.0   |
+----------+----------+--------+-------+



**Get only latest state of a record from cdc_df**

In [31]:
from pyspark.sql.window import Window

print("Initial CDC df : ")
cdc_df.show(truncate=False)
# Deduplicate by composite key (product_id + sales_date)
w = Window.partitionBy(*[col(c) for c in primary_key_columns]) \
          .orderBy(col("_commit_version").desc(), col("_commit_timestamp").desc())

cdc_df = (
    cdc_df
      .withColumn("_rn", row_number().over(w))
      .filter(col("_rn") == 1)
      .drop("_rn")
)

cdc_df.createOrReplaceTempView(source_temp_view)
print("After taking only latest rows : ")
spark.sql(f"""select * from {source_temp_view}""").show(truncate=False)

Initial CDC df : 
+----------+----------+--------+-------+----------------+---------------+-----------------------+
|product_id|sales_date|quantity|revenue|_change_type    |_commit_version|_commit_timestamp      |
+----------+----------+--------+-------+----------------+---------------+-----------------------+
|P009      |2025-08-03|50      |250.0  |update_postimage|5              |2025-08-18 16:29:36.511|
|P009      |2025-08-03|10      |100.0  |insert          |4              |2025-08-18 16:29:30.842|
+----------+----------+--------+-------+----------------+---------------+-----------------------+

After taking only latest rows : 
+----------+----------+--------+-------+----------------+---------------+-----------------------+
|product_id|sales_date|quantity|revenue|_change_type    |_commit_version|_commit_timestamp      |
+----------+----------+--------+-------+----------------+---------------+-----------------------+
|P009      |2025-08-03|50      |250.0  |update_postimage|5        

In [32]:
print("Current Source Data -- ")
spark.sql(f"""select * from {source_table}""").show(truncate=False)
print("Target Table before Merge Statement execution...")
spark.sql(f"""select * from {target_table}""").show(truncate=False)
spark.sql(merge_statement)
print("Target Table after Merge Statement execution...")
spark.sql(f"""select * from {target_table}""").show(truncate=False)

Current Source Data -- 
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|25      |250.0  |
|P009      |2025-08-03|50      |250.0  |
|P003      |2025-08-01|7       |70.0   |
+----------+----------+--------+-------+

Target Table before Merge Statement execution...
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|25      |250.0  |
|P003      |2025-08-01|7       |70.0   |
+----------+----------+--------+-------+

Target Table after Merge Statement execution...
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|25      |250.0  |
|P003      |2025-08-01|7       |70.0   |
|P009      |2025

# **Handling Corner Cases (Insert + Update + Delete in Same Batch)**

In [33]:
# Capture the current timestamp so that the CDC query will only fetch changes
# that happen after this point (mimicking incremental processing).
current_timestamp = spark.sql("SELECT current_timestamp").collect()[0][0]

# Insert, Update, Delete a row
spark.sql(f"""INSERT INTO {source_table} VALUES ('P010','2025-08-04',80,120.0)""");
spark.sql(f"UPDATE {source_table} SET quantity = 50, revenue = 210.0 WHERE product_id='P010' AND sales_date='2025-08-04'");
spark.sql(f"""Delete from {source_table} WHERE product_id='P010' AND sales_date='2025-08-04'""");

# Read the Change Data Feed (CDF) again, but this time starting from the captured timestamp.
# This ensures we only capture the DELETE event and not the earlier inserts.
cdc_df = (
    spark.read.format("delta")
        .option("readChangeFeed", "true")
        .option("startingTimestamp", current_timestamp)
        .table(f"{source_table}")
).filter(col('_change_type')!='update_preimage')

# Register the CDF dataframe as a temporary view so it can be used in merge operations.
cdc_df.createOrReplaceTempView(source_temp_view)

# Display the captured CDC records.
# This should show all the insert, update, delete events whatever happened in Source Table...
spark.sql(f"SELECT * FROM {source_temp_view}").show(truncate=False)

# Deduplicate by composite key (product_id + sales_date)
w = Window.partitionBy(*[col(c) for c in primary_key_columns]) \
          .orderBy(col("_commit_version").desc(), col("_commit_timestamp").desc())

cdc_df = (
    cdc_df
      .withColumn("_rn", row_number().over(w))
      .filter(col("_rn") == 1)
      .drop("_rn")
)

cdc_df.createOrReplaceTempView(source_temp_view)
print("After taking only latest rows : ")
spark.sql(f"""select * from {source_temp_view}""").show(truncate=False)

+----------+----------+--------+-------+----------------+---------------+-----------------------+
|product_id|sales_date|quantity|revenue|_change_type    |_commit_version|_commit_timestamp      |
+----------+----------+--------+-------+----------------+---------------+-----------------------+
|P010      |2025-08-04|50      |210.0  |update_postimage|7              |2025-08-18 16:30:48.371|
|P010      |2025-08-04|50      |210.0  |delete          |8              |2025-08-18 16:30:52.92 |
|P010      |2025-08-04|80      |120.0  |insert          |6              |2025-08-18 16:30:42.471|
+----------+----------+--------+-------+----------------+---------------+-----------------------+

After taking only latest rows : 
+----------+----------+--------+-------+------------+---------------+----------------------+
|product_id|sales_date|quantity|revenue|_change_type|_commit_version|_commit_timestamp     |
+----------+----------+--------+-------+------------+---------------+----------------------+
|

In [34]:
print("Current Source Data -- ")
spark.sql(f"""select * from {source_table}""").show(truncate=False)
print("Target Table before Merge Statement execution...")
spark.sql(f"""select * from {target_table}""").show(truncate=False)
spark.sql(merge_statement)
print("Target Table after Merge Statement execution...")
spark.sql(f"""select * from {target_table}""").show(truncate=False)

Current Source Data -- 
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|25      |250.0  |
|P009      |2025-08-03|50      |250.0  |
|P003      |2025-08-01|7       |70.0   |
+----------+----------+--------+-------+

Target Table before Merge Statement execution...
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|25      |250.0  |
|P003      |2025-08-01|7       |70.0   |
|P009      |2025-08-03|50      |250.0  |
+----------+----------+--------+-------+

Target Table after Merge Statement execution...
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|25      |250.0  |
|P010      |2025

In [35]:
def dynamic_merge_statement_generator(source_temp_view_name, target_delta_table_name, data_columns, primary_key_columns):
    # Initialize an empty string to build the merge SQL statement
    merge_statement = ""

    # Begin the MERGE INTO statement with target table and source view
    merge_statement += f"""MERGE INTO {target_delta_table_name} target USING {source_temp_view_name} source ON """

    # Create a list to hold join expressions between primary key columns of source and target
    join_expression_list = []

    # For each primary key column, build the equality condition (target.pk = source.pk)
    for pk in primary_key_columns:
        join_expression_list.append(f"target.{pk} = source.{pk}")

    # Join all primary key conditions with "AND" to form the final join expression
    join_expression = " AND ".join(join_expression_list)

    # Append the join expression to the merge statement
    merge_statement += join_expression

    # Add the SQL clause to handle update operations when the source record has _change_type = 'update_postimage'.
    # This indicates that the target table should be updated with the latest values from the source table.
    merge_statement += f" WHEN MATCHED AND source._change_type = 'update_postimage' THEN UPDATE SET "

    # Dynamically build the "SET" part of the update by mapping each column in data_columns
    # so that target.column = source.column (ensuring all specified columns are updated).
    merge_statement += ", ".join([f"{c} = source.{c}" for c in data_columns])

    ## Add delete clause for matched rows with change_type = delete
    merge_statement += f" WHEN MATCHED AND source._change_type = 'delete' THEN DELETE "

    # Convert the list of data columns into a comma-separated string for the INSERT part
    data_columns_str = ", ".join(data_columns)

    # Add the NOT MATCHED clause for insert(& handling corner cases) operations, with column list
    merge_statement += f""" WHEN NOT MATCHED AND source._change_type IN ('insert','update_postimage') THEN INSERT ({data_columns_str}) Values ("""

    # Map each column in data_columns to its "source.column_name" counterpart
    merge_statement += ", ".join([f"source.{c}" for c in data_columns])

    # Close the VALUES parenthesis
    merge_statement += ")"

    # Return the dynamically built merge statement
    return merge_statement

In [36]:
# ---------------------------------------------------------
# Step 1: Rollback the Target Table to its previous version
# ---------------------------------------------------------
# Fetch the second last version from the Delta table history
second_last_version = spark.sql(f"DESCRIBE HISTORY {target_table}").collect()[1]['version']
print("The second last version for the target table is :", second_last_version)

# Restore the target table to the second last version
spark.sql(f"RESTORE TABLE {target_table} TO VERSION AS OF {second_last_version}").show(truncate=False)

# ---------------------------------------------------------
# Step 2: Generate the dynamic MERGE SQL using our function
# ---------------------------------------------------------
merge_statement = dynamic_merge_statement_generator(
    source_temp_view,
    target_table,
    data_columns,
    primary_key_columns
)

print("Generated Merge Statement:\n", merge_statement)

# ---------------------------------------------------------
# Step 3: Check current state of source and target before merge
# ---------------------------------------------------------
print("Current Source Data -- ")
spark.sql(f"SELECT * FROM {source_table}").show(truncate=False)

print("Current CDC Data -- ")
spark.sql(f"SELECT * FROM {source_temp_view}").show(truncate=False)


print("Target Table BEFORE Merge Statement execution...")
spark.sql(f"SELECT * FROM {target_table}").show(truncate=False)

# ---------------------------------------------------------
# Step 4: Execute the dynamic MERGE statement
# ---------------------------------------------------------
spark.sql(merge_statement)

# ---------------------------------------------------------
# Step 5: Validate results after merge
# ---------------------------------------------------------
print("Target Table AFTER Merge Statement execution...")
spark.sql(f"SELECT * FROM {target_table}").show(truncate=False)


The second last version for the target table is : 6
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|3734                    |3                         |1                |0                 |1228              |0                  |
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+

Generated Merge Statement:
 MERGE INTO demo_delta_cdf.product_sales_tgt target USING demo_delta_cdf_product_sales_src_view source ON target.product_id = source.product_id AND target.sales_date = source.sales_date WHEN MATCHED AND source._change_type = 'update_postimage' THEN UPDATE SET product_id

# **Complete Flow**

In [37]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import time

#create a database demo_delta_cdf to organize our tables.
db = "demo_delta_cdf"
spark.sql(f"Drop DATABASE if exists {db} CASCADE ")
spark.sql(f"CREATE DATABASE  {db}")

source_table = f"{db}.product_sales_src"
target_table = f"{db}.product_sales_tgt"

spark.sql(f"DROP TABLE IF EXISTS {source_table}")
spark.sql(f"DROP TABLE IF EXISTS {target_table}")

spark.sql(f"""
CREATE TABLE {source_table} (
  product_id   STRING,
  sales_date   DATE,
  quantity     INT,
  revenue      DOUBLE
) using Delta TBLPROPERTIES (delta.enableChangeDataFeed = true)
-- CDF is enabled on this table so that any inserts, updates, or deletes can be captured
""")

spark.sql(f"""
CREATE TABLE {target_table} (
  product_id   STRING,
  sales_date   DATE,
  quantity     INT,
  revenue      DOUBLE
) using Delta
""")

data_columns = ["product_id", "sales_date", "quantity", "revenue"]
primary_key_columns = ["product_id", "sales_date"]

# Helper: view name convention (same as source with '.' -> '_' and suffix _view)
def to_temp_view_name(table_name: str) -> str:
    return f"{table_name}_view".replace(".", "_")

# Helper: symmetric diff validation (returns True if identical)
def validate_target_equals_source(cols, src_table, tgt_table):
    cols_list = ", ".join(cols)
    diff_sql = f"""
    (SELECT {cols_list} FROM {src_table}
     EXCEPT
     SELECT {cols_list} FROM {tgt_table})
    UNION
    (SELECT {cols_list} FROM {tgt_table}
     EXCEPT
     SELECT {cols_list} FROM {src_table})
    """
    diff_count = spark.sql(diff_sql).count()
    return diff_count == 0

# Helper: read CDF between start_ts (string) and now, filter out preimage rows
def read_cdf_and_dedupe(source_table, start_ts, pk_cols, temp_view_name):
    # Read CDF since starting timestamp
    cdc_df = (
        spark.read.format("delta")
            .option("readChangeFeed", "true")
            .option("startingTimestamp", str(start_ts))
            .table(source_table)
    )
    print("Current CDC Data : ")
    cdc_df.show(truncate=False)

    cdc_df = cdc_df.filter(col("_change_type") != "update_preimage")

    # If there's nothing, create an empty view and return df
    if len(cdc_df.columns) == 0 or cdc_df.count() == 0:
        # create empty temp view with expected schema (no rows)
        cdc_df.createOrReplaceTempView(temp_view_name)
        return

    # Deduplicate: keep only latest change per composite key
    w = Window.partitionBy(*[col(c) for c in pk_cols]) \
              .orderBy(col("_commit_version").desc(), col("_commit_timestamp").desc())

    cdc_df_latest = (
        cdc_df.withColumn("_rn", row_number().over(w))
              .filter(col("_rn") == 1)
              .drop("_rn")
    )

    # Register temp view for SQL merge
    cdc_df_latest.createOrReplaceTempView(temp_view_name)
    return

# Helper: get current timestamp string from Spark
def current_ts_str():
    return spark.sql("SELECT current_timestamp() AS ts").collect()[0]["ts"]

def dynamic_merge_statement_generator(source_temp_view_name, target_delta_table_name, data_columns, primary_key_columns):
    # Initialize an empty string to build the merge SQL statement
    merge_statement = ""

    # Begin the MERGE INTO statement with target table and source view
    merge_statement += f"""MERGE INTO {target_delta_table_name} target USING {source_temp_view_name} source ON """

    # Create a list to hold join expressions between primary key columns of source and target
    join_expression_list = []

    # For each primary key column, build the equality condition (target.pk = source.pk)
    for pk in primary_key_columns:
        join_expression_list.append(f"target.{pk} = source.{pk}")

    # Join all primary key conditions with "AND" to form the final join expression
    join_expression = " AND ".join(join_expression_list)

    # Append the join expression to the merge statement
    merge_statement += join_expression

    # Add the SQL clause to handle update operations when the source record has _change_type = 'update_postimage'.
    # This indicates that the target table should be updated with the latest values from the source table.
    merge_statement += f" WHEN MATCHED AND source._change_type = 'update_postimage' THEN UPDATE SET "

    # Dynamically build the "SET" part of the update by mapping each column in data_columns
    # so that target.column = source.column (ensuring all specified columns are updated).
    merge_statement += ", ".join([f"{c} = source.{c}" for c in data_columns])

    ## Add delete clause for matched rows with change_type = delete
    merge_statement += f" WHEN MATCHED AND source._change_type = 'delete' THEN DELETE "

    # Convert the list of data columns into a comma-separated string for the INSERT part
    data_columns_str = ", ".join(data_columns)

    # Add the NOT MATCHED clause for insert(& handling corner cases) operations, with column list
    merge_statement += f""" WHEN NOT MATCHED AND source._change_type IN ('insert','update_postimage') THEN INSERT ({data_columns_str}) Values ("""

    # Map each column in data_columns to its "source.column_name" counterpart
    merge_statement += ", ".join([f"source.{c}" for c in data_columns])

    # Close the VALUES parenthesis
    merge_statement += ")"

    # Return the dynamically built merge statement
    return merge_statement

print("Seeding initial data into source...")
spark.sql(f"""
INSERT INTO {source_table} VALUES
('P001','2025-08-01',10,100.0),
('P001','2025-08-02',20,200.0),
('P002','2025-08-01',5,50.0),
('P003','2025-08-01',7,70.0)
""")

source_temp_view = to_temp_view_name(source_table)

# Initial full CDF read from epoch (first load)
print('*'*100)

print("Current Source Data -- ")
spark.sql(f"SELECT * FROM {source_table}").show(truncate=False)

cdc_df = read_cdf_and_dedupe(source_table, "1970-01-01 00:00:00", primary_key_columns, source_temp_view)
print("CDC Data after taking latest state only-- ")
spark.sql(f"SELECT * FROM {source_temp_view}").show(truncate=False)

print("Target Table BEFORE Merge Statement execution...")
spark.sql(f"SELECT * FROM {target_table}").show(truncate=False)

merge_statement = dynamic_merge_statement_generator(source_temp_view, target_table, data_columns, primary_key_columns)
spark.sql(merge_statement)

print("Target Table AFTER Merge Statement execution...")
spark.sql(f"SELECT * FROM {target_table}").show(truncate=False)

assert validate_target_equals_source(data_columns, source_table, target_table), "Initial load mismatch!"
print("Initial load validated: Target equals Source")
print('*'*100)

# Capture checkpoint timestamp BEFORE changes
batch_a_start_ts = current_ts_str()
time.sleep(0.5)  # small wait to ensure different commit timestamp

print("\n--- Running Batch A (mixed operations, some inserts, some insert->update, some insert->update->delete) ---")
# Batch A operations: some inserts, some insert->update, some insert->update->delete
#  - P004: insert only
#  - P005: insert then update
#  - P006: insert then update then delete
spark.sql(f"INSERT INTO {source_table} VALUES ('P004','2025-08-01',12,120.0)")                # insert only
spark.sql(f"INSERT INTO {source_table} VALUES ('P005','2025-08-01',8,80.0)")                  # insert
spark.sql(f"UPDATE {source_table} SET quantity = 18, revenue = 180.0 WHERE product_id='P005' AND sales_date='2025-08-01'")  # update
spark.sql(f"INSERT INTO {source_table} VALUES ('P006','2025-08-01',3,30.0)")  # insert
spark.sql(f"UPDATE {source_table} SET quantity = 4, revenue = 40.0 WHERE product_id='P006' AND sales_date='2025-08-01'")# update
spark.sql(f"DELETE FROM {source_table} WHERE product_id='P006' AND sales_date='2025-08-01'")# delete



print("Current Source Data -- ")
spark.sql(f"SELECT * FROM {source_table}").show(truncate=False)

cdc_df = read_cdf_and_dedupe(source_table, batch_a_start_ts, primary_key_columns, source_temp_view)
print("CDC Data after taking latest state only-- ")
spark.sql(f"SELECT * FROM {source_temp_view}").show(truncate=False)

print("Target Table BEFORE Merge Statement execution...")
spark.sql(f"SELECT * FROM {target_table}").show(truncate=False)

spark.sql(merge_statement)

print("Target Table AFTER Merge Statement execution...")
spark.sql(f"SELECT * FROM {target_table}").show(truncate=False)

assert validate_target_equals_source(data_columns, source_table, target_table), "Initial load mismatch!"
print("Initial load validated: Target equals Source")
print('*'*100)

# Batch B ops:
# - Update existing P001 on 2025-08-01
# - Delete existing P002 on 2025-08-01
# - Insert new P007, P008
print("\n--- Running Batch B (update existing, delete existing, new inserts) ---")
batch_b_start_ts = current_ts_str()
time.sleep(0.5)
spark.sql(f"UPDATE {source_table} SET quantity = 15, revenue = 150.0 WHERE product_id='P001' AND sales_date='2025-08-01'")
spark.sql(f"DELETE FROM {source_table} WHERE product_id='P002' AND sales_date='2025-08-01'")
spark.sql(f"INSERT INTO {source_table} VALUES ('P007','2025-08-01',9,90.0)")
spark.sql(f"INSERT INTO {source_table} VALUES ('P008','2025-08-02',2,20.0)")


print("Current Source Data -- ")
spark.sql(f"SELECT * FROM {source_table}").show(truncate=False)

cdc_df = read_cdf_and_dedupe(source_table, batch_b_start_ts, primary_key_columns, source_temp_view)
print("CDC Data after taking latest state only-- ")
spark.sql(f"SELECT * FROM {source_temp_view}").show(truncate=False)
print("Target Table BEFORE Merge Statement execution...")
spark.sql(f"SELECT * FROM {target_table}").show(truncate=False)
spark.sql(merge_statement)
print("Target Table AFTER Merge Statement execution...")
spark.sql(f"SELECT * FROM {target_table}").show(truncate=False)

assert validate_target_equals_source(data_columns, source_table, target_table), "Initial load mismatch!"
print("Initial load validated: Target equals Source")
print('*'*100)


Seeding initial data into source...
****************************************************************************************************
Current Source Data -- 
+----------+----------+--------+-------+
|product_id|sales_date|quantity|revenue|
+----------+----------+--------+-------+
|P001      |2025-08-01|10      |100.0  |
|P001      |2025-08-02|20      |200.0  |
|P002      |2025-08-01|5       |50.0   |
|P003      |2025-08-01|7       |70.0   |
+----------+----------+--------+-------+

Current CDC Data : 
+----------+----------+--------+-------+------------+---------------+-----------------------+
|product_id|sales_date|quantity|revenue|_change_type|_commit_version|_commit_timestamp      |
+----------+----------+--------+-------+------------+---------------+-----------------------+
|P001      |2025-08-01|10      |100.0  |insert      |1              |2025-08-18 16:31:56.629|
|P001      |2025-08-02|20      |200.0  |insert      |1              |2025-08-18 16:31:56.629|
|P002      |2025-08-