
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning">
</div>


# Processing Records from Delta Change Data Feed

In this notebook, we'll demonstrate an end-to-end of how you can easily propagate changes through a Lakehouse with Delta Lake Change Data Feed (CDF).

For this demo, we'll work with a slightly different dataset representing patient information for medical records. Descriptions of the data at various stages follow.

### Bronze Table
Here we store all records as consumed. A row represents:
1. A new patient providing data for the first time
1. An existing patient confirming that their information is still correct
1. An existing patient updating some of their information

The type of action a row represents is not captured.

### Silver Table
This is the validated view of our data. Each patient will appear only once in this table. An upsert statement will be used to identify rows that have changed.

### Gold Table
For this example, we'll create a simple gold table that captures patients that have a new address.

## Learning Objectives
By the end of this lesson, students will be able to:
- Enable Change Data Feed on a cluster or for a particular table
- Describe how changes are recorded
- Read CDF output with Spark SQL or PySpark
- Refactor ELT code to process CDF output



### Setup

The following code defines some paths, a demo database, and clears out previous runs of the demo.

It also defines another data factory that we'll use to land raw data in our source directory, allowing us to process new records as if they were arriving in production.

In [0]:
%run ./Includes/Classroom-Setup-03.4

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


Resetting the learning environment (cdf_demo):
| No action taken

Skipping install of existing datasets to "dbfs:/mnt/dbacademy-datasets/data-engineer-learning-path/v04"

Validating the locally installed datasets:
| listing local files...(6 seconds)
| validation completed...(6 seconds total)

Creating & using the schema "labuser9308469_1740138925_77gg_da_adewd_cdf_demo" in the catalog "hive_metastore"...(0 seconds)

Predefined tables in "labuser9308469_1740138925_77gg_da_adewd_cdf_demo":
| -none-

Predefined paths variables:
| DA.paths.working_dir:   dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo
| DA.paths.user_db:       dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo/database.db
| DA.paths.datasets:      dbfs:/mnt/dbacademy-datasets/data-engineer-learning-path/v04
| DA.paths.checkpoints:   dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum






Enable CDF using Spark conf setting in a notebook or on a cluster will ensure it's used on all newly created Delta tables in that scope.

In [0]:
spark.conf.set("spark.databricks.delta.properties.defaults.enableChangeDataFeed", True)


## Ingest Data with Auto Loader

Here we'll use Auto Loader to ingest data as it arrives.

The steps below include:
* Declaring the target table
* Creating & starting the stream
* Load some data into our source directory



Create the bronze table.

In [0]:
%sql
CREATE TABLE IF NOT EXISTS bronze
  (mrn BIGINT, dob DATE, sex STRING, gender STRING, first_name STRING, last_name STRING, street_address STRING, zip BIGINT, city STRING, state STRING, updated timestamp) 
LOCATION '${DA.paths.working_dir}/bronze'



Create and start the stream.

For this example, we will:
* Use continuous processing as opposed to trigger-once or trigger-available-now
* Specify the schema as opposed to inferring it

In [0]:
schema = "mrn BIGINT, dob DATE, sex STRING, gender STRING, first_name STRING, last_name STRING, street_address STRING, zip BIGINT, city STRING, state STRING, updated TIMESTAMP"

bronze_query = (spark.readStream
                     .format("cloudFiles")
                     .option("cloudFiles.format", "json")
                     .schema(schema)
                     .load(DA.paths.cdc_stream)
                     .writeStream
                     .format("delta")
                     .outputMode("append")
                     #.trigger(availableNow=True)
                     .trigger(processingTime='5 seconds')
                     .option("checkpointLocation", f"{DA.paths.checkpoints}/bronze")
                     .table("bronze"))

DA.block_until_stream_is_ready(bronze_query)

Processed 0 of 2 batches...
Processed 1 of 2 batches...
Processed 1 of 2 batches...
Processed 2 of 2 batches...
The stream is now active with 2 batches having been processed.




Expand the stream monitor above to see the progress of your stream. 

Use the cell below to land a batch of data; you should see these records processed as a single batch.

In [0]:
DA.cdc_stream.load()

Loading batch #1 to the stream...Loaded 829 records



## Create a Target Table

Here we use **`DEEP CLONE`** to move read-only data from PROD to our DEV environment (where we have full write/delete access).

In [0]:
%sql
CREATE TABLE silver
DEEP CLONE delta.`${DA.paths.silver_source}`
LOCATION '${DA.paths.user_db}/silver'

source_table_size,source_num_of_files,num_removed_files,num_copied_files,removed_files_size,copied_files_size
117357,1,0,1,0,117357





Tables that were not created with CDF enabled will not have it turned on by default, but can be altered to capture changes with the following syntax.

Note that editing properties will version a table. 

Note: CDC data is **NOT** captured during **`CLONE`** operations.

In [0]:
%sql
ALTER TABLE silver 
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

In [0]:
%sql
DESCRIBE TABLE EXTENDED silver

col_name,data_type,comment
mrn,bigint,
dob,date,
sex,string,
gender,string,
first_name,string,
last_name,string,
street_address,string,
zip,bigint,
city,string,
state,string,





## Upsert Data with Delta Lake

Here we define upsert logic into the silver table using a streaming read against the bronze table, matching on our unique identifier **`mrn`**.

We specify an additional conditional check to ensure that a field in the data has changed before inserting the new record.

In [0]:
def upsert_to_delta(microBatchDF, batchId):
    microBatchDF.createOrReplaceTempView("updates")
    microBatchDF._jdf.sparkSession().sql("""
        MERGE INTO silver s
        USING updates u
        ON s.mrn = u.mrn
        WHEN MATCHED AND s.dob <> u.dob OR
                         s.sex <> u.sex OR
                         s.gender <> u.gender OR
                         s.first_name <> u.first_name OR
                         s.last_name <> u.last_name OR
                         s.street_address <> u.street_address OR
                         s.zip <> u.zip OR
                         s.city <> u.city OR
                         s.state <> u.state OR
                         s.updated <> u.updated
            THEN UPDATE SET *
        WHEN NOT MATCHED
            THEN INSERT *
    """)

In [0]:
query = (spark.readStream
              .table("bronze")
              .writeStream
              .foreachBatch(upsert_to_delta)
              .trigger(availableNow=True)
            #   .trigger(processingTime='5 seconds')
              .start())

DA.block_until_stream_is_ready(query)

Processed 0 of 2 batches...
Processed 0 of 2 batches...
Processed 0 of 2 batches...
Processed 1 of 2 batches...
The query is no longer active...
The stream is now active with 1 batches having been processed.



We have an additional metadata directory nested in our table directory, *_change_data*, where Databricks records change data for **`UPDATE`**, **`DELETE`**, and **`MERGE`** operations. Some operations, such as insert-only operations and full partition deletes, do not generate data in this directory because Databricks can efficiently compute the change data feed directly from the transaction log.

If you do not see this directory present, it's likely because none of the applicable operations have been performed on the table yet; wait a moment and try again.

In [0]:
files = dbutils.fs.ls(f"{DA.paths.user_db}/silver")
display(files)

path,name,size,modificationTime
dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo/database.db/silver/_change_data/,_change_data/,0,1740145593782
dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo/database.db/silver/_delta_log/,_delta_log/,0,1740145593782
dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo/database.db/silver/part-00000-35e161b1-d9f9-4c97-b9ae-62979fce820f.c000.snappy.parquet,part-00000-35e161b1-d9f9-4c97-b9ae-62979fce820f.c000.snappy.parquet,21526,1740145587000
dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo/database.db/silver/part-00000-957a4ac4-037b-4932-a3ba-8f8c42dddf5b-c000.snappy.parquet,part-00000-957a4ac4-037b-4932-a3ba-8f8c42dddf5b-c000.snappy.parquet,93636,1740145586000
dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo/database.db/silver/part-00000-aa5a5e6a-162e-480a-9933-6e0ca0ba6070-c000.snappy.parquet,part-00000-aa5a5e6a-162e-480a-9933-6e0ca0ba6070-c000.snappy.parquet,117357,1740145562000
dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo/database.db/silver/part-00001-779130ac-1ef0-49c9-9148-46674cad7e09.c000.snappy.parquet,part-00001-779130ac-1ef0-49c9-9148-46674cad7e09.c000.snappy.parquet,18122,1740145587000
dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo/database.db/silver/part-00002-c2a28a91-4ac0-4398-818d-3a6d1f7720be.c000.snappy.parquet,part-00002-c2a28a91-4ac0-4398-818d-3a6d1f7720be.c000.snappy.parquet,16275,1740145587000
dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo/database.db/silver/part-00003-3356864f-86d9-4263-a3e0-c84a8acf4893.c000.snappy.parquet,part-00003-3356864f-86d9-4263-a3e0-c84a8acf4893.c000.snappy.parquet,10870,1740145587000



We can see this directory also contains parquet files.

**NOTE:** If this operation fails, it's likely because none of the applicable operations that populate the *_change_data* directory have been performed on the table yet; wait a moment and try again.

In [0]:
files = dbutils.fs.ls(f"{DA.paths.user_db}/silver/_change_data")
display(files)

path,name,size,modificationTime
dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo/database.db/silver/_change_data/cdc-00000-ccbd1255-64b6-4e60-8bc3-8b0243b0336f.c000.snappy.parquet,cdc-00000-ccbd1255-64b6-4e60-8bc3-8b0243b0336f.c000.snappy.parquet,26472,1740145587000
dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo/database.db/silver/_change_data/cdc-00001-3044917a-2c23-49d5-9b15-5d0dff702d4f.c000.snappy.parquet,cdc-00001-3044917a-2c23-49d5-9b15-5d0dff702d4f.c000.snappy.parquet,21949,1740145587000
dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo/database.db/silver/_change_data/cdc-00002-99ebe887-574c-4c66-afa6-8f1e57b1a31b.c000.snappy.parquet,cdc-00002-99ebe887-574c-4c66-afa6-8f1e57b1a31b.c000.snappy.parquet,19684,1740145587000
dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo/database.db/silver/_change_data/cdc-00003-4aaa4cb2-981d-4b84-9170-a542803e965c.c000.snappy.parquet,cdc-00003-4aaa4cb2-981d-4b84-9170-a542803e965c.c000.snappy.parquet,13309,1740145587000




## Read the Change Data Feed

To pick up the recorded CDC data, we add two options:
- **`readChangeData`**
- **`startingVersion`** (can use **`startingTimestamp`** instead)

Here we'll do a streaming display of just those patients in LA. Note that users with changes have two records present.

In [0]:
cdc_df = (spark.readStream
               .format("delta")
               .option("readChangeData", True)
               .option("startingVersion", 1)
               .table("silver"))

cdc_la_df = cdc_df.filter("city = 'Los Angeles'")

display(cdc_la_df, streamName = "display_la")
DA.block_until_stream_is_ready("display_la")

mrn,dob,sex,gender,first_name,last_name,street_address,zip,city,state,updated,_change_type,_commit_version,_commit_timestamp
53225168,1936-12-09,F,F,Mary,Perry,735 Allen Spring,90048,Los Angeles,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-02-21T13:46:28Z
53225168,1936-12-09,F,F,Mary,Perry,735 Allen Spring,90048,Los Angeles,CA,2019-12-20T22:47:47.062Z,update_postimage,2,2025-02-21T13:46:28Z
25883635,1984-04-01,F,F,Madison,Merritt,84140 Adams Fort Suite 660,90045,Los Angeles,CA,2019-12-20T18:39:54.276Z,update_postimage,2,2025-02-21T13:46:28Z
84106259,1943-05-19,M,M,David,Owens,96701 Buchanan Port Suite 728,90067,Los Angeles,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-02-21T13:46:28Z
84106259,1943-05-19,M,M,David,Owens,96701 Buchanan Port Suite 728,90067,Los Angeles,CA,2019-12-20T16:41:17.198Z,update_postimage,2,2025-02-21T13:46:28Z
65575001,1965-07-07,M,M,Gregory,Adkins,0164 Hartman Camp Suite 432,90061,Los Angeles,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-02-21T13:46:28Z
65575001,1965-07-07,M,M,Gregory,Adkins,0164 Hartman Camp Suite 432,90061,Los Angeles,CA,2019-12-20T16:35:55.876Z,update_postimage,2,2025-02-21T13:46:28Z
35919894,1930-05-01,F,F,Melinda,Phillips,0966 Price Alley,90095,Los Angeles,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-02-21T13:46:28Z
35919894,1930-05-01,F,F,Melinda,Phillips,0966 Price Alley,90095,Los Angeles,CA,2019-12-20T10:37:25.996Z,update_postimage,2,2025-02-21T13:46:28Z
84458294,1918-09-28,M,M,Joshua,Cardenas,108 Bryan Springs,90049,Los Angeles,CA,2010-01-01T00:00:00Z,update_preimage,2,2025-02-21T13:46:28Z




If we land another file in our source directory and wait a few seconds, we'll see that we now have captured CDC changes for multiple **`_commit_version`** (change the sort order of the **`_commit_version`** column in the display above to see this).

In [0]:
DA.cdc_stream.load()

Loading batch #2 to the stream...Loaded 956 records






## Gold Table
Our gold table will capture all of those patients that have a new address, and record this information alongside 2 timestamps: the time at which this change was made in our source system (currently labeled **`updated`**) and the time this was processed into our silver table (captured by the **`_commit_timestamp`** generated CDC field).

Within silver table CDC records:
- check for max **`_commit_version`** for each record
- if new version and address change, insert to gold table
- record **`updated_timestamp`** and **`processed_timestamp`**

#### Gold Table Schema
| field | type |
| --- | --- |
| mrn | long |
| new_street_address | string |
| new_zip | long |
| new_city | string |
| new_state | string |
| old_street_address | string |
| old_zip | long |
| old_city | string |
| old_state | string |
| updated_timestamp | timestamp |
| processed_timestamp | timestamp |

In [0]:
%sql
CREATE TABLE gold (mrn BIGINT,
                   new_street_address STRING,
                   new_zip BIGINT,
                   new_city STRING,
                   new_state STRING,
                   old_street_address STRING,
                   old_zip BIGINT,
                   old_city STRING,
                   old_state STRING,
                   updated_timestamp TIMESTAMP,
                   processed_timestamp TIMESTAMP)
USING DELTA
LOCATION '${DA.paths.working_dir}/gold'


Note that we are using a table that has updates written to it as a streaming source! 

This is a **huge** value add, and something that historically has required extensive workarounds to process correctly.

In [0]:
silver_stream_df = (spark.readStream
                         .format("delta")
                         .option("readChangeData", True)
                         .option("startingVersion", 1)
                         .table("silver"))


Our **`_change_type`** field lets us easily distinguish valid and invalid records.

New valid rows will have the **`update_postimage`** or **`insert`** label.
New invalid rows will have the **`update_preimage`** or **`delete`** label. 

(**NOTE**: We'll demonstrate logic for propagating deletes a little later)

In the cell below, we'll define two queries against our streaming source to perform a stream-stream merge on our data.

In [0]:
from pyspark.sql import functions as F

new_df = (silver_stream_df
         .filter(F.col("_change_type").isin(["update_postimage", "insert"]))
         .selectExpr("mrn",
                     "street_address AS new_street_address",
                     "zip AS new_zip",
                     "city AS new_city",
                     "state AS new_state",
                     "updated AS updated_timestamp",
                     "_commit_timestamp AS processed_timestamp"))
                                                                                         
old_df = (silver_stream_df
         .filter(F.col("_change_type").isin(["update_preimage"]))
         .selectExpr("mrn",
                     "street_address AS old_street_address",
                     "zip AS old_zip",
                     "city AS old_city",
                     "state AS old_state",
                     "_commit_timestamp AS processed_timestamp"))



Assuming that we have properly deduplicated our data to ensure that only a single record for our **`mrn`** can be processed to our silver table, **`mrn`** and **`_commit_timestamp`** (aliased to **`processed_timestamp`** here) serve as a unique composite key.

Our join will allow us to match up the current and previous states of our data to track all changes.

This table could drive further downstream processes, such as triggering confirmation emails or automatic mailings for patients with updated addresses.

Our CDC data arrives as a stream, so only newly changed data at the silver level will be processed. Therefore, we can write to our gold table in append mode and maintain the grain of our data.

In [0]:
query = (new_df.withWatermark("processed_timestamp", "3 minutes")
               .join(old_df, ["mrn", "processed_timestamp"], "left")
               .filter("new_street_address <> old_street_address OR old_street_address IS NULL")
               .writeStream
               .outputMode("append")
               #.trigger(availableNow=True)
               .trigger(processingTime="5 seconds")
               .option("checkpointLocation", f"{DA.paths.checkpoints}/gold")
               .table("gold"))

DA.block_until_stream_is_ready(query)

Processed 0 of 2 batches...
Processed 0 of 2 batches...
Processed 0 of 2 batches...
Processed 0 of 2 batches...
Processed 0 of 2 batches...
Processed 0 of 2 batches...
Processed 0 of 2 batches...
Processed 0 of 2 batches...
Processed 0 of 2 batches...
Processed 0 of 2 batches...
Processed 0 of 2 batches...
Processed 1 of 2 batches...
Processed 1 of 2 batches...
Processed 2 of 2 batches...
The stream is now active with 2 batches having been processed.




Note the number of rows in our gold table.

In [0]:
%sql
SELECT * FROM gold

mrn,new_street_address,new_zip,new_city,new_state,old_street_address,old_zip,old_city,old_state,updated_timestamp,processed_timestamp
93394828,417 Griffin Avenue Apt. 481,91340,San Fernando,CA,00704 Garner Road,90639,La Mirada,CA,2019-12-31T15:54:55.317Z,2025-02-21T13:46:28Z
71749426,987 Wolf Village,90095,Los Angeles,CA,1875 Jill Roads Suite 352,91030,South Pasadena,CA,2019-12-27T07:21:57.263Z,2025-02-21T13:46:28Z
45837216,1338 Navarro Glen Apt. 460,90755,Signal Hill,CA,48036 Benjamin Stravenue Apt. 275,93552,Palmdale,CA,2019-12-05T16:49:03.256Z,2025-02-21T13:46:28Z
98161270,578 Teresa Vista Apt. 068,91740,Glendora,CA,63553 Rosales Parks,90019,Los Angeles,CA,2019-12-10T16:28:37.009Z,2025-02-21T13:46:28Z
18018581,0057 Michelle Forest Apt. 352,91012,La Canada Flintridge,CA,576 Booth Street,91208,Glendale,CA,2019-12-01T01:37:13.706Z,2025-02-21T13:46:28Z
20085918,67672 Erin Burgs Suite 172,90746,Carson,CA,73573 Derrick Drives,91380,Santa Clarita,CA,2019-12-20T17:19:21.59Z,2025-02-21T13:46:28Z
71616880,858 Nicholas Alley Suite 628,90038,Los Angeles,CA,5184 Robinson Highway Apt. 821,91343,North Hills,CA,2019-12-12T16:08:30.92Z,2025-02-21T13:46:28Z
14125426,527 Matthew Rapids,91732,El Monte,CA,295 Ricky Garden Suite 364,90270,Maywood,CA,2019-12-11T19:07:30.303Z,2025-02-21T13:46:28Z
53201317,383 Sean Dam,90094,Playa Vista,CA,72179 Christian Well Apt. 990,91201,Glendale,CA,2019-12-26T00:30:48.962Z,2025-02-21T13:46:28Z
23674125,8809 Liu Glen Suite 523,90091,Los Angeles,CA,836 Caldwell Walk,90013,Los Angeles,CA,2019-12-30T06:46:04.089Z,2025-02-21T13:46:28Z





If we land a new raw file and wait a few seconds, we can see that all of our changes have propagated through our pipeline.

(This assumes you're using **`processingTime`** instead of trigger-once or trigger-available-now processing. Scroll up to the gold table streaming write to wait for a new peak in the processing rate to know your data has arrived.)

In [0]:
DA.cdc_stream.load()

Loading batch #3 to the stream...Loaded 918 records





You should be able to see a jump in the number of records in your gold table.

In [0]:
%sql
SELECT * FROM gold

mrn,new_street_address,new_zip,new_city,new_state,old_street_address,old_zip,old_city,old_state,updated_timestamp,processed_timestamp
93394828,417 Griffin Avenue Apt. 481,91340,San Fernando,CA,00704 Garner Road,90639,La Mirada,CA,2019-12-31T15:54:55.317Z,2025-02-21T13:46:28Z
71749426,987 Wolf Village,90095,Los Angeles,CA,1875 Jill Roads Suite 352,91030,South Pasadena,CA,2019-12-27T07:21:57.263Z,2025-02-21T13:46:28Z
45837216,1338 Navarro Glen Apt. 460,90755,Signal Hill,CA,48036 Benjamin Stravenue Apt. 275,93552,Palmdale,CA,2019-12-05T16:49:03.256Z,2025-02-21T13:46:28Z
98161270,578 Teresa Vista Apt. 068,91740,Glendora,CA,63553 Rosales Parks,90019,Los Angeles,CA,2019-12-10T16:28:37.009Z,2025-02-21T13:46:28Z
18018581,0057 Michelle Forest Apt. 352,91012,La Canada Flintridge,CA,576 Booth Street,91208,Glendale,CA,2019-12-01T01:37:13.706Z,2025-02-21T13:46:28Z
20085918,67672 Erin Burgs Suite 172,90746,Carson,CA,73573 Derrick Drives,91380,Santa Clarita,CA,2019-12-20T17:19:21.59Z,2025-02-21T13:46:28Z
71616880,858 Nicholas Alley Suite 628,90038,Los Angeles,CA,5184 Robinson Highway Apt. 821,91343,North Hills,CA,2019-12-12T16:08:30.92Z,2025-02-21T13:46:28Z
14125426,527 Matthew Rapids,91732,El Monte,CA,295 Ricky Garden Suite 364,90270,Maywood,CA,2019-12-11T19:07:30.303Z,2025-02-21T13:46:28Z
53201317,383 Sean Dam,90094,Playa Vista,CA,72179 Christian Well Apt. 990,91201,Glendale,CA,2019-12-26T00:30:48.962Z,2025-02-21T13:46:28Z
23674125,8809 Liu Glen Suite 523,90091,Los Angeles,CA,836 Caldwell Walk,90013,Los Angeles,CA,2019-12-30T06:46:04.089Z,2025-02-21T13:46:28Z




Make sure to run the following cell to stop all active streams.

In [0]:
for stream in spark.streams.active:
    stream.stop()
    stream.awaitTermination()




## Propagating Deletes

While some use cases may require processing deletes alongside updates and inserts, the most important delete requests are those that allow companies to maintain compliance with privacy regulations such as GDPR and CCPA. Most companies have stated SLAs around how long these requests will take to process, but for various reasons, these are often handled in pipelines separate from their core ETL.

Here, we should see a single user being deleted from our **`silver`** table.

In [0]:
%sql
DELETE FROM silver WHERE mrn = 14125426

num_affected_rows
1




As expected, when we try to locate this user in our **`silver`** table, we'll get no result.

In [0]:
%sql
SELECT * FROM silver WHERE mrn = 14125426

mrn,dob,sex,gender,first_name,last_name,street_address,zip,city,state,updated



This change has been captured in our Change Data Feed.

In [0]:
%sql
SELECT * 
FROM table_changes("silver", 3)
WHERE mrn = 14125426
ORDER BY _commit_version

mrn,dob,sex,gender,first_name,last_name,street_address,zip,city,state,updated,_change_type,_commit_version,_commit_timestamp
14125426,1955-10-24,F,F,Miranda,Cordova,527 Matthew Rapids,91732,El Monte,CA,2019-12-11T19:07:30.303Z,delete,3,2025-02-21T13:48:13Z



Because we have a record of this delete action, we can define logic that propagates deletes to our **`gold`** table.

In [0]:
%sql
WITH deletes AS (
  SELECT mrn
  FROM table_changes("silver", 3)
  WHERE _change_type='delete'
)

MERGE INTO gold g
USING deletes d
ON d.mrn=g.mrn
WHEN MATCHED
  THEN DELETE

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1,0,1,0




This drastically simplifies deleting user data, and allows the keys and logic used in your ETL to also be used for propagating delete requests.

In [0]:
%sql
SELECT * FROM gold WHERE mrn = 14125426

mrn,new_street_address,new_zip,new_city,new_state,old_street_address,old_zip,old_city,old_state,updated_timestamp,processed_timestamp



Run the following cell to delete the tables and files associated with this lesson.

In [0]:
DA.cleanup()

Resetting the learning environment (cdf_demo):
| dropping the schema "labuser9308469_1740138925_77gg_da_adewd_cdf_demo"...(2 seconds)
| removing the working directory "dbfs:/mnt/dbacademy-users/labuser9308469_1740138925@vocareum.com/advanced-data-engineering-with-databricks/cdf_demo"...(3 seconds)

Validating the locally installed datasets:
| listing local files...(6 seconds)
| validation completed...(6 seconds total)



&copy; 2024 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the 
<a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/><a href="https://databricks.com/privacy-policy">Privacy Policy</a> | 
<a href="https://databricks.com/terms-of-use">Terms of Use</a> | 
<a href="https://help.databricks.com/">Support</a>