
<div  style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://raw.githubusercontent.com/derar-alhussein/Databricks-Certified-Data-Engineer-Professional/main/Includes/images/books.png" width="60%">
</div>

This notebook focuses on creating the books silver table - we will use a Type2 SCD table to record the data, so we can keep track of the price of the data. 

In [0]:
%run ../Includes/Copy-Datasets

In [0]:
%sql
-- NOTE: This cell contains the SQL query for your reference, and won't work if run directly.
-- The query is used below in the type2_upsert() function as part of the foreachBatch call.

MERGE INTO books_silver
USING (
    SELECT updates.book_id as merge_key, updates.*
    FROM updates

    UNION ALL

    SELECT NULL as merge_key, updates.*
    FROM updates
    JOIN books_silver ON updates.book_id = books_silver.book_id
    WHERE books_silver.current = true AND updates.price <> books_silver.price
  ) staged_updates
ON books_silver.book_id = merge_key 
WHEN MATCHED AND books_silver.current = true AND books_silver.price <> staged_updates.price THEN
  UPDATE SET current = false, end_date = staged_updates.updated
WHEN NOT MATCHED THEN
  INSERT (book_id, title, author, price, current, effective_date, end_date)
  VALUES (staged_updates.book_id, staged_updates.title, staged_updates.author, staged_updates.price, true, staged_updates.updated, NULL)

In [0]:
def type2_upsert(microBatchDF, batch):
    microBatchDF.createOrReplaceTempView("updates")
    
    sql_query = """
        MERGE INTO books_silver
        USING (
            SELECT updates.book_id as merge_key, updates.*
            FROM updates

            UNION ALL

            SELECT NULL as merge_key, updates.*
            FROM updates
            JOIN books_silver ON updates.book_id = books_silver.book_id
            WHERE books_silver.current = true AND updates.price <> books_silver.price
          ) staged_updates
        ON books_silver.book_id = merge_key 
        WHEN MATCHED AND books_silver.current = true AND books_silver.price <> staged_updates.price THEN
          UPDATE SET current = false, end_date = staged_updates.updated
        WHEN NOT MATCHED THEN
          INSERT (book_id, title, author, price, current, effective_date, end_date)
          VALUES (staged_updates.book_id, staged_updates.title, staged_updates.author, staged_updates.price, true, staged_updates.updated, NULL)
    """
    
    microBatchDF.sparkSession.sql(sql_query)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS books_silver
(book_id STRING, title STRING, author STRING, price DOUBLE, current BOOLEAN, effective_date TIMESTAMP, end_date TIMESTAMP)

Execute the streaming query below to process the books data from the bronze table 

In [0]:
def process_books():
    schema = "book_id STRING, title STRING, author STRING, price DOUBLE, updated TIMESTAMP"
 
    query = (spark.readStream
                    .table("bronze")
                    .filter("topic = 'books'")
                    .select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
                    .select("v.*")
                 .writeStream
                    .foreachBatch(type2_upsert)
                    .option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/books_silver")
                    .trigger(availableNow=True)
                    .start()
            )
    
    query.awaitTermination()
    
process_books()

In [0]:
books_df = spark.read.table("books_silver").orderBy("book_id", "effective_date")
display(books_df)

book_id,title,author,price,current,effective_date,end_date
B01,The Soul of a New Machine,Tracy Kidder,49.0,True,2021-11-07T17:11:33.507Z,
B02,Learning JavaScript Design Patterns,Addy Osmani,28.0,True,2021-11-07T17:11:33.507Z,
B03,Make Your Own Neural Network,Tariq Rashid,35.0,True,2021-11-07T17:11:33.507Z,
B04,Robot Dynamics and Control,Mark W. Spong,20.0,True,2021-11-08T17:12:05.419Z,
B05,Fluent Python,Luciano Ramalho,47.0,True,2021-11-08T17:12:05.419Z,
B06,Deep Learning with Python,François Chollet,22.0,True,2021-11-08T17:12:05.419Z,
B07,The Hundred-Page Machine Learning,Andriy Burkov,33.0,True,2021-11-09T17:11:49.506Z,
B08,Quantum Computing for Everyone,Chris Bernhardt,41.0,True,2021-11-09T17:11:49.506Z,
B09,Advanced Data Structures,Peter Brass,24.0,True,2021-11-09T17:11:49.506Z,
B10,Beginning Database Design Solutions,Rod Stephens,44.0,True,2021-11-10T16:36:31.241Z,


Load some data files with updated books, and process them 

In [0]:
bookstore.load_books_updates()
bookstore.process_bronze()
process_books()

Loading books-updates-streaming-03.json file to the bookstore dataset


In [0]:
books_df = spark.read.table("books_silver").orderBy("book_id", "effective_date")
display(books_df)

book_id,title,author,price,current,effective_date,end_date
B01,The Soul of a New Machine,Tracy Kidder,49.0,True,2021-11-07T17:11:33.507Z,
B02,Learning JavaScript Design Patterns,Addy Osmani,28.0,True,2021-11-07T17:11:33.507Z,
B03,Make Your Own Neural Network,Tariq Rashid,35.0,True,2021-11-07T17:11:33.507Z,
B04,Robot Dynamics and Control,Mark W. Spong,20.0,True,2021-11-08T17:12:05.419Z,
B05,Fluent Python,Luciano Ramalho,47.0,True,2021-11-08T17:12:05.419Z,
B06,Deep Learning with Python,François Chollet,22.0,True,2021-11-08T17:12:05.419Z,
B07,The Hundred-Page Machine Learning,Andriy Burkov,33.0,True,2021-11-09T17:11:49.506Z,
B08,Quantum Computing for Everyone,Chris Bernhardt,41.0,True,2021-11-09T17:11:49.506Z,
B09,Advanced Data Structures,Peter Brass,24.0,True,2021-11-09T17:11:49.506Z,
B10,Beginning Database Design Solutions,Rod Stephens,44.0,True,2021-11-10T16:36:31.241Z,


We want to create another silver table that will only have the most current books (records) available - we will filter it based on current IS TRUE


<div  style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://raw.githubusercontent.com/derar-alhussein/Databricks-Certified-Data-Engineer-Professional/main/Includes/images/current_books.png" width="60%">
</div>

In [0]:
%sql
CREATE OR REPLACE TABLE current_books
AS (
   SELECT book_id, title, author, price
   FROM books_silver
   WHERE current IS TRUE
   )

num_affected_rows,num_inserted_rows


Our silver table only returns the current records, and will have a record of everything that is current in the books table

In [0]:
%sql
SELECT *
FROM current_books
ORDER BY book_id

book_id,title,author,price
B01,The Soul of a New Machine,Tracy Kidder,49.0
B02,Learning JavaScript Design Patterns,Addy Osmani,28.0
B03,Make Your Own Neural Network,Tariq Rashid,35.0
B04,Robot Dynamics and Control,Mark W. Spong,20.0
B05,Fluent Python,Luciano Ramalho,47.0
B06,Deep Learning with Python,François Chollet,22.0
B07,The Hundred-Page Machine Learning,Andriy Burkov,33.0
B08,Quantum Computing for Everyone,Chris Bernhardt,41.0
B09,Advanced Data Structures,Peter Brass,24.0
B10,Beginning Database Design Solutions,Rod Stephens,44.0
