-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Streaming Query

##### Objectives
1. Build streaming DataFrames
1. Display streaming query results
1. Write streaming query results
1. Monitor streaming query

##### Classes
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamReader.html#pyspark.sql.streaming.DataStreamReader" target="_blank">DataStreamReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.html#pyspark.sql.streaming.DataStreamWriter" target="_blank">DataStreamWriter</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.StreamingQuery.html#pyspark.sql.streaming.StreamingQuery" target="_blank">StreamingQuery</a>

In [0]:
%run ./Includes/Classroom-Setup

### Build streaming DataFrames

Obtain an initial streaming DataFrame from a Parquet-format file source.

In [0]:
schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"

df = (spark
      .readStream
      .schema(schema)
      .option("maxFilesPerTrigger", 1)
      .parquet(eventsPath)
     )
df.isStreaming

Out[4]: True

In [0]:
eventsPath

Out[5]: '/mnt/training/ecommerce/events/events.parquet'

In [0]:
display(df)

device,ecommerce,event_name,event_previous_timestamp,event_timestamp,geo,items,traffic_source,user_first_touch_timestamp,user_id
Windows,"List(null, null, null)",mattresses,,1593878451670406,"List(Rancho Cucamonga, CA)",List(),google,1593878451670406,UA000000107375516
Chrome OS,"List(null, null, null)",original,1593877120958836.0,1593877122961062,"List(Darien, IL)",List(),google,1593877120958836,UA000000107363395
macOS,"List(1995.0, 1, 1)",finalize,1593878927824405.0,1593878975223311,"List(Portland, OR)","List(List(null, M_PREM_K, Premium King Mattress, 1995.0, 1995.0, 1))",google,1593876594233488,UA000000107358786
Linux,"List(null, null, null)",main,,1593879040510110,"List(Minneapolis, MN)",List(),youtube,1593879040510110,UA000000107380793
Android,"List(null, null, null)",warranty,1593877045689659.0,1593878419031778,"List(Burbank, CA)",List(),google,1593877045689659,UA000000107362753
macOS,"List(null, null, null)",press,1593878002467366.0,1593878030394957,"List(Houston, TX)",List(),google,1593878002467366,UA000000107371441
macOS,"List(null, null, null)",main,,1593876576250697,"List(Leominster, MA)",List(),google,1593876576250697,UA000000107358624
Windows,"List(null, null, null)",warranty,1593878046761243.0,1593878217999965,"List(Rock Hill, MO)",List(),facebook,1593877573835861,UA000000107367618
macOS,"List(null, null, null)",main,,1593877402934268,"List(Houston, TX)",List(),instagram,1593877402934268,UA000000107365982
macOS,"List(null, null, null)",pillows,,1593877792014796,"List(Raleigh, NC)",List(),instagram,1593877792014796,UA000000107369599


Apply some transformations, producing new streaming DataFrames.

In [0]:
from pyspark.sql.functions import col, approx_count_distinct, count

emailTrafficDF = (df
                  .filter(col("traffic_source") == "email")
                  .withColumn("mobile", col("device").isin(["iOS", "Android"]))
                  .select("user_id", "event_timestamp", "mobile")
                 )
emailTrafficDF.isStreaming

Out[7]: True

In [0]:
display(emailTrafficDF)

user_id,event_timestamp,mobile
UA000000107364589,1593877247383050,True
UA000000107380291,1593878989920603,True
UA000000107379662,1593879019178474,False
UA000000107363204,1593877145629072,False
UA000000107377755,1593878702328335,True
UA000000107381578,1593879126075931,True
UA000000107360621,1593876808432395,True
UA000000107359302,1593877215376860,True
UA000000107372741,1593878901873861,False
UA000000107374342,1593878316344881,True


### Write streaming query results

Take the final streaming DataFrame (our result table) and write it to a file sink in "append" mode.

In [0]:
checkpointPath = userhome + "/email_traffic/checkpoint"
outputPath = userhome + "/email_traffic/output"

devicesQuery = (emailTrafficDF
                .writeStream
                .outputMode("append")
                .format("parquet")
                .queryName("email_traffic")
                .trigger(processingTime="1 second")
                .option("checkpointLocation", checkpointPath)
                .start(outputPath)
               )

In [0]:
display(devicesQuery)

### Monitor streaming query

Use the streaming query "handle" to monitor and control it.

In [0]:
devicesQuery.id

Out[10]: '26897ba2-cc81-48a7-8984-f506cb698cf0'

In [0]:
devicesQuery.status

Out[11]: {'message': 'Waiting for next trigger',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [0]:
devicesQuery.lastProgress

Out[12]: {'id': '26897ba2-cc81-48a7-8984-f506cb698cf0',
 'runId': '9e678388-3eb3-4d26-960e-fba0c69521b0',
 'name': 'email_traffic',
 'timestamp': '2023-07-15T16:55:07.000Z',
 'batchId': 4,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 26, 'triggerExecution': 27},
 'stateOperators': [],
 'sources': [{'description': 'FileStreamSource[dbfs:/mnt/training/ecommerce/events/events.parquet]',
   'startOffset': {'logOffset': 3},
   'endOffset': {'logOffset': 3},
   'latestOffset': None,
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0}],
 'sink': {'description': 'FileSink[dbfs:/user/hamed.vaheb@pwc.lu/email_traffic/output]',
  'numOutputRows': -1}}

In [0]:
devicesQuery.awaitTermination(5)

Out[13]: False

In [0]:
devicesQuery.stop()

# Coupon Sales Lab
Process and append streaming data on transactions using coupons.
1. Read data stream
2. Filter for transactions with coupons codes
3. Write streaming query results to Parquet
4. Monitor streaming query
5. Stop streaming query

##### Classes
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamReader.html#pyspark.sql.streaming.DataStreamReader" target="_blank">DataStreamReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.html#pyspark.sql.streaming.DataStreamWriter" target="_blank">DataStreamWriter</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.StreamingQuery.html#pyspark.sql.streaming.StreamingQuery" target="_blank">StreamingQuery</a>

### 1. Read data stream
- Use the schema stored in **`schema`**
- Set to process 1 file per trigger
- Read from Parquet files in the source directory specified by **`salesPath`**

Assign the resulting DataFrame to **`df`**.

In [0]:
schema = "order_id BIGINT, email STRING, transaction_timestamp BIGINT, total_item_quantity BIGINT, purchase_revenue_in_usd DOUBLE, unique_items BIGINT, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>"

In [0]:
# TODO
df = (spark
      .readStream
      .schema(schema)
      .option("maxFilesPerTrigger", 1)
      .parquet(salesPath)
     )
df.isStreaming

Out[18]: True

In [0]:
display(df)

order_id,email,transaction_timestamp,total_item_quantity,purchase_revenue_in_usd,unique_items,items
257437,kmunoz@powell-duran.com,1592194221828900,1,1995.0,1,"List(List(null, M_PREM_K, Premium King Mattress, 1995.0, 1995.0, 1))"
282611,bmurillo@hotmail.com,1592504237604072,1,940.5,1,"List(List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1))"
257448,bradley74@gmail.com,1592200438030141,1,945.0,1,"List(List(null, M_STAN_F, Standard Full Mattress, 945.0, 945.0, 1))"
257440,jameshardin@campbell-morris.biz,1592197217716495,1,1045.0,1,"List(List(null, M_STAN_Q, Standard Queen Mattress, 1045.0, 1045.0, 1))"
283949,whardin@hotmail.com,1592510720760323,1,535.5,1,"List(List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1))"
257444,emily88@cobb.com,1592199040703476,1,1045.0,1,"List(List(null, M_STAN_Q, Standard Queen Mattress, 1045.0, 1045.0, 1))"
257449,craig61@luna-oliver.com,1592200459769596,1,1195.0,1,"List(List(null, M_STAN_K, Standard King Mattress, 1195.0, 1195.0, 1))"
257441,johnsonashley@mcclain.com,1592197729873798,1,945.0,1,"List(List(null, M_STAN_F, Standard Full Mattress, 945.0, 945.0, 1))"
264191,maxwelltara@edwards.com,1592306255847870,2,993.6,2,"List(List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1), List(NEWBED10, P_FOAM_S, Standard Foam Pillow, 53.1, 59.0, 1))"
286727,rojasjorge@yahoo.com,1592533048926949,1,535.5,1,"List(List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1))"


**CHECK YOUR WORK**

In [0]:
assert df.isStreaming
assert df.columns == ["order_id", "email", "transaction_timestamp", "total_item_quantity", "purchase_revenue_in_usd", "unique_items", "items"]

### 2. Filter for transactions with coupon codes
- Explode the **`items`** field in **`df`** with the results replacing the existing **`items`** field
- Filter for records where **`items.coupon`** is not null

Assign the resulting DataFrame to **`couponSalesDF`**.

In [0]:
from pyspark.sql.functions import explode

In [0]:
# TODO
couponSalesDF = (df
                 .withColumn("items", explode("items"))
                 .filter(col("items").coupon.isNotNull())
)

In [0]:
display(couponSalesDF)

order_id,email,transaction_timestamp,total_item_quantity,purchase_revenue_in_usd,unique_items,items
282611,bmurillo@hotmail.com,1592504237604072,1,940.5,1,"List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1)"
283949,whardin@hotmail.com,1592510720760323,1,535.5,1,"List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1)"
264191,maxwelltara@edwards.com,1592306255847870,2,993.6,2,"List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1)"
264191,maxwelltara@edwards.com,1592306255847870,2,993.6,2,"List(NEWBED10, P_FOAM_S, Standard Foam Pillow, 53.1, 59.0, 1)"
286727,rojasjorge@yahoo.com,1592533048926949,1,535.5,1,"List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1)"
287573,marmstrong46@hotmail.com,1592547727220317,1,53.1,1,"List(NEWBED10, P_FOAM_S, Standard Foam Pillow, 53.1, 59.0, 1)"
271597,johnsonderrick@yahoo.com,1592399808941985,5,2846.7,5,"List(NEWBED10, M_STAN_K, Standard King Mattress, 1075.5, 1195.0, 1)"
271597,johnsonderrick@yahoo.com,1592399808941985,5,2846.7,5,"List(NEWBED10, P_DOWN_K, King Down Pillow, 143.1, 159.0, 1)"
271597,johnsonderrick@yahoo.com,1592399808941985,5,2846.7,5,"List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1)"
271597,johnsonderrick@yahoo.com,1592399808941985,5,2846.7,5,"List(NEWBED10, M_PREM_T, Premium Twin Mattress, 985.5, 1095.0, 1)"


In [0]:
couponSalesDF.printSchema()

root
 |-- order_id: long (nullable = true)
 |-- email: string (nullable = true)
 |-- transaction_timestamp: long (nullable = true)
 |-- total_item_quantity: long (nullable = true)
 |-- purchase_revenue_in_usd: double (nullable = true)
 |-- unique_items: long (nullable = true)
 |-- items: struct (nullable = true)
 |    |-- coupon: string (nullable = true)
 |    |-- item_id: string (nullable = true)
 |    |-- item_name: string (nullable = true)
 |    |-- item_revenue_in_usd: double (nullable = true)
 |    |-- price_in_usd: double (nullable = true)
 |    |-- quantity: long (nullable = true)



In [0]:
df.schema

Out[46]: StructType([StructField('order_id', LongType(), True), StructField('email', StringType(), True), StructField('transaction_timestamp', LongType(), True), StructField('total_item_quantity', LongType(), True), StructField('purchase_revenue_in_usd', DoubleType(), True), StructField('unique_items', LongType(), True), StructField('items', ArrayType(StructType([StructField('coupon', StringType(), True), StructField('item_id', StringType(), True), StructField('item_name', StringType(), True), StructField('item_revenue_in_usd', DoubleType(), True), StructField('price_in_usd', DoubleType(), True), StructField('quantity', LongType(), True)]), True), True)])

In [0]:
couponSalesDF.schema

Out[45]: StructType([StructField('order_id', LongType(), True), StructField('email', StringType(), True), StructField('transaction_timestamp', LongType(), True), StructField('total_item_quantity', LongType(), True), StructField('purchase_revenue_in_usd', DoubleType(), True), StructField('unique_items', LongType(), True), StructField('items', StructType([StructField('coupon', StringType(), True), StructField('item_id', StringType(), True), StructField('item_name', StringType(), True), StructField('item_revenue_in_usd', DoubleType(), True), StructField('price_in_usd', DoubleType(), True), StructField('quantity', LongType(), True)]), True)])

**CHECK YOUR WORK**

In [0]:
schemaStr = str(couponSalesDF.schema)
assert "StructField('items', StructType([StructField('coupon'" in schemaStr, "items column was not exploded"

### 3. Write streaming query results to parquet
- Configure the streaming query to write Parquet format files in "append" mode
- Set the query name to "coupon_sales"
- Set a trigger interval of 1 second
- Set the checkpoint location to **`couponsCheckpointPath`**
- Set the output path to **`couponsOutputPath`**

Start the streaming query and assign the resulting handle to **`couponSalesQuery`**.

In [0]:
# TODO
couponsCheckpointPath = workingDir + "/coupon-sales/checkpoint"
couponsOutputPath = workingDir + "/coupon-sales/output"

couponSalesQuery = (couponSalesDF
                .writeStream
                .outputMode("append")
                .format("parquet")
                .queryName("coupon_sales")
                .trigger(processingTime="1 second")
                .option("checkpointLocation", couponsCheckpointPath)
                .start(couponsOutputPath)
)

**CHECK YOUR WORK**

In [0]:
untilStreamIsReady("coupon_sales")
assert couponSalesQuery.isActive
assert len(dbutils.fs.ls(couponsOutputPath)) > 0
assert len(dbutils.fs.ls(couponsCheckpointPath)) > 0
assert "coupon_sales" in couponSalesQuery.lastProgress["name"]

The stream coupon_sales is active and ready.


### 4. Monitor streaming query
- Get the ID of streaming query and store it in **`queryID`**
- Get the status of streaming query and store it in **`queryStatus`**

In [0]:
# TODO
queryID = couponSalesQuery.id

In [0]:
queryID

Out[54]: '7c2f41e5-1036-46d6-aa3c-764c28fde7f3'

In [0]:
# TODO
queryStatus = couponSalesQuery.status

In [0]:
queryStatus

Out[58]: {'message': 'Waiting for next trigger',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [0]:
couponSalesQuery.lastProgress

Out[57]: {'id': '7c2f41e5-1036-46d6-aa3c-764c28fde7f3',
 'runId': 'fdb8d170-6387-4cb8-b249-381da87f5cb5',
 'name': 'coupon_sales',
 'timestamp': '2023-07-15T17:25:19.000Z',
 'batchId': 4,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 29, 'triggerExecution': 29},
 'stateOperators': [],
 'sources': [{'description': 'FileStreamSource[dbfs:/mnt/training/ecommerce/sales/sales.parquet]',
   'startOffset': {'logOffset': 3},
   'endOffset': {'logOffset': 3},
   'latestOffset': None,
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0}],
 'sink': {'description': 'FileSink[dbfs:/user/hamed.vaheb@pwc.lu/dbacademy/spark_programming/asp_4_1_streaming_query//coupon-sales/output]',
  'numOutputRows': -1}}

**CHECK YOUR WORK**

In [0]:
assert type(queryID) == str
assert list(queryStatus.keys()) == ["message", "isDataAvailable", "isTriggerActive"]

### 5. Stop streaming query
- Stop the streaming query

In [0]:
# TODO
couponSalesQuery.stop()

**CHECK YOUR WORK**

In [0]:
assert not couponSalesQuery.isActive

### 6. Verify the records were written in Parquet format

In [0]:
# TODO
display(dbutils.fs.ls(couponsOutputPath))

path,name,size,modificationTime
dbfs:/user/hamed.vaheb@pwc.lu/dbacademy/spark_programming/asp_4_1_streaming_query/coupon-sales/output/_spark_metadata/,_spark_metadata/,0,1689441860000
dbfs:/user/hamed.vaheb@pwc.lu/dbacademy/spark_programming/asp_4_1_streaming_query/coupon-sales/output/part-00000-3ccbb2b1-a590-4c28-8953-34507168b86a-c000.snappy.parquet,part-00000-3ccbb2b1-a590-4c28-8953-34507168b86a-c000.snappy.parquet,654276,1689441862000
dbfs:/user/hamed.vaheb@pwc.lu/dbacademy/spark_programming/asp_4_1_streaming_query/coupon-sales/output/part-00000-3e88aa75-fdc5-4e23-8dd7-d7c728bb5824-c000.snappy.parquet,part-00000-3e88aa75-fdc5-4e23-8dd7-d7c728bb5824-c000.snappy.parquet,630784,1689441864000
dbfs:/user/hamed.vaheb@pwc.lu/dbacademy/spark_programming/asp_4_1_streaming_query/coupon-sales/output/part-00000-7331d3c7-adfc-41cf-b67b-42e38b1ca3f2-c000.snappy.parquet,part-00000-7331d3c7-adfc-41cf-b67b-42e38b1ca3f2-c000.snappy.parquet,645723,1689441863000
dbfs:/user/hamed.vaheb@pwc.lu/dbacademy/spark_programming/asp_4_1_streaming_query/coupon-sales/output/part-00000-f3676af2-9008-4e6f-9f2c-023e9593d7b3-c000.snappy.parquet,part-00000-f3676af2-9008-4e6f-9f2c-023e9593d7b3-c000.snappy.parquet,633519,1689441865000


In [0]:
couponSalesQuery.lastProgress.keys()

Out[64]: dict_keys(['id', 'runId', 'name', 'timestamp', 'batchId', 'numInputRows', 'inputRowsPerSecond', 'processedRowsPerSecond', 'durationMs', 'stateOperators', 'sources', 'sink'])

### Classroom Cleanup
Run the cell below to clean up resources.

In [0]:
%run ./Includes/Classroom-Cleanup

Stopping 4 streams


-sandbox
&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>