## ITML606 

## Spark Streaming Example : Wordcount

Let's start by opening a Spark structured session.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

ModuleNotFoundError: No module named 'pyspark'

In [4]:
spark

Next, let’s create a streaming DataFrame that represents text data received from a server listening on localhost:9999, and transform the DataFrame to calculate word counts.

In [15]:
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

25/04/14 09:14:32 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


This lines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it.

Next, we set it up to print the complete set of counts (specified by outputMode("complete")) to the console every time they are updated. And then start the streaming computation using start().

We start by opening port 9999 using netact command line utility. Do this in the terminal.

In [20]:
! nc -lk 9999

^C


In [24]:
# Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

25/04/14 09:18:30 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/hw/gqdk7yr50bd98fn3k2zjf2q80000gn/T/temporary-3da780a1-a485-4312-a6a1-285f04ad4b08. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/14 09:18:30 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
+----+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
| word|count|
+-----+-----+
|hello|    1|
+-----+-----+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+-------+-----+
|   word|count|
+-------+-----+
|example|    1|
|  hello|    2|
|     is|    1|
|  world|    1|
|     an|    1|
|       |    1|
|   this|    1|
+-------+-----+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+-------+-----+
|   word|count|
+-------+-----+
|    you|    1|
|    can|    1|
|    how|    1|
|example|    1|
|  hello|    3|
|    say|    1|
|     is|    1|
|  world|    1|
|     an|    1|
|       |    2|
|   this|    1|
|   many|    1|
+-------+-----+



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 