-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Exercise #5 - Streaming Orders

With our four historical datasets properly loaded, we can now begin to process the "current" orders.

In this case, the new "system" is landing one JSON file per order into cloud storage.

We can process these JSON files as a stream of orders under the assumption that new orders are continually added to this dataset.

In order to keep this project simple, we have reduced the "stream" of orders to just the first few hours of 2020 and will be throttling that stream to only one file per iteration.

This exercise is broken up into 3 steps:
* Exercise 5.A - Use Database
* Exercise 5.B - Stream-Append Orders
* Exercise 5.C - Stream-Append Line Items

## Some Friendly Advice...

Each record is a JSON object with roughly the following structure:

* **`customerID`**
* **`orderId`**
* **`products`**
  * array
    * **`productId`**
    * **`quantity`**
    * **`soldPrice`**
* **`salesRepId`**
* **`shippingAddress`**
  * **`address`**
  * **`attention`**
  * **`city`**
  * **`state`**
  * **`zip`**
* **`submittedAt`**

As you ingest this data, it will need to be transformed to match the existing **`orders`** table's schema and the **`line_items`** table's schema.

Before attempting to ingest the data as a stream, we highly recomend that you start with a static **`DataFrame`** so that you can iron out the various kinks:
* Renaming and flattening columns
* Exploding the products array
* Parsing the **`submittedAt`** column into a **`timestamp`**
* Conforming to the **`orders`** and **`line_items`** schemas - because these are Delta tables, appending to them will fail if the schemas are not correct

Furthermore, creating a stream from JSON files will first require you to specify the schema - you can "cheat" and infer that schema from some of the JSON files before starting the stream.

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Setup Exercise #5</h2>

To get started, we first need to configure your Registration ID and then run the setup notebook.

### Setup - Registration ID

In the next commmand, please update the variable **`registration_id`** with the Registration ID you received when you signed up for this project.

For more information, see [Registration ID]($./Registration ID)

In [0]:
registration_id = "3339094"

### Setup - Run the exercise setup

Run the following cell to setup this exercise, declaring exercise-specific variables and functions.

In [0]:
%run ./_includes/Setup-Exercise-05

Variable/Function,Description
username,dakota.murdock@wavicledata.com
,This is the email address that you signed into Databricks with
working_dir,dbfs:/user/dakota.murdock@wavicledata.com/dbacademy/developer-foundations-capstone
,This is the directory in which all work should be conducted
user_db,dbacademy_dakota_murdock_wavicledata_com_developer_foundations_capstone
,The name of the database you will use for this project.
orders_table,orders
,The name of the orders table.
products_table,products
,The name of the products table.


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #5.A - Use Database</h2>

Each notebook uses a different Spark session and will initially use the **`default`** database.

As in the previous exercise, we can avoid contention to commonly named tables by using our user-specific database.

**In this step you will need to:**
* Use the database identified by the variable **`user_db`** so that any tables created in this notebook are **NOT** added to the **`default`** database

### Implement Exercise #5.A

Implement your solution in the following cell:

In [0]:
spark.sql(f"USE {user_db};")

Out[13]: DataFrame[]

### Reality Check #5.A
Run the following command to ensure that you are on track:

In [0]:
reality_check_05_a()

Points,Test,Result
1,Using DBR 9.1 & Proper Cluster Configuration,
1,Valid Registration ID,
1,The current database is dbacademy_dakota_murdock_wavicledata_com_developer_foundations_capstone,
1,"Expected 195,698 orders",
1,"Expected 1,175,870 line-items",
1,Expected 12 products,


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #5.B - Stream-Append Orders</h2>

Every JSON file ingested by our stream representes one order and the enumerated list of products purchased in that order.

Our goal is simple, ingest the data, transform it as required by the **`orders`** table's schema, and append these new records to our existing table.

**In this step you will need to:**

* Ingest the stream of JSON files:
  * Start a stream from the path identified by **`stream_path`**.
  * Using the **`maxFilesPerTrigger`** option, throttle the stream to process only one file per iteration.
  * Add the ingest meta data (same as with our other datasets):
    * **`ingested_at`**:**`timestamp`**
    * **`ingest_file_name`**:**`string`**
  * Properly parse the **`submitted_at`**  as a valid **`timestamp`**
  * Add the column **`submitted_yyyy_mm`** usinge the format "**yyyy-MM**"
  * Make any other changes required to the column names and data types so that they conform to the **`orders`** table's schema

* Write the stream to a Delta **table**.:
  * The table's format should be "**delta**"
  * Partition the data by the column **`submitted_yyyy_mm`**
  * Records must be appended to the table identified by the variable **`orders_table`**
  * The query must be named the same as the table, identified by the variable **`orders_table`**
  * The query must use the checkpoint location identified by the variable **`orders_checkpoint_path`**

### Implement Exercise #5.B

Implement your solution in the following cell:

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

dbutils.fs.rm(orders_checkpoint_path, True)

schema = StructType(
    [
        StructField("customerId",StringType(),True),
        StructField("orderId",StringType(),True),
        StructField("products",ArrayType(StructType([StructField("productId",StringType(),True),
                                                       StructField("quantity",LongType(),True),
                                                       StructField("soldPrice",DoubleType(),True)]),True),True),
        StructField("salesRepId",StringType(),True),
        StructField("shippingAddress",StructType([StructField("address",StringType(),True),
                                                    StructField("attention",StringType(),True),
                                                    StructField("city",StringType(),True),
                                                    StructField("state",StringType(),True),
                                                    StructField("zip",StringType(),True)]),True),
        StructField("submittedAt",StringType(),True)
    ]
)

df = (spark.readStream
      .format("json")
      .option("maxFilesPerTrigger", 1)
      .schema(schema)
      .load(stream_path)
     )

df_orders = (df.select("submittedAt", "orderId", "customerId", "salesRepId", "shippingAddress")
              .withColumnRenamed("submittedAt", "submitted_at")
              .withColumnRenamed("orderId", "order_id")
              .withColumnRenamed("customerId", "customer_id")
              .withColumnRenamed("salesRepId", "sales_rep_id")
              .withColumnRenamed("shippingAddress", "shipping_address")
              .withColumn("ingest_file_name", input_file_name())
              .withColumn("submitted_at", col("submitted_at").cast("timestamp")) # Convert from unix time and cast column to timestamp
              .withColumn("submitted_yyyy_mm", date_format("submitted_at", "yyyy-MM")) # Created new year/month column from submitted_at column
              .withColumn("ingested_at", current_timestamp())
              .withColumn("shipping_address_attention", col("shipping_address").attention)
              .withColumn("shipping_address_address", col("shipping_address").address)
              .withColumn("shipping_address_city", col("shipping_address").city)
              .withColumn("shipping_address_state", col("shipping_address").state)
              .withColumn("shipping_address_zip", col("shipping_address").zip.cast("integer"))
              .drop("shipping_address")
                    )

df_orders_query = (df_orders.writeStream
                   .partitionBy("submitted_yyyy_mm")
                   .outputMode("append")
                   .format("delta")
                   .queryName(orders_table)
                   .option("checkpointLocation", orders_checkpoint_path)
                   .table(orders_table)
                  )

### Reality Check #5.B
Run the following command to ensure that you are on track.

**Caution**: In the cell above, you will be appending to a Delta table and the final record count will be validated below. Should you restart the stream, you will inevitably append duplicate records to these tables forcing the validation to fail. There are two things you will need to address in this scenario:
* Address the duplicate data issue by re-running **Exercise #3** which would presumably delete and/or overwrite the datasets, putting them back to their default state for this exercise.
* Address the stream's state issue (remembering which files were processed) by deleting the directory identified by *`orders_checkpoint_path`*

In [0]:
reality_check_05_b()

The stream "orders" has started.
The stream hasn't processed any trigger yet...
The stream hasn't processed any trigger yet...
The stream has processed 1 triggers so far.
Processing trigger 2 of 20...
Processing trigger 3 of 20...
Processing trigger 4 of 20...
Processing trigger 4 of 20...
Processing trigger 5 of 20...
Processing trigger 6 of 20...
Processing trigger 7 of 20...
Processing trigger 8 of 20...
Processing trigger 9 of 20...
Processing trigger 9 of 20...
Processing trigger 10 of 20...
Processing trigger 11 of 20...
Processing trigger 11 of 20...
Processing trigger 12 of 20...
Processing trigger 13 of 20...
Processing trigger 13 of 20...
Processing trigger 14 of 20...
Processing trigger 15 of 20...
Processing trigger 16 of 20...
Processing trigger 16 of 20...
Processing trigger 17 of 20...
Processing trigger 18 of 20...
Processing trigger 19 of 20...
Processing trigger 19 of 20...
Processing trigger 20 of 20...
Processing results...
PYTHON ERROR Invalid argument, not a strin

Points,Test,Result
1,Expected at least 20 triggers,
1,Expected less than 100 triggers,
1,Expected the first 20 triggers to processes 1 record per trigger,
1,Checkpoint directory exists,
1,"Expected 195,718 orders (20 new)",


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #5.C - Stream-Append Line Items</h2>

The same JSON file we processed in the previous stream also contains the line items which we now need to extract and append to the existing **`line_items`** table.

Just like before, our goal is simple, ingest the data, transform it as required by the **`line_items`** table's schema, and append these new records to our existing table.

Note: we are processing the same stream twice - there are other patterns to do this more efficiently, but for this exercise, we want to keep the design simple.<br/>
The good news here is that you can copy most of the code from the previous step to get you started here.

**In this step you will need to:**

* Ingest the stream of JSON files:
  * Start a stream from the path identified by **`stream_path`**.
  * Using the **`maxFilesPerTrigger`** option, throttle the stream to process only one file per iteration.
  * Add the ingest meta data (same as with our other datasets):
    * **`ingested_at`**:**`timestamp`**
    * **`ingest_file_name`**:**`string`**
  * Make any other changes required to the column names and data types so that they conform to the **`line_items`** table's schema
    * The most significant transformation will be to the **`products`** column.
    * The **`products`** column is an array of elements and needs to be exploded (see **`pyspark.sql.functions`**)
    * One solution would include:
      1. Select **`order_id`** and explode **`products`** while renaming it to **`product`**.
      2. Flatten the **`product`** column's nested values.
      3. Add the ingest meta data (**`ingest_file_name`** and **`ingested_at`**).
      4. Convert data types as required by the **`line_items`** table's schema.

* Write the stream to a Delta sink:
  * The sink's format should be "**delta**"
  * Records must be appended to the table identified by the variable **`line_items_table`**
  * The query must be named the same as the table, identified by the variable **`line_items_table`**
  * The query must use the checkpoint location identified by the variable **`line_items_checkpoint_path`**

### Implement Exercise #5.C

Implement your solution in the following cell:

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

dbutils.fs.rm(line_items_checkpoint_path, True)

schema = StructType(
    [
        StructField("customerId",StringType(),True),
        StructField("orderId",StringType(),True),
        StructField("products",ArrayType(StructType([StructField("productId",StringType(),True),
                                                       StructField("quantity",LongType(),True),
                                                       StructField("soldPrice",DoubleType(),True)]),True),True),
        StructField("salesRepId",StringType(),True),
        StructField("shippingAddress",StructType([StructField("address",StringType(),True),
                                                    StructField("attention",StringType(),True),
                                                    StructField("city",StringType(),True),
                                                    StructField("state",StringType(),True),
                                                    StructField("zip",StringType(),True)]),True),
        StructField("submittedAt",StringType(),True)
    ]
)

df = (spark.readStream
      .format("json")
      .option("maxFilesPerTrigger", 1)
      .schema(schema)
      .load(stream_path)
     )

df_line_item = (df.select("orderId", "products")
              .withColumnRenamed("orderId", "order_id")
              .withColumn("product", explode("products"))
              .withColumn("product_id", col("product").productId.cast("String"))
              .withColumn("product_quantity", col("product").quantity.cast("Integer"))
              .withColumn("product_sold_price", col("product").soldPrice.cast(DecimalType(10, 2)))
              .withColumn("ingest_file_name", input_file_name())
              .withColumn("ingested_at", current_timestamp())
              .drop("products")
              .drop("product")
                    )

df_line_item_query = (df_line_item.writeStream
                   .outputMode("append")
                   .format("delta")
                   .queryName(line_items_table)
                   .option("checkpointLocation", line_items_checkpoint_path)
                   .table(line_items_table)
                  )

In [0]:
reality_check_05_c()

The stream "line_items" has started.
The stream hasn't processed any trigger yet...
The stream hasn't processed any trigger yet...
The stream has processed 1 triggers so far.
Processing trigger 2 of 20...
Processing trigger 3 of 20...
Processing trigger 4 of 20...
Processing trigger 5 of 20...
Processing trigger 6 of 20...
Processing trigger 7 of 20...
Processing trigger 8 of 20...
Processing trigger 9 of 20...
Processing trigger 10 of 20...
Processing trigger 11 of 20...
Processing trigger 11 of 20...
Processing trigger 12 of 20...
Processing trigger 13 of 20...
Processing trigger 14 of 20...
Processing trigger 15 of 20...
Processing trigger 16 of 20...
Processing trigger 17 of 20...
Processing trigger 18 of 20...
Processing trigger 18 of 20...
Processing trigger 19 of 20...
Processing trigger 20 of 20...
Processing results...
PYTHON ERROR Invalid argument, not a string or column: 1636992409637.249 of type <class 'float'>. For column literals, use 'lit', 'array', 'struct' or 'create_m

Points,Test,Result
1,Expected at least 20 triggers,
1,Expected less than 100 triggers,
1,Expected the first 20 triggers to processes 1 record per trigger,
1,Checkpoint directory exists,
1,"Expected 1,175,967 records, (97 new)",


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #5 - Final Check</h2>

Run the following command to make sure this exercise is complete:

In [0]:
reality_check_05_final()

PYTHON ERROR Invalid argument, not a string or column: 1636992414132.0525 of type <class 'float'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
PYTHON ERROR Invalid argument, not a string or column: 1636992414149.7625 of type <class 'float'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
PYTHON ERROR Invalid argument, not a string or column: 1636992414163.6792 of type <class 'float'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
PYTHON ERROR Invalid argument, not a string or column: 1636992415087.5708 of type <class 'float'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
PYTHON ERROR Invalid argument, not a string or column: 1636992415711.6975 of type <class 'float'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
PYTHON ERROR Invalid argument, not a string or column: 1636992416348.2397 of type <class 'float'>. For column literals, use '

Points,Test,Result
1,Reality Check 05.A passed,
1,Reality Check 05.B passed,
1,Reality Check 05.C passed,
1,"Expected 195,718 orders (20 new)",
1,"Expected 1,175,967 records, (97 new)",
1,Expected 12 products,
1,Non-null (properly parsed) submitted_at,


-sandbox
&copy; 2021 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>