# Lecture 25. Structured Streaming (Hands On)

In this notebook, we will explore the basics of working with Spark Structured Streaming to allow incremental processing of data.

We will continue using our bookstore dataset that contains Customers, Orders and Books tables.

<div style="text-align: center;">
<img src="../../assets/images/bookstore_schema.png" style="width:640px" >
</div> 

Let us first copy our dataset.

In [0]:
%run ../Includes/Copy-Datasets

## Reading Stream

To work with data streaming in SQL, you must first use `spark.readStream` method PySpark API.
This is why we are using here a Python notebook.

`spark.readStream` method allows to query a Delta table as a stream source.
And from there, we can register a temporary view against this stream source.

In [0]:
(spark.readStream
      .table("books")
      .createOrReplaceTempView("books_streaming_tmp_vw")
)

The temporary view created here is a "streaming" temporary view that allows to apply most transformations in SQL the same way as we would with the static data.

### Displaying Streaming Data
Let us first query this streaming temporary view.

In [0]:
%sql
SELECT * FROM books_streaming_tmp_vw

book_id,title,author,category,price
B14,Data Communications and Networking,Behrouz A. Forouzan,Computer Science,34.0
B15,Inside the Java Virtual Machine,Bill Venners,Computer Science,41.0
B13,Linux pocket guide,Daniel J. Barrett,Computer Science,26.0
B10,Beginning Database Design Solutions,Rod Stephens,Computer Science,44.0
B11,Business Intelligence for Dummies,Swain Scheps,Computer Science,38.0
B12,Big Data in Practice,Bernard Marr,Computer Science,30.0
B01,The Soul of a New Machine,Tracy Kidder,Computer Science,49.0
B02,Learning JavaScript Design Patterns,Addy Osmani,Computer Science,28.0
B03,Make Your Own Neural Network,Tariq Rashid,Computer Science,35.0
B07,The Hundred-Page Machine Learning,Andriy Burkov,Computer Science,33.0


Here, what we see is a streaming result. 
As you can see, the query is still running, waiting for any new data to be displayed here.

Generally speaking, we don't display a streaming result unless a human is actively monitoring the output of a query during development or live dashboarding.

Let us click `Interrupt` to stop this active streaming query.

### Applying Transformations

Let us now apply some aggregations on this streaming temporary view.

In [0]:
%sql
SELECT author, count(book_id) AS total_books
FROM books_streaming_tmp_vw
GROUP BY author

author,total_books
Mark W. Spong,1
Chris Bernhardt,1
Tariq Rashid,1
Peter Brass,1
Luciano Ramalho,1
Addy Osmani,1
Andriy Burkov,1
Tracy Kidder,1
Swain Scheps,1
François Chollet,1


Again, because we are querying a streaming temporary view,
this becomes a streaming query that executes infinitely, 
rather than completing after retrieving a single set of results.

And here we are just displaying an aggregation of input as seen by the stream.
None of these records are being persisted anywhere at this point.

For streaming queries like this, we can always explore an interactive dashboard to monitor the streaming performance.

Before continuing, let us cancel this active streaming query by clicking `Interrupt`.

### Unsupported Operations

Remember, when working with streaming data, some operations are not supported like sorting.

In [0]:
%sql
SELECT * 
FROM books_streaming_tmp_vw
ORDER BY author

You can use advanced methods like windowing and watermarking to achieve such operations, but it is out of scope for this course.

## Persisting Streaming Data

Now, in order to persist incremental results, we need first to pass our logic back to PySpark DataFrame API.

Here we are creating another temporary view.
And since we are creating this temporary view from the result of a query against a streaming temporary view, so this new temporary view is also a "streaming" temporary view.

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW author_counts_tmp_vw AS (
  SELECT author, count(book_id) AS total_books
  FROM books_streaming_tmp_vw
  GROUP BY author
)

The new streaming temporary view has been created.

In PySpark DataFrame API, we can use the `spark.table()` to load data from a streaming temporary view back to a DataFrame.

Note that spark always loads streaming views as a streaming DataFrames, and static views as a static DataFrames, meaning that: incremental processing must be defined from the very beginning with Read logic to support later an incremental writing.

Then, we are using DataFrame `writeStream` method to persist the result of a streaming query to a durable storage.
This allows us to configure the output with the three settings: 
- The trigger intervals, here every 4 seconds. 
- The output mode, either append or complete.
- For aggregation streaming queries, we must always use "complete" mode to overwrite the table with the new calculation.
- Finally, the checkpoint location to help tracking the progress of the streaming processing.

In [0]:
(spark.table("author_counts_tmp_vw")                               
      .writeStream  
      .trigger(processingTime='4 seconds')
      .outputMode("complete")
      .option("checkpointLocation", "dbfs:/mnt/demo/author_counts_checkpoint")
      .table("author_counts")
)

<pyspark.sql.streaming.query.StreamingQuery at 0x7f1ba4687970>

You can think about such a streaming query as an always-on incremental query, and we can always explore its interactive dashboard.
From this dashboard, we can see that the data has been processed and we can now query our target table.

In [0]:
%sql
SELECT *
FROM author_counts

author,total_books
Behrouz A. Forouzan,1
François Chollet,1
Daniel J. Barrett,1
Luciano Ramalho,1
Chris Bernhardt,1
Andriy Burkov,1
Mark W. Spong,1
Bill Venners,1
Tariq Rashid,1
Bernard Marr,1


Our data has been written to the target table, the `author_counts` table, and we can see that each author has currently only 1 book.

And remember, what you see here is not a "streaming" query! simply because we are querying the table directly.
I mean, not as a streaming source through a streaming DataFrame.

And if we come back to our streaming query, we see that it is still active. 
In fact, when we execute a streaming query, the streaming query will continue to update as new data arrives in the source.

### Adding New Data

To confirm this, let us add new data to our source table, the `books` table. Let us run this query and see what will happen in our streaming.

In [0]:
%sql
INSERT INTO books
values ("B19", "Introduction to Modeling and Simulation", "Mark W. Spong", "Computer Science", 25),
        ("B20", "Robot Modeling and Control", "Mark W. Spong", "Computer Science", 30),
        ("B21", "Turing's Vision: The Birth of Computer Science", "Chris Bernhardt", "Computer Science", 35)

num_affected_rows,num_inserted_rows
3,3


We can see that there is a new date arriving.

Let us query our target table again to see the updated books counts for each author.

In [0]:
%sql
SELECT *
FROM author_counts

author,total_books
Behrouz A. Forouzan,1
François Chollet,1
Daniel J. Barrett,1
Chris Bernhardt,2
Luciano Ramalho,1
Mark W. Spong,3
Andriy Burkov,1
Tariq Rashid,1
Rod Stephens,1
Tracy Kidder,1


Now we see some authors having more than 1 book.

Let us come back to our streaming query, and cancel it to see another scenario.

Always remember to cancel any active stream in your notebook, otherwise the stream will be always on and prevents the cluster from auto termination.

## Streaming in Batch Mode

For our last scenario, we will add some books for new authors to our source table.

In [0]:
%sql
INSERT INTO books
values ("B16", "Hands-On Deep Learning Algorithms with Python", "Sudharsan Ravichandiran", "Computer Science", 25),
       ("B17", "Neural Network Methods in Natural Language Processing", "Yoav Goldberg", "Computer Science", 30),
       ("B18", "Understanding digital signal processing", "Richard Lyons", "Computer Science", 35)

num_affected_rows,num_inserted_rows
3,3


Three records have been inserted.

In this scenario, we modify the trigger method to change our query from an always-on query triggered every 4 seconds to a triggered incremental batch.

We do this using the `availableNow` trigger option.
With this trigger option, the query will process all new available data and stop on its own after execution.

In this case, we can use the `awaitTermination` method to block the execution of any cell in this notebook until the incremental batch's write has succeeded.

Let us now run this query to process the three records we have just added.

In [0]:
(spark.table("author_counts_tmp_vw")                               
      .writeStream           
      .trigger(availableNow=True)
      .outputMode("complete")
      .option("checkpointLocation", "dbfs:/mnt/demo/author_counts_checkpoint")
      .table("author_counts")
      .awaitTermination()
)

As you can see, with the `availableNow` trigger option, the query runs in a batch mode.

It is executed to process all the available data and then stop on its own.

Let us finally query the target table again to see the updated data.

In [0]:
%sql
SELECT *
FROM author_counts

author,total_books
Sudharsan Ravichandiran,1
Behrouz A. Forouzan,1
François Chollet,1
Daniel J. Barrett,1
Luciano Ramalho,1
Chris Bernhardt,2
Andriy Burkov,1
Yoav Goldberg,1
Richard Lyons,1
Mark W. Spong,3



Now we have 18 authors instead of 15.