# Streaming Query

##### Objectives
1. Build streaming DataFrames
1. Display streaming query results
1. Write streaming query results
1. Monitor streaming query

##### Classes
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamReader.html#pyspark.sql.streaming.DataStreamReader" target="_blank">DataStreamReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.html#pyspark.sql.streaming.DataStreamWriter" target="_blank">DataStreamWriter</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.StreamingQuery.html#pyspark.sql.streaming.StreamingQuery" target="_blank">StreamingQuery</a>

In [0]:
%run ./Includes/Classroom-Setup

### Build streaming DataFrames

Obtain an initial streaming DataFrame from a Parquet-format file source.

In [0]:
schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"

df = (spark
      .readStream
      .schema(schema)
      .option("maxFilesPerTrigger", 1)
      .parquet(eventsPath)
     )
df.isStreaming

Out[4]: True

In [0]:
schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"

df = (spark.readStream
           .schema(schema)
           .option("maxFilesPerTrigger", 1)
           .parquet(eventsPath)
        )

df.isStreaming

Out[7]: True

Apply some transformations, producing new streaming DataFrames.

In [0]:
from pyspark.sql.functions import col, approx_count_distinct, count

In [0]:
emailTrafficDF = (df.filter(col("traffic_source") == "email")
                    .withColumn("mobile", col("device").isin(['iOS', 'Android']))
                    .select("user_id", "event_timestamp", "mobile")
                 )

In [0]:
emailTrafficDF.isStreaming

Out[14]: True

### Write streaming query results

Take the final streaming DataFrame (our result table) and write it to a file sink in "append" mode.

In [0]:
checkpointPath = userhome + "/email_traffic/checkpoint"
outputPath = userhome + "/email_traffic/output"

devicesQuery = (emailTrafficDF
                .writeStream
                .outputMode("append")
                .format("parquet")
                .queryName("email_traffic")
                .trigger(processingTime="1 second")
                .option("checkpointLocation", checkpointPath)
                .start(outputPath)
               )

In [0]:
checkpointPath = userhome + "/email_traffic/checkpoint"
outputPath = userhome + "/email_traffic/output"

devicesQuery = (emailTrafficDF
                             .writeStream
                             .outputMode("append")
                             .format("delta")
                             .queryName("email_traffic")
                             .trigger(processingTime="100 milliseconds")
                             .option("checkpointLocation", checkpointPath)
                             .start(outputPath))

In [0]:
devicesQuery.writeStream()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-632445097376683>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdevicesQuery[0m[0;34m.[0m[0mtake[0m[0;34m([0m[0;36m5[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mAttributeError[0m: 'StreamingQuery' object has no attribute 'take'

### Monitor streaming query

Use the streaming query "handle" to monitor and control it.

In [0]:
devicesQuery.id

Out[18]: '1a4d9de4-44f1-46d2-9969-bc2b4b6bcf9d'

In [0]:
devicesQuery.status

Out[19]: {'message': 'Getting offsets from FileStreamSource[dbfs:/mnt/training/ecommerce/events/events.parquet]',
 'isDataAvailable': False,
 'isTriggerActive': True}

In [0]:
devicesQuery.lastProgress

Out[20]: {'id': '1a4d9de4-44f1-46d2-9969-bc2b4b6bcf9d',
 'runId': 'e792a2ec-f56f-46ed-98c6-b07b69e74660',
 'name': 'email_traffic',
 'timestamp': '2023-02-25T17:01:49.700Z',
 'batchId': 4,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 126, 'triggerExecution': 127},
 'stateOperators': [],
 'sources': [{'description': 'FileStreamSource[dbfs:/mnt/training/ecommerce/events/events.parquet]',
   'startOffset': {'logOffset': 3},
   'endOffset': {'logOffset': 3},
   'latestOffset': None,
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0}],
 'sink': {'description': 'DeltaSink[dbfs:/user/Admin.Databricks@novigosolutions.com/email_traffic/output]',
  'numOutputRows': -1}}

In [0]:
devicesQuery.awaitTermination(5)

Out[21]: False

In [0]:
devicesQuery.stop()

# Coupon Sales Lab
Process and append streaming data on transactions using coupons.
1. Read data stream
2. Filter for transactions with coupons codes
3. Write streaming query results to Parquet
4. Monitor streaming query
5. Stop streaming query

##### Classes
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamReader.html#pyspark.sql.streaming.DataStreamReader" target="_blank">DataStreamReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.html#pyspark.sql.streaming.DataStreamWriter" target="_blank">DataStreamWriter</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.streaming.StreamingQuery.html#pyspark.sql.streaming.StreamingQuery" target="_blank">StreamingQuery</a>

### 1. Read data stream
- Use the schema stored in **`schema`**
- Set to process 1 file per trigger
- Read from Parquet files in the source directory specified by **`salesPath`**

Assign the resulting DataFrame to **`df`**.

In [0]:
schema = "order_id BIGINT, email STRING, transaction_timestamp BIGINT, total_item_quantity BIGINT, purchase_revenue_in_usd DOUBLE, unique_items BIGINT, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>"

In [0]:
# TODO
df = (spark.readStream
           .schema(schema)
           .option("maxFilesPerTrigger", 1)
           .parquet(salesPath)
)

**CHECK YOUR WORK**

In [0]:
assert df.isStreaming
assert df.columns == ["order_id", "email", "transaction_timestamp", "total_item_quantity", "purchase_revenue_in_usd", "unique_items", "items"]

### 2. Filter for transactions with coupon codes
- Explode the **`items`** field in **`df`** with the results replacing the existing **`items`** field
- Filter for records where **`items.coupon`** is not null

Assign the resulting DataFrame to **`couponSalesDF`**.

In [0]:
# TODO
from pyspark.sql.functions import explode

couponSalesDF = (df.withColumn("items", explode(col("items")))
                   .filter(col("items.coupon").isNotNull())
)

**CHECK YOUR WORK**

In [0]:
schemaStr = str(couponSalesDF.schema)
assert "StructField(items,StructType(List(StructField(coupon" in schemaStr, "items column was not exploded"

### 3. Write streaming query results to parquet
- Configure the streaming query to write Parquet format files in "append" mode
- Set the query name to "coupon_sales"
- Set a trigger interval of 1 second
- Set the checkpoint location to **`couponsCheckpointPath`**
- Set the output path to **`couponsOutputPath`**

Start the streaming query and assign the resulting handle to **`couponSalesQuery`**.

In [0]:
# TODO
couponsCheckpointPath = workingDir + "/coupon-sales/checkpoint"
couponsOutputPath = workingDir + "/coupon-sales/output"

couponSalesQuery = (couponSalesDF.writeStream
                                 .format("parquet")
                                 .queryName("coupon_sales")
                                 .trigger(processingTime="1 second")
                                 .option("checkpointLocation", couponsCheckpointPath)
                                 .start(couponsOutputPath)
                                 
)

**CHECK YOUR WORK**

In [0]:
untilStreamIsReady("coupon_sales")
assert couponSalesQuery.isActive
assert len(dbutils.fs.ls(couponsOutputPath)) > 0
assert len(dbutils.fs.ls(couponsCheckpointPath)) > 0
assert "coupon_sales" in couponSalesQuery.lastProgress["name"]

The stream coupon_sales is active and ready.


### 4. Monitor streaming query
- Get the ID of streaming query and store it in **`queryID`**
- Get the status of streaming query and store it in **`queryStatus`**

In [0]:
# TODO
queryID = couponSalesQuery.id

In [0]:
# TODO
queryStatus = couponSalesQuery.status

**CHECK YOUR WORK**

In [0]:
assert type(queryID) == str
assert list(queryStatus.keys()) == ["message", "isDataAvailable", "isTriggerActive"]

### 5. Stop streaming query
- Stop the streaming query

In [0]:
# TODO
couponSalesQuery.stop()

**CHECK YOUR WORK**

In [0]:
assert not couponSalesQuery.isActive

### 6. Verify the records were written in Parquet format

In [0]:
# TODO
display(spark.read.parquet(couponsOutputPath).limit(5))

order_id,email,transaction_timestamp,total_item_quantity,purchase_revenue_in_usd,unique_items,items
282611,bmurillo@hotmail.com,1592504237604072,1,940.5,1,"List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1)"
283949,whardin@hotmail.com,1592510720760323,1,535.5,1,"List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1)"
264191,maxwelltara@edwards.com,1592306255847870,2,993.6,2,"List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1)"
264191,maxwelltara@edwards.com,1592306255847870,2,993.6,2,"List(NEWBED10, P_FOAM_S, Standard Foam Pillow, 53.1, 59.0, 1)"
286727,rojasjorge@yahoo.com,1592533048926949,1,535.5,1,"List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1)"


### Classroom Cleanup
Run the cell below to clean up resources.

In [0]:
%run ./Includes/Classroom-Cleanup

-sandbox
&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>