In [0]:
%run /Workspace/Users/aminah.yussuf@slalom.com/Includes/Copy-Datasets

In [0]:
files = dbutils.fs.ls("dbfs:/mnt/demo-datasets/bookstore/customers-json/")
display(files)

In [0]:
#create an auto loader against source directory where we have the orders-raw folder 
(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", "dbfs:/mnt/demo-datasets/orders-raw_checkpoint/")
    .load("dbfs:/mnt/demo-datasets/bookstore/orders-raw/")
    .createOrReplaceTempView("orders_raw_temp"))

In [0]:

%sql
SELECT * FROM orders_raw_temp;

In [0]:
%sql 
----create a temporary view from the temporary stream define above, this temporary view is different in schema from the temporary stream. In this temporary view, we added a new column "arrival_time" and "source_file". 
CREATE OR REPLACE TEMPORARY VIEW orders_temp AS ( 
  SELECT *, current_timestamp() AS arrival_time, input_file_name() AS source_file
  FROM orders_raw_temp
);

SELECT * FROM orders_temp

In [0]:
#using spark writeStream, write the data from the most reformed temporary view into a delta lake table
#this is considered a bronze data  we can use it as a source for our silver layer
(spark.table("orders_temp")
       .writeStream
       .format("delta")
       .option("checkpointLocation", "dbfs:/mnt/demo-datasets/orders-bronze_checkpoint/")
       .outputMode("append")
       .table("order_bronze"))




In [0]:
%sql
SELECT * FROM order_bronze; 

In [0]:
#call the load new data function to add more data to the bronze table, afet that select count on the order_bronze table and validate the data count increases 
load_new_data()

In [0]:
(spark.read
       .format("json")
       .load("dbfs:/mnt/demo-datasets/bookstore/customers-json/")
       .createOrReplaceTempView("customers_lookup"))

In [0]:
%sql
SELECT * FROM customers_lookup;

In [0]:
%sql 
CREATE OR REPLACE TEMPORARY VIEW orders_enriched_tmp AS (
  SELECT order_id, quantity, o.customer_id, c.profile:first_name AS f_name,
         c.profile:last_name as l_name, 
         cast(from_unixtime(order_timestamp, 'yyyy-MM-dd HH:mm:ss') AS timestamp) order_timestamp, books
  FROM orders_temp o 
  INNER JOIN customers_lookup c 
  ON o.customer_id = c.customer_id
  WHERE quantity > 0
);

In [0]:
%sql
SELECT COUNT(*) FROM orders_enriched_tmp;

In [0]:
(spark.table("orders_enriched_tmp")
        .writeStream
        .format("delta")
        .option("checkpointLocation", "dbfs:/mnt/demo-datasets/orders-silver_checkpoint/")
        .outputMode("append")
        .table("orders_silver")
);

In [0]:
%sql
SELECT COUNT(*) FROM orders_silver;

In [0]:
#we read a stream of data from the silver table into a streaming temporary view. this streaming temporary view will be used as a source data for the gold level temporary view. 

(spark.readStream
    .table("orders_silver")
    .createOrReplaceTempView("orders_silver_temp")
);


In [0]:
%sql
SELECT * FROM orders_silver_temp;

In [0]:
%sql
----here we create the gold level temporary view, we use the streaming temporary view as a source for the gold level temporary view, in this view calculate the daily number of books per customer 
CREATE OR REPLACE TEMP VIEW daily_customer_books_tmp AS (
    SELECT customer_id, f_name, l_name, date_trunc("DD", order_timestamp) order_date, sum(quantity) books_counts
    FROM orders_silver_temp
    GROUP BY customer_id, f_name, l_name, date_trunc("DD", order_timestamp)
);

In [0]:
%sql
SELECT * FROM daily_customer_books_tmp

In [0]:
#previously we created a streaming temporary view as a source for the gold layer table, now we use writeStream to create the gold level table. the gold level table will be used for daily reporting of the customers book
#availableNow option is used to trigger the writeStream operation as soon as the data is available 
(spark.table("daily_customer_books_tmp")
        .writeStream
        .format("delta")
        .outputMode("complete")
        .trigger(availableNow=True)
        .option("checkpointLocation", "dbfs:/mnt/demo-datasets/daily-customer-books_checkpoint/")
        .table("daily_customer_books"));

In [0]:
%sql
select * from daily_customer_books;

In [0]:
#previously we create a gold level table that delivers business value, to wrap this up, let us create a python script that stops all runing bacthes 
for s in spark.streams.active: 
    print("Stopping stream: ", s.id)
    s.stop()
    s.awaitTermination()