# **Trend Analysis on Twitter live data using Spark Streaming**

Import sparkContext & StreamingContext from PySpark library.

In [1]:
# from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

Create a sparkContext with AppName  "StreamingTwitterAnalysis".
<br>Setting the LogLevel of SparkContext to ERROR. This will not print all the logs which are INFO or WARN level.
<br>Create Spark Streaming Context using SC (spark context).parameter 10 is the batch interval.
<br>Every 10 second the analysis will be done.

In [2]:
sc = SparkContext(appName="abc")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 10)

Connect to socket broker using ssc (spark streaming context)
<br>Host:"172.16.117.15"(localhost) & port:5555

In [3]:
socket_stream = ssc.socketTextStream("172.16.117.15", 6677)

window function parameter sets the Window length. All the analysis will be done on tweets stored for 20 secs.

In [4]:
lines = socket_stream.window(60)

### **Process the Stream:**
1. Receives tweet message, stored in lines. **Input DStream**
2. splits the messages into words. **Apply transformation on DStream : flatMap**
3. filters all the words which start with a hashtag(#). **transformation : filter**
4. converts the words to lowercase. **transformation : map**
5. maps each tag to (word, 1). **transformation : map**
6. then reduces and counts occurrences of each hash tag. (action : reduceByKey) hashtags = **output DStream**

In [5]:
hashtags= lines.flatMap(lambda text: text.split(" ")).filter(lambda word: word.lower().startswith('#')).map( lambda word:( word.lower(),1)).reduceByKey(lambda a,b:a+b)

Sort the hashtags based on the counts in decreasing order

In [6]:
author_counts_sorted_dstream = hashtags.transform(lambda foo:foo.sortBy(lambda x:x[0].lower()).sortBy(lambda x:x[1], ascending=False))

Print the final aalysis: Most popular hashtags on streaming twitter data

In [7]:
author_counts_sorted_dstream.pprint()

## **Starting the Spark Streaming:**
Spark Streaming code we have written till now will not execute, untill we start the ssc.
<br>ssc.start() will start the spark streaming context. This is the Action for the whole code.
<br>Now it'll create the lineage & DAG & do the lazy evaluation & start running the whole sequence of code

In [8]:
ssc.start()

awaitTermination() is very important to stop the SSC
<br> When we kill this python process then this signal will be sent to awaitTermination() function.
<br> It will finally stop the spark streaming job.

In [9]:
ssc.awaitTermination()

Py4JJavaError: An error occurred while calling o23.awaitTermination.
: java.net.SocketException: Connection reset
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
	at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
	at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
	at py4j.ClientServerConnection.readBlockingResponse(ClientServerConnection.java:313)
	at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:229)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
	at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
	at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
	at com.sun.proxy.$Proxy19.call(Unknown Source)
	at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:92)
	at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
	at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$3(DStream.scala:343)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$2(DStream.scala:343)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
	at org.apache.spark.streaming.dstream.DStream.$anonfun$getOrCompute$1(DStream.scala:342)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:335)
	at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
	at org.apache.spark.streaming.DStreamGraph.$anonfun$generateJobs$2(DStreamGraph.scala:123)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
	at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:122)
	at org.apache.spark.streaming.scheduler.JobGenerator.$anonfun$generateJobs$1(JobGenerator.scala:252)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:250)
	at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:186)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:91)
	at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:90)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
