## Trend Analysis on Twitter Live Data Using Spark Streamimg

In [None]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

- Create a sparkContext with appName 'StreamingTwitterAnalysis'
- Set the log level of sparkContext to Error. This will not print all the logs which are Info or Warn level.
- Create spark streaming context using sc, parameter 10 is batch interval. Every 10 seconds analysis will be done.

In [None]:
sc = SparkContext(appName = "StreamingTwitterAnalysis")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 10)

In [None]:
#sc.stop()

- Connect to socket broker using ssc.

In [None]:
socket_stream = ssc.socketTextStream("127.0.0.1",9999)

- Windows function parameter sets window length. All the analysis will be done on tweets stored for 60 seconds.

In [None]:
lines = socket_stream.window(60)

- Receive tweet message, stored in lines. Splits messages into words, apply transformation on DStream: flatMap
- Filter all words starting with '#', tranformation: filter.
- Convert words to lowercase and map each tag to (word,1), transformation: Map
- Then reduce and count the occurences of each hashtag(#).

In [None]:
hashtags = lines.flatMap(lambda text: text.split("").filter(lambda word: word.lower().startswith("#")).map(lambda word: (word.lower(),1)).reduceByKey (lambda a,b: a+b))

- Sort the hashtags based on the counts in decreasing order.

In [None]:
author_counts_sorted_dstream = hashtags.transform(lambda foo: foo.sortBy(lambda x: x[0].lower()). sortBy(lambda x: x[1], ascending = False))

- Print the final analysis. Most ppopular hashtags on streaming twitter data.

In [None]:
author_counts_sorted_dstream.pprint()

- ssc.start() will start spark streaming context. This is the action for whole code. Now it'll create the lineage & DAG. will     do lazy evaluation and start running the code.

In [None]:
ssc.start()

- awaitTermination() is used to stop the ssc. 

In [None]:
ssc.awaitTermination()

In [None]:
sc.stop()