# Chapter 7.2 - Spark Streaming

Paul E. Anderson

## Ice Breaker

What are you most looking forward to this holiday break?

## Annotated Example: WordCount

### The usual SparkContext

In [4]:
from pyspark import SparkConf
from pyspark.context import SparkContext

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)]) # We'll use this later

### Grab a streaming context

In [12]:
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 1)

### After a context is defined:
* Define the input sources by creating input DStreams.
* Define the streaming computations by applying transformation and output operations to DStreams.
* Start receiving data and processing it using streamingContext.start().
* Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
* The processing can be manually stopped using streamingContext.stop().

### Points to remember:
* Once a context has been started, no new streaming computations can be set up or added to it.
* Once a context has been stopped, it cannot be restarted.
* Only one StreamingContext can be active in a JVM at the same time.
* A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.

In [7]:
PORT=9999 # Change this to a unique port before running individually
HOST="localhost"

In [10]:
print("Run this command at the terminal and type in words and hit enter periodically:")
print(f"nc -lk {PORT}")

Run this command at the terminal and type in words and hit enter periodically:
nc -lk 9999


### Discretized Streams (DStreams)
* DStream is the basic abstraction provided by Spark Streaming
* Continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. 
* Internally, a DStream is represented by a continuous series of RDDs
* Each RDD in a DStream contains data from a certain interval, as shown in the following figure.

<img src="https://spark.apache.org/docs/latest/img/streaming-dstream.png">

* Any operation applied on a DStream translates to operations on the underlying RDDs. 
* In our example of converting a stream of lines to words, the flatMap operation is applied on each RDD in the lines DStream to generate the RDDs of the words DStream. 
* This is shown in the following figure:
<img src="https://spark.apache.org/docs/latest/img/streaming-dstream-ops.png">

In [13]:
lines = ssc.socketTextStream(HOST, PORT)
counts = lines.flatMap(lambda line: line.split(" "))\
              .map(lambda word: (word, 1))\
              .reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
import time; time.sleep(10)
#ssc.awaitTerminationOrTimeout(60) # wait 60 seconds
ssc.stop(stopSparkContext=False)

                                                                                

-------------------------------------------
Time: 2023-11-14 02:42:27
-------------------------------------------

-------------------------------------------
Time: 2023-11-14 02:42:28
-------------------------------------------

-------------------------------------------
Time: 2023-11-14 02:42:29
-------------------------------------------

-------------------------------------------
Time: 2023-11-14 02:42:30
-------------------------------------------

-------------------------------------------
Time: 2023-11-14 02:42:31
-------------------------------------------

-------------------------------------------
Time: 2023-11-14 02:42:32
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:42:33
-------------------------------------------

-------------------------------------------
Time: 2023-11-14 02:42:34
-------------------------------------------



23/11/14 02:42:36 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
23/11/14 02:42:36 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
	at java.base/java.net.SocketInputStream.socketRead0(Native Method)
	at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
	at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
	at org.apache

-------------------------------------------
Time: 2023-11-14 02:42:35
-------------------------------------------



Exception in thread "receiver-supervisor-future-0" java.lang.InterruptedException: sleep interrupted
	at java.base/java.lang.Thread.sleep(Native Method)
	at org.apache.spark.streaming.receiver.ReceiverSupervisor.$anonfun$restartReceiver$1(ReceiverSupervisor.scala:196)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.ja

-------------------------------------------
Time: 2023-11-14 02:42:36
-------------------------------------------

-------------------------------------------
Time: 2023-11-14 02:42:37
-------------------------------------------



**Stop and think:** What is missing in our previous example? 

One thing is a lack of state. We process the lines in an RDD/DStream and print the results. What if we wanted to accumulate the word counts?

In [17]:
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")

# RDD with initial state (key, value) pairs

def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream(HOST,PORT)
running_counts = lines.flatMap(lambda line: line.split(" "))\
                      .map(lambda word: (word, 1))\
                      .updateStateByKey(updateFunc, initialRDD=initialStateRDD)

running_counts.pprint()

ssc.start()
import time; time.sleep(15)
#ssc.awaitTerminationOrTimeout(60) # wait 60 seconds
ssc.stop(stopSparkContext=False)

                                                                                

-------------------------------------------
Time: 2023-11-14 02:45:05
-------------------------------------------
('hello', 1)
('world', 1)



                                                                                

-------------------------------------------
Time: 2023-11-14 02:45:06
-------------------------------------------
('hello', 1)
('world', 1)



                                                                                

-------------------------------------------
Time: 2023-11-14 02:45:07
-------------------------------------------
('hello', 1)
('world', 1)



                                                                                

-------------------------------------------
Time: 2023-11-14 02:45:08
-------------------------------------------
('hello', 1)
('world', 1)



23/11/14 02:45:37 WARN ReceiverTracker: Not all of the receivers have deregistered, Vector(0)
23/11/14 02:45:37 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.
23/11/14 02:45:37 ERROR ReceiverSupervisorImpl: Error stopping receiver 0 org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.onReceiverStop(ReceiverSupervisorImpl.scala:199)
	at org.apache.spark.streaming.receiver.ReceiverSupervisor.stopReceiver(ReceiverSupervisor.scala:172)
	at org.apache.spark.streaming.receiver.ReceiverSupervisor.stop(ReceiverSupervisor.sca

-------------------------------------------
Time: 2023-11-14 02:45:09
-------------------------------------------
('hello', 1)
('world', 1)



23/11/14 02:46:21 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:9999
java.net.ConnectException: Connection timed out (Connection timed out)
	at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
	at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
	at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.base/java.net.Socket.connect(Socket.java:609)
	at java.base/java.net.Socket.connect(Socket.java:558)
	at java.base/java.net.Socket.<init>(Socket.java:454)
	at java.base/java.net.Socket.<init>(Socket.java:231)
	at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
	at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(Receiv

## Monitoring a directory

You can monitor a directory and apply the same processing.

In [19]:
data_dir = "/tmp/add_books_here"

ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")

# RDD with initial state (key, value) pairs

def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

lines = ssc.textFileStream(data_dir)

running_counts = lines.flatMap(lambda line: line.split(" "))\
                      .map(lambda word: (word, 1))\
                      .updateStateByKey(updateFunc)

running_counts.pprint()

ssc.start()
import time; time.sleep(60)
#ssc.awaitTerminationOrTimeout(60) # wait 60 seconds
ssc.stop(stopSparkContext=False)



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:03
-------------------------------------------



23/11/14 02:51:08 WARN FileInputDStream: Time taken to find new files 3649 exceeds the batch size. Consider increasing the batch size or reducing the number of files in the monitored directories.
                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:04
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:05
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:06
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:07
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:08
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:09
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:10
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:11
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:12
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:13
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:14
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:15
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:16
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2023-11-14 02:51:17
-------------------------------------------
('of', 27385)
('', 78826)
('no', 2135)
('away', 358)
('Life', 41)
('results', 7)
('mind', 284)
('willing', 44)
('have', 4700)
('property.', 4)
...

-------------------------------------------
Time: 2023-11-14 02:51:18
-------------------------------------------
('of', 27385)
('', 78826)
('no', 2135)
('away', 358)
('Life', 41)
('results', 7)
('mind', 284)
('willing', 44)
('have', 4700)
('property.', 4)
...

-------------------------------------------
Time: 2023-11-14 02:51:19
-------------------------------------------
('of', 27385)
('', 78826)
('no', 2135)
('away', 358)
('Life', 41)
('results', 7)
('mind', 284)
('willing', 44)
('have', 4700)
('property.', 4)
...

-------------------------------------------
Time: 2023-11-14 02:51:20
-------------------------------------------
('of', 27385)
('', 78826)
('no', 2135)
('away', 358)
('Life', 41)
('results', 7)
('mind', 284)
('wil

23/11/14 02:51:57 WARN FileInputDStream: Time taken to find new files 3331 exceeds the batch size. Consider increasing the batch size or reducing the number of files in the monitored directories.
23/11/14 02:52:06 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.

### Bridging Streaming and Spark SQL

In [20]:
data_dir = "/tmp/add_books_here"


from pyspark.sql import SparkSession
from pyspark.sql import Row
import traceback

# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")

lines = ssc.textFileStream(data_dir)

def process(time, rdd):
    print("========= %s =========" % str(time))
    if rdd.isEmpty():
        return
    # Get the singleton instance of SparkSession
    try:
        spark = getSparkSessionInstance(rdd.context.getConf())
        # Convert RDD[String] to RDD[Row] to DataFrame
        words = rdd.flatMap(lambda line: line.split(" ")).map(lambda word: word)
        rowRdd = words.map(lambda w: Row(word=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # Creates a temporary view using the DataFrame
        wordsDataFrame.createOrReplaceTempView("words")

        # Do word count on table using SQL and print it
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
        print(wordCountsDataFrame.show())
    except Exception:
        print(traceback.format_exc())

lines.foreachRDD(process)

ssc.start()
import time; time.sleep(30)
#ssc.awaitTerminationOrTimeout(60) # wait 60 seconds
ssc.stop(stopSparkContext=False)



                                                                                

+-----------+-----+
|       word|total|
+-----------+-----+
|     online|   24|
|     LITTLE|   18|
|    JOHNSON|    4|
|       some|  578|
|      MUSTY|    2|
| DEPARTURE.|    2|
|        IS,|    4|
|     BOUND.|    2|
|  forgetful|    1|
|       eye.|   14|
|        few|  111|
|     Heaven|   14|
|     CASTLE|    1|
|     waters|   10|
|      those|  274|
|        art|   34|
|      spoil|    4|
|         By|   74|
|       cot,|    1|
|whip-handle|    2|
+-----------+-----+
only showing top 20 rows

None


23/11/14 02:54:25 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer queue interrupted.

+------------+-----+
|        word|total|
+------------+-----+
|      online|   40|
|     embrace|   13|
|  unlearning|    1|
|        some|  703|
|       those|  495|
|   theorists|    1|
|         few|  242|
|   arguments|    4|
|      freaks|    2|
|       still|  239|
|      poetry|   12|
|        cot,|    1|
|     Having,|    1|
|      doubts|    6|
|    cautious|    5|
| transmitted|    3|
|          By|  140|
|        “If,|    2|
|         art|   65|
|vicissitudes|    2|
+------------+-----+
only showing top 20 rows

None


                                                                                