In [None]:
import findspark
findspark.init('/usr/local/spark')
import pyspark

Import StreamingContext which is the main entry point for all streaming functionality.

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

Create a SparkContext with two execution threads, and StreamingContext with batch interval of 1 second.

In [None]:
sc = SparkContext("local[2]", "StreamingWordCount")
ssc = StreamingContext(sc, 1)

Using this context, we can create a DStream that represents streaming data from a TCP source, specified as hostname (e.g. localhost) and port (e.g. 9999).

In [None]:
lines = ssc.socketTextStream("localhost", 9999)

This lines DStream represents the stream of data that will be received from the data server.

In [None]:
type(lines)

Each record in this DStream is a line of text. Next, we need to split the lines by space into words.

In [None]:
words = lines.flatMap(lambda line: line.split(" "))

This operation creates a new DStream named words. The elements of this DStream will be the individual words from the lines.

In [None]:
type(words)

The words DStream is further transformed to a DStream of (word, 1) pairs.
And reduceByKey operation generates the count for each key i.e. word.

In [None]:
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

Use pprint (pretty print) function to print the counts generated every second

In [None]:
wordCounts.pprint()

When the above code is executed the computations are only set up by SparkStreaming. They will be executed when SparkStreaming is started as below.

In [None]:
ssc.start()

We can terminate it by interrupting the kernel (sending Control+C)

In [None]:
ssc.awaitTermination()

In [None]:
ssc.stop()