## What a data Stream is?

- Any data source that grows over time.

Examples:

- New files landing in cloud storage
- Updates to a database captured in a cdc feed.
- Events queued in a pub/sub messaging feed. (Like kafka)
- A delta table.

## Spark Streaming

Spark streaming is another Spark feature that allows to treat infinite input data stream as a structured table.

With this, new data in the source result in new records to an **unbounded** table. 

### Triggers for streaming data

**Unspecified: processingTime = "500ms"**

**Fixed interval**

In [0]:
# .trigger( processingTime = "10 seconds" ) #Process data in micro-batches at the user-specified intervals.

**Triggered batch**

In [0]:
#.trigger( once=True ) #Process all available data in a single batch, then stop.

**Triggered micro-batches**

In [0]:
#.trigger(availableNow=True) #Process all available data in multiple micro-batches, then stop.

### Output modes for streaming

**Append**

In [0]:
#.outputMode("append")

**Complete**

In [0]:
#.outputMode("complete")

### Unsupported operations

- Sorting
- Deduplication

### Practice

**Create the table from the csv folder**

In [0]:
csv_file_df = spark.read.option("header", "true").option("delimiter", ";")\
    .csv("dbfs:/external_data/bookstore/books-csv/*.csv")
csv_file_df.write.mode("overwrite").saveAsTable("external_data.books")

**Read the table as a stream table**

In [0]:
spark.readStream.table("external_data.books").createOrReplaceTempView("books_streaming_tmp_vw")

In [0]:
%sql
select *
from books_streaming_tmp_vw

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW author_counts_tmp_vw as
(
  select author, count(*) as total_books
  from books_streaming_tmp_vw
  group by author
)

Both views will behave as a live query, capturing all the upcomming data.

**Creating a delta table from a streaming temporary view**

This option will look for changes in the source view every 10 seconds and perform a complete overwriting

In [0]:
spark.table("author_counts_tmp_vw")\
  .writeStream\
  .trigger(processingTime = '10 seconds')\
  .option("checkpointLocation", "dbfs:/external_data/bookstore/author_counts_checkpoint")\
  .outputMode("complete")\
  .table("external_data.author_counts")

In [0]:
%sql

select *
from external_data.author_counts

In [0]:
%sql
INSERT INTO external_data.books
values ("B19", "Introduction to Modeling and Simulation", "Mark W. Spong", "Computer Science", 25),
        ("B20", "Robot Modeling and Control", "Mark W. Spong", "Computer Science", 30),
        ("B21", "Turing's Vision: The Birth of Computer Science", "Chris Bernhardt", "Computer Science", 35)

Inserting new data

In [0]:
%sql
INSERT INTO external_data.books
values ("B16", "Hands-On Deep Learning Algorithms with Python", "Sudharsan Ravichandiran", "Computer Science", 25),
        ("B17", "Neural Network Methods in Natural Language Processing", "Yoav Goldberg", "Computer Science", 30),
        ("B18", "Understanding digital signal processing", "Richard Lyons", "Computer Science", 35)

**Batch Writing Mode**

This option will insert all that's available at the momment and write it as a delta table in micro batches

In [0]:
spark.table("author_counts_tmp_vw")\
  .writeStream\
  .trigger(availableNow=True)\
  .option("checkpointLocation", "dbfs:/external_data/bookstore/author_counts_checkpoint")\
  .outputMode("complete")\
  .table("external_data.author_counts")\
    .awaitTermination()

In [0]:
%sql
select *
from external_data.author_counts

### Stop streamings

In [0]:
for s in spark.streams.active:
    print("Stopping stream: " + s.id)
    s.stop()
    s.awaitTermination()

## Incremental Data Ingestion From Files

### Copy INTO statement

In [0]:
%sql
COPY INTO external_data.books
from 'dbfs:/external_data/bookstore/books-csv/*.csv'
fileformat = csv
format_options (
  'delimiter' = ';',
  'header' = 'true'
)
copy_options (
  'mergeSchema' = 'true',
  'ignoreDuplicates' = 'true',
  'errorIfExists'='false'
  --,'force'='true'
)

Good to know operation:
If by any means, you truncate the target table and want to use again the copy into for data insetion, you can write 'force' = 'true' to "restart" the copying behaviour.

In [0]:
%sql
select *
from external_data.books

#### Use cases

Low data volume. (Thousands of files).

Batch loading of low volumes.

Less efficient at scale.

### Auto Loader

In [0]:
spark.readStream\
        .format("cloudFiles")\
        .option("cloudFiles.format", "parquet")\
        .option("cloudFiles.schemaLocation", "dbfs:/external_data/bookstore/orders_checkpoint")\
        .load("dbfs:/external_data/bookstore/orders-raw")\
    .writeStream\
        .option("checkpointLocation", "dbfs:/external_data/bookstore/orders_checkpoint")\
        .table("external_data.orders_updates")


In [0]:
%sql
select *
from external_data.orders_updates

This is for getting the functions created in the notebook

In [0]:
%run Workspace/Repos/borderbrayan@gmail.com/Databricks-Tests/Copy-Datasets

In [0]:
load_new_data()

In [0]:
dbutils.fs.rm("dbfs:/external_data/bookstore/orders_checkpoint", True)

#### Use cases

Can process billions of files

Support near real-time ingestion of millions of files per hour.

Fault tolerance: with Auto Loader Checkpoint, the process can resume from the failure point.

Efficient at scale.

## Multi-hop Architecture

![Screenshot 2025-05-07 at 9.54.26 a.m..png](./Screenshot 2025-05-07 at 9.54.26 a.m..png "Screenshot 2025-05-07 at 9.54.26 a.m..png")

### Bronze Layer

In [0]:
spark.readStream\
    .format("cloudFiles")\
    .option("cloudFiles.format", "parquet")\
    .option("cloudFiles.schemaLocation", "dbfs:/external_data/bookstore/checkpoints/orders_raw")\
    .load("dbfs:/external_data/bookstore/orders-raw")\
    .createOrReplaceTempView("orders_raw_temp")

Enrich the raw data before creating the bronze table

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW orders_tmp AS (
  SELECT *, current_timestamp() arrival_time, input_file_name() source_file
  FROM orders_raw_temp
)

Create the bronze table

In [0]:
(spark.table("orders_tmp")
      .writeStream
      .format("delta")
      .option("checkpointLocation", "dbfs:/external_data/bookstore/checkpoints/orders_bronze")
      .outputMode("append")
      .table("orders_bronze"))

In [0]:
%sql
select count(*)
from orders_bronze

### Silver Layer

In [0]:
(spark.read
      .format("json")
      .load(f"{dataset_bookstore}/customers-json")
      .createOrReplaceTempView("customers_lookup"))

In [0]:
(spark.readStream
  .table("orders_bronze")
  .createOrReplaceTempView("orders_bronze_tmp"))

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW orders_enriched_tmp AS (
  SELECT order_id, quantity, o.customer_id, c.profile:first_name as f_name, c.profile:last_name as l_name,
         cast(from_unixtime(order_timestamp, 'yyyy-MM-dd HH:mm:ss') AS timestamp) order_timestamp, books
  FROM orders_bronze_tmp o
  INNER JOIN customers_lookup c
  ON o.customer_id = c.customer_id
  WHERE quantity > 0)

In [0]:
(spark.table("orders_enriched_tmp")
      .writeStream
      .format("delta")
      .option("checkpointLocation", "dbfs:/external_data/bookstore/checkpoints/orders_silver")
      .outputMode("append")
      .table("orders_silver"))

In [0]:
%sql
select count(*)
from orders_silver

### Gold Layer

In [0]:
(spark.readStream
  .table("orders_silver")
  .createOrReplaceTempView("orders_silver_tmp"))

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW daily_customer_books_tmp AS (
  SELECT customer_id, f_name, l_name, date_trunc("DD", order_timestamp) order_date, sum(quantity) books_counts
  FROM orders_silver_tmp
  GROUP BY customer_id, f_name, l_name, date_trunc("DD", order_timestamp)
  )

In [0]:
(spark.table("daily_customer_books_tmp")
      .writeStream
      .format("delta")
      .outputMode("complete")
      .option("checkpointLocation", "dbfs:/external_data/bookstore/checkpoints/daily_customer_books")
      .trigger(availableNow=True)
      .table("daily_customer_books"))

In [0]:
for s in spark.streams.active:
    print("Stopping stream: " + s.id)
    s.stop()
    s.awaitTermination()

In [0]:
%sql
drop table daily_customer_books;
drop table orders_silver;
drop table orders_bronze;

In [0]:
dbutils.fs.rm("dbfs:/external_data/bookstore/checkpoints", True)