# Trend Analysis on Twitter live data using Spark Streaming

Import sparkContext & StreamingContext from PySpark library.

In [1]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

Create a sparkContext with AppName "SteramingTwitterAnalysis".<br>
Setting the LogLevel of SparkContext to ERROR. This will not print all the logs which are INFO or WARN level.<br>
Create Spark Streaming Context using SC (spark context). parameter 10 is the batch interval. <br>
Every 10 second the analysis will be done.

In [None]:
sc = SparkContext(appName="SteramingTwitterAnalysis")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 10)

Connect to socket broker using ssc (spark streaming context)<br>
Host : "172.31.20.58" (localhost) & port : 5555

In [None]:
socket_stream = ssc.socketTextStream("172.31.20.58", 5555)

window function parameter sets the Window length. All the analysis will be done on tweets stored for 60 secs.

In [None]:
lines = socket_stream.window( 20 )

### Process the Stream:
1. Receives tweet message, stored in lines. **Input DStream**
2. splits the messages into words. **Apply transformation on DStream : flatMap**
3. filters all the words which start with a hashtag(#). **transformation : filter**
4. converts the words to lowercase. **transformation : map**
5. maps each tag to (word, 1). **transformation : map**
6. then reduces and counts occurrences of each hash tag. (action : reduceByKey) hashtags = **output DStream**

In [None]:
hashtags = lines.flatMap( lambda text: text.split( " " ) ).filter( lambda word: word.lower().startswith("#") ).map( lambda word: ( word.lower()
, 1 ) ).reduceByKey( lambda a,b:a+b)

Sort the hashtags based on the counts in decreasing order

In [None]:
author_counts_sorted_dstream = hashtags.transform(lambda foo:foo.sortBy(lambda x:x[0].lower()).sortBy(lambda x:x[1],ascending=False))

Print the final analysis: Most popular hashtags on streaming twitter data

In [None]:
author_counts_sorted_dstream.pprint()

### Starting the Spark Streaming:
Spark Streaming code we have written till now will not execute, untill we start the ssc.<br>
ssc.start() will start the spark streaming context. This is the Action for the whole code. <br>
Now it'll create the lineage & DAG & do the lazy evaluation & start running the whole sequesnce of code.


In [None]:
ssc.start()

awaitTermination() is very important to stop the SSC.<br> 
When we kill this python process then this signal will be sent to awaitTermination() function.<br> 
it will finally stop the spark streaming job.

In [None]:
ssc.awaitTermination()