# Spark Streaming

> Streaming is an extension to Spark's core which allows to perform __real life data processing__

> __Not all functionalities are available on Python currently, in order to work with full powered framework one should use `scala` or `java`__

![](./images/streaming-arch.png)

Flow looks as below:
1. We hook up to data stream source (e.g. `Apache Kafka`)
2. Incoming data is divided into batches 
3. Batches are processed by `spark` engine
4. Resulting data batches are returned (e.g. after `MapReduce` transforms)

![](./images/streaming-flow.png)

## DStreams (Discretized Streams)

> High level abstraction over stream of data which allows us to easily work it

These can be created either by:
- Reading directly our data source
- Modifying  existing `DStreams` (and creating new ones at the same time)

> __`DStreams` internally are represented as a sequence of `RDD`s__

### Receivers

> Every `DStream` (except file) is connected to the __`Receiver`__

Receiver's job is to:
- receive data from a source
- store in Spark's memory for processing

__Points to remember:__
- __SINGLE `Receiver` USED ON THREAD__
- If we use a single thread (or not enough resources in terms of cluster) there will be no threads left to actually run data processing!
- __We should give more cores than `Receivers` we would like to run__
- We can run multiple streams to get data from different sources
- __`FileStreams` are exception as  they periodically query the system for new files__ (and this can be done on a single thread)

Let's start by creating `Session` (__you should always start like that!__)

#### Receivers reliability

Receivers can be split into two wide categories:

- __reliable__:
    - gets data from the reliable stream
    - sends acknowledgmenet to stream when the data has been processed and saved to Spark memory
    - source receives the acknowledgment and verifies everything is correct
    - __Example: `Apache Kafka`__
- __unreliable__
    - either `receiver` or `source` cannot process acknowledgments 
    - we don't care about sending acknowledgment and data loss is not a problem

__First category ensures no packages have been lost during transmission__

## Coding

Let's start by creating `Session` (__you should always start like that!__)

In [None]:
import findspark

findspark.init()

In [None]:
import multiprocessing

import pyspark

# We should always start with session in order to obtain
# context and session if needed
session = pyspark.sql.SparkSession.builder.config(
    conf=pyspark.SparkConf()
    .setMaster(f"local[{multiprocessing.cpu_count()}]")
    .setAppName("TestApp")
).getOrCreate()


## StreamingContext

After that we can create `pyspark.streaming.StreamingContext` ([docs](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.StreamingContext.html)) object which:
- takes `sparkContext` as input
- it can be the one we're using for other things (SQL or core PySpark functionality)
- __it does not work with `pyspark.SparkContext` object directly though!__

In [None]:
from pyspark.streaming import StreamingContext

# This context can be used with PySpark streaming
# You might have to specify batchDuration (e.g. on which time window operation will be run)
# By default data is collected every 0.5 seconds
ssc = StreamingContext(session.sparkContext, batchDuration=30)


Important things to notice:
- __We set up the whole computation pipeline first__
- __NOTHING__ is started until we use `ssc.start()` and finish with `ssc.end()`

Methods of interest which allow us to work with streams:
- [`ssc.awaitTermination([timeout])`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.StreamingContext.awaitTermination.html#pyspark.streaming.StreamingContext.awaitTermination)
    processes data indefinitely or up to a moment `timeout` is hit
- [`ssc.checkpoint(directory)`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.StreamingContext.checkpoint.html#pyspark.streaming.StreamingContext.checkpoint) -
    periodically checkpoint data for increased fault tolerance
- [`scc.getActiveOrCreate(path, function)`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.StreamingContext.getActiveOrCreate.html#pyspark.streaming.StreamingContext.getActiveOrCreate) -
    If there is an active stream (`start`ed and not `stop`ped) return it, otherwise recreate if from checkpoint

Methods we will run each time:
- [`scc.start()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.StreamingContext.start.html#pyspark.streaming.StreamingContext.start) - starts execution of streams
- [`scc.stop()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.StreamingContext.stop.html#pyspark.streaming.StreamingContext.stop) - stops executions of streams

Creating streams from context:
- [`ssc.socketTextStream(hostname, port [,storageLevel])`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.StreamingContext.socketTextStream.html#pyspark.streaming.StreamingContext.socketTextStream) - create input stream by listening on specified `hostname` and `port`
- [`ssc.textFileStreams(directory)`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.StreamingContext.textFileStream.html#pyspark.streaming.StreamingContext.textFileStream) - watch for new files created on Hadoop compatible system (e.g. HDFS) in specified directory and read them as text files
- [`ssc.binaryRecordsStream(directory, recordLength)`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.streaming.StreamingContext.binaryRecordsStream.html#pyspark.streaming.StreamingContext.binaryRecordsStream) - as above, but reads the files as binary

Given all of that, let's create a `DStream` via `socketTextStream` which will listen on `localhost` for data incoming to the machine:

In [None]:
# We will send lines of data to this socketTextStream
lines = ssc.socketTextStream("localhost", 9999)

Let's also apply same transformations which:
- Will split input text into `words` and flatten the result
- Count unique words in the text

In [None]:
unique_words = lines.flatMap(lambda text: text.split()).countByValue()

And print the incoming words:

In [None]:
unique_words.pprint()

__Notice nothing happened yet!__

This is due to our `computation` not starting yet. In order to do that, we have to run `ssc.start()`.

Before we do that though, let's run `netcat` to push some data to our socket.

Please notice that:
- `--listen` flag is specified __which means we have created server listening on specified port__
- PySpark's `socket` __expects to find server which it can connect to under specified address!__

Let's run this interactive command. It will allow us to send text data to `localhost:9999` (make sure you have `nc` or `netcat` available on your system!).

> __Run it in the terminal in order not to block the execution of the notebook!__

After you've run it you can send textual data to the server setup by `netcat`.
Type some words in the terminal and they will be added to the stream.

In [None]:
# install with sudo apt install netcat
# !nc -lk 9999

Now, you can run `start` which:
- __will run indefinitely__, BUT
- __will NOT stop Python's program execution__

Due to above we will be able to run next cell (in our program it would be next line)

In [None]:
ssc.start()
# After starting the stream run the command, from your spark folder.
# ./bin/spark-submit --master "local[2]" examples/src/main/python/streaming/network_wordcount.py localhost 9999
# This we begin the example wordcount_stream script.
# You can then type words in your terminal and they should be outputted to the stream. 

Now we can run the cell below in order to wait for `seconds` until `pyspark` terminates the connection:

In [None]:
seconds = 180
ssc.awaitTermination(seconds)

Finally, we can stop `pyspark` client handling incoming data.

Specifying `stopGraceFully=True` will allow it to finish only after consuming whole data posted to `9999` port:

In [None]:
ssc.stop(stopGraceFully=True)

## Analyze

Let's analyze our results. This infographic will help us clear the confusion:

![](./images/streaming-dstream.png)

![](./images/streaming-dstream-ops.png)

As one can see:

- __Operations are done on the batch of gathered data, NOT ON THE WHOLE DATASET__
- If we want to do that we should `persist` the results to disk periodically
- Streams will be automatically cleared

# How different transformations work with Streams

## updateStateByKey

> `stream.updateStateByKey(func)` allows us to __keep one state and update it continuously with values coming from a stream__

Given a `func` taking two arguments `(new, accumulated)`

For example:

In [None]:
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)

could accumulate count of words (as the function is applied on a per-key value)

## Window operations 

> __Windowed computations allow you to apply operations over sliding window of data__

![](./images/streaming-dstream-window.png)

These type of operations:
- slide over `DStream`'s `RDD`s
- `RDD`s being within the window are combined via our operation
- These combined values create new `DStream`

For windowed operations we need to specify:
- size of sliding window (`3` in this case) a.k.a. __window length__
- interval at which operation is performed (`2` in this case) a.k.a. __window interval__

An example function could be [`reduceByKeyAndWindow`](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.streaming.DStream.reduceByKeyAndWindow.html), others include:
- `reduceByWindow`
- `countByValueAndWindow`

# Joins

> __Streams can be joined with other streams and `DataFrame`s we have in memory__

Above allows us to:
- accumulate data in memory
- mix multiple streams from different data sources

For example:

In [None]:
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)

> __The same types of joins are supported as in case of `pyspark.sql`__

One can also join streams with `spark.sql.DataFrame` objects via `transform`, __notice one can change the `dataset` we're joining against!__

In [None]:
dataset = ... # some DataFrame

windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))

## Saving Streams

One can also save data afternecessary transformations were applied.

> __These operations will trigger stream running, same as `print`!__

Most common two options supported by Python API:
- [`saveAsTextFiles`](http://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.streaming.DStream.saveAsTextFiles.html) - __save each `RDD` in `DStream` as `str` type to a text file__
- [`foreachRDD(func)`](http://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.streaming.DStream.foreachRDD.html?highlight=foreachrdd) - generic function applying transformation on each `RDD` within `DStream`, __also can be used for saving specific parts of data__

An example of `foreach`-like function could be sending partitions of data to another system:

In [None]:
def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

Let's also see the code snippet below which shows how one can mix `pyspark.sql` with `streams`.
Can you tell what this code does and explain all of the lines?

In [None]:
# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

...

# DataFrame operations inside your streaming program

words = ... # DStream of strings

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf())

        # Convert RDD[String] to RDD[Row] to DataFrame
        rowRdd = rdd.map(lambda w: Row(word=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # Creates a temporary view using the DataFrame
        wordsDataFrame.createOrReplaceTempView("words")

        # Do word count on table using SQL and print it
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
        wordCountsDataFrame.show()
    except:
        pass

words.foreachRDD(process)

## persist and Dstreams

From the previous lesson we have learned about `persistence` and what it does.

> Using `persist()` method on a DStream will automatically persist every RDD of that DStream in memory

Things to note:
- We run multiple operations on the stream (so it doesn't have to be loaded from disk, __speedup over Hadoop's approach__
- By default data from sources are replicated on two nodes for improved fault tolerance
- For some operations (e.g. windowed ones) this is done automatically (as we will compute over multiple RDDs hence it's faster to do that in-memory if possible)

# Checkpointing 

> As streaming services run 24/7 Spark checkpoints data in order to be resistent to faulty nodes, JVM crashes etc.

Checkpointing has two types:
- __Metadata checkpointing__ - saves information __defining the computations__ (steps we have to undertake). Examples would include:
    - Configuration
    - `DStream` operations
    - Yet unprocessed batches of data
- __Data checkpointing__ -  saving generated RDD to distributed storage. Used when:
    - Combining data incoming from stream
    - The longer the chain (the longer service runs) the chain of operations needed to get the result increases
    - To keep it reasonably small periodic checkpointing of RDDs is done and the rest of the chain is discarded 
    
> __In some cases we need to provide directory for checkpointing, ALWAYS CHECK THE DOCS OF OPERATIONS YOU ARE APPLYING ON STREAMS__

## Procedure to follow when working with Streams

Here are the steps one usually employs when working with streams:

1. Define `SparkStreamContext` from Session's context
2. Define the input sources by creating input `DStreams`.
3. Define the streaming computations by applying transformation and output operations to `DStreams`.
4. Start receiving data and processing it using `streamingContext.start()`.
5. Wait for the processing to be stopped (manually or due to any error) using `streamingContext.awaitTermination()`.
6. The processing can be manually stopped using `streamingContext.stop()`.

In addition we should remember (especially when working with multiple streams):

1. __Once a context has been started, no new streaming computations can be set up or added to it__.
2. Once a context has been stopped, it cannot be restarted.
3. Only one `StreamingContext` can be active in a JVM at the same time.
4. `stop()` on `StreamingContext` also stops the `SparkContext`. 
    To stop only the `StreamingContext`, set the optional parameter of `stop()` `stopSparkContext` to false.
5. A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped 
    (without stopping the SparkContext) before the next StreamingContext is created.
