# Chapter 7.2 - Spark Streaming

Paul E. Anderson

## Ice Breaker

What are you most looking forward to this holiday break?

## Annotated Example: WordCount

### The usual SparkContext

In [1]:
from pyspark import SparkConf
from pyspark.context import SparkContext

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)]) # We'll use this later

### Grab a streaming context

In [2]:
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 1)

### After a context is defined:
* Define the input sources by creating input DStreams.
* Define the streaming computations by applying transformation and output operations to DStreams.
* Start receiving data and processing it using streamingContext.start().
* Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
* The processing can be manually stopped using streamingContext.stop().

### Points to remember:
* Once a context has been started, no new streaming computations can be set up or added to it.
* Once a context has been stopped, it cannot be restarted.
* Only one StreamingContext can be active in a JVM at the same time.
* A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.

In [31]:
PORT=9999 # Change this to a unique port before running individually
HOST="localhost"

In [32]:
print("Run this command at the terminal and type in words and hit enter periodically:")
print(f"nc -lk {PORT}")

Run this command at the terminal and type in words and hit enter periodically:
nc -lk 9999


### Discretized Streams (DStreams)
* DStream is the basic abstraction provided by Spark Streaming
* Continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. 
* Internally, a DStream is represented by a continuous series of RDDs
* Each RDD in a DStream contains data from a certain interval, as shown in the following figure.

<img src="https://spark.apache.org/docs/latest/img/streaming-dstream.png">

* Any operation applied on a DStream translates to operations on the underlying RDDs. 
* In our example of converting a stream of lines to words, the flatMap operation is applied on each RDD in the lines DStream to generate the RDDs of the words DStream. 
* This is shown in the following figure:
<img src="https://spark.apache.org/docs/latest/img/streaming-dstream-ops.png">

In [33]:
lines = ssc.socketTextStream(HOST, PORT)
counts = lines.flatMap(lambda line: line.split(" "))\
              .map(lambda word: (word, 1))\
              .reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
import time; time.sleep(1)
#ssc.awaitTerminationOrTimeout(60) # wait 60 seconds
ssc.stop(stopSparkContext=False)

-------------------------------------------
Time: 2021-11-03 09:30:19
-------------------------------------------

-------------------------------------------
Time: 2021-11-03 09:30:20
-------------------------------------------



**Stop and think:** What is missing in our previous example? 

One thing is a lack of state. We process the lines in an RDD/DStream and print the results. What if we wanted to accumulate the word counts?

In [36]:
sc

In [39]:
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")

# RDD with initial state (key, value) pairs

def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream(HOST,PORT)
running_counts = lines.flatMap(lambda line: line.split(" "))\
                      .map(lambda word: (word, 1))\
                      .updateStateByKey(updateFunc, initialRDD=initialStateRDD)

running_counts.pprint()

ssc.start()
import time; time.sleep(10)
#ssc.awaitTerminationOrTimeout(60) # wait 60 seconds
ssc.stop(stopSparkContext=False)

-------------------------------------------
Time: 2021-11-03 09:32:21
-------------------------------------------
('', 3)
('hello', 1)
('world', 1)

-------------------------------------------
Time: 2021-11-03 09:32:22
-------------------------------------------
('', 3)
('hello', 1)
('world', 1)

-------------------------------------------
Time: 2021-11-03 09:32:23
-------------------------------------------
('', 3)
('hello', 1)
('world', 1)

-------------------------------------------
Time: 2021-11-03 09:32:24
-------------------------------------------
('', 3)
('hello', 1)
('world', 1)

-------------------------------------------
Time: 2021-11-03 09:32:25
-------------------------------------------
('', 3)
('hello', 2)
('world', 1)

-------------------------------------------
Time: 2021-11-03 09:32:26
-------------------------------------------
('', 3)
('hello', 2)
('world', 1)

-------------------------------------------
Time: 2021-11-03 09:32:27
------------------------------------

## Monitoring a directory

You can monitor a directory and apply the same processing.

In [4]:
data_dir = "/tmp/add_books_here"

ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")

# RDD with initial state (key, value) pairs

def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

lines = ssc.textFileStream(data_dir)

running_counts = lines.flatMap(lambda line: line.split(" "))\
                      .map(lambda word: (word, 1))\
                      .updateStateByKey(updateFunc, initialRDD=initialStateRDD)

running_counts.pprint()

ssc.start()
import time; time.sleep(1)
#ssc.awaitTerminationOrTimeout(60) # wait 60 seconds
ssc.stop(stopSparkContext=False)



-------------------------------------------
Time: 2021-11-03 09:55:00
-------------------------------------------
('hello', 1)
('world', 1)



### Bridging Streaming and Spark SQL

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import traceback

# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")

lines = ssc.textFileStream(data_dir)

def process(time, rdd):
    print("========= %s =========" % str(time))
    if rdd.isEmpty():
        return
    # Get the singleton instance of SparkSession
    try:
        spark = getSparkSessionInstance(rdd.context.getConf())
        # Convert RDD[String] to RDD[Row] to DataFrame
        words = rdd.flatMap(lambda line: line.split(" ")).map(lambda word: word)
        rowRdd = words.map(lambda w: Row(word=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # Creates a temporary view using the DataFrame
        wordsDataFrame.createOrReplaceTempView("words")

        # Do word count on table using SQL and print it
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
        print(wordCountsDataFrame.show())
    except Exception:
        print(traceback.format_exc())

lines.foreachRDD(process)

ssc.start()
import time; time.sleep(10)
#ssc.awaitTerminationOrTimeout(60) # wait 60 seconds
ssc.stop(stopSparkContext=False)

+--------------------+-----+
|                word|total|
+--------------------+-----+
|              online|    4|
|              Flower|    1|
|                 ...|    4|
|Descendants.--Exa...|    1|
|               those|   82|
|           destitute|   18|
|         sand-hills;|    3|
|        vicissitudes|    1|
|                some|  160|
|           Asiatics,|    1|
|                 few|   74|
|             pitcher|    1|
|              travel|   10|
|              freaks|    1|
|               still|   43|
|             tresses|    1|
|              waters|   16|
|                 art|    6|
|              ransom|    1|
|         unequivocal|    1|
+--------------------+-----+
only showing top 20 rows

None
