# Chapter 7: Streaming in and out of your Delta Lake

draft: 3

# Notebook Intro and Setup

This notebook should run end to end with the "run all" mode.

We encourage exploration and breaking stuff to get the most from it.

In [2]:
# Create some resources
import subprocess
import os

# Set working directory
try:
    os.chdir("/opt/spark/work-dir/ch9/")
except FileNotFoundError:
    raise

# Remove any old checkpoints
try:
    subprocess.run(["rm", "-rf", f"./ckpt/"])
except:
    raise

# Set some Spark configuration options
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
sc.setLogLevel("ERROR")

# set COVID dataset path
covid_data_path = '/opt/spark/work-dir/rs/data/COVID-19_NYT'

# Kill any other active streams
for stream in spark.streams.active:
    stream.stop()

# Delta `readStream` example 

In [3]:
streamingDeltaDf = (
    spark
    .readStream
    .format("delta")
    .option("ignoreDeletes", "true")
    .load(covid_data_path)
    )

# Delta `writeStream` example

In [4]:
(streamingDeltaDf
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "./delta/ckpt/ws1/")
.start("./delta/covid/")
);

In [5]:
from time import sleep
sleep(15)

                                                                                

# Delta chained `readStream` to `writeStream`

In [6]:
# Check the count of the table
(spark
.read
.format("delta")
.load("./delta/covid/")
).count()

1111930

In [7]:
from pyspark.sql.functions import col

(spark
.readStream
.format("delta")
.load("./delta/covid/")
# in the book this says '<other transformation logic>'
.filter(col("deaths") > 0)
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "./delta/ckpt/ws2/")
.start("./delta/covid_deaths/")
);

In [8]:
from time import sleep
sleep(10)

# Setting `ignoreDeletes`

In [9]:
(
    spark
    .readStream
    .format("delta")
    .option("ignoreDeletes", "true")
    .load("./delta/covid_deaths/")
    );

# Setting `ignoreChanges`

In [10]:
streamingDeltaDf = (
    spark
    .readStream
    .format("delta")
    .option("ignoreChanges", "true")
    .load("./delta/covid_deaths/")
    );

# Specify the `startingVersion`

In [11]:
streamingDeltaDf = (spark
.readStream
.format("delta")
.option("startingVersion", "5")
.load("./delta/covid_deaths/")
);

# Specify the `startingTimestamp`

In [12]:
streamingDeltaDf = (spark
.readStream
.format("delta")
.option("startingTimestamp", "2023-04-18")
.load("./delta/covid_deaths/")
);

# Setting `eventTimeOrder` with a watermark

In [13]:
from pyspark.sql.functions import col, current_timestamp

(spark
.readStream
.format("delta")
.load("./delta/covid/")
.withColumn("event_time", current_timestamp())
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "./delta/ckpt/ws3/")
.start("./delta/covid_deaths_with_ts/")
);

In [14]:
from time import sleep
sleep(10)

                                                                                

In [15]:
streamingDeltaDf = (spark
.readStream
.format("delta")
.option("withEventTimeOrder", "true")
.load("./delta/covid_deaths_with_ts/")
.withWatermark("event_time", "10 seconds")
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "./delta/ckpt/ws4/")
.start("./delta/covid_deaths_with_ts2/")
)

# Idempotent Fanout

Example of a `forEachBatch` function using `txnVersion` options for idempotency

In [16]:
from datetime import datetime, timedelta

# Create a new table, enable change data feed, get some change records going
(spark
.read
.format("delta")
.load("./delta/covid/")
.write
.format("delta")
.mode("overwrite")
.saveAsTable("covid_table")
)
spark.sql("ALTER TABLE covid_table SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

start_ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")

for i in range(10):
    (spark
    .read
    .format("delta")
    .load(covid_data_path)
    .sample(0.05)
    .write
    .format("delta")
    .mode("append")
    .saveAsTable("covid_table")
    );
    if i == 7:
        end_ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")

# We'll need a second table too
(spark
.read
.format("delta")
.load("./delta/covid/")
.sample(0.01)
.drop_duplicates(["fips", "date", "county", "state"])
.write
.format("delta")
.mode("overwrite")
.saveAsTable("covid_table2")
)

                                                                                

In [17]:
from time import sleep
sleep(10)

In [18]:
app_id = "delta_idempotency_example" # A unique string used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
    # location 1
    (batch_df
    .write
    .format("delta")
    .option("txnVersion", batch_id)
    .option("txnAppId", app_id)
    .save("./delta/idempotent_location_1/")
    )
    # location 2
    (batch_df
    .write
    .format("delta")
    .option("txnVersion", batch_id)
    .option("txnAppId", app_id)
    .save("./delta/idempotent_location_2/")
    )

    
changesStream = ( # Streaming dataframe with CDC records
    spark
    .readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 1)
    .table("covid_table")
    )

# Write the output of a streaming aggregation query into Delta table
(changesStream
.writeStream
.format("delta")
.queryName("A Pipeline")
.foreachBatch(writeToDeltaLakeTableIdempotent)
.outputMode("update")
.start()
);

# Streaming Upsert

Using the Delta Lake `mergeBuilder` to create an upsert `forEachBatch` function and apply to a `writeStream`

In [19]:
# spark.table("covid_table").printSchema()
changesStream.printSchema()

root
 |-- date: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- fips: integer (nullable = true)
 |-- cases: integer (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- _change_type: string (nullable = true)
 |-- _commit_version: long (nullable = true)
 |-- _commit_timestamp: timestamp (nullable = true)



In [20]:
from delta.tables import *

changesStream = ( # Streaming dataframe with CDC records
    spark
    .readStream
    .format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 1)
    .table("covid_table")
    )

# Function to upsert microBatchDf into Delta table using merge
def upsertToDelta(microBatchDf, batchId):
    """Use Delta APIs to handle merge logic into table"""
    deltaTable = DeltaTable.forName(spark, "covid_table2") # Target table

    deltaTable.alias("dt") \
    .merge(
        source = microBatchDf.alias("sdf").drop_duplicates(["fips", "date", "county", "state"]),
        condition = "sdf.fips = dt.fips and sdf.date = dt.date and sdf.county = dt.county and sdf.state = dt.state"
        ) \
    .whenMatchedDelete(condition = "sdf._change_type = 'DELETE'") \
    .whenMatchedUpdate(set = {
        "fips": "sdf.fips",
        "date": "sdf.date",
        "cases": "sdf.cases",
        "deaths": "sdf.deaths"
        }) \
    .whenNotMatchedInsert(values = {
        "fips": "sdf.fips",
        "date": "sdf.date",
        "cases": "sdf.cases",
        "deaths": "sdf.deaths"
        }) \
    .execute()

# Write the output of a streaming aggregation query into Delta table
(changesStream
.writeStream
.format("delta")
.queryName("New Pipeline")
.foreachBatch(upsertToDelta)
.outputMode("update")
.start()
)

<pyspark.sql.streaming.StreamingQuery at 0x7f3a78138cd0>

# Delta Live Tables (DLT)

A syntactical example, you'll have to go try out something like this on Databricks

```
import dlt

@dlt.table
def autoloader_dlt_bronze():
    return (
        spark
        .readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .load("<data path>")
    )

@dlt.table
def delta_dlt_silver():
    return (
        dlt
        .read_stream("autoloader_dlt_bronze")
        …
        <transformation logic>
        …
    )

@dlt.table
def live_delta_gold():
    return (
        dlt
        .read("delta_dlt_silver")
        …
        <aggregation logic>
        …
    )
```

# Defining Change Data Feed read boundaries in a batch process

In [21]:
print(start_ts, end_ts)

# Specify the version as int or long
(spark
  .read
  .format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 1)
  .option("endingVersion", 10)
  .table("covid_table")
)

# Specify timestamps as formatted timestamp
(spark
  .read
  .format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", start_ts)
  .option("endingTimestamp", end_ts)
  .table("covid_table")
)
 
# Providing only the startingVersion/timestamp
(spark
  .read
  .format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", start_ts)
  .table("covid_table")
)

# Specifying similarly with a file location
(spark
  .read
  .format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", start_ts)
  .load("/opt/spark/work-dir/spark-warehouse/covid_table/")
)

2023-07-26T22:49:03 2023-07-26T22:49:10


DataFrame[date: string, county: string, state: string, fips: int, cases: int, deaths: int, _change_type: string, _commit_version: bigint, _commit_timestamp: timestamp]

# Defining Change Data Feed read boundaries in a streaming process

In [22]:
# Specifying a starting version
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 1) \
  .load("/opt/spark/work-dir/spark-warehouse/covid_table/")

# Specifying a starting timestamp
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", start_ts) \
  .load("/opt/spark/work-dir/spark-warehouse/covid_table/")

# Not providing either option, i.e., process from beginning
spark.readStream.format("delta") \
  .option("readChangeFeed", "true") \
  .load("/opt/spark/work-dir/spark-warehouse/covid_table/")

DataFrame[date: string, county: string, state: string, fips: int, cases: int, deaths: int, _change_type: string, _commit_version: bigint, _commit_timestamp: timestamp]

# Example for viewing changes

In [23]:
for stream in spark.streams.active:
    stream.stop()

23/07/26 22:49:24 ERROR TorrentBroadcast: Store broadcast broadcast_204 fail, remove all pieces of the broadcast
23/07/26 22:49:24 ERROR TorrentBroadcast: Store broadcast broadcast_205 fail, remove all pieces of the broadcast


In [24]:
from pyspark.sql.functions import col

spark.sql("""
    update covid_table
        SET
            cases = 100,
            deaths = 1
        WHERE
            date='2020-07-08'
            and fips=21025;
    """)


DataFrame[num_affected_rows: bigint]

In [25]:
(spark
.read
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 12)
.table("covid_table")
.select(
    col("date"),
    col("fips"),
    col("cases"),
    col("deaths"),
    col("_change_type"),
    col("_commit_version"))
).show()

+----------+-----+-----+------+----------------+---------------+
|      date| fips|cases|deaths|    _change_type|_commit_version|
+----------+-----+-----+------+----------------+---------------+
|2020-07-08|21025|   12|     0| update_preimage|             12|
|2020-07-08|21025|  100|     1|update_postimage|             12|
+----------+-----+-----+------+----------------+---------------+

