#### In this notebook we will create a Delta Lake multi-hop pipeline. (MEDALION ARCHITECTURE)

In [0]:
%python
# Databricks notebook source
def path_exists(path):
  try:
    dbutils.fs.ls(path)
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise

# COMMAND ----------

def download_dataset(source, target):
    files = dbutils.fs.ls(source)

    for f in files:
        source_path = f"{source}/{f.name}"
        target_path = f"{target}/{f.name}"
        if not path_exists(target_path):
            print(f"Copying {f.name} ...")
            dbutils.fs.cp(source_path, target_path, True)

# COMMAND ----------

data_source_uri = "wasbs://course-resources@dalhussein.blob.core.windows.net/datasets/bookstore/v1/"
dataset_bookstore = 'dbfs:/mnt/demo-datasets/bookstore'
spark.conf.set(f"dataset.bookstore", dataset_bookstore)

# COMMAND ----------

def get_index(dir):
    files = dbutils.fs.ls(dir)
    index = 0
    if files:
        file = max(files).name
        index = int(file.rsplit('.', maxsplit=1)[0])
    return index+1

# COMMAND ----------

# Structured Streaming
streaming_dir = f"{dataset_bookstore}/orders-streaming"
raw_dir = f"{dataset_bookstore}/orders-raw"

def load_file(current_index):
    latest_file = f"{str(current_index).zfill(2)}.parquet"
    print(f"Loading {latest_file} file to the bookstore dataset")
    dbutils.fs.cp(f"{streaming_dir}/{latest_file}", f"{raw_dir}/{latest_file}")

    
def load_new_data(all=False):
    index = get_index(raw_dir)
    if index >= 10:
        print("No more data to load\n")

    elif all == True:
        while index <= 10:
            load_file(index)
            index += 1
    else:
        load_file(index)
        index += 1

# COMMAND ----------

# DLT
streaming_orders_dir = f"{dataset_bookstore}/orders-json-streaming"
streaming_books_dir = f"{dataset_bookstore}/books-streaming"

raw_orders_dir = f"{dataset_bookstore}/orders-json-raw"
raw_books_dir = f"{dataset_bookstore}/books-cdc"

def load_json_file(current_index):
    latest_file = f"{str(current_index).zfill(2)}.json"
    print(f"Loading {latest_file} orders file to the bookstore dataset")
    dbutils.fs.cp(f"{streaming_orders_dir}/{latest_file}", f"{raw_orders_dir}/{latest_file}")
    print(f"Loading {latest_file} books file to the bookstore dataset")
    dbutils.fs.cp(f"{streaming_books_dir}/{latest_file}", f"{raw_books_dir}/{latest_file}")

    
def load_new_json_data(all=False):
    index = get_index(raw_orders_dir)
    if index >= 10:
        print("No more data to load\n")

    elif all == True:
        while index <= 10:
            load_json_file(index)
            index += 1
    else:
        load_json_file(index)
        index += 1

# COMMAND ----------

download_dataset(data_source_uri, dataset_bookstore)

In [0]:
# we have 3 PARQUET FILES in out SOURCE DIRECTORY
files=dbutils.fs.ls(f"{dataset_bookstore}/orders-raw")
display(files)

path,name,size,modificationTime
dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet,01.parquet,18823,1716397834000
dbfs:/mnt/demo-datasets/bookstore/orders-raw/02.parquet,02.parquet,18814,1716723864000
dbfs:/mnt/demo-datasets/bookstore/orders-raw/03.parquet,03.parquet,18822,1716723877000


### ---------- BRONZE LAYER ----------

In [0]:
(spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format","parquet")
        .option("cloudFiles.schemaLocation","dbfs:/mnt/demo/checkpoints/orders_raw")
        .load(f"{dataset_bookstore}/orders-raw")
        .createOrReplaceTempView("orders_raw_temp")
        )
# Here, we are configuring a stream read on our Parquet source using Auto Loader with schema inference.
# And once configured, we immediately register a streaming temporary view to do data transformation in Spark SQL.
# Our temporary view is named "orders_raw_tmp"

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW orders_tmp AS (
  SELECT *, current_timestamp() arrival_time, input_file_name() source_file
  FROM orders_raw_temp
)
--Next, we will enrich our raw data with additional metadata describing the source file and the time it was ingested. 
--Such information is useful for troubleshooting errors if corrupted data is encountered.

In [0]:
%sql
SELECT * FROM orders_tmp LIMIT 10;

order_id,order_timestamp,customer_id,quantity,total,books,_rescued_data,arrival_time,source_file
6341,1657520256,C00788,1,41,"List(List(B08, 1, 41))",,2024-05-27T16:40:58.021+0000,dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet
6342,1657520256,C00788,1,41,"List(List(B08, 1, 41))",,2024-05-27T16:40:58.021+0000,dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet
6343,1657531717,C00654,1,28,"List(List(B02, 1, 28))",,2024-05-27T16:40:58.021+0000,dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet
6344,1657531717,C00654,1,28,"List(List(B02, 1, 28))",,2024-05-27T16:40:58.021+0000,dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet
6345,1657543676,C00762,1,49,"List(List(B01, 1, 49))",,2024-05-27T16:40:58.021+0000,dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet
6346,1657543676,C00762,1,49,"List(List(B01, 1, 49))",,2024-05-27T16:40:58.021+0000,dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet
6347,1657546079,C01014,1,28,"List(List(B02, 1, 28))",,2024-05-27T16:40:58.021+0000,dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet
6348,1657546658,C00633,1,24,"List(List(B09, 1, 24))",,2024-05-27T16:40:58.021+0000,dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet
6349,1657546658,C00633,1,24,"List(List(B09, 1, 24))",,2024-05-27T16:40:58.021+0000,dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet
6350,1657547177,C00638,1,35,"List(List(B03, 1, 35))",,2024-05-27T16:40:58.021+0000,dbfs:/mnt/demo-datasets/bookstore/orders-raw/01.parquet


In [0]:
# Now, we are going to pass this enriched data back to PySpark API to process an incremental write to a Delta Lake table called "new_orders_bronze"
(spark.table("orders_tmp")
        .writeStream
        .format("delta")
        .option("checkpointLocation","dbfs:/mnt/demo/checkpoints/new_orders_bronze")
        .outputMode("append")
        .table("new_orders_bronze"))
# Whenever you new data is detected by the STREAMING QUERY the graph would spike up indicating loading the new data 

Out[18]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f0eebe79760>

In [0]:
%sql
SELECT COUNT(*) FROM new_orders_bronze;

count(1)
3000


In [0]:
load_new_data()
# triggering another file arrival

Loading 04.parquet file to the bookstore dataset


In [0]:
%sql
SELECT COUNT(*) FROM new_orders_bronze;

count(1)
4000


### ---------- SILVER LAYER ----------

In [0]:
# we need a static lookup table to joining it with our bronze table.
# Here we are creating a customers static temporary view from JSON files.
# Here we are doing it with PySpark API, but we can also do it with Spark SQL.

(spark.read 
        .format("json")
        .load(f"{dataset_bookstore}/customers-json")
        .createOrReplaceTempView("customers_lookup")
        )

In [0]:
%sql
SELECT * FROM customers_lookup LIMIT 10;

customer_id,email,profile,updated
C00301,thomas.lane@gmail.com,"{""first_name"":""Thomas"",""last_name"":""Lane"",""gender"":""Male"",""address"":{""street"":""06 Boulevard Victor Hugo"",""city"":""Paris"",""country"":""France""}}",2021-12-14T23:15:43.375Z
C00302,ocolegatele@blogger.com,"{""first_name"":""Odilia"",""last_name"":""Colegate"",""gender"":""Female"",""address"":{""street"":""07 Sommers Parkway"",""city"":""Lyon"",""country"":""France""}}",2021-12-14T23:15:43.375Z
C00303,acolledged2@nbcnews.com,"{""first_name"":""Andros"",""last_name"":""Colledge"",""gender"":""Male"",""address"":{""street"":""342 Katie Center"",""city"":""Gort"",""country"":""Ireland""}}",2021-12-14T23:15:43.375Z
C00304,,"{""first_name"":""Iver"",""last_name"":""Collet"",""gender"":""Male"",""address"":{""street"":""12126 Union Point"",""city"":""Iguape"",""country"":""Brazil""}}",2021-12-14T23:15:43.375Z
C00305,pcollier5r@cmu.edu,"{""first_name"":""Page"",""last_name"":""Collier"",""gender"":""Male"",""address"":{""street"":""3 Farragut Lane"",""city"":""Berlin"",""country"":""Germany""}}",2021-12-14T23:15:43.375Z
C00306,,"{""first_name"":""Tally"",""last_name"":""Collins"",""gender"":""Male"",""address"":{""street"":""4 Hovde Park"",""city"":""Cairo"",""country"":""Egypt""}}",2021-12-14T23:15:43.375Z
C00307,lcollocottcm@t-online.de,"{""first_name"":""Leupold"",""last_name"":""Collocott"",""gender"":""Male"",""address"":{""street"":""917 Stephen Circle"",""city"":""Dzerzhinskiy"",""country"":""Russia""}}",2021-12-14T23:15:43.375Z
C00308,icolloughfa@prweb.com,"{""first_name"":""Inesita"",""last_name"":""Collough"",""gender"":""Female"",""address"":{""street"":""7910 Delladonna Street"",""city"":""Osoyoos"",""country"":""Canada""}}",2021-12-14T23:15:43.375Z
C00309,jcollymore4n@pcworld.com,"{""first_name"":""Joelle"",""last_name"":""Collymore"",""gender"":""Female"",""address"":{""street"":""19 Dayton Court"",""city"":""Yidu"",""country"":""China""}}",2021-12-14T23:15:43.375Z
C00310,gcolnetef@japanpost.jp,"{""first_name"":""Goldi"",""last_name"":""Colnet"",""gender"":""Female"",""address"":{""street"":""710 Knutson Place"",""city"":""Suso"",""country"":""Philippines""}}",2021-12-14T23:15:43.375Z


In [0]:
# To work on our bronze data in the silver layer
# we will start by creating a streaming temporary view against our bronze table.
(spark.readStream
        .table("new_orders_bronze")
        .createOrReplaceTempView("orders_bronze_tmp"))

In [0]:
%sql
-- In this silver level, we are doing several enrichments and checks.
-- First, we join the order data with the customers information to add customers names.
-- And we parse the order timestamp from Unix timestamp into human readable format. 
-- And lastly, we exclude any order with no items, if any.

CREATE OR REPLACE TEMPORARY VIEW orders_enriched_tmp AS (
  SELECT order_id, quantity, o.customer_id, c.profile:first_name AS f_name , c.profile:last_name AS l_name,
        CAST(from_unixtime(order_timestamp, 'yyyy-MM-dd HH:mm:ss') AS timestamp) order_timestamp, books
  FROM orders_bronze_tmp o
  INNER JOIN customers_lookup c 
  ON o.customer_id = c.customer_id
  WHERE quantity > 0)


In [0]:
# Let us do a STREAM WRITE for this "orders_enriched_tmp" data into a SILVER TABLE
(spark.table("orders_enriched_tmp")
        .writeStream
        .format("delta")
        .option("checkpointLocation","dbfs:/mnt/demo/checkpoints/orders_silver")
        .outputMode("append")
        .table("orders_silver"))
# The data has been processed with the Stream

Out[31]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f0eebe799d0>

In [0]:
%sql
-- Lets query our SILVER TABLE
SELECT * FROM orders_silver LIMIT 10;

order_id,quantity,customer_id,f_name,l_name,order_timestamp,books
9397,1,C00494,Sherlocke,Fairbard,2022-07-12T17:13:57.000+0000,"List(List(B08, 1, 41))"
9396,1,C00494,Sherlocke,Fairbard,2022-07-12T17:13:57.000+0000,"List(List(B08, 1, 41))"
7397,1,C00494,Sherlocke,Fairbard,2022-07-12T17:13:57.000+0000,"List(List(B08, 1, 41))"
7396,1,C00494,Sherlocke,Fairbard,2022-07-12T17:13:57.000+0000,"List(List(B08, 1, 41))"
8397,1,C00494,Sherlocke,Fairbard,2022-07-12T17:13:57.000+0000,"List(List(B08, 1, 41))"
8396,1,C00494,Sherlocke,Fairbard,2022-07-12T17:13:57.000+0000,"List(List(B08, 1, 41))"
6397,1,C00494,Sherlocke,Fairbard,2022-07-12T17:13:57.000+0000,"List(List(B08, 1, 41))"
6396,1,C00494,Sherlocke,Fairbard,2022-07-12T17:13:57.000+0000,"List(List(B08, 1, 41))"
9406,1,C00495,Kerrie,Falcus,2022-07-12T19:17:02.000+0000,"List(List(B09, 1, 24))"
9405,1,C00495,Kerrie,Falcus,2022-07-12T19:17:02.000+0000,"List(List(B09, 1, 24))"


In [0]:
%sql
SELECT COUNT(*) FROM orders_silver;
-- Great! we have successfully processed all the 4000 Records

count(1)
4000


In [0]:
load_new_data()
#Lets load another new file , and watch how it is propagated through the STREAM , from BRONZE to SILVER LAYER

Loading 05.parquet file to the bookstore dataset


In [0]:
%sql
SELECT COUNT(*) FROM orders_silver;
-- Great! now we have 5000 records

count(1)
5000


### ---------- GOLD LAYER ----------

In [0]:
# We create a STREAMING TEMP VIEW "orders_silver_tmp" from the SILVER TABLE
(spark.readStream
        .table("orders_silver")
        .createOrReplaceTempView("orders_silver_tmp"))

In [0]:
%sql
-- Now, we write another STREAM to create an AGGREGATE GOLD TABLE for the daily number of books for each customer
CREATE OR REPLACE TEMP VIEW daily_customer_books_tmp AS (
  SELECT customer_id,f_name,l_name,date_trunc("DD",order_timestamp) order_date , SUM(quantity) books_counts
  FROM orders_silver_tmp
  GROUP BY customer_id,f_name,l_name, order_date

)

In [0]:
# Let us write this aggregated data into a GOLD TABLE called "daily_customer_books"
(spark.table("daily_customer_books_tmp")
        .writeStream
        .format("delta")
        .outputMode("complete")
        .option("checkpointLocation","dbfs:/mnt/demo/checkpoints/daily_customer_books")
        .trigger(availableNow=True) # The STREAM will process all available data in microbatches and stop on its own due to the type of trigger chosen
        .table("daily_customer_books")
        )
     
# We can combine STREAMING and BATCH workloads in the same PIPELINE

Out[38]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f0eeb5900d0>

* Once the table **"daily_customer_books"** is updated or overwritten it is no longer valid for STREAMING
* So,in our case here we cannot read a stream from this **GOLD TABLE** .


In [0]:
%sql
-- Let use query the GOLD TABLE
SELECT * FROM daily_customer_books LIMIT 10;

customer_id,f_name,l_name,order_date,books_counts
C00873,Pauly,Lattka,2022-07-26T00:00:00.000+0000,5
C00804,Kenneth,Kiehl,2022-07-29T00:00:00.000+0000,5
C00803,Zollie,Kiddy,2022-07-14T00:00:00.000+0000,5
C01150,Natale,O'Luby,2022-07-26T00:00:00.000+0000,5
C00779,Samson,Josephy,2022-07-27T00:00:00.000+0000,10
C01120,Carrissa,Nairn,2022-07-30T00:00:00.000+0000,5
C01011,Nickey,McBeith,2022-07-16T00:00:00.000+0000,5
C01121,Deanne,Nani,2022-07-13T00:00:00.000+0000,5
C00807,Irwinn,Kike,2022-07-23T00:00:00.000+0000,5
C00755,Melessa,Jans,2022-07-29T00:00:00.000+0000,10


In [0]:
# Let us now Land all the remaining data files in our sources directory
load_new_data(all=True)

Loading 06.parquet file to the bookstore dataset
Loading 07.parquet file to the bookstore dataset
Loading 08.parquet file to the bookstore dataset
Loading 09.parquet file to the bookstore dataset
Loading 10.parquet file to the bookstore dataset


* The new data will be propagated from **BRONZE** to **SILVER** layer.
* For the **GOLD** layer , we rerun our final query to update the GOLD TABLE, since it is configured as a **BATCH JOB** using the *.trigger(availableNow=True)* command 

In [0]:
(spark.table("daily_customer_books_tmp")
        .writeStream
        .format("delta")
        .outputMode("complete")
        .option("checkpointLocation","dbfs:/mnt/demo/checkpoints/daily_customer_books")
        .trigger(availableNow=True) # The STREAM will process all available data in microbatches and stop on its own due to the type of trigger chosen
        .table("daily_customer_books")
        )

Out[42]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f0eeb5900a0>

In [0]:
%sql
SELECT * FROM daily_customer_books LIMIT 20;

customer_id,f_name,l_name,order_date,books_counts
C00873,Pauly,Lattka,2022-07-26T00:00:00.000+0000,5
C00804,Kenneth,Kiehl,2022-07-29T00:00:00.000+0000,5
C00803,Zollie,Kiddy,2022-07-14T00:00:00.000+0000,5
C01150,Natale,O'Luby,2022-07-26T00:00:00.000+0000,5
C00779,Samson,Josephy,2022-07-27T00:00:00.000+0000,10
C01120,Carrissa,Nairn,2022-07-30T00:00:00.000+0000,5
C01011,Nickey,McBeith,2022-07-16T00:00:00.000+0000,5
C01121,Deanne,Nani,2022-07-13T00:00:00.000+0000,5
C00807,Irwinn,Kike,2022-07-23T00:00:00.000+0000,5
C00755,Melessa,Jans,2022-07-29T00:00:00.000+0000,10


In [0]:
# Let us stop all the active streams
for s in spark.streams.active:
    print("Stopping Stream: "+ s.id)
    s.stop()
    s.awaitTermination()

Stopping Stream: 319c733a-312e-40a6-97a0-68447fd68526
Stopping Stream: 477d6c7c-fd88-4110-b32f-24581f0e76ae
Stopping Stream: 3b40ce9c-6d96-45cc-ade1-732c8dfcb97f
