In [0]:
%run ../Includes/Copy-Datasets

In [0]:
files = dbutils.fs.ls(f"{dataset_bookstore}/orders-raw")
display(files)

In [0]:
(
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", "dbfs:/mnt/demo/checkpoints/orders_raw")
    .load(f"{dataset_bookstore}/orders-raw")
    .createOrReplaceTempView("orders_raw_temp")
)

# Bronze layer

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW orders_tmp AS (
  SELECT *, current_timestamp() AS arrival_time, input_file_name() source_file FROM orders_raw_temp
)

In [0]:
%sql
SELECT * FROM orders_tmp;

In [0]:
(
    spark.table("orders_tmp")
    .writeStream
    .format("delta")
    .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/orders_bronze")
    .outputMode("append")
    .table("orders_bronze")
)

In [0]:
%sql
SELECT COUNT(*) FROM orders_tmp;

In [0]:
load_new_data()

# Silver layer

In [0]:
(
    spark.read
    .format("json")
    .load(f"{dataset_bookstore}/customers-json")
    .createOrReplaceTempView("customers_lookup")

)

In [0]:
%sql
SELECT * FROM customers_lookup;

In [0]:
(
    spark.readStream
    .table("orders_bronze")
    .createOrReplaceTempView("orders_bronze_tmp")
)

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW orders_enriched_tmp AS (
  SELECT order_id, quantity, o.customer_id, c.profile:first_name as f_name, c.profile:last_name as l_name,
         CAST(from_unixtime(order_timestamp, 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AS order_timestamp, books
  FROM orders_bronze_tmp o   
  INNER JOIN customers_lookup c ON o.customer_id = c.customer_id
  WHERE quantity > 0
)

In [0]:
(
    spark.table("orders_enriched_tmp")
    .writeStream
    .format("delta")
    .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/orders_silver")
    .outputMode("append")
    .table("orders_silver")
)

In [0]:
%sql
SELECT * FROM orders_silver;

In [0]:
%sql
SELECT COUNT(*) FROM orders_silver;

In [0]:
# trigger another new file and wait for it to propagate through the previous two streams, from bronze to silver layer
load_new_data()

# gold layer

In [0]:
(
    spark.readStream
    .table("orders_silver")
    .createOrReplaceTempView("orders_silver_tmp")
)

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW daily_customer_books_tmp AS (
  SELECT customer_id, f_name, l_name, date_trunc("DD", order_timestamp) order_date, sum(quantity) book_counts
  FROM orders_silver_tmp
  GROUP BY customer_id, f_name, l_name, date_trunc("DD", order_timestamp)
)

In [0]:
(
    spark.table("daily_customer_books_tmp")
    .writeStream
    .format("delta")
    .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/daily_customer_books")
    .outputMode("complete") # rewrite the updated aggregation each time logic runs
    .trigger(availableNow=True) # the stream stopped on its own once all the data was processed
    .table("daily_customer_books")
)

In [0]:
%sql
SELECT * FROM daily_customer_books;

In [0]:
load_new_data(all=True)

In [0]:
for s in spark.streams.active:
    print("Stopping stream: " + s.id)
    s.stop()
    s.awaitTermination()