# Trending Hashtag Analysis on Twitter live data using Spark Streaming

**Import sparkContext & StreamingContext from PySpark library**

In [1]:
import os
import sys
# Here you need to have same Python version on your local machine adn on worker node i.e. EC2. here both should have python3.
os.environ["PYSPARK_PYTHON"] = "/bin/python3"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")
os.environ["PYTHONIOENCODING"] = "utf8"

# os.environ["PYSPARK_PYTHON"]="/home/ec2-user/spark-2.4.4-bin-hadoop2.7/python"
# os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
# os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
# os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
# sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [2]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

- Create a **sparkContext** with AppName "TwitterStreaming".<br>
- Setting the LogLevel of SparkContext to ERROR. This will not print all the logs which are INFO or WARN level.<br>
- Create Spark Streaming Context using SC (spark context). Parameter 10 is the batch interval. <br>
Every 10 second the analysis will be done.

In [3]:
sc = SparkContext("local[2]","TwitterStreaming")
sc.setLogLevel('ERROR')
ssc = StreamingContext(sc, 5)

21/10/26 15:17:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Connect to socket broker using ssc (spark streaming context)<br>
Host : "127.0.0.1" (localhost) & port : 7777 (It can be anything but it has to be same in both the notebooks)

In [4]:
stream_data = ssc.socketTextStream("127.0.0.1", 7777)

ssc.checkpoint("checkpoint-dir")

window function parameter sets the Window length. All the analysis will be done on tweets stored for 20 secs.

In [5]:
twitter_data = stream_data.window(10)

### Process the Stream:
1. Receives tweet message, stored in lines. **Input DStream**
2. splits the messages into words. **Apply transformation on DStream : flatMap**
3. filters all the words which start with a hashtag(#). **transformation : filter**
4. converts the words to lowercase. **transformation : map**
5. maps each tag to (word, 1). **transformation : map**
6. then reduces and counts occurrences of each hash tag. (action : reduceByKey) hashtags = **output DStream**

In [6]:
word_data = twitter_data.flatMap(lambda text: text.split(" "))

In [7]:
filtered_data = word_data.filter(lambda word: word.lower().startswith("#"))

In [8]:
hashtag_count = filtered_data.map(lambda word: (word.lower(), 1)).reduceByKey(lambda a, b:a+b)

Sort the hashtags based on the counts in decreasing order

In [9]:
hashtag_sorted = hashtag_count.transform(lambda foo: foo.sortBy(lambda x:x[1], ascending = False))
hashtag_sorted.pprint()

Print the final analysis: Most popular hashtags on streaming twitter data

### Starting the Spark Streaming:
Spark Streaming code we have written till now will not execute, untill we start the ssc.<br>
ssc.start() will start the spark streaming context. This is the Action for the whole code. <br>
Now it'll create the lineage & DAG & do the lazy evaluation & start running the whole sequesnce of code.


In [10]:
ssc.start()

[Stage 0:>                                                          (0 + 1) / 1]

**awaitTermination()** is very important to stop the SSC.<br> 
When we kill this python process then this signal will be sent to awaitTermination() function.<br> 
It will finally stop the spark streaming job.

In [11]:
ssc.awaitTermination()

                                                                                

-------------------------------------------
Time: 2021-10-26 15:18:10
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2021-10-26 15:18:15
-------------------------------------------
('#covid19', 1)
('#corona', 1)
('#coronavirus', 1)
('#rusistas', 1)
('#covid_19', 1)
('#juliettefreire', 1)



                                                                                

-------------------------------------------
Time: 2021-10-26 15:18:20
-------------------------------------------
('#covid19', 1)
('#corona', 1)
('#coronavirus', 1)
('#rusistas', 1)
('#covid_19', 1)
('#juliettefreire', 1)



                                                                                

-------------------------------------------
Time: 2021-10-26 15:18:25
-------------------------------------------
('#bundestag', 2)
('#salmankhan', 1)
('#corona', 1)
('#germany:', 1)
('#klima', 1)
('#corona-warnstufe.', 1)



                                                                                

-------------------------------------------
Time: 2021-10-26 15:18:30
-------------------------------------------
('#salmankhan', 2)
('#bundestag', 2)
('#corona', 1)
('#germany:', 1)
('#klima', 1)
('#corona-warnstufe.', 1)



                                                                                

-------------------------------------------
Time: 2021-10-26 15:18:35
-------------------------------------------
('#salmankhan', 1)



                                                                                

-------------------------------------------
Time: 2021-10-26 15:18:40
-------------------------------------------
('#corona…', 1)



                                                                                

-------------------------------------------
Time: 2021-10-26 15:18:45
-------------------------------------------
('#corona…', 1)
('#vaccinatie', 1)



                                                                                

-------------------------------------------
Time: 2021-10-26 15:18:50
-------------------------------------------
('#w2610', 1)
('#corona', 1)
('#salmankhan', 1)
('#vaccinatie', 1)



                                                                                

-------------------------------------------
Time: 2021-10-26 15:18:55
-------------------------------------------
('#w2610', 1)
('#corona', 1)
('#salmankhan', 1)
('#kimmich', 1)
('#corona-pol…', 1)



                                                                                

-------------------------------------------
Time: 2021-10-26 15:19:00
-------------------------------------------
('#kimmich', 1)
('#ongevaccineerden', 1)
('#corona-pol…', 1)



                                                                                

-------------------------------------------
Time: 2021-10-26 15:19:05
-------------------------------------------
('#ongevaccineerden', 1)
('#nieuweverkiezingen', 1)
('#delen', 1)
('#coronamaatregelen', 1)
('#corona', 1)
('#coronavirus', 1)
('#corona-faalbe…', 1)
('#corona-infektionen', 1)



                                                                                

-------------------------------------------
Time: 2021-10-26 15:19:10
-------------------------------------------
('#nieuweverkiezingen', 1)
('#delen', 1)
('#coronamaatregelen', 1)
('#corona', 1)
('#coronavirus', 1)
('#kimmich', 1)
('#w2610', 1)
('#corona-faalbe…', 1)
('#corona-infektionen', 1)
('#wien', 1)



                                                                                

-------------------------------------------
Time: 2021-10-26 15:19:15
-------------------------------------------
('#kimmich', 1)
('#w2610', 1)
('#covid', 1)
('#wien', 1)
('#gevaccineerde', 1)



[Stage 0:>                                                          (0 + 1) / 1]

KeyboardInterrupt: 