# **Data Streaming using PySpark [CN7030]**

**`Dr Amin Karami, UEL Docklands Campus, March 2022`**

`E: a.karami@uel.ac.uk`

`W: www.aminkarami.com`

---

**updateStateByKey Operation**:	It allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps:

[1] Define the state - The state can be an arbitrary data type.

[2] Define the state update function - Specify with a function how to update the state using the previous state and the new values from an input stream.

In [1]:
# Load Spark engine
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [3]:
##### add config
sc = SparkContext()
ssc = StreamingContext(sc, 5)
ssc.checkpoint('checkpoint')

22/03/28 14:21:39 WARN Utils: Your hostname, Predator-G3572 resolves to a loopback address: 127.0.1.1; using 172.29.43.74 instead (on interface eth0)
22/03/28 14:21:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/28 14:21:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/28 14:21:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
int_values = ssc.socketTextStream('localhost', 7000)
# open cmd and type: nc -lk 7000

In [5]:
# Function adds new values with previous running count to get new count

# The update function will be called for each word, with newValues having
# a sequence of 1’s (from the (word, 1) pairs) and the runningCount having the previous count.

def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    
    return sum(newValues, runningCount)

In [7]:
# Take a remainder value by 10 at each window for the incoming numbers.
# Counts how many times each number between 0 and 9 is seen.

Remainder = int_values.map(lambda x: int(x) % 10).map(lambda x: (x,1)).updateStateByKey(updateFunction)

Remainder.pprint()

#Remainder.count().pprint()

In [8]:
ssc.start()

                                                                                

-------------------------------------------
Time: 2022-03-28 14:23:10
-------------------------------------------

-------------------------------------------
Time: 2022-03-28 14:23:10
-------------------------------------------



22/03/28 14:23:15 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/03/28 14:23:15 WARN BlockManager: Block input-0-1648473795200 replicated to only 0 peer(s) instead of 1 peers


-------------------------------------------
Time: 2022-03-28 14:23:15
-------------------------------------------

-------------------------------------------
Time: 2022-03-28 14:23:15
-------------------------------------------



22/03/28 14:23:18 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/03/28 14:23:18 WARN BlockManager: Block input-0-1648473798400 replicated to only 0 peer(s) instead of 1 peers
                                                                                

-------------------------------------------
Time: 2022-03-28 14:23:20
-------------------------------------------
(0, 1)
(2, 1)



22/03/28 14:23:21 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
22/03/28 14:23:21 WARN BlockManager: Block input-0-1648473800800 replicated to only 0 peer(s) instead of 1 peers


-------------------------------------------
Time: 2022-03-28 14:23:20
-------------------------------------------
(0, 1)
(2, 1)



                                                                                

-------------------------------------------
Time: 2022-03-28 14:23:25
-------------------------------------------
(0, 2)
(2, 1)

-------------------------------------------
Time: 2022-03-28 14:23:25
-------------------------------------------
(0, 2)
(2, 1)



                                                                                

-------------------------------------------
Time: 2022-03-28 14:23:30
-------------------------------------------
(0, 2)
(2, 1)

-------------------------------------------
Time: 2022-03-28 14:23:30
-------------------------------------------
(0, 2)
(2, 1)



                                                                                

-------------------------------------------
Time: 2022-03-28 14:23:35
-------------------------------------------
(0, 2)
(2, 1)

-------------------------------------------
Time: 2022-03-28 14:23:35
-------------------------------------------
(0, 2)
(2, 1)



                                                                                

-------------------------------------------
Time: 2022-03-28 14:23:40
-------------------------------------------
(0, 2)
(2, 1)

-------------------------------------------
Time: 2022-03-28 14:23:40
-------------------------------------------
(0, 2)
(2, 1)



                                                                                

-------------------------------------------
Time: 2022-03-28 14:23:45
-------------------------------------------
(0, 2)
(2, 1)

-------------------------------------------
Time: 2022-03-28 14:23:45
-------------------------------------------
(0, 2)
(2, 1)



                                                                                

-------------------------------------------
Time: 2022-03-28 14:23:50
-------------------------------------------
(0, 2)
(2, 1)

-------------------------------------------
Time: 2022-03-28 14:23:50
-------------------------------------------
(0, 2)
(2, 1)



                                                                                

-------------------------------------------
Time: 2022-03-28 14:23:55
-------------------------------------------
(0, 2)
(2, 1)

-------------------------------------------
Time: 2022-03-28 14:23:55
-------------------------------------------
(0, 2)
(2, 1)



                                                                                

-------------------------------------------
Time: 2022-03-28 14:24:00
-------------------------------------------
(0, 2)
(2, 1)

-------------------------------------------
Time: 2022-03-28 14:24:00
-------------------------------------------
(0, 2)
(2, 1)



                                                                                

-------------------------------------------
Time: 2022-03-28 14:24:05
-------------------------------------------
(0, 2)
(2, 1)

-------------------------------------------
Time: 2022-03-28 14:24:05
-------------------------------------------
(0, 2)
(2, 1)



[Stage 0:>                                                          (0 + 1) / 1]

In [9]:
ssc.stop(stopSparkContext=True, stopGraceFully=True)

22/03/28 14:24:09 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:171)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.io.InputStreamReader.read(InputStreamReader.java:184)
	at java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.io.BufferedReader.readLine(BufferedReader.java:324)
	at java.io.BufferedReader.readLine(BufferedReader.java:389)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:121)
	at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.getNext(SocketInputDStream.scala:119)
	at org.apache.spar

-------------------------------------------
Time: 2022-03-28 14:24:10
-------------------------------------------
(0, 2)
(2, 1)

-------------------------------------------
Time: 2022-03-28 14:24:10
-------------------------------------------
(0, 2)
(2, 1)



22/03/28 14:24:10 WARN ReceivedBlockTracker: Exception thrown while writing record: BatchCleanupEvent(Vector(1648473825000 ms)) to the WriteAheadLog.
java.lang.IllegalStateException: close() was called on BatchedWriteAheadLog before write request with time 1648473850820 could be fulfilled.
	at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:88)
	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:244)
	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:177)
	at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:228)
	at org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(JobGenerator.scala:290)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:190)
	at org.apache.spark.streaming.scheduler.JobGen

-------------------------------------------
Time: 2022-03-28 14:24:15
-------------------------------------------
(0, 2)
(2, 1)

-------------------------------------------
Time: 2022-03-28 14:24:15
-------------------------------------------
(0, 2)
(2, 1)



22/03/28 14:24:15 WARN ReceivedBlockTracker: Exception thrown while writing record: BatchCleanupEvent(Vector(1648473830000 ms, 1648473825000 ms)) to the WriteAheadLog.
java.lang.IllegalStateException: close() was called on BatchedWriteAheadLog before write request with time 1648473855841 could be fulfilled.
	at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:88)
	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:244)
	at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:177)
	at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:228)
	at org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(JobGenerator.scala:290)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:190)
	at org.apache.spark.streamin