###  1. Enable CDF on Delta Table

In [0]:
# Drop the table if it exists
spark.sql("DROP TABLE IF EXISTS customer_txn")

# Remove the files from the location
dbutils.fs.rm("dbfs:/user/hive/warehouse/customer_txn", True)

# Now create the Delta table
spark.sql("""
CREATE TABLE customer_txn (
    cust_id INT,
    txn_amount DOUBLE,
    txn_type STRING
)
USING DELTA
TBLPROPERTIES (
  delta.enableChangeDataFeed = true
)
""")


Out[3]: DataFrame[]

### 2. Insert Initial Data (Version 0)[](url)

In [0]:
%sql
INSERT INTO customer_txn VALUES
(1, 100.0, 'credit'),
(2, 150.0, 'debit'),
(3, 200.0, 'credit');


num_affected_rows,num_inserted_rows
3,3


### 3. Update Data (Version 1)

In [0]:
%sql
UPDATE customer_txn
SET txn_amount = 180.0
WHERE cust_id = 2;

num_affected_rows
1


### 4. Delete a Record (Version 2)

In [0]:
%sql
DELETE FROM customer_txn
WHERE cust_id = 3;


num_affected_rows
1


### 5. View Table History (All Versions)

In [0]:
%sql
DESCRIBE HISTORY customer_txn;


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2025-08-07T18:28:52.000+0000,5662471924481153,alkeshlajurkar@gmail.com,DELETE,"Map(predicate -> [""(cust_id#1990 = 3)""])",,List(1958953019200964),0807-181537-4xi0oq8m,2.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numRemovedBytes -> 1383, numCopiedRows -> 2, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 1, executionTimeMs -> 3718, numDeletedRows -> 1, scanTimeMs -> 2017, numAddedFiles -> 1, numAddedBytes -> 1339, rewriteTimeMs -> 1700)",,Databricks-Runtime/12.2.x-scala2.12
2,2025-08-07T18:25:52.000+0000,5662471924481153,alkeshlajurkar@gmail.com,UPDATE,"Map(predicate -> [""(cust_id#1357 = 2)""])",,List(1958953019200964),0807-181537-4xi0oq8m,1.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numRemovedBytes -> 1166, numCopiedRows -> 2, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 1, executionTimeMs -> 9596, scanTimeMs -> 7338, numAddedFiles -> 1, numUpdatedRows -> 1, numAddedBytes -> 1383, rewriteTimeMs -> 2241)",,Databricks-Runtime/12.2.x-scala2.12
1,2025-08-07T18:25:02.000+0000,5662471924481153,alkeshlajurkar@gmail.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1958953019200964),0807-181537-4xi0oq8m,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 3, numOutputBytes -> 1166)",,Databricks-Runtime/12.2.x-scala2.12
0,2025-08-07T18:24:43.000+0000,5662471924481153,alkeshlajurkar@gmail.com,CREATE TABLE,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {""delta.enableChangeDataFeed"":""true""})",,List(1958953019200964),0807-181537-4xi0oq8m,,WriteSerializable,True,Map(),,Databricks-Runtime/12.2.x-scala2.12


### 6. Fetch Changes Between Versions
### 

In [0]:
%sql
-- Check changes between version 0 and version 2
SELECT *
FROM table_changes('customer_txn', 0, 2);


cust_id,txn_amount,txn_type,_change_type,_commit_version,_commit_timestamp
2,150.0,debit,update_preimage,2,2025-08-07T18:25:52.000+0000
2,180.0,debit,update_postimage,2,2025-08-07T18:25:52.000+0000
1,100.0,credit,insert,1,2025-08-07T18:25:02.000+0000
2,150.0,debit,insert,1,2025-08-07T18:25:02.000+0000
3,200.0,credit,insert,1,2025-08-07T18:25:02.000+0000


### 7. Get Latest Change for a Specific Record


In [0]:
%sql
-- Track all changes made to cust_id = 2 from version 0 onwards
SELECT *
FROM table_changes('customer_txn', 0)
WHERE cust_id = 2
ORDER BY _commit_version DESC
LIMIT 1;


cust_id,txn_amount,txn_type,_change_type,_commit_version,_commit_timestamp
2,150.0,debit,update_preimage,2,2025-08-07T18:25:52.000+0000


### 8. PySpark Version to Capture Changes Programmatically

In [0]:
df_changes = spark.read.format("delta") \
  .option("readChangeData", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 2) \
  .table("customer_txn")

df_changes.filter("cust_id = 2") \
  .orderBy("_commit_version", ascending=False) \
  .show(1)


+-------+----------+--------+---------------+---------------+-------------------+
|cust_id|txn_amount|txn_type|   _change_type|_commit_version|  _commit_timestamp|
+-------+----------+--------+---------------+---------------+-------------------+
|      2|     150.0|   debit|update_preimage|              2|2025-08-07 18:25:52|
+-------+----------+--------+---------------+---------------+-------------------+
only showing top 1 row



### 9. Check if a Record Was Deleted

In [0]:
%sql
SELECT *
FROM table_changes('customer_txn', 0, 5)
WHERE _change_type = 'delete'


cust_id,txn_amount,txn_type,_change_type,_commit_version,_commit_timestamp
3,200.0,credit,delete,3,2025-08-07T18:28:52.000+0000


### 10. Find All Updated Records in a Version
### 

In [0]:
%sql
-- All updates that occurred between version 1 and 2
SELECT *
FROM table_changes('customer_txn', 1, 2)
WHERE _change_type LIKE 'update%';


cust_id,txn_amount,txn_type,_change_type,_commit_version,_commit_timestamp
2,150.0,debit,update_preimage,2,2025-08-07T18:25:52.000+0000
2,180.0,debit,update_postimage,2,2025-08-07T18:25:52.000+0000


### 11. Track Inserted Records Only


In [0]:
%sql
SELECT *
FROM table_changes('customer_txn', 0, 5)
WHERE _change_type = 'insert'

cust_id,txn_amount,txn_type,_change_type,_commit_version,_commit_timestamp
1,100.0,credit,insert,1,2025-08-07T18:25:02.000+0000
2,150.0,debit,insert,1,2025-08-07T18:25:02.000+0000
3,200.0,credit,insert,1,2025-08-07T18:25:02.000+0000


### 12. How to Revert Changes in a Delta Table


In [0]:
%sql
RESTORE TABLE customer_txn TO VERSION AS OF 1;


table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
1166,1,0,1,0,1166


In [0]:
%sql
select * from customer_txn

cust_id,txn_amount,txn_type
1,100.0,credit
2,150.0,debit
3,200.0,credit
