# Delta Live Tables - Bookstore Pipeline

This notebook defines the Bronze, Silver, and Gold layers of our DLT pipeline for the Bookstore data.

## SETUP

In [0]:
-- SET Var_Catalog_Name = 'default';
-- SET Var_Schema_Name = 'default';
-- SET Var_Target_DLT_Schema = 'default';

In [0]:
-- Set the current catalog and schema for DLT output.
-- USE CATALOG ${Var_Catalog_Name};
-- USE SCHEMA ${Var_Target_DLT_Schema};

USE CATALOG default;
USE SCHEMA default;


## Bronze Layer

Raw data ingestion

In [0]:
CREATE OR REFRESH LIVE TABLE bronze_books
COMMENT "Raw books data ingested from Unity Catalog table."
AS SELECT * FROM default.default.books_raw;

In [0]:
CREATE OR REFRESH LIVE TABLE bronze_customers
COMMENT "Raw customers data ingested from Unity Catalog table."
AS SELECT * FROM default.default.customers_raw;

In [0]:
CREATE OR REFRESH LIVE TABLE bronze_orders
COMMENT "Raw orders data ingested from Unity Catalog table."
AS SELECT * FROM default.default.orders_raw;


## Silver Layer 

The cleaned books orders with only the orders with a valid id

In [0]:
CREATE OR REFRESH LIVE TABLE silver_orders_cleaned (
  CONSTRAINT valid_order_number EXPECT (order_id IS NOT NULL) ON VIOLATION DROP ROW
)
COMMENT "Cleaned orders data, with valid order_ids and enriched customer details."
AS
  SELECT
    o.order_id,
    o.timestamp, -- Original timestamp from raw orders
    o.customer_id,
    o.quantity AS order_total_quantity, -- Total quantity of all books in this specific order
    o.total AS order_total_price,       -- Total price of this specific order
    o.books, -- The array of structs: [{"book_id": x, "quantity": y}, ...]
    c.email, -- Adding customer email for Gold layer filtering
    c.profile:first_name AS f_name,
    c.profile:last_name AS l_name
  FROM LIVE.bronze_orders AS o
  INNER JOIN LIVE.bronze_customers AS c
    ON o.customer_id = c.customer_id
  WHERE
    o.order_id IS NOT NULL -- Ensure order_id is not null (redundant with expectation but good for clarity)
    ;


> Constraint violation
| **`ON VIOLATION`** | Behavior |
| --- | --- |
| **`DROP ROW`** | Discard records that violate constraints |
| **`FAIL UPDATE`** | Violated constraint causes the pipeline to fail  |
| Omitted | Records violating constraints will be kept, and reported in metrics |



## Gold Layer

"The daily number of books per customer having an email finishing with 'mail.org' "

In [0]:

CREATE OR REFRESH LIVE TABLE gold_daily_books_mail_org_customers
COMMENT "Daily number of books ordered by customers with email ending in 'mail.org'."
AS
  SELECT
    sc.customer_id,
    sc.email AS customer_email,
    CAST(sc.timestamp AS DATE) AS order_date, -- Extract date from order timestamp
    SUM(book_detail.quantity) AS total_books_ordered_daily -- Sum of individual book quantities from exploded array
  FROM LIVE.silver_orders_cleaned AS sc
  LATERAL VIEW EXPLODE(sc.books) exploded_books_view AS book_detail -- Flatten the 'books' array (each book item in the order becomes a new row)
  WHERE sc.email LIKE '%mail.org'
  GROUP BY
    sc.customer_id,
    sc.email,
    CAST(sc.timestamp AS DATE); -- Group by customer and order date to get daily totals


Now that we have the complete DLT pipeline code covering Bronze, Silver, and Gold layers!

The final step is to create and run the DLT pipeline in the Databricks UI.