# Change Data Feed

Change Data Feed (CDF) feature allows Delta tables to track row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records "change events" for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated.

CDF shows the row-level changes between versions of a Delta table. The changes displayed include row data and metadata that indicates whether the row was inserted, deleted, or updated.

# Enable change data feed

You must explicitly enable the change data feed option using one of the following methods:

## New table

Set the table property delta.enableChangeDataFeed = true in the CREATE TABLE command

In [None]:
spark.sql("DROP TABLE IF EXISTS demo.cdf_demo")

# Create data DataFrame
data = spark.range(0, 5)
data.write.mode("overwrite").format("delta").option("delta.enableChangeDataFeed", "true").save("Tables/cdf_demo")

In [None]:
%%sql
DROP TABLE IF EXISTS demo.cdf_demo;
CREATE TABLE demo.cdf_demo (id int) 
USING DELTA 
TBLPROPERTIES (delta.enableChangeDataFeed = true);

In [None]:
%%sql

SHOW TBLPROPERTIES demo.cdf_demo

## Existing table

Set the table property delta.enableChangeDataFeed = true in the ALTER TABLE command.

In [None]:
spark.sql("DROP TABLE IF EXISTS demo.cdf_demo")

# Create data DataFrame
data = spark.range(0, 5)

# Write the data DataFrame to onelake location
data.write.format("delta").options().save("Tables/cdf_demo")

In [None]:
%%sql

ALTER TABLE demo.cdf_demo SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

In [None]:
display(spark.sql("SHOW TBLPROPERTIES demo.cdf_demo"))

## All new tables


In [None]:
spark.conf.set('spark.databricks.delta.properties.defaults.enableChangeDataFeed', "true")

In [None]:
spark.conf.set('spark.databricks.delta.properties.defaults.enableChangeDataFeed', "false")

 Checking data storage

In [None]:
mssparkutils.fs.ls("Tables/cdf_demo/")

# What is versioning in CDF?

Versioning in CDF means that for each action (insert, update, delete, append) triggered on a CDF-enabled table, a new version will be created for smooth metadata and data segregation

# Data manipulation

Let's make some operations and see how CDF works!

## Adding data

In [None]:
# Create data DataFrame
data = spark.range(5, 7)
# Write the data DataFrame to onelake location
data.write.mode("append").format("delta").save("Tables/cdf_demo")


> In particular, insert-only operations and full partition deletes will not generate data in the _change_data directory.

In [None]:
mssparkutils.fs.ls("Tables/cdf_demo/")

## Updating data

In [None]:
import delta
from pyspark.sql.functions import *

delta_info = delta.DeltaTable.forName(spark, "demo.cdf_demo")

In [None]:
delta_info.update(
    condition = col("id") == '4',
    set = {'id': 'id * 10'} 
    )

In [None]:
%%sql
UPDATE cdf 
    SET id = 40 
WHERE id = 4

In [None]:
display(spark.read.table("demo.cdf_demo"))

> OR

In [None]:
%%sql

SELECT * FROM demo.cdf_demo

> Checking data storage and now there is a new folder **__change_data_**

In [None]:
mssparkutils.fs.ls("Tables/cdf_demo/")

## Deleting data

In [None]:
delta_info.delete("id = 2")

In [None]:
display(spark.read.table("demo.cdf_demo"))

## Merging data

In [None]:
# Create a new data 
# ID 2 has been deleted

data = spark.range(0, 5)
data.show()

In [None]:
( delta_info.alias("original") 
    .merge(data.alias("new_data"), "original.id = new_data.id") 
    .whenMatchedUpdateAll() 
    .whenNotMatchedInsertAll()
    .execute()
)

In [None]:
display(spark.read.table("demo.cdf_demo"))

In [None]:
display(delta_info.history())

# Change data storage

Delta Lake records change data for UPDATE, DELETE, and MERGE operations in the **_change_data_** folder under the delta table directory. 

These records may be skipped when Delta Lake detects it can efficiently compute the change data feed directly from the transaction log. In particular, insert-only operations and full partition deletes will not generate data in the _**_change_data**_ directory.

The files in the _**_change_data**_ folder follow the retention policy of the table. Therefore, if you run the **VACUUM** command, change data feed data is also deleted.

In [None]:
mssparkutils.fs.ls("Tables/cdf_demo/_change_data")

# Reading data changes

You can provide either version or timestamp for the start and end. The start and end versions and timestamps are inclusive in the queries. To read the changes from a particular start version to the latest version of the table, specify only the starting version or timestamp.

If you provide a version lower or timestamp older than one that has recorded change events, that is, when the change data feed was enabled, an error is thrown indicating that the change data feed was not enabled.

In [None]:
display(delta_info.history())

## Read by version 

If you try to read version 0 you will get and error:

<mark>_AnalysisException: Error getting change data for range [0 , 5] as change data was not recorded for version [0]. If you've enabled change data feed on this table, use `DESCRIBE HISTORY` to see when it was first enabled_</mark>

In [None]:
changes = spark \
  .read \
  .format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", "0") \
  .table("demo.cdf_demo")

display(changes)

In [None]:
changes = spark \
  .read \
  .format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", "1") \
  .table("demo.cdf_demo")

display(changes)

In [None]:
changes = spark \
  .read \
  .format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", "1") \
    .option("endingVersion", "2") \
  .table("demo.cdf_demo")

display(changes)

> OR

In [None]:
%%sql

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('demo.cdf_demo', 2 ,3) 

## Read by timestamp

In [None]:
display(delta_info.history())

> Get the **timestamp for version 1** and _change_ the below commands

In [None]:
changes = spark \
  .read \
  .format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2024-02-17 12:15:16.665") \
  .table("demo.cdf_demo")
display(changes)

> Get the **timestamp for version 1 and version 2** and _change_ the below commands

In [None]:
changes = spark \
  .read \
  .format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", "2024-02-17 12:15:16.665") \
  .option("endingTimestamp",   "2024-02-17 12:15:16.665") \
  .table("demo.cdf_demo")
display(changes)

> OR

In [None]:
%%sql
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('demo.cdf_demo', '2024-02-17 12:15:16.665' , '2024-02-17 12:15:16.665')

# Clean up

In [None]:
spark.sql("DROP TABLE demo.cdf_demo")