# Structured Streaming

In [0]:
%run /Workspace/Users/guslagares.data@gmail.com/[learn]Databricks-Certified-Data-Engineer-Associate/Includes/Copy-Datasets

In [0]:
%sql
USE CATALOG dae_learn_path;

In [0]:
spark.readStream.table("books") \
    .createOrReplaceTempView("books_streaming_tmp_vw")

In [0]:
%sql
DESCRIBE EXTENDED books_streaming_tmp_vw

In [0]:
%sql
select *
from books_streaming_tmp_vw

In [0]:
books_streaming_df = spark.sql("SELECT * FROM books_streaming_tmp_vw")
display(books_streaming_df, checkpointLocation = f"{checkpoints_bookstore}/tmp/books_streaming_{time.time()}")

In [0]:
df_author = spark.sql("""
                      SELECT author, count(book_id) AS total_books
                      FROM books_streaming_tmp_vw
                      GROUP BY author 
                    """)

display(df_author, checkpointLocation = f"{checkpoints_bookstore}/tmp/author_streaming_{time.time()}")

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW author_counts_tmp_vw AS (
  SELECT author, count(book_id) AS total_books
  FROM books_streaming_tmp_vw
  GROUP BY author
)


In [0]:
sorted_books_df = books_streaming_df.groupBy("author").count()

sorted_books_df.writeStream \
    .outputMode("complete") \
    .trigger(availableNow=True) \
    .option("checkpointLocation", f"{checkpoints_bookstore}/tmp/author_streaming_{time.time()}") \
    .format('console') \
    .table("author_counts") # \
    # .start()


In [0]:
display(sorted_books_df)


In [0]:
%sql
select * from author_counts


In [0]:
%sql
INSERT INTO books (book_id, title, author, category, price)
values ("B19", "Introduction to Modeling and Simulation", "Mark W. Spong", "Computer Science", 25),
         ("B20", "Robot Modeling and Control", "Mark W. Spong", "Computer Science", 30),
         ("B21", "Turing's Vision: The Birth of Computer Science", "Chris Bernhardt", "Computer Science", 35)

In [0]:
%sql
INSERT INTO books (book_id, title, author, category, price)
  values ("B16", "Hands-On Deep Learning Algorithms with Python", "Sudharsan Ravichandiran", "Computer Science", 25),
         ("B17", "Neural Network Methods in Natural Language Processing", "Yoav Goldberg", "Computer Science", 30),
        ("B18", "Understanding digital signal processing", "Richard Lyons", "Computer Science", 35)
        


In [0]:
(spark.table("author_counts_tmp_vw")                               
      .writeStream           
      .trigger(availableNow=True)
      .outputMode("complete")
      .option("checkpointLocation", f"{checkpoints_bookstore}/author_counts")
      .table("author_counts")
      .awaitTermination()
)

In [0]:
%sql
select * from author_counts

# Auto Loader

In [0]:
files = dbutils.fs.ls("/Volumes/workspace/default/bookstore_dataset/orders-raw/")
display(files)

In [0]:
(spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", f"{checkpoints_bookstore}/orders")
        .load(f"{dataset_bookstore}/orders-raw")
      .writeStream
        .trigger(availableNow=True) # we use trigger AvailableNow as Trigger type ProcessingTime is not supported for Serverless compute.
        .option("checkpointLocation", f"{checkpoints_bookstore}/orders")
        .table("orders_updates")
)

In [0]:
files = dbutils.fs.ls("/Volumes/workspace/default/bookstore_dataset/orders/")
display(files)

In [0]:
%sql
select * from orders_updates

In [0]:
%sql
select count(*) from orders_updates

In [0]:
load_new_data()

In [0]:
files = dbutils.fs.ls("/Volumes/workspace/default/bookstore_dataset/orders-raw/")
display(files)

In [0]:
%sql
DESCRIBE HISTORY orders_updates

# Multi-Hop Architecture

In [0]:
%sql
USE CATALOG dae_learn_path

In [0]:
(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", f"{checkpoints_bookstore}/orders_raw")
    .load("/Volumes/workspace/default/bookstore_dataset/orders-raw")
    .createOrReplaceTempView("orders_raw_temp"))

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW orders_tmp AS (
  SELECT *, current_timestamp() arrival_time, input_file_name() source_file
  FROM orders_raw_temp
)

In [0]:
orders_tmp_df = spark.sql("SELECT * FROM orders_tmp")
display(orders_tmp_df, checkpointLocation = f"{checkpoints_bookstore}/orders_{time.time()}")

In [0]:
(spark.table("orders_tmp")
                    .writeStream
                    .format("delta")
                    .option("checkpointLocation", f"{checkpoints_bookstore}/orders_bronze")
                    .outputMode("append")
                    .trigger(availableNow=True) # we use trigger AvailableNow as Trigger type ProcessingTime is not supported for Serverless compute.
                    .table("orders_bronze"))

In [0]:
%sql
select count(*) from orders_bronze


In [0]:
def process_bronze():
      query = (spark.table("orders_tmp")
                    .writeStream
                    .format("delta")
                    .option("checkpointLocation", f"{checkpoints_bookstore}/orders_bronze")
                    .outputMode("append")
                    .trigger(availableNow=True) # we use trigger AvailableNow as Trigger type ProcessingTime is not supported for Serverless compute.
                    .table("orders_bronze"))
      
      query.awaitTermination()

process_bronze()

In [0]:
load_new_data()

In [0]:
process_bronze()

In [0]:
(spark.read
      .format("json")
      .load(f"{dataset_bookstore}/customers-json")
      .createOrReplaceTempView("customers_lookup"))

In [0]:
%sql
SELECT * FROM customers_lookup

In [0]:
(spark.readStream
  .table("orders_bronze")
  .createOrReplaceTempView("orders_bronze_tmp"))

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW orders_enriched_tmp AS (
  SELECT order_id, quantity, o.customer_id, c.profile:first_name as f_name, c.profile:last_name as l_name,
        cast(from_unixtime(order_timestamp, 'yyyy-MM-dd HH:mm:ss') AS timestamp) order_timestamp, books
   FROM orders_bronze_tmp o
   INNER JOIN customers_lookup c
   ON o.customer_id = c.customer_id
   WHERE quantity > 0)
   

In [0]:
def process_silver():
      query = (spark.table("orders_enriched_tmp")
                    .writeStream
                    .format("delta")
                    .option("checkpointLocation", f"{checkpoints_bookstore}/orders_silver")
                    .outputMode("append")
                    .trigger(availableNow=True) # we use trigger AvailableNow as Trigger type ProcessingTime is not supported for Serverless compute.
                    .table("orders_silver"))
      
      query.awaitTermination()

process_silver()


In [0]:
%sql
select * from orders_silver

In [0]:
%sql
select count(*) from orders_silver

In [0]:
load_new_data()

process_bronze()
process_silver()



In [0]:
(spark.readStream
  .table("orders_silver")
  .createOrReplaceTempView("orders_silver_tmp"))

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW daily_customer_books_tmp AS (
  SELECT customer_id, f_name, l_name, date_trunc("DD", order_timestamp) order_date, sum(quantity) books_counts
  FROM orders_silver_tmp
  GROUP BY customer_id, f_name, l_name, date_trunc("DD", order_timestamp)
  )

In [0]:
def process_gold():
      query = (spark.table("daily_customer_books_tmp")
                    .writeStream
                    .format("delta")
                    .outputMode("complete")
                    .option("checkpointLocation", f"{checkpoints_bookstore}/daily_customer_books")
                    .trigger(availableNow=True)
                    .table("daily_customer_books"))
      
      query.awaitTermination()

process_gold()

In [0]:
%sql
select * from daily_customer_books

In [0]:
load_new_data(all=True)


In [0]:
process_bronze()
process_silver()
process_gold()