
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning">
</div>





# Propagating Changes with CDF Lab

We'll be using [Change Data Feed](https://docs.databricks.com/en/delta/delta-change-data-feed.html) to propagate changes to many tables from a single source.

For this lab, we'll work with the fitness tracker datasets to propagate changes through a Lakehouse with Delta Lake Change Data Feed (CDF).

Because the **user_lookup** table links identifying information between different pipelines, we'll make this the point where changes propagate from.


## Objectives
By the end of this lab, you should be able to:
- Enable Change Data Feed on a particular table
- Read CDF output with Spark SQL or PySpark
- Refactor ELT code to process CDF output

## REQUIRED - SELECT CLASSIC COMPUTE

Before executing cells in this notebook, please select your classic compute cluster in the lab. Be aware that **Serverless** is enabled by default.

Follow these steps to select the classic compute cluster:

1. Navigate to the top-right of this notebook and click the drop-down menu to select your cluster. By default, the notebook will use **Serverless**.

1. If your cluster is available, select it and continue to the next cell. If the cluster is not shown:

  - In the drop-down, select **More**.

  - In the **Attach to an existing compute resource** pop-up, select the first drop-down. You will see a unique cluster name in that drop-down. Please select that cluster.

**NOTE:** If your cluster has terminated, you might need to restart it in order to select it. To do this:

1. Right-click on **Compute** in the left navigation pane and select *Open in new tab*.

1. Find the triangle icon to the right of your compute cluster name and click it.

1. Wait a few minutes for the cluster to start.

1. Once the cluster is running, complete the steps above to select your cluster.

## A. Classroom Setup

Run the following cell to configure your working environment for this course. It will also set your default catalog to your unique catalog name and the schema to your specific schema name shown below using the `USE` statements.
<br></br>


```
USE CATALOG your-catalog;
USE SCHEMA your-catalog.pii_data;
```

**NOTE:** The `DA` object is only used in Databricks Academy courses and is not available outside of these courses. It will dynamically reference the information needed to run the course.

In [0]:
%run ./Includes/Classroom-Setup-1.4L

Check your default catalog and schema. Notice the lab uses a new schema named **cdf_lab**.

In [0]:
SELECT current_catalog(), current_schema()

## B. Show available tables

We'll focus on the **user_lookup** table with the intent to delete a user, check the CDF changes and then propagate such changes into the tables **users** and **user_bins**.

Run the `SHOW TABLES` command and confirm the following tables are available:

- **user_lookup**: Table containing user lookup information.
- **users**: Table containing detailed user information.
- **user_bins**: Table containing user bin assignments.

In [0]:
SHOW TABLES;


## C. Review the user_lookup table data

Run the following query to view the **user_lookup** table. Notice that the table contains an **alt_id** column, as well as **device_id**, **mac_address**, and **user_id**.


In [0]:
SELECT *
FROM user_lookup
LIMIT 10;


## D. Enable CDF for the user_lookup table

Enable change data feed on the **user_lookup** table.

**HINT:** Use the `ALTER TABLE` and `SET TBLPROPERTIES` to activate CDF. View the [Enable change data feed](https://docs.databricks.com/en/delta/delta-change-data-feed.html#enable-change-data-feed) documentation for more information.


In [0]:
ALTER TABLE user_lookup                              -- Table Name
SET TBLPROPERTIES (delta.enableChangeDataFeed=True)  -- property = True


## E. Confirm CDF is enabled on user_lookup table

**HINT**: Use the `DESCRIBE TABLE EXTENDED` statement to view information about the **user_lookup** table. Scroll to the bottom of the results and view the *Table Properties* row. Confirm that this table has `delta.enableChangeDataFeed` activated.



In [0]:
DESCRIBE TABLE EXTENDED user_lookup;


## F. View user_lookup table History

Enabling CDF generates a new entry in the table's history.


View the history of the **user_lookup** table. Confirm the entry is in the history of the **user_lookup** table as version `1` and that the **operation** column has updated the *SET TBLPROPERTIES*. 


In [0]:
DESCRIBE HISTORY user_lookup;


## G. Delete a record from user_lookup table



### G1. Query and pick a record from the User Lookup table

Run the query below and pick a **user_id** value to use in the next section. Notice that there are 100 records in the **user_lookup** table.

In [0]:
SELECT * 
FROM user_lookup

### G2. Delete a record from User Lookup table

To delete record from the table pick a value from the **device_id** column from above to delete. Then complete the following:

- Place the picked **user_id** value after the `DEFAULT` keyword in the `DECLARE OR REPLACE VARIABLE` statement below. This will save the value into a variable named **user_id** that will be used across this lab.
- Use `DELETE` statement with column_name of table
- Run the cell below to proceed with the record's deletion from the table. 
- Confirm a row was deleted.

In [0]:

-- -- Define a variable for user_id to delete
DECLARE OR REPLACE VARIABLE user_id INT DEFAULT 11745; -- Place your user_id here

-- -- Delete user with user_id equals to variable
DELETE FROM user_lookup 
WHERE user_id = session.user_id;

### G3. Confirm the record is deleted from the User Lookup table

Run the query below to confirm the selected **user_id** was deleted from the **user_lookup** table. Confirm no rows are returned.

In [0]:
SELECT * 
FROM user_lookup
WHERE user_id=session.user_id;

Run the query below and notice that the table now holds *99* records.

In [0]:
SELECT count(*)
FROM user_lookup;

## H. Review user_looup table History

Now that our record is deleted, a new entry in the **user_lookup** table history is added corresponding to this event. Let's get a view to this table's history. 

  Run the query and confirm version **2** and that the **operation** column is *DELETE*.

In [0]:
DESCRIBE HISTORY user_lookup;


## I. Read the user_lookup CDF output (starting from version 1)

This section we'll proceed to read CDF data from a the **user_lookup** table, well apply two approaches:
- Using SQL via `table_changes` function
- Using Python

### I1. Using SQL to View Row Level Changes

To read the CDF data:

- Using [table_changes](https://docs.databricks.com/en/sql/language-manual/functions/table_changes.html) function
- Select all changes from version **1** of the **user_lookup** table

Confirm that one row is returned in the output. The row should be the row you deleted in the earlier step.

**HINT:** [Use Delta Lake change data feed on Databricks](https://docs.databricks.com/en/delta/delta-change-data-feed.html#use-delta-lake-change-data-feed-on-databricks) documentation.

In [0]:
-- -- USING SQL
SELECT * 
FROM table_changes("user_lookup", 1);

### I2. (OPTIONAL) Using Python to View Row Level Changes 

To read the CDF data:
- Read the **user_lookup** table
- Configure the stream to enable reading change data with the `readChangeData` option
- Configure the stream to start reading from version **1** with `startingVersion` option

Confirm that one row is returned in the output. The row should be the row you deleted in the earlier step.

**HINT:** [Use Delta Lake change data feed on Databricks](https://docs.databricks.com/en/delta/delta-change-data-feed.html#use-delta-lake-change-data-feed-on-databricks) documentation.

In [0]:
%python
user_lookup_df = (spark
                  .read
                  .format("delta")
                  .option("readChangeData", True) # Reading Change Data
                  .option("startingVersion", 1)  # Start Reading Version
                  .table("user_lookup")
)

display(user_lookup_df)

## J. Propagate Deletes To Multiple Tables

### J1. Create a Temporary View of the user_lookup Table CDF Changes

Let's create a temporary view to hold the CDF changes of **user_lookup** table.

**HINTS**:
- Use `CREATE OR REPLACE TEMPORARY VIEW` name it **user_lookup_deletes_vw**
- Use `table_changes` and starting version of **2**
- Select all records in view where **_change_type** is *delete*
- Then display the results of the view. 
- Confirm one row is returned.

In [0]:
CREATE OR REPLACE TEMPORARY VIEW user_lookup_deletes_vw AS
SELECT * 
FROM table_changes('user_lookup',2)
WHERE _change_type ='delete';


-- -- Display the view
SELECT *
FROM user_lookup_deletes_vw;

### J2. Merge the Temporary View into **Users** Table

Merge using **user_lookup_deletes_vw** as source into **users** table and **DELETE** when **alt_id** gets matched.

The `MERGE` operation should return a value of *1* for **num_affected_rows** and **num_deleted_rows**.

**HINTS**:
- Use the [MERGE INTO](https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html) Syntax:
```
MERGE [ WITH SCHEMA EVOLUTION ] INTO target_table_name [target_alias]
   USING source_table_reference [source_alias]
   ON merge_condition
   { WHEN MATCHED [ AND matched_condition ] THEN matched_action |
     WHEN NOT MATCHED [BY TARGET] [ AND not_matched_condition ] THEN not_matched_action |
     WHEN NOT MATCHED BY SOURCE [ AND not_matched_by_source_condition ] THEN not_matched_by_source_action } [...]

matched_action
 { DELETE |
   UPDATE SET * |
   UPDATE SET { column = { expr | DEFAULT } } [, ...] }

not_matched_action
 { INSERT * |
   INSERT (column1 [, ...] ) VALUES ( expr | DEFAULT ] [, ...] )

not_matched_by_source_action
 { DELETE |
   UPDATE SET { column = { expr | DEFAULT } } [, ...] }

```

In [0]:
MERGE INTO users u
USING user_lookup_deletes_vw ul
ON u.alt_id = ul.alt_id
WHEN MATCHED
  THEN DELETE;

Run the query below to check the record count in the **users** table, notice this reflects record deleted propagated via the `MERGE_INTO` executed in the cell above.

In [0]:
SELECT count(*)
FROM users

### J3. Merge the Temporary View into the users_bin Table

Similarly, merge using **user_lookup_deletes_vw** as source into **user_bins** table and `DELETE` when **user_id** gets matched.

The `MERGE` operation should return a value of *1* for **num_affected_rows** and **num_deleted_rows**.

In [0]:
MERGE INTO users_bin ub
USING user_lookup_deletes_vw ul
ON ub.user_id = ul.user_id
WHEN MATCHED
  THEN DELETE;

Run the query below to check for the deleted record in the **users_bin** table by filtering and using the selected **user_id** saved in a variable. 

Notice there are no results confirming the delete as correctly propagated via the `MERGE INTO`.

In [0]:
SELECT *
FROM users_bin
WHERE user_id = session.user_id


&copy; 2025 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the 
<a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/><a href="https://databricks.com/privacy-policy">Privacy Policy</a> | 
<a href="https://databricks.com/terms-of-use">Terms of Use</a> | 
<a href="https://help.databricks.com/">Support</a>