# Trend Analysis on Twitter live data using Spark Streaming

Import sparkContext & StreamingContext from PySpark library.

In [1]:
import findspark
findspark.init()

In [2]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

Create a sparkContext with AppName "SteramingTwitterAnalysis".<br>
Setting the LogLevel of SparkContext to ERROR. This will not print all the logs which are INFO or WARN level.<br>
Create Spark Streaming Context using SC (spark context). parameter 10 is the batch interval. <br>
Every 10 second the analysis will be done.

In [3]:
sc = SparkContext(appName="SteramingTwitterAnalysis")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 10)

Connect to socket broker using ssc (spark streaming context)<br>
Host : "172.31.20.58" (localhost) & port : 5555

In [4]:
socket_stream = ssc.socketTextStream("192.168.56.1", 5555)

window function parameter sets the Window length. All the analysis will be done on tweets stored for 60 secs.

In [5]:
lines = socket_stream.window( 20 )

### Process the Stream:
1. Receives tweet message, stored in lines. **Input DStream**
2. splits the messages into words. **Apply transformation on DStream : flatMap**
3. filters all the words which start with a hashtag(#). **transformation : filter**
4. converts the words to lowercase. **transformation : map**
5. maps each tag to (word, 1). **transformation : map**
6. then reduces and counts occurrences of each hash tag. (action : reduceByKey) hashtags = **output DStream**

In [6]:
hashtags = lines.flatMap( lambda text: text.split( " " ) ).filter( lambda word: word.lower().startswith("#") ).map( lambda word: ( word.lower()
, 1 ) ).reduceByKey( lambda a,b:a+b)

Sort the hashtags based on the counts in decreasing order

In [7]:
author_counts_sorted_dstream = hashtags.transform(lambda foo:foo.sortBy(lambda x:x[0].lower()).sortBy(lambda x:x[1],ascending=False))

Print the final analysis: Most popular hashtags on streaming twitter data

In [8]:
author_counts_sorted_dstream.pprint()

### Starting the Spark Streaming:
Spark Streaming code we have written till now will not execute, untill we start the ssc.<br>
ssc.start() will start the spark streaming context. This is the Action for the whole code. <br>
Now it'll create the lineage & DAG & do the lazy evaluation & start running the whole sequesnce of code.


In [9]:
ssc.start()

awaitTermination() is very important to stop the SSC.<br> 
When we kill this python process then this signal will be sent to awaitTermination() function.<br> 
it will finally stop the spark streaming job.

In [10]:
ssc.awaitTermination()

-------------------------------------------
Time: 2020-10-03 15:19:10
-------------------------------------------
('#covid19', 2)
('#accountabilitymatters:', 1)
('#coronavirus', 1)
('#presidenttrump…', 1)
('#trump', 1)
('#trump2020', 1)
('#trump2020landslidevictory', 1)
('#trumpcovid19', 1)
('#weloveyou', 1)

-------------------------------------------
Time: 2020-10-03 15:19:20
-------------------------------------------
('#covid19', 6)
('#trump', 6)
('##', 2)
('#accountabilitymatters:', 2)
('#covid-19', 2)
('#trumphascovid', 2)
('#95minutesintrumpsamerica', 1)
('#boyfriendday', 1)
('#bunkerboytrumpdespite', 1)
('#checry', 1)
...

-------------------------------------------
Time: 2020-10-03 15:19:30
-------------------------------------------
('#trump', 35)
('#covid19', 17)
('#covid-19', 6)
('#get2getherexperience', 5)
('#hitrefresh', 5)
('#humantraffickingawareness', 5)
('#coronavirus', 4)
('#trumpcovid', 4)
('#covid', 3)
('#prayfortrump', 3)
...

-------------------------------------

Py4JJavaError: An error occurred while calling o23.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "C:\Users\Admin\Spark\spark-3.0.1-bin-hadoop2.7\python\pyspark\streaming\util.py", line 68, in call
    r = self.func(t, *rdds)
  File "C:\Users\Admin\Spark\spark-3.0.1-bin-hadoop2.7\python\pyspark\streaming\dstream.py", line 173, in takeAndPrint
    taken = rdd.take(num + 1)
  File "C:\Users\Admin\Spark\spark-3.0.1-bin-hadoop2.7\python\pyspark\rdd.py", line 1446, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "C:\Users\Admin\Spark\spark-3.0.1-bin-hadoop2.7\python\pyspark\context.py", line 1118, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "C:\Users\Admin\Spark\spark-3.0.1-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\Admin\Spark\spark-3.0.1-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 296.0 failed 1 times, most recent failure: Lost task 3.0 in stage 296.0 (TID 8378, DESKTOP-LIHMB40, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:131)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:708)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:752)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:684)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:650)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:626)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:583)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:540)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:154)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at jdk.internal.reflect.GeneratedMethodAccessor91.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:131)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:708)
	at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:752)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:684)
	at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:650)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:626)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:583)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:540)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 17 more


	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonDStream$.$anonfun$callForeachRDD$1(PythonDStream.scala:179)
	at org.apache.spark.streaming.api.python.PythonDStream$.$anonfun$callForeachRDD$1$adapted(PythonDStream.scala:179)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)


-------------------------------------------
Time: 2020-10-03 15:23:10
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:23:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:23:30
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:23:40
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:23:50
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:24:00
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:24:10
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:24:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:24:30
----------

-------------------------------------------
Time: 2020-10-03 15:35:10
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:35:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:35:30
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:35:40
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:35:50
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:36:00
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:36:10
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:36:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:36:30
----------

-------------------------------------------
Time: 2020-10-03 15:47:10
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:47:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:47:30
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:47:40
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:47:50
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:48:00
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:48:10
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:48:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:48:30
----------

-------------------------------------------
Time: 2020-10-03 15:59:10
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:59:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:59:30
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:59:40
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 15:59:50
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:00:00
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:00:10
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:00:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:00:30
----------

-------------------------------------------
Time: 2020-10-03 16:11:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:11:30
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:11:40
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:11:50
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:12:00
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:12:10
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:12:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:12:30
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:12:40
----------

-------------------------------------------
Time: 2020-10-03 16:23:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:23:30
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:23:40
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:23:50
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:24:00
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:24:10
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:24:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:24:30
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:24:40
----------

-------------------------------------------
Time: 2020-10-03 16:35:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:35:30
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:35:40
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:35:50
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:36:00
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:36:10
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:36:20
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:36:30
-------------------------------------------

-------------------------------------------
Time: 2020-10-03 16:36:40
----------