-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

## Hourly Activity by Traffic Lab
Process streaming data to display the total active users by traffic source with a 1 hour window.
1. Cast to timestamp and add watermark for 2 hours
2. Aggregate active users by traffic source for 1 hour windows
3. Execute query with **`display`** and plot results
5. Use query name to stop streaming query

### Setup
Run the cells below to generate hourly JSON files of event data for July 3, 2020.

In [0]:
%run ../Includes/Classroom-Setup

Deleted the working directory dbfs:/user/si@elisaandgeeks.com/dbacademy/aspwd/asp_5_1bl_hourly_activity_by_traffic_lab


Your working directory is
dbfs:/user/si@elisaandgeeks.com/dbacademy/aspwd

The source for this dataset is
wasbs://courseware@dbacademy.blob.core.windows.net/apache-spark-programming-with-databricks/v02/

Skipping install of existing dataset to
dbfs:/user/si@elisaandgeeks.com/dbacademy/aspwd/datasets


Out[27]: DataFrame[key: string, value: string]

In [0]:
schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"

# Directory of hourly events logged from the BedBricks website on July 3, 2020
hourly_events_path = f"{datasets_dir}/events/events-2020-07-03.json"

df = (spark
      .readStream
      .schema(schema)
      .option("maxFilesPerTrigger", 1)
      .json(hourly_events_path)
     )

### 1. Cast to timestamp and add watermark for 2 hours
- Add a **`createdAt`** column by dividing **`event_timestamp`** by 1M and casting to timestamp
- Set a watermark of 2 hours on the **`createdAt`** column

Assign the resulting DataFrame to **`events_df`**.

In [0]:
# TODO
from pyspark.sql.functions import col

events_df = (df
             .withColumn("createdAt", (col("event_timestamp")/1e6).cast("timestamp"))
             .withWatermark("createdAt", "2 hours")
            )

**1.1: CHECK YOUR WORK**

In [0]:
assert "StructField(createdAt,TimestampType,true" in str(events_df.schema)
print("All test pass")

All test pass


### 2. Aggregate active users by traffic source for 1 hour windows

- Set the default shuffle partitions to the number of cores on your cluster
- Group by **`traffic_source`** with 1-hour tumbling windows based on the **`createdAt`** column
- Aggregate the approximate count of distinct users per **`traffic_source`** and alias the resulting column to **`active_users`**
- Select **`traffic_source`**, **`active_users`**, and the **`hour`** extracted from **`window.start`** with an alias of **`hour`**
- Sort by **`hour`** in ascending order
Assign the resulting DataFrame to **`traffic_df`**.

In [0]:
# TODO
from pyspark.sql.functions import approx_count_distinct, hour, window

spark.conf.set("spark.sql.shuffle.partitions", spark.sparkContext.defaultParallelism)

traffic_df = (events_df
              .groupBy("traffic_source", window(col("createdAt"), "1 hour"))
              .agg(approx_count_distinct("traffic_source").alias("active_users"))
              .select("traffic_source", "active_users", hour(col("window.start")).alias("hour"))
              .sort("hour")
)

**2.1: CHECK YOUR WORK**

In [0]:
assert str(traffic_df.schema) == "StructType(List(StructField(traffic_source,StringType,true),StructField(active_users,LongType,false),StructField(hour,IntegerType,true)))"
print("All test pass")

All test pass


### 3. Execute query with display() and plot results
- Use **`display`** to start **`traffic_df`** as a streaming query and display the resulting memory sink
  - Assign "hourly_traffic" as the name of the query by setting the **`streamName`** parameter of **`display`**
- Plot the streaming query results as a bar graph
- Configure the following plot options:
  - Keys: **`hour`**
  - Series groupings: **`traffic_source`**
  - Values: **`active_users`**

In [0]:
# TODO
display(traffic_df, streamName="hourly_traffic")

traffic_source,active_users,hour
youtube,1,0
facebook,1,0
instagram,1,0
email,1,0
google,1,0
direct,1,0
google,1,1
email,1,1
facebook,1,1
instagram,1,1


**3.1: CHECK YOUR WORK**

- The bar chart should plot **`hour`** on the x-axis and **`active_users`** on the y-axis
- Six bars should appear at every hour for all traffic sources
- The chart should stop at hour 23

### 4. Manage streaming query
- Iterate over SparkSession's list of active streams to find one with name "hourly_traffic"
- Stop the streaming query

In [0]:
# TODO
until_stream_is_ready("hourly_traffic")

for s in spark.streams.active:
  if s.name == "hourly_traffic":
    s.stop()
    s.awaitTermination()

The stream hourly_traffic is active and ready.


**4.1: CHECK YOUR WORK**
Print all active streams to check that "hourly_traffic" is no longer there

In [0]:
for s in spark.streams.active:
    print(s.name)

### Classroom Cleanup
Run the cell below to clean up resources.

In [0]:
classroom_cleanup()

-sandbox
&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>