# Structured Streaming

[**Watch the video**](https://panoptotech.cloud.panopto.eu/Panopto/Pages/Viewer.aspx?id=c42b2eeb-4453-4b05-964e-afb0013d9354)
 --- This video is for an older version of the notebook

For this lab, we will need a data streaming source - A Kafka server.

The Kafka server is part of the docker images you already have.

We can create one by using Kafka server that simulates a live data stream.

Instructions on setting the Kafka server are in `prepare_kafka_server.md` in the root directory of this repo. [**Watch the video**](https://panoptotech.cloud.panopto.eu/Panopto/Pages/Viewer.aspx?id=f36885e6-8caf-43d9-a6fe-afb00140e63f)


refer to [sdg].p44

and the root: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

https://docs.databricks.com/spark/latest/structured-streaming/index.html

Video from Spark conf 2016: from the developer
https://www.youtube.com/watch?v=rl8dIzTpxrI




## Basic Concepts
<img src="https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png">

## Output modes
Each time a *trigger* happens we may want to write data to the output (e.g. database, HDFS, datasink).

Spark provides these modes:

### Append
Only the new rows appended to the result table since the last trigger will be written to the external storage. This is applicable only on queries where existing rows in the result table cannot change (e.g., a map on an input stream).

### Complete
The entire updated result table will be written to external storage.

### Update
Only the rows that were updated in the result table since the last trigger will be changed in the external storage. This mode works for output sinks that can be updated in place, such as a MySQL table

<br>
<br>

## The plan
You will read data from Kafka data source using the streaming API. Whenever new data is received, you will repeat a calculation (count county instances) and write the new value to the output. 

Note: in this example we use stdout, but we could also write to a database and update existing records.

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *
import os,time

_note:_ the Kafka driver has to be supplied from somewhere - internet or locally.<br>
I added the needed driver files to the Docker image so you don't have to wait for download each time you start Docker container.

If you need some other driver, the syntax is like: 
`config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0')`


In [None]:
# Schema for retail data 
SCHEMA = "InvoiceNo INT ,StockCode INT,Description STRING ,Quantity INT,InvoiceDate DATE,UnitPrice FLOAT,CustomerID FLOAT, country STRING"

# The config packages must match the specific Spark version you run!
spark = SparkSession.builder.appName('streaming')\
    .config("spark.kryoserializer.buffer.max", "512m")\
    .config('spark.jars', '/home/jars/*.jar')\
    .getOrCreate()

spark.sparkContext.setLogLevel("INFO") # too much noise? replace with "ERROR"

_note:_ "kafka_server" is the URL where the Kafka server is running. 

For example: 
    myserver.aws.com:29092
    

In [None]:
kafka_server = "kafka:9092"  # internal name in the Docker network
#kafka_server = "20.169.149.9:29092"
topic = "retail"             # the topic name where the data is stored

## Read the data stream into a regular DataFrame.

The dataframe will get bigger and bigger -- so BE CAREFUL!.\
This is ONLY to demonstrate `read()`

In [None]:
static_df = spark.read\
                  .format("kafka")\
                  .option("kafka.bootstrap.servers", kafka_server)\
                  .option("subscribe", topic)\
                  .option("startingOffsets", "earliest")\
                  .option("failOnDataLoss",False)\
                  .load()
retail_data = static_df.select(f.from_csv(f.decode("value", "US-ASCII"), schema=SCHEMA).alias("value")).select("value.*")

In [None]:
%%time 
# on my pc, there is a fixed 3 sec time for each of count() and show()  ?!
# this is probably a spark config: https://stackoverflow.com/questions/59916338/why-is-there-a-delay-in-the-launch-of-spark-executors
print("%d records in frame" % retail_data.count())
retail_data.show(5)

## Read the data stream using the streaming API

It does not make sense to read infinite data (or at least unbounded) into a dataframe. We will read from the stream and perform some computation on the data, such as finding commulative count.

Let's try to read in streaming mode (a.k.a micro batch)

In [None]:
OFFSETS_PER_TRIGGER = 500
streaming_df = spark.readStream\
                  .format("kafka")\
                  .option("kafka.bootstrap.servers", kafka_server)\
                  .option("subscribe", topic)\
                  .option("startingOffsets", "earliest")\
                  .option("failOnDataLoss",False)\
                  .option("maxOffsetsPerTrigger", OFFSETS_PER_TRIGGER )\
                  .load()\
                  .select(f.from_csv(f.decode("value", "US-ASCII"), schema=SCHEMA).alias("value")).select("value.*")

In [None]:
# Let's see the structure of the DF
streaming_df

_notes:_ 
1. Monitoring Streaming Queries: check the formal [docs](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reading-metrics-interactively)
1. In the next cell we are writing to **memory sink**. This is for *debugging only*. In real life you would write to a file or some database. (see the next example)
1. In JupyterNotebook, writing to the console does not work (hence not using `format("console")`)

In [None]:
country_counts = streaming_df.groupBy('country').count()
count_countries_query =country_counts.writeStream\
.queryName('num_countries')\
.format("memory")\
.outputMode("complete")\
.start()

# https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

In [None]:
# wait 20 seconds, letting Spark do its thing.
# During this time, Spark will run the query on each incoming microbatch
from IPython.display import display, clear_output

for _ in range(10):
    clear_output(wait=True)
    print("query status:",count_countries_query.status)
    spark.sql('SELECT * FROM num_countries').show()
    time.sleep(2)
    
count_countries_query.stop()
# If you don't stop the query, it will run forever, waiting for more data to arrive from the input

# Using foreach()

In the next example, we take it one step forward: get rid of the sleep() and handle batches in our own function


In [None]:
def process_batch(df, epoch_id):
    """
    This function is called for each batch. 
    Do whatever you want with it.
    \param df  Dataframe, containing 'batch size' rows of the input data.
    \param epoch_id  int, 0 based counter 
    """
    clear_output(wait=True)
    print(f"{epoch_id}:   {df.count()}")
    df.groupBy('country').count().show()
    # TODO: replace with something more inteligent that involves the full data (e.g. aggregate over all data)

In [None]:
query = streaming_df.writeStream.foreachBatch(process_batch).start()
time.sleep(10) # This sleep() is only so you can move on automatically to the next cell.
# Remember to stop the query when you had enough.
query.stop()

In [None]:
# and then wait for the query to terminate. It can take some time.
try:
    if query.awaitTermination(timeout=20): # This is a blocking call
        print("Query terminated")
    else:
        print("\nWARNING: the query did NOT terminate!")
except Exception as e:
    print(f"got  {str(type(e))}. This is caused by interrupting the 'process_batch' and can be ignored")
    
query.status['isTriggerActive']

> **Tip:** Some important information is printed to the log output.<br>
Use `docker logs spark-lab -f` in another terminal to get a live stream of the log messages.
 
> **Tip:** If you get `IllegalArgumentException: Cannot start query with name num_countries2 as a query with that name is already active in this SparkSession` it is because I ran the cell twice. Restart the Kernel.

In [None]:
count_countries_query2 =country_counts.writeStream\
.queryName('num_countries3')\
.format("memory")\
.outputMode("complete")\
.start()

for _ in range(10):
    clear_output(wait=True)
    display(count_countries_query2.status)
    spark.sql('SELECT * FROM num_countries').show()
    time.sleep(2)
    
count_countries_query2.stop()
if count_countries_query2.awaitTermination(timeout=2):
    print("Query terminated")
else:
    print("\nWARNING: the query did NOT terminate!")

<hr>
Let's use something more realistic: Read a data stream, process it, write output to a database sink.

Make sure you completed the "Working with databases" lesson, and that the database is up and running.


In [None]:
#todo

<hr>

# Reading text from a network connection

Copied verbatim (מִלָה בְּמִלָה) from https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#quick-example

*FIRST* run the data source in another window, and then run the cell below. (If you first run the cell, it will complain on "Connection refused" which means there is no input).

When you had enough, close the data source, and the cell will finish automatically (because it will identify the connection is terminated)

NOTE: Reading from socket is only for debug/prototyping.<br>
Also **writing to console, socket, memory is for debug only.** (see [SDG]p 357)

## Installing netcat (nc) in the Spark server (in the running container)
In the video, I used preinstalled 'nc' application.  Here are the instructions to install it:

```
docker exec -it spark-lab sh
# now, in the container's shell:
wget https://busybox.net/downloads/binaries/1.35.0-x86_64-linux-musl/busybox_NC -O nc
chmod +x nc

# run it!
./nc localhost -l -p 9999

# to exit, press ^C
```

Remember that each time a container is killed, all its content is removed, so you will have to install again.

> **Note1:** when running in Jupyter notebook, writing to the console is not visible in the cell output. 
Istead, it is written to the log output of the spark node, so run the next line of code in a terminal. It will look for these prints and print the first 10 lines: `2>&1 docker logs spark-lab -f | grep -A 10 "^Batch:"`

 > **Note2:** After the `nc` server is stopped, the spark client may try to reconnect so the cell will run indefinitely until your stop the jupyter kernel.


In [None]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

 # Start running the query that prints the running counts to the console(note1)
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()


if query.awaitTermination():
    print("Query terminated")
else:
    print("\nWARNING: the query did NOT terminate!")

# Challenges in distributed data streaming

## Consistency
Some data is already processed in one node, but stale values in another node - can cause errors

## Fault tolerance
How to handle failed reads?

## Out of order data
e.g. node 1 received and processed "close event" and node 2 then processes the "start event". 

# What we did not cover here

This was just a taste of the streaming API. 

New features are added from time to time, so checking the docs is always advised.

Some interesting topics to follow:
* selection, Projection
* Handling errors (duplication, recovery ...)
* Window operations (see 'window functions' notebook)
* Join operations
* ForeachBatch -- call a user function https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.streaming.DataStreamWriter.foreachBatch.html
  * see also https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
* Foreach sink -- call a user function for each record, giving more control  

## Debugging and testing

You can use **memory, console** and **socket** for debugging. They should not be used for production due to performance and lack of fault tolerance!

If you do want to output data to a table for interactive SQL queries in production, the authors
recommend using the Parquet file sink on a distributed file system (e.g., S3). You can then query
the data from any Spark application. [SDG]

# Check yourself

* What will happen if you run 'country_counts.show()'? Why?
* change OFFSETS_PER_TRIGGER to 100. How does it affect the processing?
* replace `outputMode("complete")` with "append" and "update" and run the kafka code again. Is this what you expected?
* When using *memory* sink, (the memory of) which node is used? 
* what happens if you subscribe to nonexistent topic in Kafka?
* What happens if the microbatch processing time is longer than the input data rate?

Answer [here](https://forms.gle/cUPe5xeTwoGwbH4PA) and see your results

# Finished? <p style="color:red;">Remember to stop/kill the Docker container to avoid consuming CPU.</p>