-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Processing Records from Delta Change Data Feed

In this notebook, we'll demonstrate an end-to-end of how you can easily propagate changes through a Lakehouse with Delta Lake Change Data Feed (CDF).

For this demo, we'll work with a slightly different dataset representing patient information for medical records. Descriptions of the data at various stages follow.

### Bronze Table
Here we store all records as consumed. A row represents:
1. A new patient providing data for the first time
1. An existing patient confirming that their information is still correct
1. An existing patient updating some of their information

The type of action a row represents is not captured.

### Silver Table
This is the validated view of our data. Each patient will appear only once in this table. An upsert statement will be used to identify rows that have changed.

### Gold Table
For this example, we'll create a simple gold table that captures patients that have a new address.

## Learning Objectives
By the end of this lesson, students will be able to:
- Enable Change Data Feed on a cluster or for a particular table
- Describe how changes are recorded
- Read CDF output with Spark SQL or PySpark
- Refactor ELT code to process CDF output

### Setup

The following code defines some paths, a demo database, and clears out previous runs of the demo.

It also defines a variable `Raw` that we'll use to land raw data in our source directory, allowing us to process new records as if they were arriving in production.

In [0]:
%run ../Includes/cdc-setup $mode="reset"

Enable CDF using Spark conf setting in a notebook or on a cluster will ensure it's used on all newly created Delta tables in that scope.

In [0]:
spark.conf.set('spark.databricks.delta.properties.defaults.enableChangeDataFeed',True)

Confirm source directory is empty.

In [0]:
try:
    dbutils.fs.ls(Raw.userdir)
except Exception as e:
    print(e)

## Ingest Data with Auto Loader

Here we'll use Auto Loader to ingest data as it arrives.

The logic below is set up to either use trigger once to process all records loaded so far, or to continuously process records as they arrive.

We'll turn on continuous processing.

In [0]:
schema = "mrn BIGINT, dob DATE, sex STRING, gender STRING, first_name STRING, last_name STRING, street_address STRING, zip BIGINT, city STRING, state STRING, updated timestamp"

bronzePath = f"{userhome}/bronze"

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS bronze
    (mrn BIGINT, dob DATE, sex STRING, gender STRING, first_name STRING, last_name STRING, street_address STRING, zip BIGINT, city STRING, state STRING, updated timestamp) 
    LOCATION '{bronzePath}'
""")

(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .schema(schema)
    .load(Raw.userdir)
    .writeStream
    .format("delta")
    .outputMode("append")
#     .trigger(once=True)
    .trigger(processingTime='5 seconds')
    .option("checkpointLocation", userhome + "/_bronze_checkpoint")
    .table("bronze"))

Expand the stream monitor above to see the progress of your stream. No files should have been ingestedl.

Use the cell below to land a batch of data and list files in the source; you should see these records processed as a batch.

In [0]:
Raw.arrival()
dbutils.fs.ls(Raw.userdir)

## Create a Target Table

Here we use `DEEP CLONE` to move read-only data from PROD to our DEV environment (where we have full write/delete access).

In [0]:
silverPath = userhome + "/silver"

spark.sql(f"""
    CREATE TABLE silver
    DEEP CLONE delta.`{URI}/pii/silver`
    LOCATION '{silverPath}'
""")

Tables that were not created with CDF enabled will not have it turned on by default, but can be altered to capture changes with the following syntax.

Note that editing properties will version a table. Also note that no CDC is captured during the CLONE operation above.

In [0]:
%sql
ALTER TABLE silver SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

## Upsert Data with Delta Lake

Here we define upsert logic into the silver table using a streaming read against the bronze table, matching on our unique identifier `mrn`.

We specify an additional conditional check to ensure that a field in the data has changed before inserting the new record.

In [0]:
def upsertToDelta(microBatchDF, batchId):
    microBatchDF.createOrReplaceTempView("updates")
    microBatchDF._jdf.sparkSession().sql("""
        MERGE INTO silver s
        USING updates u
        ON s.mrn = u.mrn
        WHEN MATCHED AND s.dob <> u.dob OR
                         s.sex <> u.sex OR
                         s.gender <> u.gender OR
                         s.first_name <> u.first_name OR
                         s.last_name <> u.last_name OR
                         s.street_address <> u.street_address OR
                         s.zip <> u.zip OR
                         s.city <> u.city OR
                         s.state <> u.state OR
                         s.updated <> u.updated
            THEN UPDATE SET *
        WHEN NOT MATCHED
            THEN INSERT *
    """)
    
(spark.readStream
    .table("bronze")
    .writeStream
    .foreachBatch(upsertToDelta)
    .outputMode("update")
#     .trigger(once=True)
    .trigger(processingTime='5 seconds')
    .start())

Note that we have an additional metadata directory nested in our table directory, `_change_data`

In [0]:
dbutils.fs.ls(silverPath)

We can see this directory also contains parquet files.

In [0]:
dbutils.fs.ls(silverPath + "/_change_data")

## Read the Change Data Feed

To pick up the recorded CDC data, we add two options:
- `readChangeData`
- `startingVersion` (can use `startingTimestamp` instead)

Here we'll do a streaming display of just those patients in LA. Note that users with changes have two records present.

In [0]:
cdcDF = spark.readStream.format("delta").option("readChangeData", True).option("startingVersion", 0).table("silver")
display(cdcDF.filter("city = 'Los Angeles'"))

If we land another file in our source directory and wait a few seconds, we'll see that we now have captured CDC changes for multiple `_commit_version` (change the sort order of the `_commit_version` column in the display above to see this).

In [0]:
Raw.arrival()

## Gold Table
Our gold table will capture all of those patients that have a new address, and record this information alongside 2 timestamps: the time at which this change was made in our source system (currently labeled `updated`) and the time this was processed into our silver table (captured by the `_commit_timestamp` generated CDC field).

Within silver table CDC records:
- check for max `_commit_version` for each record
- if new version and address change, insert to gold table
- record `updated_timestamp` and `processed_timestamp`

#### Gold Table Schema
| field | type |
| --- | --- |
| mrn | long |
| new_street_address | string |
| new_zip | long |
| new_city | string |
| new_state | string |
| old_street_address | string |
| old_zip | long |
| old_city | string |
| old_state | string |
| updated_timestamp | timestamp |
| processed_timestamp | timestamp |

In [0]:
goldPath = userhome + "/gold"

spark.sql(f"""
    CREATE TABLE gold
        (mrn BIGINT,
        new_street_address STRING,
        new_zip BIGINT,
        new_city STRING,
        new_state STRING,
        old_street_address STRING,
        old_zip BIGINT,
        old_city STRING,
        old_state STRING,
        updated_timestamp TIMESTAMP,
        processed_timestamp TIMESTAMP)
    USING DELTA
    LOCATION '{goldPath}'
""")

Note that we are using a table that has updates written to it as a streaming source! This is a **huge** value add, and something that historically has required exensively workarounds to process correctly.

In [0]:
silverStreamDF = spark.readStream.format("delta").option("readChangeData", True).option("startingVersion", 0).table("silver")

Our `_change_type` field lets us easily distinguish valid and invalid records.

New valid rows will have the `update_postimage` or `insert` label.
New invalid rows will have the `update_preimage` or `delete` label. 

(**NOTE**: We'll demonstrate logic for propagating deletes a little later)

In the cell below, we'll define two queries against our streaming source to perform a stream-stream merge on our data.

In [0]:
newDF = (silverStreamDF.filter(F.col("_change_type").isin(["update_postimage", "insert"]))
             .selectExpr("mrn",
                 "street_address new_street_address",
                 "zip new_zip",
                 "city new_city",
                 "state new_state",
                 "updated updated_timestamp",
                 "_commit_timestamp processed_timestamp"))

                                                                                         
oldDF = (silverStreamDF.filter(F.col("_change_type").isin(["update_preimage"]))
             .selectExpr("mrn",
                 "street_address old_street_address",
                 "zip old_zip",
                 "city old_city",
                 "state old_state",
                 "_commit_timestamp processed_timestamp"))

Assuming that we have properly deduplicated our data to ensure that only a single record for our `mrn` can be processed to our silver table, `mrn` and `_commit_timestamp` (aliased to `processed_timestamp` here) serve as a unique composite key.

Our join will allow us to match up the current and previous states of our data to track all changes.

This table could drive further downstream processes, such as triggering confirmation emails or automatic mailings for patients with updated addresses.

Our CDC data arrives as a stream, so only newly changed data at the silver level will be processed. Therefore, we can write to our gold table in append mode and maintain the grain of our data.

In [0]:
(newDF.withWatermark("processed_timestamp", "3 minutes")
    .join(oldDF, ["mrn", "processed_timestamp"], "left")
    .filter("new_street_address <> old_street_address OR old_street_address IS NULL")
    .writeStream
    .outputMode("append")
#     .trigger(once=True)
    .trigger(processingTime="5 seconds")
    .option("checkpointLocation", userhome + "/_gold_checkpoint")
    .table("gold"))

Note the number of rows in our gold table.

In [0]:
%sql
SELECT * FROM gold

If we land a new raw file and wait a few seconds, we can see that all of our changes have propagated through our pipeline.

(This assumes you're using `processingTime` instead of trigger once processing. Scroll up to the gold table streaming write to wait for a new peak in the processing rate to know your data has arrived.)

In [0]:
Raw.arrival()

You should be able to see a jump in the number of records in your gold table.

In [0]:
%sql
SELECT * FROM gold

Make sure to run the following cell to stop all active streams.

In [0]:
for stream in spark.streams.active:
    stream.stop()

## Propagating Deletes

While some use cases may require processing deletes alongside updates and inserts, the most important delete requests are those that allow companies to maintain compliance with privacy regulations such as GDPR and CCPA. Most companies have stated SLAs around how long these requests will take to process, but for various reasons, these are often handled in pipelines separate from their core ETL.

Here, we should a single user being deleted from our `silver` table.

In [0]:
%sql
DELETE FROM silver WHERE mrn = 14125426

As expected, when we try to locate this user in our `silver` table, we'll get no result.

In [0]:
%sql
SELECT * FROM silver WHERE mrn = 14125426

This change has been captured in our Change Data Feed.

In [0]:
%sql
SELECT * 
FROM table_changes("silver", 0)
WHERE mrn = 14125426

Because we have a record of this delete action, we can define logic that propagates deletes to our `gold` table.

In [0]:
%sql
WITH deletes AS (
  SELECT mrn
  FROM table_changes("silver", 0)
  WHERE _change_type='delete'
)

MERGE INTO gold g
USING deletes d
ON d.mrn=g.mrn
WHEN MATCHED
  THEN DELETE

This drastically simplifies deleting user data, and allows the keys and logic used in your ETL to also be used for propagating delete requests.

In [0]:
%sql
SELECT * FROM gold WHERE mrn = 14125426

-sandbox
&copy; 2021 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>