In [0]:
%run ./00_Setup

In [0]:
%sql
-- Set the working schema and create a widget for the book store path
USE SCHEMA strata_lab;
CREATE WIDGET TEXT book_store_path DEFAULT '/Volumes/workspace/strata_lab/entrenamiento/book_store';

In [0]:
# Retrieve the book_store_path value from the widget
book_store_path = dbutils.widgets.get("book_store_path")

In [0]:
# Display the files in the orders directory
display(dbutils.fs.ls(f"{book_store_path}/orders/"))

In [0]:
# Ingest raw order data as a streaming DataFrame and write to 'orders_raw' table
(
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", f"{book_store_path}/orders_architecture/")
    .load(f"{book_store_path}/orders/")
    .writeStream
    .option("checkpointLocation", f"{book_store_path}/orders_raw_checkpoint/")
    .trigger(availableNow=True)
    .table("orders_raw")
)

In [0]:
%sql
-- Create a temp view with arrival time and source file for each order
CREATE OR REPLACE TEMP VIEW orders_tmp AS(
  SELECT
    *,
    current_timestamp() AS arrival_time,
    _metadata.file_path AS source_file
  FROM orders_raw
)

In [0]:
%sql
-- Preview the first 10 rows from the orders_tmp view
SELECT  * FROM orders_tmp
LIMIT 10;

In [0]:
# Write orders_tmp DataFrame to the bronze Delta table
orders_df = spark.table("orders_tmp")

(
    orders_df.write
    .format("delta")
    .mode("append")
    .option("checkpointLocation", f"{book_store_path}/orders_bronze_checkpoint/")
    .saveAsTable("orders_bronze")
)

In [0]:
%sql
-- Count the number of records in the orders_bronze table
SELECT COUNT(*) FROM orders_bronze

In [0]:
# Simulate arrival of new order data
load_data(row_count=50)

In [0]:
# Ingest the new batch of order data to the orders_raw table
(
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", f"{book_store_path}/orders_architecture/")
    .load(f"{book_store_path}/orders/")
    .writeStream
    .option("checkpointLocation", f"{book_store_path}/orders_raw_checkpoint/")
    .trigger(availableNow=True)
    .table("orders_raw")
)

In [0]:
%sql
-- Count the number of records in the orders_tmp view
SELECT  count(*) FROM orders_tmp

In [0]:
# Write the new batch of orders to the bronze table
(
    orders_df.write
    .format("delta")
    .mode("append")
    .option("checkpointLocation", f"{book_store_path}/orders_bronze_checkpoint/")
    .saveAsTable("orders_bronze")
)

In [0]:
%sql
-- Double count occurs because arrival time is recalculated for all data, as streaming is not supported in free version
SELECT COUNT(*) FROM orders_bronze

In [0]:
# Load customer data and create a temp view for lookup
(
  spark.read
  .format("json")
  .load(f"{book_store_path}/customers")
  .createOrReplaceTempView("customers_lookup")
)

In [0]:
%sql
-- Preview the customer lookup table
SELECT * FROM customers_lookup

In [0]:
# Create a temp view from the bronze table for further processing
(
    spark.read
    .table("orders_bronze")
    .createOrReplaceTempView("orders_bronze_tmp")
)

In [0]:
%sql
-- Create an enriched temp view by joining orders and customers
CREATE OR REPLACE TEMP VIEW orders_enriched_temp AS (
  SELECT 
    o.order_id,
    o.quantity,
    o.customer_id,
    c.profile:first_name,
    c.profile:last_name,
    cast(from_unixtime(o.order_timestamp, 'yyyy-MM-dd HH:mm:ss') as timestamp) as order_timestamp,
    o.books
  FROM orders_tmp o
  INNER JOIN customers_lookup c
  ON o.customer_id = c.customer_id
  WHERE o.quantity > 0
)

In [0]:
%sql
-- Preview the enriched orders temp view
SELECT * FROM orders_enriched_temp

In [0]:
# Write enriched orders to the silver Delta table
orders_df = spark.table("orders_enriched_temp")

(
    orders_df.write
    .format("delta")
    
    .mode("append")
    .option("checkpointLocation", f"{book_store_path}/orders_silver_checkpoint/")
    .saveAsTable("orders_silver")
)

In [0]:
%sql
-- Preview the orders_silver table
SELECT * FROM orders_silver

In [0]:
%sql
-- Count the number of records in the orders_silver table
SELECT COUNT (*) FROM orders_silver

In [0]:
# Simulate arrival of another batch of new order data
load_data()

In [0]:
# Ingest and propagate the new batch to orders_raw
(
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", f"{book_store_path}/orders_architecture/")
    .load(f"{book_store_path}/orders/")
    .writeStream
    .option("checkpointLocation", f"{book_store_path}/orders_raw_checkpoint/")
    .trigger(availableNow=True)
    .table("orders_raw")
)

In [0]:
# Write the new batch of orders to the bronze table
orders_df = spark.table("orders_tmp")

(
    orders_df.write
    .format("delta")
    .mode("append")
    .option("checkpointLocation", f"{book_store_path}/orders_bronze_checkpoint/")
    .saveAsTable("orders_bronze")
)

In [0]:
orders_df = spark.table("orders_enriched_temp")

(
    orders_df.write
    .format("delta")
    .mode("append")
    .option("checkpointLocation", f"{book_store_path}/orders_silver_checkpoint/")
    .saveAsTable("orders_silver")
)

In [0]:
%sql
SELECT * FROM orders_silver

In [0]:
%sql
DESCRIBE HISTORY orders_silver;

In [0]:
%sql
SELECT COUNT(*) FROM orders_silver

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW daily_customers_books_temp AS (
  SELECT
    customer_id,
    first_name,
    last_name,
    date_trunc('day', order_timestamp) AS order_date,
    sum(quantity) AS book_counts
  FROM orders_silver
  GROUP BY customer_id, first_name, last_name, date_trunc('day', order_timestamp)
  
)

In [0]:
orders_df = spark.table("daily_customers_books_temp")

(
    orders_df.write
    .format("delta")
    .mode("append")
    .option("checkpointLocation", f"{book_store_path}/daily_customer_books/")
    .saveAsTable("daily_customer_books")
)

In [0]:
%sql
SELECT * FROM daily_customer_books

In [0]:
# To stop active streaming (if not cluster keeps running, leading to unnecesary costs, if we were to have stream available)
for s in spark.streams.active:
    print(f"Stopping stream {s.name}")
    s.stop()
    s.awaitTermination()