### Spark Structured Streaming

In [0]:
%sql
create schema cat.streaming;

In [0]:
%sql
create volume cat.streaming.datavol;

In [0]:
dbutils.fs.mkdirs("/Volumes/cat/streaming/datavol/sink4/checkpoint")

True

#### Append OutputMode with Source Archiving

{
  "order_id": "ORD1002",
  "timestamp": "2025-06-01T10:30:00Z",
  "customer": {
    "customer_id": 502,
    "name": "Alice Smith",
    "email": "alice@example.com",
    "address": {
      "city": "Vancouver",
      "postal_code": "V5K 0A1",
      "country": "Canada"
    }
  },
  "items": [
    {
      "item_id": "I102",
      "product_name": "Bluetooth Keyboard",
      "quantity": 1,
      "price": 45.00
    }
  ],
  "payment": {
    "method": "PayPal",
    "transaction_id": "TXN7891"
  },
  "metadata": [
    {"key": "campaign", "value": "cyber_monday"},
    {"key": "channel", "value": "affiliate"}
  ]
}

In [0]:
from pyspark.sql.functions import *

In [0]:
schema = """
    order_id string,
    timestamp string,
    customer struct<
        customer_id int,
        name string,
        email string,
        address struct<
            city string,
            postal_code string,
            country string
        >
    >,
    items array<struct<
        item_id int,
        product_name string,
        quantity int,
        price double
    >>,
    payment struct<
        method string,
        transaction_id string
    >,
    metadata array<struct<
        key string,
        value string
    >>
"""

In [0]:
df = spark.readStream.format("json")\
            .schema(schema)\
            .option("multiLine", "true")\
            .option("cleanSource", "archive")\
            .option("sourceArchiveDir", "/Volumes/cat/streaming/datavol/source/archive")\
            .load("/Volumes/cat/streaming/datavol/source/data")

df_transform = df.withColumn("items", explode_outer(col("items")))
df_transform = df_transform.select(
    "order_id", 
    "timestamp", 
    "customer.customer_id", 
    "customer.name", 
    "customer.email",
    "customer.address.city",
    "customer.address.postal_code",
    "customer.address.country", 
    "items.item_id", 
    "items.product_name", 
    "items.quantity", 
    "items.price",
    "payment.method", 
    "payment.transaction_id",
    "metadata"
)

df_transform.writeStream\
    .outputMode("append")\
    .format("delta")\
    .option("checkpointLocation", "/Volumes/cat/streaming/datavol/sink/checkpoint")\
    .option("path", "/Volumes/cat/streaming/datavol/sink/data")\
    .trigger(once=True)\
    .start()


<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7fa9bdce7320>

In [0]:
%sql
select * from delta.`/Volumes/cat/streaming/datavol/sink/data`;

order_id,timestamp,customer_id,name,email,city,postal_code,country,item_id,product_name,quantity,price,method,transaction_id,metadata
ORD1001,2025-06-01T10:15:00Z,501,John Doe,john@example.com,Toronto,M5H 2N2,Canada,,Wireless Mouse,2,25.99,Credit Card,TXN7890,"List(List(campaign, back_to_school), List(channel, email))"
ORD1001,2025-06-01T10:15:00Z,501,John Doe,john@example.com,Toronto,M5H 2N2,Canada,,USB-C Adapter,1,15.49,Credit Card,TXN7890,"List(List(campaign, back_to_school), List(channel, email))"
ORD1002,2025-06-01T10:30:00Z,502,Alice Smith,alice@example.com,Vancouver,V5K 0A1,Canada,,Bluetooth Keyboard,1,45.0,PayPal,TXN7891,"List(List(campaign, cyber_monday), List(channel, affiliate))"


#### Complete OutputMode

In [0]:
from delta.tables import DeltaTable

DeltaTable.createOrReplace(spark)\
    .tableName("cat.streaming.source")\
    .addColumn("color", "string")\
    .execute()

<delta.connect.tables.DeltaTable at 0x7fa9bbc78c80>

In [0]:
%sql
insert into cat.streaming.source 
values 
('red'), ('green'), ('red');

num_affected_rows,num_inserted_rows
3,3


In [0]:
df = spark.readStream.table("cat.streaming.source")

df_transform = df.groupBy("color").count().alias("count")

df_transform.writeStream\
    .outputMode("complete")\
    .format("delta")\
    .option("checkpointLocation", "/Volumes/cat/streaming/datavol/sink2/checkpoint")\
    .option("path", "/Volumes/cat/streaming/datavol/sink2/data")\
    .trigger(once=True)\
    .start()


<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7fa9bdcc2c00>

In [0]:
%sql
select * from delta.`/Volumes/cat/streaming/datavol/sink2/data`;

color,count
yellow,1
green,2
blue,1
red,4


#### For Each Batch

In [0]:
def write_fn(df, batch_id):
  df = df.groupBy("color").count().alias("count")

  df.write.format("delta")\
      .mode("append")\
      .option("path", "/Volumes/cat/streaming/datavol/sink3/data1")\
      .save()

  df.write.format("delta")\
      .mode("append")\
      .option("path", "/Volumes/cat/streaming/datavol/sink3/data2")\
      .save()

In [0]:
df = spark.readStream.table("cat.streaming.source")

df.writeStream\
    .foreachBatch(write_fn)\
    .outputMode("append")\
    .option("checkpointLocation", "/Volumes/cat/streaming/datavol/sink3/checkpoint")\
    .trigger(availableNow=True)\
    .start()

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7fa9be8466f0>

#### Tumbling Windows

In [0]:
DeltaTable.createOrReplace(spark)\
    .tableName("cat.streaming.winsrc")\
    .addColumn("color", "string")\
    .addColumn("event_time", "timestamp")\
    .execute()

<delta.connect.tables.DeltaTable at 0x7fa9bdc9dca0>

In [0]:
%sql
insert into cat.streaming.winsrc
values
('red', '2025-08-27 09:07:00')

num_affected_rows,num_inserted_rows
1,1


In [0]:
df = spark.readStream.table("cat.streaming.winsrc")

df = df.groupBy("color", window("event_time", "10 minutes")).count().alias("count")

df.writeStream\
    .outputMode("complete")\
    .format("delta")\
    .option("path", "/Volumes/cat/streaming/datavol/sink4/data")\
    .option("checkpointLocation", "/Volumes/cat/streaming/datavol/sink4/checkpoint")\
    .trigger(once=True)\
    .start()

<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7fa9b1310aa0>

In [0]:
%sql
select * from delta.`/Volumes/cat/streaming/datavol/sink4/data`;

color,window,count
green,"List(2025-08-27T09:00:00.000Z, 2025-08-27T09:10:00.000Z)",1
red,"List(2025-08-27T09:10:00.000Z, 2025-08-27T09:20:00.000Z)",1
red,"List(2025-08-27T09:00:00.000Z, 2025-08-27T09:10:00.000Z)",2
