-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Exercise #5 - Streaming Orders

With our four historical datasets properly loaded, we can now begin to process the "current" orders.

In this case, the new "system" is landing one JSON file per order into cloud storage.

We can process these JSON files as a stream of orders under the assumption that new orders are continually added to this dataset.

In order to keep this project simple, we have reduced the "stream" of orders to just the first few hours of 2020 and will be throttling that stream to only one file per iteration.

This exercise is broken up into 3 steps:
* Exercise 5.A - Use Database
* Exercise 5.B - Stream-Append Orders
* Exercise 5.C - Stream-Append Line Items

## Some Friendly Advice...

Each record is a JSON object with roughly the following structure:

* **`customerID`**
* **`orderId`**
* **`products`**
  * array
    * **`productId`**
    * **`quantity`**
    * **`soldPrice`**
* **`salesRepId`**
* **`shippingAddress`**
  * **`address`**
  * **`attention`**
  * **`city`**
  * **`state`**
  * **`zip`**
* **`submittedAt`**

As you ingest this data, it will need to be transformed to match the existing **`orders`** table's schema and the **`line_items`** table's schema.

Before attempting to ingest the data as a stream, we highly recomend that you start with a static **`DataFrame`** so that you can iron out the various kinks:
* Renaming and flattening columns
* Exploding the products array
* Parsing the **`submittedAt`** column into a **`timestamp`**
* Conforming to the **`orders`** and **`line_items`** schemas - because these are Delta tables, appending to them will fail if the schemas are not correct

Furthermore, creating a stream from JSON files will first require you to specify the schema - you can "cheat" and infer that schema from some of the JSON files before starting the stream.

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Setup Exercise #5</h2>

To get started, we first need to configure your Registration ID and then run the setup notebook.

### Setup - Registration ID

In the next commmand, please update the variable **`registration_id`** with the Registration ID you received when you signed up for this project.

For more information, see [Registration ID]($./Registration ID)

In [0]:
registration_id = "3203488"

### Setup - Run the exercise setup

Run the following cell to setup this exercise, declaring exercise-specific variables and functions.

In [0]:
%run ./_includes/Setup-Exercise-05

In [0]:
dbutils.fs.rm(orders_checkpoint_path, recurse=True)
dbutils.fs.rm(line_items_checkpoint_path, recurse=True)

Variable/Function,Description
username,andrew.barry@infinitive.com
,This is the email address that you signed into Databricks with
working_dir,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone
,This is the directory in which all work should be conducted
user_db,dbacademy_andrew_barry_infinitive_com_db
,The name of the database you will use for this project.
orders_table,orders
,The name of the orders table.
products_table,products
,The name of the products table.


Variable/Function,Description
username,andrew.barry@infinitive.com
,This is the email address that you signed into Databricks with
working_dir,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone
,This is the directory in which all work should be conducted
user_db,dbacademy_andrew_barry_infinitive_com_db
,The name of the database you will use for this project.
orders_table,orders
,The name of the orders table.
products_table,products
,The name of the products table.


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #5.A - Use Database</h2>

Each notebook uses a different Spark session and will initially use the **`default`** database.

As in the previous exercise, we can avoid contention to commonly named tables by using our user-specific database.

**In this step you will need to:**
* Use the database identified by the variable **`user_db`** so that any tables created in this notebook are **NOT** added to the **`default`** database

### Implement Exercise #5.A

Implement your solution in the following cell:

In [0]:
use_query = "USE {};".format(user_db)

sqlContext.sql(use_query)

### Reality Check #5.A
Run the following command to ensure that you are on track:

In [0]:
reality_check_05_a()

Points,Test,Result
1,"Using DBR 7.3 LTS, with 8 cores",
1,Valid Registration ID,
1,The current database is dbacademy_andrew_barry_infinitive_com_db,
1,"Expected 195,698 orders",
1,"Expected 1,175,870 line-items",
1,Expected 12 products,


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #5.B - Stream-Append Orders</h2>

Every JSON file ingested by our stream representes one order and the enumerated list of products purchased in that order.

Our goal is simple, ingest the data, transform it as required by the **`orders`** table's schema, and append these new records to our existing table.

**In this step you will need to:**

* Ingest the stream of JSON files:
  * Start a stream from the path identified by **`stream_path`**.
  * Using the **`maxFilesPerTrigger`** option, throttle the stream to process only one file per iteration.
  * Add the ingest meta data (same as with our other datasets):
    * **`ingested_at`**:**`timestamp`**
    * **`ingest_file_name`**:**`string`**
  * Properly parse the **`submitted_at`**  as a valid **`timestamp`**
  * Add the column **`submitted_yyyy_mm`** usinge the format "**yyyy-MM**"
  * Make any other changes required to the column names and data types so that they conform to the **`orders`** table's schema

* Write the stream to a Delta **table**.:
  * The table's format should be "**delta**"
  * Partition the data by the column **`submitted_yyyy_mm`**
  * Records must be appended to the table identified by the variable **`orders_table`**
  * The query must be named the same as the table, identified by the variable **`orders_table`**
  * The query must use the checkpoint location identified by the variable **`orders_checkpoint_path`**

### Implement Exercise #5.B

Implement your solution in the following cell:

In [0]:
dbutils.fs.head("dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json")

In [0]:
from pyspark.sql.functions import *

from datetime import datetime
def parse_month(x):
  if x is None:
    return None
  else:
    string = x.strftime("%Y-%m")
    return string
  
month_udf = udf(parse_month)

schema = "submittedAt STRING, orderId STRING, customerId STRING, salesRepId STRING, products ARRAY<STRUCT<productId: STRING, quantity: STRING, soldPrice: STRING>>, shippingAddress STRUCT<attention: STRING, address: STRING, state: STRING, city: STRING, zip: STRING>"

prod_df = (spark.readStream
  .schema(schema)
  .option("maxFilesPerTrigger", 1)
  .json(stream_path)
           .withColumn("submittedAt", (col("submittedAt")).astype("timestamp"))
           .withColumn("shippingAddressAttention", col("shippingAddress.attention"))
           .withColumn("shippingAddressAddress", col("shippingAddress.address"))
           .withColumn("shippingAddressCity", col("shippingAddress.city"))
           .withColumn("shippingAddressState", col("shippingAddress.state"))
           .withColumn("shippingAddressZip", col("shippingAddress.zip").astype("int"))
           .drop("shippingAddress", "products")
           .toDF("submitted_at", "order_id", "customer_id", "sales_rep_id", "shipping_address_attention", "shipping_address_address",\
           "shipping_address_city", "shipping_address_state", "shipping_address_zip")
           .withColumn("ingest_file_name", input_file_name()).withColumn("ingested_at", current_timestamp())
           .withColumn("submitted_yyyy_mm", month_udf(col("submitted_at")))
)

In [0]:
display(prod_df)

submitted_at,order_id,customer_id,sales_rep_id,shipping_address_attention,shipping_address_address,shipping_address_city,shipping_address_state,shipping_address_zip,ingest_file_name,ingested_at,submitted_yyyy_mm
2020-01-01T15:00:00.000+0000,0612a18b-0cc7-43ea-9f5b-155aad967cb9,3bd96aa2-0000-4138-8191-b961618954a7,1e65eaee-012c-4987-8056-f37c19fc0a35,Khaleesi Frost,493 Augustine Drive N,Miramar,FL,33785,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json,2021-09-17T22:52:53.695+0000,2020-01
2020-01-01T18:00:00.000+0000,1d5609b3-b0e1-4201-8571-93c0c0e8e4da,418d1146-c085-4dd9-8538-a83ae73e720e,bc378481-c31f-4d56-8f20-023875cd5d09,Sofia Craig,173 W Burnsed Boulevard,Peoria,AZ,85613,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_1d5609b3-b0e1-4201-8571-93c0c0e8e4da_2020-01-01.json,2021-09-17T22:52:58.734+0000,2020-01
2020-01-01T09:00:00.000+0000,1da78b30-5658-45cc-b7cd-1e798d965f79,e426c612-b8c2-466c-87e4-1e524a1bb641,d89b9c35-5adc-42af-b6a7-0b32d14f17a3,Fernando Reilly,983 Hoopfer Way N,San Diego,CA,95636,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_1da78b30-5658-45cc-b7cd-1e798d965f79_2020-01-01.json,2021-09-17T22:53:01.569+0000,2020-01
2020-01-01T08:00:00.000+0000,24638248-2a9b-45fe-9d72-17cc813e7c32,b4f9fa1f-f601-4829-b2cb-fc6083e2b4a2,8a97a6f1-3f60-422d-914a-3a4b13c6afb2,Aries Petty,195 Burroughs Road S,West Jordan,UT,84647,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_24638248-2a9b-45fe-9d72-17cc813e7c32_2020-01-01.json,2021-09-17T22:53:04.165+0000,2020-01
2020-01-01T18:00:00.000+0000,3f6814fd-a291-4fff-9706-9d247c676aa0,fcbabbb9-f502-4253-ad9e-35762e873914,9617aeb0-46f5-44fd-a85c-a7991da17167,Patricia Cunningham,952 W Ruby Road,Salt Lake City,UT,84305,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_3f6814fd-a291-4fff-9706-9d247c676aa0_2020-01-01.json,2021-09-17T22:53:06.579+0000,2020-01
2020-01-01T00:00:00.000+0000,466f239e-755c-4aa8-b377-c24a2a8d99c7,8e218aa6-8c4f-4f76-b53d-53acdd926a40,81de6091-d4eb-4640-b99c-5b8989fbf4b9,Ibrahim Rollins,274 Eagle Ridge Drive,Sacramento,CA,91899,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_466f239e-755c-4aa8-b377-c24a2a8d99c7_2020-01-01.json,2021-09-17T22:53:09.734+0000,2020-01
2020-01-01T14:00:00.000+0000,52fb8aa7-ff89-4343-952b-74829914acd8,55e87a67-428d-4cb2-adf6-f9de5f0e3a45,81de6091-d4eb-4640-b99c-5b8989fbf4b9,Zahir Duffy,655 W Castlehill Drive,Jurupa Valley,CA,90509,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_52fb8aa7-ff89-4343-952b-74829914acd8_2020-01-01.json,2021-09-17T22:53:12.055+0000,2020-01
2020-01-01T14:00:00.000+0000,533577a8-d504-4360-8b05-3f38502a0fc1,416b4456-d6e2-416d-88e6-1699b753d289,bc378481-c31f-4d56-8f20-023875cd5d09,Emery Joseph,337 Petoskey Place,Sterling Heights,MI,48889,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_533577a8-d504-4360-8b05-3f38502a0fc1_2020-01-01.json,2021-09-17T22:53:14.227+0000,2020-01
2020-01-01T03:00:00.000+0000,575aaee8-4a6e-4ea1-ad3d-e69c2d6b7c2a,91a1776e-6cf4-4814-bfae-0bd13137675c,bc378481-c31f-4d56-8f20-023875cd5d09,Evelyn Savage,659 S Hendry Drive,Kent,WA,98582,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_575aaee8-4a6e-4ea1-ad3d-e69c2d6b7c2a_2020-01-01.json,2021-09-17T22:53:16.820+0000,2020-01
2020-01-01T12:00:00.000+0000,6e79596e-6038-4a0d-af1b-a99e63d0e574,81293070-4b6b-4da8-bc0d-263433b9f673,989866ca-a52f-4847-919c-f324042223e8,Judah Warner,496 W Holly Lane,Joliet,IL,60469,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_6e79596e-6038-4a0d-af1b-a99e63d0e574_2020-01-01.json,2021-09-17T22:53:20.207+0000,2020-01


In [0]:
writer = (prod_df.writeStream
  .format("delta")
  .partitionBy("submitted_yyyy_mm")
  .outputMode("append")
  .queryName(orders_table)
  .option("checkpointLocation", orders_checkpoint_path)
  .table(orders_table)
)

In [0]:
df = spark.sql("select count(*) from orders")
display(df)

count(1)
195698


### Reality Check #5.B
Run the following command to ensure that you are on track.

**Caution**: In the cell above, you will be appending to a Delta table and the final record count will be validated below. Should you restart the stream, you will inevitably append duplicate records to these tables forcing the validation to fail. There are two things you will need to address in this scenario:
* Address the duplicate data issue by re-running **Exercise #3** which would presumably delete and/or overwrite the datasets, putting them back to their default state for this exercise.
* Addrese the stream's state issue (remembering which files were processed) by deleting the directory identified by *`orders_checkpoint_path`*

In [0]:
reality_check_05_b()

Points,Test,Result
1,Expected at least 20 triggers,
1,Expected less than 100 triggers,
1,Expected the first 20 triggers to processes 1 record per trigger,
1,Checkpoint directory exists,
1,"Expected 195,718 orders (20 new)",


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #5.C - Stream-Append Line Items</h2>

The same JSON file we processed in the previous stream also contains the line items which we now need to extract and append to the existing **`line_items`** table.

Just like before, our goal is simple, ingest the data, transform it as required by the **`line_items`** table's schema, and append these new records to our existing table.

Note: we are processing the same stream twice - there are other patterns to do this more efficiently, but for this exercise, we want to keep the design simple.<br/>
The good news here is that you can copy most of the code from the previous step to get you started here.

**In this step you will need to:**

* Ingest the stream of JSON files:
  * Start a stream from the path identified by **`stream_path`**.
  * Using the **`maxFilesPerTrigger`** option, throttle the stream to process only one file per iteration.
  * Add the ingest meta data (same as with our other datasets):
    * **`ingested_at`**:**`timestamp`**
    * **`ingest_file_name`**:**`string`**
  * Make any other changes required to the column names and data types so that they conform to the **`line_items`** table's schema
    * The most significant transformation will be to the **`products`** column.
    * The **`products`** column is an array of elements and needs to be exploded (see **`pyspark.sql.functions`**)
    * One solution would include:
      1. Select **`order_id`** and explode **`products`** while renaming it to **`product`**.
      2. Flatten the **`product`** column's nested values.
      3. Add the ingest meta data (**`ingest_file_name`** and **`ingested_at`**).
      4. Convert data types as required by the **`line_items`** table's schema.

* Write the stream to a Delta sink:
  * The sink's format should be "**delta**"
  * Records must be appended to the table identified by the variable **`line_items_table`**
  * The query must be named the same as the table, identified by the variable **`line_items_table`**
  * The query must use the checkpoint location identified by the variable **`line_items_checkpoint_path`**

### Implement Exercise #5.C

Implement your solution in the following cell:

In [0]:
schema = "submittedAt STRING, orderId STRING, customerId STRING, salesRepId STRING, products ARRAY<STRUCT<productId: STRING, quantity: STRING, soldPrice: STRING>>, shippingAddress STRUCT<attention: STRING, address: STRING, city: STRING, zip: STRING>"

line_df = (spark.readStream
  .schema(schema)
  .option("maxFilesPerTrigger", 1)
  .json(stream_path)
           .drop("submittedAt", "customerId", "salesRepId", "shippingAddress")
           .withColumn("products", explode(col("products")))
           .withColumn("productId", col("products.productId"))
           .withColumn("productQuantity", col("products.quantity").astype("int"))
           .withColumn("productSoldPrice", col("products.soldPrice").astype("decimal(10,2)"))
           .drop("products")
           .toDF("order_id", "product_id", "product_quantity", "product_sold_price")
           .withColumn("ingest_file_name", input_file_name()).withColumn("ingested_at", current_timestamp())
)

In [0]:
display(line_df)

order_id,product_id,product_quantity,product_sold_price,ingest_file_name,ingested_at
0612a18b-0cc7-43ea-9f5b-155aad967cb9,8d809e13-fdc5-4d15-9271-953750f6d592,500,104.85,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json,2021-09-17T22:55:30.149+0000
0612a18b-0cc7-43ea-9f5b-155aad967cb9,ec15ba1d-53b6-44b0-8a22-1e498485f1b8,300,94.37,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json,2021-09-17T22:55:30.149+0000
0612a18b-0cc7-43ea-9f5b-155aad967cb9,e672483e-57a8-434a-bc42-ecf827c8a8d4,1000,109.57,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json,2021-09-17T22:55:30.149+0000
0612a18b-0cc7-43ea-9f5b-155aad967cb9,95cbadca-cf90-4b8a-a134-2976f6ba6df8,200,99.61,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json,2021-09-17T22:55:30.149+0000
0612a18b-0cc7-43ea-9f5b-155aad967cb9,699fcfe8-ce60-42c9-9d0f-728df3e48d70,100,99.61,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json,2021-09-17T22:55:30.149+0000
0612a18b-0cc7-43ea-9f5b-155aad967cb9,a990d79b-4957-42fc-8e42-20ceb1fd1259,100,115.34,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json,2021-09-17T22:55:30.149+0000
1d5609b3-b0e1-4201-8571-93c0c0e8e4da,e672483e-57a8-434a-bc42-ecf827c8a8d4,900,104.59,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_1d5609b3-b0e1-4201-8571-93c0c0e8e4da_2020-01-01.json,2021-09-17T22:55:33.210+0000
1d5609b3-b0e1-4201-8571-93c0c0e8e4da,a990d79b-4957-42fc-8e42-20ceb1fd1259,600,110.09,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_1d5609b3-b0e1-4201-8571-93c0c0e8e4da_2020-01-01.json,2021-09-17T22:55:33.210+0000
1d5609b3-b0e1-4201-8571-93c0c0e8e4da,8d809e13-fdc5-4d15-9271-953750f6d592,600,100.09,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_1d5609b3-b0e1-4201-8571-93c0c0e8e4da_2020-01-01.json,2021-09-17T22:55:33.210+0000
1d5609b3-b0e1-4201-8571-93c0c0e8e4da,699fcfe8-ce60-42c9-9d0f-728df3e48d70,800,95.08,dbfs:/user/andrew.barry@infinitive.com/dbacademy/developer-foundations-capstone/raw/orders/stream/order_1d5609b3-b0e1-4201-8571-93c0c0e8e4da_2020-01-01.json,2021-09-17T22:55:33.210+0000


In [0]:
lineStream = (line_df.writeStream
                .format("delta")
                .outputMode("append")
                .queryName(line_items_table)
                .option("maxFilesPerTrigger", 1)
                .option("checkpointLocation", line_items_checkpoint_path)
                .table(line_items_table)
)

In [0]:
df = spark.sql("select count(*) from line_items")
display(df)

count(1)
1175870


In [0]:
reality_check_05_c()

Points,Test,Result
1,Expected at least 20 triggers,
1,Expected less than 100 triggers,
1,Expected the first 20 triggers to processes 1 record per trigger,
1,Checkpoint directory exists,
1,"Expected 1,175,967 records, (97 new)",


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #5 - Final Check</h2>

Run the following command to make sure this exercise is complete:

In [0]:
reality_check_05_final()

Points,Test,Result
1,Reality Check 05.A passed,
1,Reality Check 05.B passed,
1,Reality Check 05.C passed,
1,"Expected 195,718 orders (20 new)",
1,"Expected 1,175,967 records, (97 new)",
1,Expected 12 products,
1,Non-null (properly parsed) submitted_at,
