-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# Exercise #5 - Streaming Orders

With our four historical datasets properly loaded, we can now begin to process the "current" orders.

In this case, the new "system" is landing one JSON file per order into cloud storage.

We can process these JSON files as a stream of orders under the assumption that new orders are continually added to this dataset.

In order to keep this project simple, we have reduced the "stream" of orders to just the first few hours of 2020 and will be throttling that stream to only one file per iteration.

This exercise is broken up into 3 steps:
* Exercise 5.A - Use Database
* Exercise 5.B - Stream-Append Orders
* Exercise 5.C - Stream-Append Line Items

## Some Friendly Advice...

Each record is a JSON object with roughly the following structure:

* **`customerID`**
* **`orderId`**
* **`products`**
  * array
    * **`productId`**
    * **`quantity`**
    * **`soldPrice`**
* **`salesRepId`**
* **`shippingAddress`**
  * **`address`**
  * **`attention`**
  * **`city`**
  * **`state`**
  * **`zip`**
* **`submittedAt`**

As you ingest this data, it will need to be transformed to match the existing **`orders`** table's schema and the **`line_items`** table's schema.

Before attempting to ingest the data as a stream, we highly recomend that you start with a static **`DataFrame`** so that you can iron out the various kinks:
* Renaming and flattening columns
* Exploding the products array
* Parsing the **`submittedAt`** column into a **`timestamp`**
* Conforming to the **`orders`** and **`line_items`** schemas - because these are Delta tables, appending to them will fail if the schemas are not correct

Furthermore, creating a stream from JSON files will first require you to specify the schema - you can "cheat" and infer that schema from some of the JSON files before starting the stream.

<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Setup Exercise #5</h2>

To get started, run the following cell to setup this exercise, declaring exercise-specific variables and functions.

In [0]:
%run ./_includes/Setup-Exercise-05

Variable/Function,Description
username,cenz.wong@ekimetrics.com
,This is the email address that you signed into Databricks with
working_dir,dbfs:/dbacademy/cenz.wong@ekimetrics.com/developer-foundations-capstone
,This is the directory in which all work should be conducted
user_db,dbacademy_cenz_wong_ekimetrics_com_developer_foundations_capstone
,The name of the database you will use for this project.
orders_table,orders
,The name of the orders table.
products_table,products
,The name of the products table.


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #5.A - Use Database</h2>

Each notebook uses a different Spark session and will initially use the **`default`** database.

As in the previous exercise, we can avoid contention to commonly named tables by using our user-specific database.

**In this step you will need to:**
* Use the database identified by the variable **`user_db`** so that any tables created in this notebook are **NOT** added to the **`default`** database

### Implement Exercise #5.A

Implement your solution in the following cell:

In [0]:
%sql
-- # CREATE DATABASE IF NOT EXISTS dbacademy_cenz_wong_ekimetrics_com_developer_foundations_capstone;
-- # USE dbacademy_cenz_wong_ekimetrics_com_developer_foundations_capstone;

use dbacademy_cenz_wong_ekimetrics_com_developer_foundations_capstone;
show tables;

database,tableName,isTemporary
dbacademy_cenz_wong_ekimetrics_com_developer_foundations_capstone,line_items,False
dbacademy_cenz_wong_ekimetrics_com_developer_foundations_capstone,orders,False
dbacademy_cenz_wong_ekimetrics_com_developer_foundations_capstone,products,False
dbacademy_cenz_wong_ekimetrics_com_developer_foundations_capstone,sales_reps,False


In [0]:
# Spark Hive table operations
spark.sql("CREATE DATABASE IF NOT EXISTS {}".format(user_db))
spark.sql("USE {}".format(user_db))

Out[12]: DataFrame[]

### Reality Check #5.A
Run the following command to ensure that you are on track:

In [0]:
reality_check_05_a()

Points,Test,Result
1,Using DBR 9.1 & Proper Cluster Configuration,
1,Valid Registration ID,
1,The current database is dbacademy_cenz_wong_ekimetrics_com_developer_foundations_capstone,
1,"Expected 195,698 orders",
1,"Expected 1,175,870 line-items",
1,Expected 12 products,


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #5.B - Stream-Append Orders</h2>

Every JSON file ingested by our stream representes one order and the enumerated list of products purchased in that order.

Our goal is simple, ingest the data, transform it as required by the **`orders`** table's schema, and append these new records to our existing table.

**In this step you will need to:**

* Ingest the stream of JSON files:
  * Start a stream from the path identified by **`stream_path`**.
  * Using the **`maxFilesPerTrigger`** option, throttle the stream to process only one file per iteration.
  * Add the ingest meta data (same as with our other datasets):
    * **`ingested_at`**:**`timestamp`**
    * **`ingest_file_name`**:**`string`**
  * Properly parse the **`submitted_at`**  as a valid **`timestamp`**
  * Add the column **`submitted_yyyy_mm`** usinge the format "**yyyy-MM**"
  * Make any other changes required to the column names and data types so that they conform to the **`orders`** table's schema

* Write the stream to a Delta **table**.:
  * The table's format should be "**delta**"
  * Partition the data by the column **`submitted_yyyy_mm`**
  * Records must be appended to the table identified by the variable **`orders_table`**
  * The query must be named the same as the table, identified by the variable **`orders_table`**
  * The query must use the checkpoint location identified by the variable **`orders_checkpoint_path`**

### Implement Exercise #5.B

Implement your solution in the following cell:

In [0]:
# # https://stackoverflow.com/questions/54503014/how-to-get-the-schema-definition-from-a-dataframe-in-pyspark
# schema_json = spark.read.table(orders_table).schema.json()

# ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
# ddl

In [0]:
from pyspark.sql.types import StructType
from pyspark.sql.functions import col

from pyspark.sql.functions import input_file_name, current_timestamp
from pyspark.sql.functions import date_format
from pyspark.sql.types import StringType, IntegerType

from pyspark.sql.functions import from_unixtime, to_timestamp

In [0]:
# return a list of all (possibly nested) fields to select, within a given schema
def flatten(schema, prefix: str = ""):
    # return a list of sub-items to select, within a given field
    def field_items(field):
        name = f'{prefix}.{field.name}' if prefix else field.name
        if type(field.dataType) == StructType:
            return flatten(field.dataType, name)
        else:
            return [col(name)]
    return [item for field in schema.fields for item in field_items(field)]

In [0]:
orders_table_columns = list(spark.read.table(orders_table).columns)

In [0]:
# Streaming

stream_static_schema = StructType(
  list(spark.read.option("inferSchema", True).json(stream_path).schema)
)

stream_raw_df = (spark.readStream
  .schema(stream_static_schema)
  .option("maxFilesPerTrigger", 1)
  .json(stream_path)
)

In [0]:
# drop the product
stream_df = stream_raw_df.drop("products")

# add ingest_file_name
stream_df = stream_df.withColumn("ingest_file_name", input_file_name())
# add timestamp
stream_df = stream_df.withColumn("ingested_at", current_timestamp())

# flatten Shipping Address
flattened = flatten(stream_df.schema)
stream_df = stream_df.select(*flattened)

# Rename
# rename column
stream_df = stream_df.withColumnRenamed('customerId', 'customer_id')\
                                        .withColumnRenamed('orderId', 'order_id')\
                                        .withColumnRenamed('salesRepId', 'sales_rep_id')\
                                        .withColumnRenamed('productId', 'product_id')\
                                        .withColumnRenamed('address', 'shipping_address_address')\
                                        .withColumnRenamed('attention', 'shipping_address_attention')\
                                        .withColumnRenamed('city', 'shipping_address_city')\
                                        .withColumnRenamed('state', 'shipping_address_state')\
                                        .withColumnRenamed('zip', 'shipping_address_zip')\
                                        .withColumnRenamed('submittedAt', 'submitted_at')

@udf(returnType=IntegerType())
def shipping_address_zip_to_int(line: str) -> int:
  # Currently the UDF can only be string in string out  
  if line is not None:
      return int(line)
  return None
stream_df = stream_df.withColumn('shipping_address_zip', shipping_address_zip_to_int(stream_df.shipping_address_zip))

# Data Format
stream_df = stream_df.select("*", date_format(stream_df.submitted_at, "yyyy-MM").alias("submitted_yyyy_mm"))
# 2020-01-01T10:00:00.000Z https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
stream_df = stream_df.withColumn('submitted_at', to_timestamp('submitted_at', "yyyy-MM-dd'T'HH:mm:ss.SSSX"))

stream_orders_df = stream_df.select( list(spark.read.table(orders_table).columns)  )

In [0]:
stream_orders_df.printSchema()

root
 |-- submitted_at: timestamp (nullable = true)
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- sales_rep_id: string (nullable = true)
 |-- shipping_address_attention: string (nullable = true)
 |-- shipping_address_address: string (nullable = true)
 |-- shipping_address_city: string (nullable = true)
 |-- shipping_address_state: string (nullable = true)
 |-- shipping_address_zip: integer (nullable = true)
 |-- ingest_file_name: string (nullable = false)
 |-- ingested_at: timestamp (nullable = false)
 |-- submitted_yyyy_mm: string (nullable = true)



In [0]:

# devicesQuery = (stream_orders_df.writeStream
#   .outputMode("append")
#   .format("delta")
#   .queryName(orders_table)
#   .trigger(processingTime="1 second")
#   .option("checkpointLocation", orders_checkpoint_path)
#   .partitionBy("submitted_yyyy_mm")
#   .start(OutputPath)
# )

devicesQuery = (stream_orders_df.writeStream
  .queryName(orders_table)
  .trigger(processingTime="1 second")
  .option("checkpointLocation", orders_checkpoint_path)
  .partitionBy("submitted_yyyy_mm")
  .toTable(orders_table, "delta", "append")
)

In [0]:
# dbutils.fs.rm(orders_checkpoint_path, True)

### Reality Check #5.B
Run the following command to ensure that you are on track.

**Caution**: In the cell above, you will be appending to a Delta table and the final record count will be validated below. Should you restart the stream, you will inevitably append duplicate records to these tables forcing the validation to fail. There are two things you will need to address in this scenario:
* Address the duplicate data issue by re-running **Exercise #3** which would presumably delete and/or overwrite the datasets, putting them back to their default state for this exercise.
* Address the stream's state issue (remembering which files were processed) by deleting the directory identified by *`orders_checkpoint_path`*

In [0]:
reality_check_05_b()

The stream "orders" has started.
The stream hasn't processed any trigger yet...
The stream hasn't processed any trigger yet...
The stream has processed 1 triggers so far.
Processing trigger 2 of 20...
Processing trigger 3 of 20...
Processing trigger 4 of 20...
Processing trigger 4 of 20...
Processing trigger 5 of 20...
Processing trigger 6 of 20...
Processing trigger 7 of 20...
Processing trigger 8 of 20...
Processing trigger 9 of 20...
Processing trigger 9 of 20...
Processing trigger 10 of 20...
Processing trigger 11 of 20...
Processing trigger 12 of 20...
Processing trigger 12 of 20...
Processing trigger 13 of 20...
Processing trigger 14 of 20...
Processing trigger 15 of 20...
Processing trigger 15 of 20...
Processing trigger 16 of 20...
Processing trigger 17 of 20...
Processing trigger 18 of 20...
Processing trigger 18 of 20...
Processing trigger 19 of 20...
Processing trigger 20 of 20...
Processing results...
Stopping the stream...


Points,Test,Result
1,Expected at least 20 triggers,
1,Expected less than 100 triggers,
1,Expected the first 20 triggers to processes 1 record per trigger,
1,Checkpoint directory exists,
1,"Expected 195,718 orders (20 new)",


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #5.C - Stream-Append Line Items</h2>

The same JSON file we processed in the previous stream also contains the line items which we now need to extract and append to the existing **`line_items`** table.

Just like before, our goal is simple, ingest the data, transform it as required by the **`line_items`** table's schema, and append these new records to our existing table.

Note: we are processing the same stream twice - there are other patterns to do this more efficiently, but for this exercise, we want to keep the design simple.<br/>
The good news here is that you can copy most of the code from the previous step to get you started here.

**In this step you will need to:**

* Ingest the stream of JSON files:
  * Start a stream from the path identified by **`stream_path`**.
  * Using the **`maxFilesPerTrigger`** option, throttle the stream to process only one file per iteration.
  * Add the ingest meta data (same as with our other datasets):
    * **`ingested_at`**:**`timestamp`**
    * **`ingest_file_name`**:**`string`**
  * Make any other changes required to the column names and data types so that they conform to the **`line_items`** table's schema
    * The most significant transformation will be to the **`products`** column.
    * The **`products`** column is an array of elements and needs to be exploded (see **`pyspark.sql.functions`**)
    * One solution would include:
      1. Select **`order_id`** and explode **`products`** while renaming it to **`product`**.
      2. Flatten the **`product`** column's nested values.
      3. Add the ingest meta data (**`ingest_file_name`** and **`ingested_at`**).
      4. Convert data types as required by the **`line_items`** table's schema.

* Write the stream to a Delta sink:
  * The sink's format should be "**delta**"
  * Records must be appended to the table identified by the variable **`line_items_table`**
  * The query must be named the same as the table, identified by the variable **`line_items_table`**
  * The query must use the checkpoint location identified by the variable **`line_items_checkpoint_path`**

### Implement Exercise #5.C

Implement your solution in the following cell:

In [0]:
# Streaming

stream_static_schema = StructType(
  list(spark.read.option("inferSchema", True).json(stream_path).schema)
)

stream_raw_df = (spark.readStream
  .schema(stream_static_schema)
  .option("maxFilesPerTrigger", 1)
  .json(stream_path)
)

static_raw_df = spark.read.option("inferSchema", True).json(stream_path)

In [0]:

from pyspark.sql.functions import explode_outer

stream_df = stream_raw_df.select("customerId", "orderId", explode_outer(stream_raw_df.products).alias("products"), "salesRepId", "shippingAddress", "submittedAt")

# add ingest_file_name
stream_df = stream_df.withColumn("ingest_file_name", input_file_name())
# add timestamp
stream_df = stream_df.withColumn("ingested_at", current_timestamp())

# flatten Shipping Address
flattened = flatten(stream_df.schema)
stream_df = stream_df.select(*flattened)

# Rename
stream_df = stream_df.withColumnRenamed('customerId', 'customer_id')\
                                        .withColumnRenamed('orderId', 'order_id')\
                                        .withColumnRenamed('productId', 'product_id')\
                                        .withColumnRenamed('quantity', 'product_quantity')\
                                        .withColumnRenamed('soldPrice', 'product_sold_price')\
                                        .withColumnRenamed('salesRepId', 'sales_rep_id')\
                                        .withColumnRenamed('productId', 'product_id')\
                                        .withColumnRenamed('address', 'shipping_address_address')\
                                        .withColumnRenamed('attention', 'shipping_address_attention')\
                                        .withColumnRenamed('city', 'shipping_address_city')\
                                        .withColumnRenamed('state', 'shipping_address_state')\
                                        .withColumnRenamed('zip', 'shipping_address_zip')\
                                        .withColumnRenamed('submittedAt', 'submitted_at')

@udf(returnType=IntegerType())
def shipping_address_zip_to_int(line: str) -> int:
  # Currently the UDF can only be string in string out  
  if line is not None:
      return int(line)
  return None
stream_df = stream_df.withColumn('shipping_address_zip', shipping_address_zip_to_int(stream_df.shipping_address_zip))

from pyspark.sql.types import IntegerType, DecimalType

stream_df = stream_df.withColumn('product_quantity', stream_df.product_quantity.cast(IntegerType()))
stream_df = stream_df.withColumn('product_sold_price', stream_df.product_sold_price.cast(DecimalType(10,2)))

# Data Format
stream_df = stream_df.select("*", date_format(stream_df.submitted_at, "yyyy-MM").alias("submitted_yyyy_mm"))
# 2020-01-01T10:00:00.000Z https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
stream_df = stream_df.withColumn('submitted_at', to_timestamp('submitted_at', "yyyy-MM-dd'T'HH:mm:ss.SSSX"))

stream_orders_df = stream_df.select( list(spark.read.table(line_items_table).columns)  )

In [0]:
stream_orders_df.display()

order_id,product_id,product_quantity,product_sold_price,ingest_file_name,ingested_at
0612a18b-0cc7-43ea-9f5b-155aad967cb9,8d809e13-fdc5-4d15-9271-953750f6d592,500,104.85,dbfs:/dbacademy/cenz.wong@ekimetrics.com/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json,2022-01-06T03:30:41.576+0000
0612a18b-0cc7-43ea-9f5b-155aad967cb9,ec15ba1d-53b6-44b0-8a22-1e498485f1b8,300,94.37,dbfs:/dbacademy/cenz.wong@ekimetrics.com/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json,2022-01-06T03:30:41.576+0000
0612a18b-0cc7-43ea-9f5b-155aad967cb9,e672483e-57a8-434a-bc42-ecf827c8a8d4,1000,109.57,dbfs:/dbacademy/cenz.wong@ekimetrics.com/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json,2022-01-06T03:30:41.576+0000
0612a18b-0cc7-43ea-9f5b-155aad967cb9,95cbadca-cf90-4b8a-a134-2976f6ba6df8,200,99.61,dbfs:/dbacademy/cenz.wong@ekimetrics.com/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json,2022-01-06T03:30:41.576+0000
0612a18b-0cc7-43ea-9f5b-155aad967cb9,699fcfe8-ce60-42c9-9d0f-728df3e48d70,100,99.61,dbfs:/dbacademy/cenz.wong@ekimetrics.com/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json,2022-01-06T03:30:41.576+0000
0612a18b-0cc7-43ea-9f5b-155aad967cb9,a990d79b-4957-42fc-8e42-20ceb1fd1259,100,115.34,dbfs:/dbacademy/cenz.wong@ekimetrics.com/developer-foundations-capstone/raw/orders/stream/order_0612a18b-0cc7-43ea-9f5b-155aad967cb9_2020-01-01.json,2022-01-06T03:30:41.576+0000
1d5609b3-b0e1-4201-8571-93c0c0e8e4da,e672483e-57a8-434a-bc42-ecf827c8a8d4,900,104.59,dbfs:/dbacademy/cenz.wong@ekimetrics.com/developer-foundations-capstone/raw/orders/stream/order_1d5609b3-b0e1-4201-8571-93c0c0e8e4da_2020-01-01.json,2022-01-06T03:30:45.321+0000
1d5609b3-b0e1-4201-8571-93c0c0e8e4da,a990d79b-4957-42fc-8e42-20ceb1fd1259,600,110.09,dbfs:/dbacademy/cenz.wong@ekimetrics.com/developer-foundations-capstone/raw/orders/stream/order_1d5609b3-b0e1-4201-8571-93c0c0e8e4da_2020-01-01.json,2022-01-06T03:30:45.321+0000
1d5609b3-b0e1-4201-8571-93c0c0e8e4da,8d809e13-fdc5-4d15-9271-953750f6d592,600,100.09,dbfs:/dbacademy/cenz.wong@ekimetrics.com/developer-foundations-capstone/raw/orders/stream/order_1d5609b3-b0e1-4201-8571-93c0c0e8e4da_2020-01-01.json,2022-01-06T03:30:45.321+0000
1d5609b3-b0e1-4201-8571-93c0c0e8e4da,699fcfe8-ce60-42c9-9d0f-728df3e48d70,800,95.08,dbfs:/dbacademy/cenz.wong@ekimetrics.com/developer-foundations-capstone/raw/orders/stream/order_1d5609b3-b0e1-4201-8571-93c0c0e8e4da_2020-01-01.json,2022-01-06T03:30:45.321+0000


In [0]:
dbutils.fs.rm(line_items_checkpoint_path, True)

Out[27]: False

In [0]:
devicesQuery = (stream_orders_df.writeStream
  .queryName(line_items_table)
  .trigger(processingTime="1 second")
  .option("checkpointLocation", line_items_checkpoint_path)
  .toTable(line_items_table, "delta", "append")
)

In [0]:
reality_check_05_c()

The stream "line_items" has started.
The stream hasn't processed any trigger yet...
The stream hasn't processed any trigger yet...
The stream has processed 1 triggers so far.
Processing trigger 2 of 20...
Processing trigger 3 of 20...
Processing trigger 4 of 20...
Processing trigger 5 of 20...
Processing trigger 6 of 20...
Processing trigger 7 of 20...
Processing trigger 8 of 20...
Processing trigger 9 of 20...
Processing trigger 9 of 20...
Processing trigger 10 of 20...
Processing trigger 11 of 20...
Processing trigger 12 of 20...
Processing trigger 13 of 20...
Processing trigger 14 of 20...
Processing trigger 15 of 20...
Processing trigger 16 of 20...
Processing trigger 16 of 20...
Processing trigger 17 of 20...
Processing trigger 18 of 20...
Processing trigger 19 of 20...
Processing trigger 20 of 20...
Processing trigger 20 of 20...
Processing results...
Stopping the stream...


Points,Test,Result
1,Expected at least 20 triggers,
1,Expected less than 100 triggers,
1,Expected the first 20 triggers to processes 1 record per trigger,
1,Checkpoint directory exists,
1,"Expected 1,175,967 records, (97 new)",


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Exercise #5 - Final Check</h2>

Run the following command to make sure this exercise is complete:

In [0]:
reality_check_05_final()

Wrote 17 bytes.


Points,Test,Result
1,Reality Check 05.A passed,
1,Reality Check 05.B passed,
1,Reality Check 05.C passed,
1,"Expected 195,718 orders (20 new)",
1,"Expected 1,175,967 records, (97 new)",
1,Expected 12 products,
1,Non-null (properly parsed) submitted_at,


-sandbox
&copy; 2021 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>