
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img
    src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png"
    alt="Databricks Learning"
  >
</div>



# Streaming Query Lab
### Coupon Sales

Process and append streaming data on transactions using coupons.

##### Objectives
1. Read data stream
2. Filter for transactions with coupons codes
3. Write streaming query results to Delta
4. Monitor streaming query
5. Stop streaming query

##### Classes
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.html" target="_blank">DataStreamReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.html" target="_blank">DataStreamWriter</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.html" target="_blank">StreamingQuery</a>

## REQUIRED - SELECT CLASSIC COMPUTE

Before executing cells in this notebook, please select your classic compute cluster in the lab. Be aware that **Serverless** is enabled by default.

Follow these steps to select the classic compute cluster:

1. Navigate to the top-right of this notebook and click the drop-down menu to select your cluster. By default, the notebook will use **Serverless**.

1. If your cluster is available, select it and continue to the next cell. If the cluster is not shown:

    - In the drop-down, select **More**.

    - In the **Attach to an existing compute resource** pop-up, select the first drop-down. You will see a unique cluster name in that drop-down. Please select that cluster.

**NOTE:** If your cluster has terminated, you might need to restart it in order to select it. To do this:

1. Right-click on **Compute** in the left navigation pane and select *Open in new tab*.

1. Find the triangle icon to the right of your compute cluster name and click it.

1. Wait a few minutes for the cluster to start.

1. Once the cluster is running, complete the steps above to select your cluster.

In [0]:
%run ./Includes/Classroom-Setup-02L

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m



### 1. Read data stream

Assign the resulting DataFrame to **`df`**:
- Read from Delta files in the source directory specified by **`/Volumes/dbacademy_ecommerce/v01/delta/sales_hist`**
- Set to process 1 file per trigger



In [0]:
df = (spark
      .readStream
      .option("maxFilesPerTrigger", 1)
      .format("delta")
      .load('/Volumes/dbacademy_ecommerce/v01/delta/sales_hist')
     )

In [0]:
%skip
df = (spark
      .readStream
      .option("maxFilesPerTrigger", 1)
      .format("delta")
      .load('/Volumes/dbacademy_ecommerce/v01/delta/sales_hist')
     )


**1.1: CHECK YOUR WORK**

In [0]:
# Define the list of required columns
sales_required_columns = [
    'order_id', 'email', 'transaction_timestamp',
    'total_item_quantity', 'purchase_revenue_in_usd',
    'unique_items', 'items'
]

In [0]:
DA.validate_dataframe(df,sales_required_columns)

All validations passed!


### 2. Filter for transactions with coupon codes
- Explode the **`items`** field in **`df`** with the results replacing the existing **`items`** field
- Filter for records where **`items.coupon`** is not null

Assign the resulting DataFrame to **`coupon_sales_df`**.

In [0]:
from pyspark.sql.functions import col, explode

coupon_sales_df = (df
                   .withColumn("items", explode(col("items")))
                   .filter(col("items.coupon").isNotNull())
                  )

In [0]:
%skip
from pyspark.sql.functions import col, explode

coupon_sales_df = (df
                   .withColumn("items", explode(col("items")))
                   .filter(col("items.coupon").isNotNull())
                  )


**2.1: CHECK YOUR WORK**

In [0]:
# Expected schema fields and types

expected_fields = {
    "order_id": "LongType",
    "email": "StringType",
    "transaction_timestamp": "LongType",
    "total_item_quantity": "LongType",
    "purchase_revenue_in_usd": "DoubleType",
    "unique_items": "LongType",
    "items": "StructType"
}

In [0]:
DA.validate_schema(coupon_sales_df.schema,expected_fields)




### 3. Write streaming query results to Delta
- Configure the streaming query to write Delta format files in "append" mode
- Set the query name to "coupon_sales"
- Set a trigger interval of 1 second
- Set the checkpoint location to **`coupons_checkpoint_path`**
- Set the output path to **`coupons_output_path`**

Start the streaming query and assign the resulting handle to **`coupon_sales_query`**.

In [0]:
coupons_checkpoint_path = f"{DA.paths.working_dir}/coupon-sales"
coupons_output_path = f"{DA.paths.working_dir}/coupon-sales/output"

coupon_sales_query = (coupon_sales_df.<FILL_IN>
)



In [0]:
%skip

coupons_checkpoint_path = f"{DA.paths.working_dir}/coupon-sales"
coupons_output_path = f"{DA.paths.working_dir}/coupon-sales/output"

coupon_sales_query = (coupon_sales_df
                      .writeStream
                      .outputMode("append")
                      .format("delta")
                      .queryName("coupon_sales")
                      .trigger(processingTime="1 second")
                      .option("checkpointLocation", coupons_checkpoint_path)
                      .start(coupons_output_path))


**3.1: CHECK YOUR WORK**

Note: Please wait for stream to get started before validating

In [0]:
DA.validate_coupon_sales_query(coupon_sales_query,coupons_checkpoint_path,coupons_output_path)



### 4. Monitor streaming query
- Get the ID of streaming query and store it in **`queryID`**
- Get the status of streaming query and store it in **`queryStatus`**

In [0]:
query_id = coupon_sales_query.<FILL_IN>



In [0]:
%skip
query_id = coupon_sales_query.id
print(query_id)

In [0]:
query_status = coupon_sales_query.<FILL_IN>



In [0]:
%skip
query_status = coupon_sales_query.status
print(query_status)


**4.1: CHECK YOUR WORK**

In [0]:
DA.validate_query_status(query_status)




### 5. Stop streaming query
- Stop the streaming query

In [0]:
coupon_sales_query.<FILL_IN>



In [0]:
%skip
coupon_sales_query.stop()
coupon_sales_query.awaitTermination()

**5.1: CHECK YOUR WORK**

In [0]:
DA.validate_query_state(coupon_sales_query)



### 6. Verify the records were written in Delta format

In [0]:
display(<FILL-IN>)



In [0]:
%skip
display(spark.read.format("delta").load(coupons_output_path))

&copy; 2026 Databricks, Inc. All rights reserved. Apache, Apache Spark, Spark, the Spark Logo, Apache Iceberg, Iceberg, and the Apache Iceberg logo are trademarks of the <a href="https://www.apache.org/" target="_blank">Apache Software Foundation</a>.<br/><br/><a href="https://databricks.com/privacy-policy" target="_blank">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use" target="_blank">Terms of Use</a> | <a href="https://help.databricks.com/" target="_blank">Support</a>