# Trending Hashtag Analysis on Twitter live data using Spark Streaming

**Import sparkContext & StreamingContext from PySpark library**

In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/bin/python3"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")
os.environ["PYTHONIOENCODING"] = "utf8"

In [2]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

- Creating a **sparkContext** with AppName "TwitterStreaming".<br>
- Setting the LogLevel of SparkContext to ERROR. This will not print all the logs which are INFO or WARN level.<br>
- Creating Spark Streaming Context using SC (spark context). Parameter 10 is the batch interval. <br>
Every 10 second the analysis will be done.

In [3]:
sc = SparkContext("local[2]","Twitter Streaming")
sc.setLogLevel('ERROR')
ssc = StreamingContext(sc, 10)

Connecting to socket broker using ssc (spark streaming context)<br>
Host : "127.0.0.1" (localhost) & port : 7777 (It can be anything but it has to be same in both the notebooks)

In [4]:
stream_data = ssc.socketTextStream("127.0.0.1", 7777)

ssc.checkpoint("checkpoint-dir")

window function parameter sets the Window length. All the analysis will be done on tweets stored for 20 secs.

In [5]:
twitter_data = stream_data.window(20)

### Process the Stream:
1. Receives tweet message, stored in lines. **Input DStream**
2. splits the messages into words. **Apply transformation on DStream : flatMap**
3. filters all the words which start with a hashtag(#). **transformation : filter**
4. converts the words to lowercase. **transformation : map**
5. maps each tag to (word, 1). **transformation : map**
6. then reduces and counts occurrences of each hash tag. (action : reduceByKey) hashtags = **output DStream**

In [6]:
word_data = twitter_data.flatMap(lambda text: text.split(" "))

In [7]:
filtered_data = word_data.filter(lambda word: word.lower().startswith("#"))

In [8]:
hashtag_count = filtered_data.map(lambda word: (word.lower(), 1)).reduceByKey(lambda a, b:a+b)

Sorting the hashtags based on the counts in decreasing order

In [9]:
hashtag_sorted = hashtag_count.transform(lambda foo: foo.sortBy(lambda x:x[1], ascending = False))

Printing the final analysis: Most popular hashtags on streaming twitter data

In [10]:
hashtag_sorted.pprint()

### Starting the Spark Streaming:
ssc.start() will start the spark streaming context. This is the Action for the whole code. <br>
It'll create the lineage & DAG & do the lazy evaluation & start running the whole sequence of code.


In [11]:
ssc.start()

**awaitTermination()** is very important to stop the SSC.<br> 
When we kill this python process then this signal will be sent to awaitTermination() function.<br> 
It will finally stop the spark streaming job.

In [12]:
ssc.awaitTermination()

-------------------------------------------
Time: 2021-02-13 17:09:50
-------------------------------------------
('#vaccine', 2)
('#covid19', 1)
('#vaccineswork', 1)
('#covid19vaccine', 1)
('##covid', 1)
('#nih', 1)

-------------------------------------------
Time: 2021-02-13 17:10:00
-------------------------------------------
('#covid19', 3)
('#vaccine', 2)
('#vaccineswork', 1)
('#covid19vaccine', 1)
('#himveers', 1)
('#palmbeachcounty', 1)
('#latest:', 1)
('##covid', 1)
('#nih', 1)
('#ministry_of_health', 1)

-------------------------------------------
Time: 2021-02-13 17:10:10
-------------------------------------------
('#covid19', 3)
('#himveers', 1)
('#palmbeachcounty', 1)
('#latest:', 1)
('#coronava…', 1)
('#ministry_of_health', 1)
('#vaccine', 1)
('#walton', 1)

-------------------------------------------
Time: 2021-02-13 17:10:20
-------------------------------------------
('#covid19', 3)
('#coronava…', 1)
('#pcr', 1)
('#covid', 1)
('#unpaidcarer', 1)
('#covidkim,', 1)
('#ل

KeyboardInterrupt: 