
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>



# Coupon Sales Lab
Process and append streaming data on transactions using coupons.
1. Read data stream
2. Filter for transactions with coupons codes
3. Write streaming query results to Delta
4. Monitor streaming query
5. Stop streaming query

##### Classes
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamReader.html" target="_blank">DataStreamReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.DataStreamWriter.html" target="_blank">DataStreamWriter</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQuery.html" target="_blank">StreamingQuery</a>

In [0]:
%run ../Includes/Classroom-Setup-5.1a

Resetting the learning environment:
| dropping the schema "anacadriano20_6ryf_da_asp"...(1 seconds)
| removing the working directory "dbfs:/mnt/dbacademy-users/anacadriano20@gmail.com/apache-spark-programming-with-databricks"...(2 seconds)

Skipping install of existing datasets to "dbfs:/mnt/dbacademy-datasets/apache-spark-programming-with-databricks/v03"

Validating the locally installed datasets:
| listing local files...(3 seconds)
| validation completed...(3 seconds total)

Creating & using the schema "anacadriano20_6ryf_da_asp" in the catalog "spark_catalog"...(0 seconds)

Predefined tables in "anacadriano20_6ryf_da_asp":
| -none-

Predefined paths variables:
| DA.paths.working_dir: dbfs:/mnt/dbacademy-users/anacadriano20@gmail.com/apache-spark-programming-with-databricks
| DA.paths.user_db:     dbfs:/mnt/dbacademy-users/anacadriano20@gmail.com/apache-spark-programming-with-databricks/database.db
| DA.paths.datasets:    dbfs:/mnt/dbacademy-datasets/apache-spark-programming-with-dat



### 1. Read data stream
- Set to process 1 file per trigger
- Read from Delta files in the source directory specified by **`DA.paths.sales`**

Assign the resulting DataFrame to **`df`**.

In [0]:
from pyspark.sql.functions import col,when, current_date, date_format,explode,filter
import pyspark.sql.functions as F
import pyspark as py

In [0]:
sale_path = DA.paths.sales

In [0]:
# TODO
df = (spark.readStream.load(sale_path)
)




**1.1: CHECK YOUR WORK**

In [0]:
DA.tests.validate_1_1(df)

Points,Test,Result
1,The query is streaming,
1,DataFrame contains all 7 columns,




### 2. Filter for transactions with coupon codes
- Explode the **`items`** field in **`df`** with the results replacing the existing **`items`** field
- Filter for records where **`items.coupon`** is not null

Assign the resulting DataFrame to **`coupon_sales_df`**.

In [0]:
#itens_exp = df.explode('items')
#df.select(explode(df.items).alias("anInt")).collect()

df_explodido = df.select("order_id",explode("items").alias('items'))

display(df_explodido)


In [0]:
df_notnull = df_explodido.filter(col('items.coupon').isNotNull())

df = df.select(col('order_id'),
              col('email'),
              col('transaction_timestamp'),
              col('total_item_quantity'),
              col('purchase_revenue_in_usd'),
              col('unique_items'))

coupon_sales_df = df.join(df_notnull,'order_id','inner')
display(coupon_sales_df)


order_id,email,transaction_timestamp,total_item_quantity,purchase_revenue_in_usd,unique_items,items
278205,christianlee@chambers.com,1592466913204896,1,940.5,1,"List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1)"
270871,ahuynh@gmail.com,1592387393915138,1,1615.5,1,"List(NEWBED10, M_PREM_Q, Premium Queen Mattress, 1615.5, 1795.0, 1)"
287904,melissa02@yahoo.com,1592554089814096,1,1795.5,1,"List(NEWBED10, M_PREM_K, Premium King Mattress, 1795.5, 1995.0, 1)"
308305,christopherflores@hotmail.com,1592685784726468,1,535.5,1,"List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1)"
324317,chenderson@beard.com,1592798192402749,2,957.6,2,"List(NEWBED10, M_STAN_F, Standard Full Mattress, 850.5, 945.0, 1)"
324317,chenderson@beard.com,1592798192402749,2,957.6,2,"List(NEWBED10, P_DOWN_S, Standard Down Pillow, 107.10000000000001, 119.0, 1)"
303906,cindygonzales@yahoo.com,1592669747826062,1,850.5,1,"List(NEWBED10, M_STAN_F, Standard Full Mattress, 850.5, 945.0, 1)"
301190,johnsonmarvin@hotmail.com,1592657875847204,1,940.5,1,"List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1)"
339214,garciajeremy@humphrey-taylor.com,1592921716053610,1,940.5,1,"List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1)"
301498,gonzalezstephanie@harding.com,1592659690235783,1,850.5,1,"List(NEWBED10, M_STAN_F, Standard Full Mattress, 850.5, 945.0, 1)"





**2.1: CHECK YOUR WORK**

In [0]:
DA.tests.validate_2_1(coupon_sales_df.schema)

Points,Test,Result
1,Schema is of type StructType,
1,Schema contians seven fields,
1,"Schema contains ""order_id"" of type LongType",
1,"Schema contains ""email"" of type StringType",
1,"Schema contains ""transaction_timestamp"" of type LongType",
1,"Schema contains ""total_item_quantity"" of type LongType",
1,"Schema contains ""purchase_revenue_in_usd"" of type DoubleType",
1,"Schema contains ""unique_items"" of type LongType",
1,"Schema contains ""items"" of type StructType",




### 3. Write streaming query results to Delta
- Configure the streaming query to write Delta format files in "append" mode
- Set the query name to "coupon_sales"
- Set a trigger interval of 1 second
- Set the checkpoint location to **`coupons_checkpoint_path`**
- Set the output path to **`coupons_output_path`**

Start the streaming query and assign the resulting handle to **`coupon_sales_query`**.



​
402 / 5.000
Resultados de tradução
Resultado da tradução
### 3. Grave os resultados da consulta de streaming no Delta
- Configure a consulta de streaming para gravar arquivos no formato Delta no modo "anexar"
- Defina o nome da consulta como "coupon_sales"
- Defina um intervalo de disparo de 1 segundo
- Defina o local do ponto de verificação como **`coupons_checkpoint_path`**
- Defina o caminho de saída para **`coupons_output_path`**

Inicie a consulta de streaming e atribua o identificador resultante a **`coupon_sales_query`**.



In [0]:
# TODO
coupons_checkpoint_path = f"{DA.paths.checkpoints}/coupon-sales"
coupons_output_path = f"{DA.paths.working_dir}/coupon-sales/output"

coupon_sales_query = (coupon_sales_df.writeStream
                 .outputMode("append")
                 .format("delta")
                 .queryName("coupon_sales")
                 .trigger(processingTime="1 seconds")
                 .option("checkpointLocation", coupons_checkpoint_path)
                 .start(coupons_output_path))





**3.1: CHECK YOUR WORK**

In [0]:
DA.tests.validate_3_1(coupon_sales_query)

Points,Test,Result
1,The query is active,
1,"The query name is ""coupon_sales"".",
1,Found at least one file in .../coupon-sales/output,
1,Found at least one file in .../coupon-sales,




### 4. Monitor streaming query
- Get the ID of streaming query and store it in **`queryID`**
- Get the status of streaming query and store it in **`queryStatus`**

4. Monitore a consulta de streaming
- Obtenha o ID da consulta de streaming e armazene-o em queryID
- Obtenha o status da consulta de streaming e armazene-o em queryStatus

In [0]:
# TODO
query_id = coupon_sales_query.id
query_id

Out[84]: '87560679-732e-4f9a-86b0-20deab37e287'

In [0]:
# TODO
query_status = coupon_sales_query.status
query_status

Out[85]: {'message': 'Waiting for next trigger',
 'isDataAvailable': False,
 'isTriggerActive': False}




**4.1: CHECK YOUR WORK**

In [0]:
DA.tests.validate_4_1(query_id, query_status)

Points,Test,Result
1,Valid status value.,
1,Valid query_id value.,




### 5. Stop streaming query
- Stop the streaming query



In [0]:
# TODO
coupon_sales_query.stop()




**5.1: CHECK YOUR WORK**

In [0]:
DA.tests.validate_5_1(coupon_sales_query)

Points,Test,Result
1,The query is not active,




### 6. Verify the records were written in Delta format

In [0]:
# TODO






### Classroom Cleanup
Run the cell below to clean up resources.

In [0]:
DA.cleanup()

Resetting the learning environment:
| stopping the stream "display_query_7"...(2 seconds)
| dropping the schema "anacadriano20_6ryf_da_asp"...(0 seconds)
| removing the working directory "dbfs:/mnt/dbacademy-users/anacadriano20@gmail.com/apache-spark-programming-with-databricks"...(2 seconds)

Validating the locally installed datasets:
| listing local files...(3 seconds)
| validation completed...(3 seconds total)


&copy; 2023 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>