### Spart Streaming through SparkContext
###### We used netcat streaming server on linux  (nc  -lk  9999)
###### The stream client used was context.start() in jupyter notebook

STEPS for streaming:
* Create a SparkContext
* Create a StreamingContext
* Create a Socket Text Stream
* Read in the lines as a "DStream"
* Then we process the lines and finally printed the results using pprint() on DStream object
* The above steps are built but not executed till we call streamingContext<b>.start()</b>
* The reading o flines as DStream till the calls involving thae context and assocaited objects keeps getting invoked till context.stop() is called by us or an exception occurs.

The connection to actual terminal (at netcat server "nc") happens through this context from our python script or jupyter notebook and then line is read from the socket connection.

#### Spark Streaming
* Here we are using SparkContext
* In structured streaming SparkSession is used

Steps for working with the data after lines are read as DStream
* Split the line into a list of words.
* Map each word into a tuple (word,1)
* Then group (reduce) the tuples by the word (key) and sum up the second argument (the number one), this actually gives the count of words

N.B.: RDD syntax relies havily on lambda expression, which are just quick anonymous functions.

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [None]:
%%time
sc = SparkContext('local[2]', 'NetworkWordCount')  ##Same as SparkContext(master='local[2]', appName='NetworkWordCount')

* The second param of SparkContext() is <b>batchDuration</b> -- It is time interval (in seconds) at which the streaming data will be divided into batches
* <b>context.awaitTermination()</b> -- allows the current thread to wait for the termination of the context by <b>stop()</b> or by an exception.
* the streaming computation can be started and stopped using <b>context.start()</b> and <b>context.stop()</b> respectively.

In [None]:
ssc = StreamingContext(sc,5)  ##* The batchDuration param is 5 seconds

* Get all the lines form the stream received in last one second

In [None]:
lines = ssc.socketTextStream(hostname='localhost', port=9999)

* collect all the words from all the lines received within a second

In [None]:
lines

In [None]:
words = lines.flatMap(lambda line: line.split(' '))

Explanation of map() and flatMap() of DStream or RDD
* map() of three lists returns list of lists
* map() of [a,b], [a,c], [d,e] is [[a,b],[a,c],[d.e]], which is a list consisting of the individual list memebrs
* 
* flatMap() of three lists returns list of the members of the lists
* flatMap() of [a,b], [a,c], [d,e] is [a,b,a,c,d.e], which is a list constructed of the members

* For the list of words return a list of tuples of form (word,1)

In [None]:
pairs = words.map(lambda word: (word,1))

* Use reduceByKey() to groupby the key (here word) to get the count of each word
* It combines values with the same key
* This method reduceByKey() assumes that the first item of the tuple as the key
* This method reduceByKey() works on each key at the same time by reducing
* We can treat this similar to groupBy()
* It takes two argument, where 1st argument is the sum of second items before now and  2nd argument is the second item of the current tuple and so on.
* REFER pg 66 of book "Learning Spark frim Oreily"
* If an RDD is rdd1 = {(1,2),(3,4),(3.6), (3,1)} -- a set of tuples
    * rdd1.reduceByKey(lambda x,y: x+y) returns a list of tuples {(1,2), (3,11)} 
    * There is one tuple with key '1' and value of '2' and hence the value returned for 1 is 2
    * There are three tuples with key '3' which are '4','6' and '1' and hence the value returned is 4+6+1 or 11

In [None]:
word_counts = pairs.reduceByKey(lambda num1,num2: num1+num2 )

In [None]:
word_counts.pprint()

* As earlier documented for StreamingContext() the streaming computation can be started and stopped using context.start() and context.stop() respectively.
* So before invoking context.start(), we need to ensure that the streaming server has already started at the hostname and port used in context.socketTextStream() earlier.

* So before invoking ssc.start() we need to start the netcat server using "nc -lk 9999" in a new unix terminal, as ssc.socketTextStream() tries to connect localhost:9999

###### ----------------------------------------------------------------------------------------------------
###### What ever lines (sequence of words ending with a "Return" key stroke) we type at the linux terminal running "nc-lk 9999" netcat server,
######     context.start() will start printing them by grouping by words
* ssc.start() polls the server every 5 seconds and repeats the loop starting from ssc.socketTextStream(5 sec)--through to --word_counts.pprint(), till we call ssc.stop() (context.stop)

###### ----------------------------------------------------------------------------------------------------

#### STOP HERE AND CHECK STREAMING SERVER ON THE TERMINAL

In [None]:
ssc.start()

#### STOP HERE AND ENSURE YOUR TESTING IS OVER BEFORE CALLING context.stop()

In [None]:
ssc.stop()

* Once stopped we need to start the entire process scarting from SpacContext() creation as all the contexts get stopped once we call streaming context.stop()

###### -------------------------------------------------------------------------------------
### We will now use SparkSession to achieve the same Network word count program
###### -------------------------------------------------------------------------------------

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark1 = SparkSession.builder.appName('StructuredNetworkWordCount').getOrCreate()

In [3]:
lines_new = spark1.readStream.format('socket').option('host', 'localhost').option('port', 9999).load()

In [4]:
lines_new

DataFrame[value: string]

In [5]:
from pyspark.sql.functions import explode, split

In [6]:
# Split the lines into words
words_new = lines_new.select( explode(split(lines_new.value, " ")).alias("word"))

In [7]:
# Generate running word count
wordCounts = words_new.groupBy("word").count()

In [8]:
# Start running the query that prints the running counts to the console

query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

In [9]:
query

<pyspark.sql.streaming.StreamingQuery at 0x7fd8d460f9d0>

In [10]:
query.awaitTermination()

KeyboardInterrupt: 

### STOP HERE AND CHECK THE APPLICATION

* If JDK is higher than 1.8 then we get the below error when we run StreamingQuery.awaitTermination()
    * StreamingQueryException: 'Unsupported class file major version 55
    * My "java -version" at ubuntu shows "openjdk 11.0.7", hence probably this issue comes.
    
* Issue got resolved once I switched to JDK 8 from the current installation of JDK 11
    * INSTALL JDK8:   <b>sudo apt install openjdk-8-jdk</b>
    * CONFIGURE DEFAULT JAVA to use ver 8:  <b>sudo update-alternatives --config java</b>
    * REFER: https://stackoverflow.com/questions/53583199/pyspark-error-unsupported-class-file-major-version
    
### HOW TO SWITCH JAVA TO DIFFERENT VERSION ON LINUX
* Refer above for the answer of switching my default java from jdk 11 to jdk 8

In [None]:
query.stop()

### Similarities in Streaming of SpackContext vs SparkSession
1. The netcat serve "nc -lk 9999" must be started prior to invoking conext.start (for SparkContext) or streamingquery.start() for SparkSession

### Differences in Streaming of SpackContext vs SparkSession
1. SparkContext uses StreamingContext but SparkSession uses StreamingQuery
2. SparkContext case output happens to browser(), in SparkSession case output happens to unix terminal taht started the jupyter notebook
2. context outputs for the delta input typed in the last interval BUT session outputs for the entire text typed to the server prompt till that time, including those which were processed earlier processings.
3. context polls server at every interval boundry, BUT in case of session the streaming server sends the input immediately to the jupyter unix terminal console and it triggers a "Staging" activity (which takes relatively longer time and finally prints a dataframe with the result in a spark table format to the terminal comsole.
4. In both cases .start() returns immediately.
    * In case of context the output block of 'start()' keeps printing till context is stopped.
    * In case of session the query.awaitTermination() goes into running mode and never returns till we stop the thread illegally in jupyter notebook. Calling query.stop() does not have any impact and the server keeps processing and output keeps coming to juputer unix terminal whenever a "Return" key stroke is entered after a line on the streaming server terminal. Only when we
3. In case of context the process stops when we call context.stop(), BUT in case of sessionthe query.stop() does not have any impact, we need to force stop the action for spark session using "Interrupt the kernel" button from top of jupyter browser.

