In [1]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

Moving to an Incremental Pipeline in Delta Lake: Change Tracking
================================

This post shows how to use Change Tracking (todo link) in Delta Lake 2.0 to convert a batch pipeline to an incremental update pipeline. We'll cover two parts:

1. Capturing change tracking in a Delta Lake Merge job.
1. Converting a series of `join` operations to `merge` operations to produce a cheaper pipeline using incremental operations.

Setting Up a Scenario: 3 Tables
--------------------------------------

I've set up three tables:

1. Invoice
2. InvoiceItem
3. Product

The ground truth for these tables lives in a production system and is dumped to the data lake and merged into a delta lake table. The logic for this merge is given below.

In [2]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("DeltaChangeFeedExample") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")  \
    .config("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")
    

sc = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
# Day 0: Read the data and merge. This is just to get our tables set up. See Day 1 for a "normal" day.
products = sc.read.format("csv") \
                .option("header","true") \
                .load("./data/products/updates/day=0/") \
                .drop('_c0')

products.write.format("delta").save("./outputs/products")

invoices = sc.read.format("csv") \
                .option("header","true") \
                .load("./data/invoice/updates/day=0/") \
                .drop('_c0')

invoices.write.format("delta").save("./outputs/invoices")

invoiceitems = sc.read.format("csv") \
                .option("header","true") \
                .load("./data/invoiceitems/updates/day=0/") \
                .drop('_c0')

invoiceitems.write.format("delta").save("./outputs/invoiceitems")

New, assume there is a job that produces a normalized copy of the data that merges all three tables together. This data has one row per invoice item. We can perform normalization using a couple of joins. Occasionally we see "hiccups" where an invoice and invoice item exist in our data lake but the product has not yet been downloaded. This kind of delay can happen when tables are joined that come from different production systems. So, we'll left join products because they will occasionally be null. Bad things happen in complicated systems.

In [4]:
# Build normalized view on day 0
# build normalized join
product_base = DeltaTable.forPath(sc, "./outputs/products").toDF()
invoice_base = DeltaTable.forPath(sc, "./outputs/invoices").toDF()
invoiceitem_base = DeltaTable.forPath(sc, "./outputs/invoiceitems").toDF()

# Left join with invoice item as the root. This isn't important for invoices and invoice items, but is
# critical for products in this example since products may be pulled at different time cadence and, thus,
# not exist yet.
normalized_view = invoiceitem_base.join(invoice_base, invoiceitem_base.invoice == invoice_base.invoice_id, how="left")
normalized_view = normalized_view.join(product_base, normalized_view.product == product_base.product_id, how="left")

normalized_view.write.format("delta").save("./outputs/normalized")

normalized_view.write.format("delta").save("./outputs/normalized_copy")  # we'll use this later.

Recall the price for these invoice items. We'll revisit them after day 1.

In [5]:
# This is a helpful debug command: check a few rows for one product.
from pyspark.sql.functions import col

sample_product = 'p_16'
print(f"Product {sample_product} on day=0 had price {products.filter(col('product_id') == sample_product).collect()[0].asDict()['price']}")

print("Prices in the combined field are all:")
normalized_view.filter(col('product') == sample_product) \
    .select('invoice_item_id', 'invoice_modified', 'price') \
    .orderBy("invoice_item_id") \
    .limit(5).toPandas()

Product p_16 on day=0 had price 51.18
Prices in the combined field are all:


Unnamed: 0,invoice_item_id,invoice_modified,price
0,0f69b5dd-b7e8-445a-8268-f317e4edc2bd,day0,51.18
1,1dd96153-630b-447c-b438-9047586e25fe,day0,51.18
2,1f962845-105a-46c5-b835-0457eba2f996,day0,51.18
3,27f5a1f6-83c7-4b59-82de-70108b5b3d27,day0,51.18
4,2e9a0213-1a2a-4a40-88de-79e07becbcb7,day0,51.18


In [6]:
# Day 1: process both updates and deletes, which come in separate files
def read_data(table_location, day, has_deletes):
    updates = sc.read.format("csv") \
                .option("header","true") \
                .load(f"./data/{table_location}/updates/day={day}/") \
                .drop('_c0')
        
    if has_deletes:
        deletes = sc.read.format("csv") \
                .option("header", "true") \
                .load(f"./data/{table_location}/deletes/day={day}/") \
                .drop("_c0")
    else:
        deletes = None

    return updates, deletes

product_updates, _ = read_data("products", day=1, has_deletes=False)
product_base = DeltaTable.forPath(sc, "./outputs/products")
print(f"Updating {product_updates.count()} products and deleting 0 products.")

invoice_updates, invoice_deletes = read_data("invoice", day=1, has_deletes=True)
invoice_base = DeltaTable.forPath(sc, "./outputs/invoices")
print(f"Updating {invoice_updates.count()} invoices and deleting {invoice_deletes.count()} invoices.")

invoiceitem_updates, invoiceitem_deletes = read_data("invoiceitems", day=1, has_deletes=True)
invoiceitem_base = DeltaTable.forPath(sc, "./outputs/invoiceitems")
print(f"Updating {invoiceitem_updates.count()} invoiceitems and deleting {invoiceitem_deletes.count()} invoiceitems.")

Updating 50 products and deleting 0 products.
Updating 582 invoices and deleting 4 invoices.
Updating 1506 invoiceitems and deleting 19 invoiceitems.


In [7]:
# Day 1 continued: merge tables
from pyspark.sql.functions import lit
product_base.alias("oldData") \
  .merge(
    product_updates.alias("newData"),
    "oldData.product_id = newData.product_id") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

# build a set of update/deletes
invoice_updates = invoice_updates.withColumn('operation', lit('update'))
invoice_deletes = invoice_deletes.withColumn('operation', lit('delete'))
invoices_full_updates = invoice_updates.unionAll(invoice_deletes)

invoice_base.alias("oldData") \
  .merge(
    invoices_full_updates.alias("newData"),
    "oldData.invoice_id = newData.invoice_id") \
  .whenMatchedUpdateAll(condition='operation != "delete"') \
  .whenMatchedDelete(condition='operation = "delete"') \
  .whenNotMatchedInsertAll() \
  .execute()

    
invoiceitem_updates = invoiceitem_updates.withColumn('operation', lit('update'))
invoiceitem_deletes = invoiceitem_deletes.withColumn('operation', lit('delete'))
invoicesitem_full_updates = invoiceitem_updates.unionAll(invoiceitem_deletes)

invoiceitem_base.alias("oldData") \
    .merge(
        invoicesitem_full_updates.alias("newData"),
        "oldData.invoice_item_id = newData.invoice_item_id"
    ) \
  .whenMatchedUpdateAll(condition='operation != "delete"') \
  .whenMatchedDelete(condition='operation = "delete"') \
  .whenNotMatchedInsertAll() \
  .execute()

Basically, every day we merge in a new set of data from a production system. This could Create, Update, or Delete rows in any table. (An example where deletes as opposed to soft deletes might happen is GDPR compliance.) So, every day we get updated Delta Lake tables representing each table. These are normally created with merge commands to take advantage of partitions.

Every day we also need to rebuild our normalized table. This code is a copy of Day 0:

In [8]:
# build normalized join
product_base = DeltaTable.forPath(sc, "./outputs/products").toDF()
invoice_base = DeltaTable.forPath(sc, "./outputs/invoices").toDF()
invoiceitem_base = DeltaTable.forPath(sc, "./outputs/invoiceitems").toDF()

# Left join with invoice item as the root. This isn't important for invoices and invoice items, but is
# critical for products in this example since products may be pulled at different time cadence and, thus,
# not exist yet.
normalized_view = invoiceitem_base.join(invoice_base, invoiceitem_base.invoice == invoice_base.invoice_id, how="left")
normalized_view = normalized_view.join(product_base, normalized_view.product == product_base.product_id, how="left")

normalized_view.write.format("delta").mode("overwrite").save("./outputs/normalized")

There are two things I hate about this join. First, we have to load the entire table every day to produce our join. If we tried to load, say, only data changed on `day=1` then we would risk join failures because of products that were not changed on day 1. For instance, say that a product is created on day 0 and used in an invoice on day 0. Mistimed data copies from the Products service and the Invoices service could result in the invoice copying over but the product not copying over. On day 1, the new product will be copied to the lake. If we rerun all data, the invoice from day 0 will be updated, but it means we have to load invoices from day 0 even if they didn't change!

Second, the normalized data pulls the most recent value for any product not the value that was active when an invoice item was created. If we change the price in our product table, for instance, then the next day's normalized data will set that new price for all previous invoice items. This can be misleading!


In [9]:
from pyspark.sql.functions import col, row_number

sample_product = 'p_16'
print(f"Product {sample_product} on day=0 had price {products.filter(col('product_id') == sample_product).collect()[0].asDict()['price']}")
print(f"Product {sample_product} on day=1 had price {product_updates.filter(col('product_id') == sample_product).collect()[0].asDict()['price']}")

print("Prices in the combined field are all:")
normalized_view.filter(col('product') == sample_product).select('invoice_item_id', 'invoice_modified', 'invoice_item_modified', 'price').limit(10).toPandas()


Product p_16 on day=0 had price 51.18
Product p_16 on day=1 had price 13.72
Prices in the combined field are all:


Unnamed: 0,invoice_item_id,invoice_modified,invoice_item_modified,price
0,01bd1f56-dfdc-47af-8062-bfc833d60b0b,day1,day1,13.72
1,0ec6f0c1-5efa-497b-9514-2fde40cd0077,day1,day1,13.72
2,0f69b5dd-b7e8-445a-8268-f317e4edc2bd,day0,day0,13.72
3,11c4720e-a2b8-46e4-ba38-0be21b7a6931,day1,day1,13.72
4,199dca1d-40b3-40e7-8ae0-df10b4d68e07,day1,day1,13.72
5,1ca8b663-c6c4-4f3a-948b-e43ca25d2e17,day1,day1,13.72
6,1dd96153-630b-447c-b438-9047586e25fe,day0,day0,13.72
7,1f962845-105a-46c5-b835-0457eba2f996,day0,day0,13.72
8,27f5a1f6-83c7-4b59-82de-70108b5b3d27,day0,day0,13.72
9,2e9a0213-1a2a-4a40-88de-79e07becbcb7,day0,day0,13.72


Even though the invoice was created and only modified on day 0 when the price was 51.18, the price in our normalized view on day=1 was updated to read 13.72. This can be fixed, but requires more work.

Enabling Change Tracking and Converting to Incremental Jobs
---------------------------------------------------

What we really want is to be able to track the changes that we introduce when day1 data merges into each of our three base tables `product`, `invoice`, and `invoice_item`. It turns out Delta Lake supports change tracking as of V2.0.0. They call this feature the [change data feed](https://docs.delta.io/2.0.0/delta-change-data-feed.html). We enabled it in the top cell of this notebook when we added this setting to our spark context:

`.config("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")`

When you write data - Create, Update, or Delete - to a Delta table with Change Data Feed enabled, Delta lake writes additional parquet files that track which rows were inserted, created, or deleted in each transaction. You can read the change records by enabling option `.option('readChangeFeed', 'true')` during reads. Below we look at a few change records for the `invoice` and `product` tables.


In [10]:
invoice_change_data = sc.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", '0') \
  .load("./outputs/invoices")

product_change_data = sc.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", '0') \
  .load("./outputs/products")

The change feed is a list of updates in the table between startingVersion and most recent version.

In [11]:
from pyspark.sql.window import Window

sample_window = Window.partitionBy("_change_type").orderBy('invoice_id')
invoice_change_data.withColumn('sample', row_number().over(sample_window)).filter(col('sample') < 3).toPandas()

Unnamed: 0,invoice_id,customer,status,invoice_modified,invoice_created,_change_type,_commit_version,_commit_timestamp,sample
0,inv_134,a42c9381-3989-4660-ba88-fe4ceffa48ba,sent,day0,day0,delete,1,2022-11-13 18:12:28.195,1
1,inv_587,6024b982-5c4e-4055-8488-3e9d1c7393ba,sent,day0,day0,delete,1,2022-11-13 18:12:28.195,2
2,inv_0,22cb24c1-689f-4861-8614-f026649652c9,sent,day0,day0,insert,0,2022-11-13 18:11:45.348,1
3,inv_1,61f0507e-37d3-4433-8368-e3ffc9f47a4a,sent,day0,day0,insert,0,2022-11-13 18:11:45.348,2
4,inv_1,61f0507e-37d3-4433-8368-e3ffc9f47a4a,sent,day1,day0,update_postimage,1,2022-11-13 18:12:28.195,1
5,inv_103,6f92d2f8-c5ae-409d-844c-e144698c4057,sent,day1,day0,update_postimage,1,2022-11-13 18:12:28.195,2
6,inv_1,61f0507e-37d3-4433-8368-e3ffc9f47a4a,sent,day0,day0,update_preimage,1,2022-11-13 18:12:28.195,1
7,inv_103,6f92d2f8-c5ae-409d-844c-e144698c4057,sent,day0,day0,update_preimage,1,2022-11-13 18:12:28.195,2


In [12]:
sample_window = Window.partitionBy("_change_type").orderBy('product_id')
product_change_data.withColumn('sample', row_number().over(sample_window)).filter(col('sample') < 3).orderBy('product_id').toPandas()

Unnamed: 0,product_id,product_name,price,product_modified,product_created,_change_type,_commit_version,_commit_timestamp,sample
0,p_0,widget0,15.66,day0,day0,insert,0,2022-11-13 18:11:32.094,1
1,p_1,sproket1,44.45,day0,day0,insert,0,2022-11-13 18:11:32.094,2
2,p_103,sproket103,62.81,day1,day0,update_postimage,1,2022-11-13 18:12:20.921,1
3,p_103,sproket103,8.45,day0,day0,update_preimage,1,2022-11-13 18:12:20.921,1
4,p_106,sproket106,46.47,day1,day0,update_postimage,1,2022-11-13 18:12:20.921,2
5,p_106,sproket106,10.93,day0,day0,update_preimage,1,2022-11-13 18:12:20.921,2


The `_change_type` shows 4 different types:

* insert is a create
* delete is a delete
* update_preimage is the before side of an update.
* update_postimage is the after side of an update.

Both update_preimage and update_postimage happen on day=1 (`_commit_version = 1`) and you can see the pre and post prices.

The really cool thing about Change Data tracking is that we can create our normalized_view of data on day=1 without reading the entire existing normalized_view table. 
We accomplish this with a `merge` that uses Change Data input. To see this, I'm going to use time travel to get a copy of the `normalized_view` as it existed at day=0.

In [13]:
normalized_view_day_0 = sc.read.format("delta") \
  .option("versionAsOf", '0') \
  .load("./outputs/normalized")

In [14]:
# prove we have no day 1 data in our time travel:
import pyspark.sql.functions as F

normalized_view_day_0.agg(
    F.max(col('invoice_modified')), 
    F.max(col('invoice_item_modified')), 
    F.max(col('product_modified'))).toPandas()

Unnamed: 0,max(invoice_modified),max(invoice_item_modified),max(product_modified)
0,day0,day0,day0


In [15]:
invoice_change_data = sc.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", '1') \
  .load("./outputs/invoices")

invoice_change_data = invoice_change_data.filter(col('_change_type') != 'update_preimage')

invoice_items_change_data = sc.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", '1') \
  .load("./outputs/invoiceitems")

invoice_items_change_data = invoice_items_change_data.filter(col('_change_type') != 'update_preimage')

products_change_data = sc.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", '1') \
  .load("./outputs/products")

products_change_data = products_change_data.filter(col('_change_type') != 'update_preimage')

In [16]:
normalized_view_day_0_copy = DeltaTable.forPath(sc, "./outputs/normalized_copy")

normalized_view_day_0_copy.alias('left').merge(
        invoice_items_change_data.alias('right'),
        'left.invoice_item_id = right.invoice_item_id'
    ).whenMatchedUpdate(set={
        'invoice': 'right.invoice',
        'count': 'right.count',
        'invoice_item_modified': 'right.invoice_item_modified',
        'invoice_item_created': 'right.invoice_item_created',
        'product': 'right.product'
    }, condition = 'right._change_type != "delete"') \
    .whenMatchedDelete(condition = 'right._change_type = "delete"') \
    .whenNotMatchedInsert(values={
        'invoice_item_id': 'right.invoice_item_id',
        'invoice': 'right.invoice',
        'count': 'right.count',
        'invoice_item_modified': 'right.invoice_item_modified',
        'invoice_item_created': 'right.invoice_item_created',
        'product': 'right.product'
    }).execute()

normalized_view_day_0_copy.alias('left').merge(
        invoice_change_data.alias('right'),
        'left.invoice = right.invoice_id'           # NOTE THIS! We'll discuss below.
    ).whenMatchedUpdate(set={
        'invoice_id': 'right.invoice_id',
        'customer': 'right.customer',
        'invoice_modified': 'right.invoice_modified',
        'invoice_created': 'right.invoice_created',
        'status': 'right.status',
    }, condition = 'right._change_type != "delete"') \
    .whenMatchedUpdate(                             # NOTE THIS! We'll discuss below.
        condition ='right._change_type = "delete"',
        set={
            'invoice_id': 'NULL',
            'invoice': 'NULL',
            'customer': 'NULL',
            'invoice_modified': 'NULL',
            'invoice_created': 'NULL',
            'status': 'NULL'
        }).execute()                                # NOTE No whenNotMatched.
    
    
final_merge = normalized_view_day_0_copy.alias('left').merge(
        products_change_data.alias('right'),
        'left.product = right.product_id'
    ).whenMatchedUpdate(set={
        'product_id': 'right.product_id',
        'product_name': 'right.product_name',
        'price': 'right.price',
        'product_modified': 'right.product_modified',
        'product_created': 'right.product_created'
    }, condition ='right._change_type != "delete" and left.invoice_item_modified >= right.product_modified') \
    .whenMatchedUpdate(
        condition ='right._change_type = "delete"',
        set={
            'invoice_id': 'NULL',
            'invoice': 'NULL',
            'customer': 'NULL',
            'invoice_modified': 'NULL',
            'invoice_created': 'NULL',
            'status': 'NULL'
        }).execute()

The merges above nearly reproduce the left joins that we used to create normalized_view. However, there are a few key differences.

First, prices are only updated if the price is updated on a day that the invoice item is also modified. This logic should be improved. 
One could choose, for instance, to never update prices on invoices if the invoice item was already created and the price set. 
We can see below that invoices on days 0 and 1 have different prices in our new example. To reproduce the left join with it's price resets, you would drop the second condition in `right._change_type != "delete" and left.invoice_item_modified = right.product_modified`.

In [17]:
sample_window = Window.partitionBy("invoice_modified").orderBy('invoice_item_id')
normalized_view_day_0_copy.toDF() \
    .filter(col('product') == sample_product) \
    .withColumn('sample', row_number().over(sample_window)) \
    .filter(col('sample') < 3) \
    .select('invoice_item_id', 'invoice_modified', 'invoice_item_modified', 'price').toPandas()

Unnamed: 0,invoice_item_id,invoice_modified,invoice_item_modified,price
0,0f69b5dd-b7e8-445a-8268-f317e4edc2bd,day0,day0,51.18
1,1dd96153-630b-447c-b438-9047586e25fe,day0,day0,51.18
2,01bd1f56-dfdc-47af-8062-bfc833d60b0b,day1,day1,13.72
3,0ec6f0c1-5efa-497b-9514-2fde40cd0077,day1,day1,13.72


There are a few things to pay special attention to:
    
1) We're using the same approach of left joining against InvoiceItem. This means that the merge condition should mimic the fields used 
in InvoiceItem left join Invoice. Do not accidently use "invoice_id" from Invoice as the left column. 

2) Be very careful about deletes in merge. We truly delete a row from normalized_view only if it is deleted from InvoiceItem - again this mimics
the left join above. For Invoice and for Product, a "delete" merely resets values to blanks.  TODO! WHAT IS EMPTY?

3) There is no whenNotMatched on invoices or products. Again, to mimic a left join we omit these. This post is intended to show how to mimic a left join. But the existing approach has a major race condition problem. If a product is written on day N, but an invoice using it doesn't appear until day N+1, the product will never be written. In a production system, you would need to deal with this somehow. For instance, you could always use the full products table rather than the product updates to control this race condition.


Conclusion
=========

Switching your Delta based pipeline to use Change Data feeds needs a little thought, but it can make a more efficient pipeline. One caveat: make sure you've updates to delta 2.0 or above!