<img src = "https://docs.delta.io/latest/_static/delta-lake-logo.png" width=600>

</br>
 ### Change Data Feed

The Delta change data feed represents row-level changes between versions of a Delta table. When enabled on a Delta table, the runtime records “change events” for all the data written into the table. This includes the row data along with metadata indicating whether the specified row was inserted, deleted, or updated.
</br>

<img src = "https://databricks.com/wp-content/uploads/2021/06/How-to-Simplify-CDC-with-Delta-Lakes-Change-Data-Feed-blog-image6.jpg" width=1000>

### Create a silver table that contains a list of addresses

We are going to use this table to simulate appends and updates commands that are  common for transactional workloads.

In [0]:
%sql
DROP DATABASE IF EXISTS cdf CASCADE;

CREATE DATABASE IF NOT EXISTS cdf;

USE cdf;

In [0]:
%sql DROP TABLE IF EXISTS cdf.silverTable;
CREATE TABLE cdf.silverTable(
  primaryKey int,
  address string,
  current boolean,
  effectiveDate string,
  endDate string
) USING DELTA

### Enable Change Data Feed on the Silver Table

This will allow us to capture the row level changes that are made to directly to the table.

In [0]:
%sql
ALTER TABLE cdf.silverTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

In [0]:
%sql
SHOW TBLPROPERTIES cdf.silverTable

key,value
Type,MANAGED
delta.minReaderVersion,1
delta.minWriterVersion,4
delta.enableChangeDataFeed,true


### Insert Data into Silver Table

In [0]:
%sql
INSERT into cdf.silverTable
select 11 primaryKey, "A new customer address" as address, true as current, "2021-10-27" as effectiveDate, null as endDate
union
select 12 primaryKey, "A different address" as address, true as current, "2021-10-27" as effectiveDate, null as endDate
union
select 13 primaryKey, "A another different address" as address, true as current, "2021-10-27" as effectiveDate, null as endDate;

SELECT * FROM cdf.silverTable

primaryKey,address,current,effectiveDate,endDate
11,A new customer address,True,2021-10-27,
13,A another different address,True,2021-10-27,
12,A different address,True,2021-10-27,


### Upsert new Data into Silver Table

This include an update to an existing address, and also a brand new address.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW updates
as
select 11 primaryKey, "A updated address" as address, true as current, "2021-10-27" as effectiveDate, null as endDate
union
select 99 primaryKey, "A completely new address" as address, true as current, "2021-10-27" as effectiveDate, null as endDate;

SELECT * FROM updates;

primaryKey,address,current,effectiveDate,endDate
11,A updated address,True,2021-10-27,
99,A completely new address,True,2021-10-27,


We want to merge the view into the silver table. Specifically if the address already exists we want to set the `endDate` of the old record to be the `effectiveDate` of the new address record and change the flag for the current column to `false`. We then want to append the new update address as a brand new row. For completely new addresses we want to insert this as a new row.

In [0]:
%sql 
MERGE INTO cdf.silverTable as original USING (
  select
    updates.primaryKey as merge,
    updates.*
  FROM
    updates
  UNION ALL
  SELECT
    null as merge,
    updates.*
  FROM
    updates
    INNER JOIN cdf.silverTable original on updates.primaryKey = original.primaryKey
  where
    original.current = true
) mergedupdates on original.primaryKey = mergedUpdates.merge
WHEN MATCHED
and original.current = true THEN
UPDATE
set
  current = false,
  endDate = mergedupdates.effectiveDate
  WHEN NOT MATCHED THEN
INSERT
  *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
3,1,0,2


In [0]:
%sql
select * from cdf.silverTable

primaryKey,address,current,effectiveDate,endDate
11,A updated address,True,2021-10-27,
11,A new customer address,False,2021-10-27,2021-10-27
12,A different address,True,2021-10-27,
13,A another different address,True,2021-10-27,
99,A completely new address,True,2021-10-27,


In [0]:
%sql
describe history cdf.silverTable

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
3,2022-03-26T00:16:51.000+0000,428915142038362,guanjie.shen@databricks.com,MERGE,"Map(predicate -> (original.`primaryKey` = mergedupdates.`merge`), matchedPredicates -> [{""predicate"":""(original.`current` = true)"",""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(4233158071160993),0320-235007-rnm6r0lz,2.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 2, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, executionTimeMs -> 4613, numTargetRowsInserted -> 2, scanTimeMs -> 1653, numTargetRowsUpdated -> 1, numOutputRows -> 5, numTargetChangeFilesAdded -> 1, numSourceRows -> 3, numTargetFilesRemoved -> 1, rewriteTimeMs -> 2872)",
2,2022-03-26T00:16:43.000+0000,428915142038362,guanjie.shen@databricks.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(4233158071160993),0320-235007-rnm6r0lz,1.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputBytes -> 1564, numOutputRows -> 3)",
1,2022-03-26T00:16:39.000+0000,428915142038362,guanjie.shen@databricks.com,SET TBLPROPERTIES,"Map(properties -> {""delta.enableChangeDataFeed"":""true""})",,List(4233158071160993),0320-235007-rnm6r0lz,0.0,SnapshotIsolation,True,Map(),
0,2022-03-26T00:16:36.000+0000,428915142038362,guanjie.shen@databricks.com,CREATE TABLE,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(4233158071160993),0320-235007-rnm6r0lz,,SnapshotIsolation,True,Map(),


In [0]:
%sql
select * from table_changes('cdf.silverTable',2,3) order by _commit_timestamp desc

primaryKey,address,current,effectiveDate,endDate,_change_type,_commit_version,_commit_timestamp
11,A updated address,True,2021-10-27,,insert,3,2022-03-26T00:16:51.000+0000
11,A new customer address,True,2021-10-27,,update_preimage,3,2022-03-26T00:16:51.000+0000
11,A new customer address,False,2021-10-27,2021-10-27,update_postimage,3,2022-03-26T00:16:51.000+0000
99,A completely new address,True,2021-10-27,,insert,3,2022-03-26T00:16:51.000+0000
11,A new customer address,True,2021-10-27,,insert,2,2022-03-26T00:16:43.000+0000
13,A another different address,True,2021-10-27,,insert,2,2022-03-26T00:16:43.000+0000
12,A different address,True,2021-10-27,,insert,2,2022-03-26T00:16:43.000+0000


In [0]:
%python
changes_df = spark.read.format("delta").option("readChangeData", True).option("startingVersion", 2).option("endingversion", 3).table('cdf.silverTable')
display(changes_df)

primaryKey,address,current,effectiveDate,endDate,_change_type,_commit_version,_commit_timestamp
11,A updated address,True,2021-10-27,,insert,3,2022-03-26T00:16:51.000+0000
11,A new customer address,True,2021-10-27,,update_preimage,3,2022-03-26T00:16:51.000+0000
11,A new customer address,False,2021-10-27,2021-10-27,update_postimage,3,2022-03-26T00:16:51.000+0000
99,A completely new address,True,2021-10-27,,insert,3,2022-03-26T00:16:51.000+0000
11,A new customer address,True,2021-10-27,,insert,2,2022-03-26T00:16:43.000+0000
13,A another different address,True,2021-10-27,,insert,2,2022-03-26T00:16:43.000+0000
12,A different address,True,2021-10-27,,insert,2,2022-03-26T00:16:43.000+0000


### Generate Gold table and propagate changes

In some cases we may not want to show each data at the transaction level, and want present to users a high level aggregate. In this case we can use CDF to make sure that the changes are propaged effieciently without having to merge large amounts of data

In [0]:
%sql DROP TABLE IF EXISTS cdf.goldTable;
CREATE TABLE cdf.goldTable(
  primaryKey int,
  address string
) USING DELTA;

In [0]:
%sql
-- Collect only the latest version for address
CREATE OR REPLACE TEMPORARY VIEW silverTable_latest_version as
SELECT * 
    FROM 
         (SELECT *, rank() over (partition by primaryKey order by _commit_version desc) as rank
          FROM table_changes('silverTable',2,3)
          WHERE _change_type ='insert')
    WHERE rank=1;
    
SELECT * FROM silverTable_latest_version

primaryKey,address,current,effectiveDate,endDate,_change_type,_commit_version,_commit_timestamp,rank
11,A updated address,True,2021-10-27,,insert,3,2022-03-26T00:16:51.000+0000,1
12,A different address,True,2021-10-27,,insert,2,2022-03-26T00:16:43.000+0000,1
13,A another different address,True,2021-10-27,,insert,2,2022-03-26T00:16:43.000+0000,1
99,A completely new address,True,2021-10-27,,insert,3,2022-03-26T00:16:51.000+0000,1


In [0]:
%sql
-- Merge the changes to gold
MERGE INTO cdf.goldTable t USING silverTable_latest_version s ON s.primaryKey = t.primaryKey
        WHEN MATCHED THEN UPDATE SET address = s.address
        WHEN NOT MATCHED THEN INSERT (primarykey, address) VALUES (s.primarykey, s.address)

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
4,0,0,4


In [0]:
%sql
SELECT * FROM cdf.goldTable

primaryKey,address
11,A updated address
12,A different address
13,A another different address
99,A completely new address


### Example that Combines Snapshots with Change Data Feed

#### Create an intial dataset and save this as a Delta table. 

This will be source table we'll use to propogate changes downstream.

In [0]:
%sql DROP TABLE IF EXISTS cdf.example_source;

In [0]:
countries = [("USA", 10000, 20000), ("India", 1000, 1500), ("UK", 7000, 10000), ("Canada", 500, 700) ]
columns = ["Country","NumVaccinated","AvailableDoses"]

spark.createDataFrame(data=countries, schema = columns).write \
                .format("delta") \
                .mode("overwrite") \
                .option("userMetadata", "Snapshot Example 1") \
                .saveAsTable("cdf.example_source") \

streaming_silverTable_df = spark.read.format("delta").table("cdf.example_source")
streaming_silverTable_df.show()

In [0]:
%sql
SET spark.databricks.delta.commitInfo.userMetadata =;
ALTER TABLE cdf.example_source SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

In [0]:
%sql
SET spark.databricks.delta.commitInfo.userMetadata =;
UPDATE cdf.example_source SET NumVaccinated = 1000, AvailableDoses = 200 WHERE COUNTRY = 'Canada';
UPDATE cdf.example_source SET NumVaccinated = 2000, AvailableDoses = 500 WHERE COUNTRY = 'India';

SELECT * FROM cdf.example_source

In [0]:
%sql
describe history cdf.example_source

Let's do a few more operations...

In [0]:
%sql
DELETE FROM cdf.example_source where Country = 'UK';
SELECT * FROM cdf.example_source;

In [0]:
%sql
SET spark.databricks.delta.commitInfo.userMetadata =;
INSERT into cdf.example_source
SELECT "France" Country, 7500 as NumVacinated, 5000 as AvailableDoses;
UPDATE cdf.example_source SET NumVaccinated = 1200, AvailableDoses = 0 WHERE COUNTRY = 'CANADA';

SELECT * FROM cdf.example_source

In [0]:
%sql
SET spark.databricks.delta.commitInfo.userMetadata =Snapshot Example 2;
INSERT into cdf.example_source
SELECT "Mexico" Country, 2000 as NumVacinated, 1000 as AvailableDoses;

SELECT * FROM cdf.example_source

#### Let's set up what the workflow might look like for a consumer.
This will first retrieve a point in time snapshot of the source table, then starts subscribing to incremental updates using Spark Structured Streaming and CDF.

# Cleanup

In [0]:
%sql
DROP TABLE IF EXISTS cdf.example_source;
DROP TABLE IF EXISTS cdf.example_sink;