
<div  style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://raw.githubusercontent.com/derar-alhussein/Databricks-Certified-Data-Engineer-Associate/main/Includes/images/bookstore_schema.png" alt="Databricks Learning" style="width: 600">
</div>

In [0]:
%run ../Includes/Copy-Datasets


## 🗂️ Exploring The Source dDirectory

In [0]:
files = dbutils.fs.ls(f"{dataset_bookstore}/orders-raw")
display(files)


## 🔄 Auto Loader

Auto Loader is used to monitor the source directory and ingest files incrementally as they arrive.  
It automatically infers the schema and continuously updates the dataset without requiring manual refreshes, making it ideal for streaming workloads.

In [0]:
(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", "dbfs:/mnt/demo/checkpoints/orders_raw")
    .load(f"{dataset_bookstore}/orders-raw")
    .createOrReplaceTempView("orders_raw_temp"))


## ✨ Enriching Raw Data

Before writing to the Bronze table, the raw ingested data is enhanced by adding useful metadata.  
This can include information such as source file names, ingestion timestamps, or audit fields.  

Enriching raw data ensures better traceability, facilitates debugging, and prepares the dataset for downstream processing in the Silver and Gold layers.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW orders_tmp AS (
  SELECT *, current_timestamp() arrival_time, input_file_name() source_file
  FROM orders_raw_temp
)

In [0]:
%sql
SELECT * FROM orders_tmp

## 🔶 Creating Bronze Table

Raw data from the source is enriched with metadata and then written to a Bronze table.  
This table serves as the landing zone for raw data, preserving its original structure while capturing additional attributes for auditing and troubleshooting.

In [0]:
(spark.table("orders_tmp")
      .writeStream
      .format("delta")
      .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/orders_bronze")
      .outputMode("append")
      .table("orders_bronze"))

In [0]:
%sql
SELECT count(*) FROM orders_bronze

In [0]:
load_new_data()


#### 📘 Creating Static Lookup Table

A static lookup table is created from an external source to support enrichment in downstream transformations.  
This typically includes reference data such as customer or product information to provide additional context during joins.

In [0]:
(spark.read
      .format("json")
      .load(f"{dataset_bookstore}/customers-json")
      .createOrReplaceTempView("customers_lookup"))

In [0]:
%sql
SELECT * FROM customers_lookup

## 🔷 Creating Silver Table

The Silver layer processes and joins data from the Bronze table with the static lookup table.  
Transformations are applied to clean, filter, and enhance the raw records. The result is a refined dataset that is suitable for operational reporting and analytics.

The Silver table represents a more structured and trusted version of the data, ready for aggregation or further processing.

In [0]:
(spark.readStream
  .table("orders_bronze")
  .createOrReplaceTempView("orders_bronze_tmp"))

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW orders_enriched_tmp AS (
  SELECT order_id, quantity, o.customer_id, c.profile:first_name as f_name, c.profile:last_name as l_name,
         cast(from_unixtime(order_timestamp, 'yyyy-MM-dd HH:mm:ss') AS timestamp) order_timestamp, books
  FROM orders_bronze_tmp o
  INNER JOIN customers_lookup c
  ON o.customer_id = c.customer_id
  WHERE quantity > 0)

In [0]:
(spark.table("orders_enriched_tmp")
      .writeStream
      .format("delta")
      .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/orders_silver")
      .outputMode("append")
      .table("orders_silver"))

In [0]:
%sql
SELECT * FROM orders_silver

In [0]:
%sql
SELECT COUNT(*) FROM orders_silver

In [0]:
load_new_data()

## 🟡 Creating Gold Table

The Gold layer aggregates data from the Silver table to generate high-value business metrics.  
This final output is optimized for consumption by dashboards, machine learning models, and reporting tools.

It delivers daily insights such as customer activity summaries, product-level metrics, or time-based trends.

In [0]:
(spark.readStream
  .table("orders_silver")
  .createOrReplaceTempView("orders_silver_tmp"))

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW daily_customer_books_tmp AS (
  SELECT customer_id, f_name, l_name, date_trunc("DD", order_timestamp) order_date, sum(quantity) books_counts
  FROM orders_silver_tmp
  GROUP BY customer_id, f_name, l_name, date_trunc("DD", order_timestamp)
  )

In [0]:
(spark.table("daily_customer_books_tmp")
      .writeStream
      .format("delta")
      .outputMode("complete")
      .option("checkpointLocation", "dbfs:/mnt/demo/checkpoints/daily_customer_books")
      .trigger(availableNow=True)
      .table("daily_customer_books"))

In [0]:
%sql
SELECT * FROM daily_customer_books

In [0]:
load_new_data()


## ⛔ Stopping Active Streams

At the end of the session, all active streams are stopped to complete the data flow lifecycle.  
This ensures that no additional data is processed and that resources are released after the pipeline has completed its operations.

In [0]:
for s in spark.streams.active:
    print("Stopping stream: " + s.id)
    s.stop()
    s.awaitTermination()