# Delta Lake Change Data Feed

In [1]:
import pyspark
from delta import *
from pyspark.sql import functions as F

builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/Users/matthew.powers/opt/miniconda3/envs/pyspark-332-delta-230/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/matthew.powers/.ivy2/cache
The jars for the packages stored in: /Users/matthew.powers/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d4c18e40-a7d8-431c-9f6f-e7c4a04053a8;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.3.0 in central
	found io.delta#delta-storage;2.3.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
:: resolution report :: resolve 95ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-core_2.12;2.3.0 from central in [default]
	io.delta#delta-storage;2.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   

23/06/04 17:25:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Create Delta table with change data feed enabled

In [11]:
spark.conf.set("spark.sql.legacy.createHiveTableByDefault", "false")

In [12]:
spark.sql(
    "CREATE TABLE students (id LONG, name STRING, age LONG) USING delta TBLPROPERTIES (delta.enableChangeDataFeed = true)"
)

DataFrame[]

### Append data

In [4]:
df = spark.createDataFrame([(0, "Bob", 23), (1, "Sue", 25), (2, "Jim", 27)]).toDF(
    "id", "name", "age"
)

In [5]:
df.repartition(1).write.mode("append").format("delta").saveAsTable("students")

                                                                                

In [6]:
spark.sql("SELECT * FROM students").show()

+---+----+---+
| id|name|age|
+---+----+---+
|  0| Bob| 23|
|  1| Sue| 25|
|  2| Jim| 27|
+---+----+---+



In [7]:
spark.sql("SELECT * FROM table_changes('students', 0)").show(truncate=False)

+---+----+---+------------+---------------+-----------------------+
|id |name|age|_change_type|_commit_version|_commit_timestamp      |
+---+----+---+------------+---------------+-----------------------+
|0  |Bob |23 |insert      |1              |2023-05-28 07:50:45.469|
|1  |Sue |25 |insert      |1              |2023-05-28 07:50:45.469|
|2  |Jim |27 |insert      |1              |2023-05-28 07:50:45.469|
+---+----+---+------------+---------------+-----------------------+



### Append more data

In [8]:
df = spark.createDataFrame([(5, "Jack", 18), (6, "Nora", 19), (7, "Clare", 20)]).toDF(
    "id", "name", "age"
)

In [9]:
df.repartition(1).write.mode("append").format("delta").saveAsTable("students")

In [10]:
spark.sql("SELECT * FROM students").show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  5| Jack| 18|
|  6| Nora| 19|
|  7|Clare| 20|
|  0|  Bob| 23|
|  1|  Sue| 25|
|  2|  Jim| 27|
+---+-----+---+



In [11]:
spark.sql("SELECT * FROM table_changes('students', 0)").show(truncate=False)

+---+-----+---+------------+---------------+-----------------------+
|id |name |age|_change_type|_commit_version|_commit_timestamp      |
+---+-----+---+------------+---------------+-----------------------+
|5  |Jack |18 |insert      |2              |2023-05-28 07:50:53.737|
|6  |Nora |19 |insert      |2              |2023-05-28 07:50:53.737|
|7  |Clare|20 |insert      |2              |2023-05-28 07:50:53.737|
|0  |Bob  |23 |insert      |1              |2023-05-28 07:50:45.469|
|1  |Sue  |25 |insert      |1              |2023-05-28 07:50:45.469|
|2  |Jim  |27 |insert      |1              |2023-05-28 07:50:45.469|
+---+-----+---+------------+---------------+-----------------------+



### Delete rows of data

In [12]:
delta_table = DeltaTable.forName(spark, "students")

In [13]:
delta_table.delete(F.col("age") > 20)

In [14]:
spark.sql("SELECT * FROM students").show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  5| Jack| 18|
|  6| Nora| 19|
|  7|Clare| 20|
+---+-----+---+



In [15]:
spark.sql("SELECT * FROM table_changes('students', 0)").show(truncate=False)

+---+-----+---+------------+---------------+-----------------------+
|id |name |age|_change_type|_commit_version|_commit_timestamp      |
+---+-----+---+------------+---------------+-----------------------+
|0  |Bob  |23 |delete      |3              |2023-05-28 07:50:59.026|
|1  |Sue  |25 |delete      |3              |2023-05-28 07:50:59.026|
|2  |Jim  |27 |delete      |3              |2023-05-28 07:50:59.026|
|5  |Jack |18 |insert      |2              |2023-05-28 07:50:53.737|
|6  |Nora |19 |insert      |2              |2023-05-28 07:50:53.737|
|7  |Clare|20 |insert      |2              |2023-05-28 07:50:53.737|
|0  |Bob  |23 |insert      |1              |2023-05-28 07:50:45.469|
|1  |Sue  |25 |insert      |1              |2023-05-28 07:50:45.469|
|2  |Jim  |27 |insert      |1              |2023-05-28 07:50:45.469|
+---+-----+---+------------+---------------+-----------------------+



## Query change data feed

In [16]:
spark.sql("SELECT * FROM table_changes('students', 0)").show(truncate=False)

+---+-----+---+------------+---------------+-----------------------+
|id |name |age|_change_type|_commit_version|_commit_timestamp      |
+---+-----+---+------------+---------------+-----------------------+
|0  |Bob  |23 |delete      |3              |2023-05-28 07:50:59.026|
|1  |Sue  |25 |delete      |3              |2023-05-28 07:50:59.026|
|2  |Jim  |27 |delete      |3              |2023-05-28 07:50:59.026|
|5  |Jack |18 |insert      |2              |2023-05-28 07:50:53.737|
|6  |Nora |19 |insert      |2              |2023-05-28 07:50:53.737|
|7  |Clare|20 |insert      |2              |2023-05-28 07:50:53.737|
|0  |Bob  |23 |insert      |1              |2023-05-28 07:50:45.469|
|1  |Sue  |25 |insert      |1              |2023-05-28 07:50:45.469|
|2  |Jim  |27 |insert      |1              |2023-05-28 07:50:45.469|
+---+-----+---+------------+---------------+-----------------------+



In [17]:
spark.sql("SELECT * FROM table_changes('students', 2, 3)").show(truncate=False)

+---+-----+---+------------+---------------+-----------------------+
|id |name |age|_change_type|_commit_version|_commit_timestamp      |
+---+-----+---+------------+---------------+-----------------------+
|0  |Bob  |23 |delete      |3              |2023-05-28 07:50:59.026|
|1  |Sue  |25 |delete      |3              |2023-05-28 07:50:59.026|
|2  |Jim  |27 |delete      |3              |2023-05-28 07:50:59.026|
|5  |Jack |18 |insert      |2              |2023-05-28 07:50:53.737|
|6  |Nora |19 |insert      |2              |2023-05-28 07:50:53.737|
|7  |Clare|20 |insert      |2              |2023-05-28 07:50:53.737|
+---+-----+---+------------+---------------+-----------------------+



## Cleanup

In [18]:
spark.sql("DROP TABLE IF EXISTS students")

DataFrame[]

## Create a minimal example

In [19]:
spark.sql(
    "CREATE TABLE people (first_name STRING, age LONG) USING delta TBLPROPERTIES (delta.enableChangeDataFeed = true)"
)

DataFrame[]

In [20]:
df = spark.createDataFrame([("Bob", 23), ("Sue", 25), ("Jim", 27)]).toDF(
    "first_name", "age"
)

In [21]:
df.show()

+----------+---+
|first_name|age|
+----------+---+
|       Bob| 23|
|       Sue| 25|
|       Jim| 27|
+----------+---+



In [22]:
df.repartition(1).write.mode("append").format("delta").saveAsTable("people")

In [23]:
delta_table = DeltaTable.forName(spark, "people")

In [24]:
delta_table.delete(F.col("first_name") == "Sue")

In [25]:
(
    spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 0)
    .table("people")
    .show(truncate=False)
)

+----------+---+------------+---------------+-----------------------+
|first_name|age|_change_type|_commit_version|_commit_timestamp      |
+----------+---+------------+---------------+-----------------------+
|Sue       |25 |delete      |2              |2023-05-28 07:51:36.929|
|Bob       |23 |insert      |1              |2023-05-28 07:51:31.661|
|Sue       |25 |insert      |1              |2023-05-28 07:51:31.661|
|Jim       |27 |insert      |1              |2023-05-28 07:51:31.661|
+----------+---+------------+---------------+-----------------------+



In [26]:
!tree spark-warehouse/people

[01;34mspark-warehouse/people[0m
├── [01;34m_change_data[0m
│   └── [00mcdc-00000-1fedcc32-6734-48c2-ab4e-97c5ba65f2f4.c000.snappy.parquet[0m
├── [01;34m_delta_log[0m
│   ├── [00m00000000000000000000.json[0m
│   ├── [00m00000000000000000001.json[0m
│   └── [00m00000000000000000002.json[0m
├── [00mpart-00000-a90e51ff-c595-47d2-a2b3-c1c161102e8e-c000.snappy.parquet[0m
└── [00mpart-00000-edd0d32f-3a48-416a-8f3b-bcce9eb5aa25.c000.snappy.parquet[0m

2 directories, 6 files


In [28]:
!jq . spark-warehouse/people/_delta_log/00000000000000000000.json

[1;39m{
  [0m[34;1m"commitInfo"[0m[1;39m: [0m[1;39m{
    [0m[34;1m"timestamp"[0m[1;39m: [0m[0;39m1685274681713[0m[1;39m,
    [0m[34;1m"operation"[0m[1;39m: [0m[0;32m"CREATE TABLE"[0m[1;39m,
    [0m[34;1m"operationParameters"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"isManaged"[0m[1;39m: [0m[0;32m"true"[0m[1;39m,
      [0m[34;1m"description"[0m[1;39m: [0m[1;30mnull[0m[1;39m,
      [0m[34;1m"partitionBy"[0m[1;39m: [0m[0;32m"[]"[0m[1;39m,
      [0m[34;1m"properties"[0m[1;39m: [0m[0;32m"{\"delta.enableChangeDataFeed\":\"true\"}"[0m[1;39m
    [1;39m}[0m[1;39m,
    [0m[34;1m"isolationLevel"[0m[1;39m: [0m[0;32m"Serializable"[0m[1;39m,
    [0m[34;1m"isBlindAppend"[0m[1;39m: [0m[0;39mtrue[0m[1;39m,
    [0m[34;1m"operationMetrics"[0m[1;39m: [0m[1;39m{}[0m[1;39m,
    [0m[34;1m"engineInfo"[0m[1;39m: [0m[0;32m"Apache-Spark/3.3.2 Delta-Lake/2.3.0"[0m[1;39m,
    [0m[34;1m"txnId"[0m[1;39m: [0m[0;32m"084e0e

In [29]:
!jq . spark-warehouse/people/_delta_log/00000000000000000001.json

[1;39m{
  [0m[34;1m"commitInfo"[0m[1;39m: [0m[1;39m{
    [0m[34;1m"timestamp"[0m[1;39m: [0m[0;39m1685274691639[0m[1;39m,
    [0m[34;1m"operation"[0m[1;39m: [0m[0;32m"WRITE"[0m[1;39m,
    [0m[34;1m"operationParameters"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"mode"[0m[1;39m: [0m[0;32m"Append"[0m[1;39m,
      [0m[34;1m"partitionBy"[0m[1;39m: [0m[0;32m"[]"[0m[1;39m
    [1;39m}[0m[1;39m,
    [0m[34;1m"readVersion"[0m[1;39m: [0m[0;39m0[0m[1;39m,
    [0m[34;1m"isolationLevel"[0m[1;39m: [0m[0;32m"Serializable"[0m[1;39m,
    [0m[34;1m"isBlindAppend"[0m[1;39m: [0m[0;39mtrue[0m[1;39m,
    [0m[34;1m"operationMetrics"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"numFiles"[0m[1;39m: [0m[0;32m"1"[0m[1;39m,
      [0m[34;1m"numOutputRows"[0m[1;39m: [0m[0;32m"3"[0m[1;39m,
      [0m[34;1m"numOutputBytes"[0m[1;39m: [0m[0;32m"743"[0m[1;39m
    [1;39m}[0m[1;39m,
    [0m[34;1m"engineInfo"[0m[1;39m: [0m[0;32m"A

In [27]:
!jq . spark-warehouse/people/_delta_log/00000000000000000002.json

[1;39m{
  [0m[34;1m"commitInfo"[0m[1;39m: [0m[1;39m{
    [0m[34;1m"timestamp"[0m[1;39m: [0m[0;39m1685274696904[0m[1;39m,
    [0m[34;1m"operation"[0m[1;39m: [0m[0;32m"DELETE"[0m[1;39m,
    [0m[34;1m"operationParameters"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"predicate"[0m[1;39m: [0m[0;32m"[\"(spark_catalog.default.people.first_name = 'Sue')\"]"[0m[1;39m
    [1;39m}[0m[1;39m,
    [0m[34;1m"readVersion"[0m[1;39m: [0m[0;39m1[0m[1;39m,
    [0m[34;1m"isolationLevel"[0m[1;39m: [0m[0;32m"Serializable"[0m[1;39m,
    [0m[34;1m"isBlindAppend"[0m[1;39m: [0m[0;39mfalse[0m[1;39m,
    [0m[34;1m"operationMetrics"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"numRemovedFiles"[0m[1;39m: [0m[0;32m"1"[0m[1;39m,
      [0m[34;1m"numRemovedBytes"[0m[1;39m: [0m[0;32m"743"[0m[1;39m,
      [0m[34;1m"numCopiedRows"[0m[1;39m: [0m[0;32m"2"[0m[1;39m,
      [0m[34;1m"numAddedChangeFiles"[0m[1;39m: [0m[0;32m"1"[0m[1;39m,
      

In [31]:
spark.read.parquet(
    "spark-warehouse/people/_change_data/cdc-00000-1fedcc32-6734-48c2-ab4e-97c5ba65f2f4.c000.snappy.parquet"
).show()

+----------+---+------------+
|first_name|age|_change_type|
+----------+---+------------+
|       Sue| 25|      delete|
+----------+---+------------+



## Intelligent incremental updates with the Change Data Feed

In [2]:
import datetime

In [25]:
spark.sql(
    """
CREATE TABLE IF NOT EXISTS customer_purchases (customer_id LONG, transaction_date DATE, price DOUBLE) 
USING delta 
TBLPROPERTIES (delta.enableChangeDataFeed = true)
"""
)

DataFrame[]

In [26]:
df = spark.createDataFrame(
    [
        (1, datetime.date(2023, 1, 1), 2.1),
        (2, datetime.date(2023, 1, 5), 3.2),
        (3, datetime.date(2023, 1, 8), 4.4),
        (1, datetime.date(2023, 1, 8), 5.5),
    ]
).toDF("customer_id", "transaction_date", "price")

In [27]:
df.write.mode("append").format("delta").saveAsTable("customer_purchases")

In [28]:
spark.table("customer_purchases").show()

+-----------+----------------+-----+
|customer_id|transaction_date|price|
+-----------+----------------+-----+
|          1|      2023-01-01|  2.1|
|          3|      2023-01-08|  4.4|
|          2|      2023-01-05|  3.2|
|          1|      2023-01-08|  5.5|
+-----------+----------------+-----+



In [29]:
spark.sql(
    """
CREATE TABLE IF NOT EXISTS cumulative_purchases (customer_id LONG, last_transaction DATE, purchases DOUBLE) 
USING delta 
"""
)

DataFrame[]

In [8]:
# initially populate the cumulative_purchases table

In [9]:
def agg_customer_purchases(df):
    return df.groupBy("customer_id").agg(
        F.max("transaction_date").alias("last_transaction"),
        F.sum("price").alias("purchases"),
    )

In [42]:
spark.table("customer_purchases").show()

+-----------+----------------+-----+
|customer_id|transaction_date|price|
+-----------+----------------+-----+
|          1|      2023-01-01|  2.1|
|          3|      2023-01-08|  4.4|
|          2|      2023-01-05|  3.2|
|          1|      2023-01-08|  5.5|
+-----------+----------------+-----+



In [43]:
spark.sql("select * from cumulative_purchases").show()

+-----------+----------------+---------+
|customer_id|last_transaction|purchases|
+-----------+----------------+---------+
+-----------+----------------+---------+



In [45]:
# initial write to cumulative_purchases
spark.table("customer_purchases").transform(agg_customer_purchases).write.format(
    "delta"
).mode("append").saveAsTable("cumulative_purchases")

In [46]:
spark.sql("select * from cumulative_purchases").show()

+-----------+----------------+---------+
|customer_id|last_transaction|purchases|
+-----------+----------------+---------+
|          1|      2023-01-08|      7.6|
|          3|      2023-01-08|      4.4|
|          2|      2023-01-05|      3.2|
+-----------+----------------+---------+



In [53]:
# another set of purchases
df = spark.createDataFrame(
    [
        (1, datetime.date(2023, 1, 1), 2.1),  # duplicate transaction from earlier
        (1, datetime.date(2023, 1, 12), 10.1),
        (1, datetime.date(2023, 1, 15), 12.2),
        (3, datetime.date(2023, 1, 22), 14.4),
    ]
).toDF("customer_id", "transaction_date", "price")

In [54]:
df.show()

+-----------+----------------+-----+
|customer_id|transaction_date|price|
+-----------+----------------+-----+
|          1|      2023-01-01|  2.1|
|          1|      2023-01-12| 10.1|
|          1|      2023-01-15| 12.2|
|          3|      2023-01-22| 14.4|
+-----------+----------------+-----+



In [55]:
# minimally update cumulative_purchases with the change data feed
cdf = (
    spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 0)
    .table("customer_purchases")
)

In [56]:
cdf.show()

+-----------+----------------+-----+------------+---------------+--------------------+
|customer_id|transaction_date|price|_change_type|_commit_version|   _commit_timestamp|
+-----------+----------------+-----+------------+---------------+--------------------+
|          1|      2023-01-01|  2.1|      insert|              1|2023-06-04 17:35:...|
|          2|      2023-01-05|  3.2|      insert|              1|2023-06-04 17:35:...|
|          3|      2023-01-08|  4.4|      insert|              1|2023-06-04 17:35:...|
|          1|      2023-01-08|  5.5|      insert|              1|2023-06-04 17:35:...|
+-----------+----------------+-----+------------+---------------+--------------------+



In [57]:
new_transactions = df.join(
    cdf, ["customer_id", "transaction_date", "price"], "leftanti"
)

In [58]:
new_transactions.show()

+-----------+----------------+-----+
|customer_id|transaction_date|price|
+-----------+----------------+-----+
|          1|      2023-01-12| 10.1|
|          1|      2023-01-15| 12.2|
|          3|      2023-01-22| 14.4|
+-----------+----------------+-----+



In [59]:
new_df = new_transactions.transform(agg_customer_purchases)

In [60]:
new_df.show()

+-----------+----------------+------------------+
|customer_id|last_transaction|         purchases|
+-----------+----------------+------------------+
|          1|      2023-01-15|22.299999999999997|
|          3|      2023-01-22|              14.4|
+-----------+----------------+------------------+



In [61]:
spark.table("cumulative_purchases").show()

+-----------+----------------+---------+
|customer_id|last_transaction|purchases|
+-----------+----------------+---------+
|          1|      2023-01-08|      7.6|
|          3|      2023-01-08|      4.4|
|          2|      2023-01-05|      3.2|
+-----------+----------------+---------+



In [62]:
cumulative_purchases_table = DeltaTable.forName(spark, "cumulative_purchases")

In [63]:
cumulative_purchases_table.alias("target").merge(
    new_df.alias("source"), "target.customer_id = source.customer_id"
).whenMatchedUpdate(
    set={"purchases": "source.purchases + target.purchases"}
).whenNotMatchedInsertAll().execute()

In [64]:
cumulative_purchases_table.toDF().show()

+-----------+----------------+---------+
|customer_id|last_transaction|purchases|
+-----------+----------------+---------+
|          1|      2023-01-08|     29.9|
|          2|      2023-01-05|      3.2|
|          3|      2023-01-08|     18.8|
+-----------+----------------+---------+



In [24]:
# Cleanup
spark.sql("DROP TABLE IF EXISTS customer_purchases")
spark.sql("DROP TABLE IF EXISTS cumulative_purchases")

DataFrame[]