In [1]:
import findspark
findspark.init('/home/darshan/spark-3.0.0-preview2-bin-hadoop2.7')

In [2]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext, SparkSession, Row
from pyspark.sql.functions import desc
from textblob import TextBlob

In [3]:
sc = SparkContext(appName = 'StreamTwitterApp')
sc.setLogLevel('ERROR')
ssc = StreamingContext(sc, 10 )
ssc.checkpoint("checkpoint")

In [4]:
socket_stream = ssc.socketTextStream("127.0.0.1", 5557)
lines = socket_stream.window( 20 )

In [5]:
words = lines.flatMap(lambda line:line.split(" "))

In [6]:
hashtags =  words.filter( lambda word: word.lower().startswith("#") ). \
            map( lambda word: ( word.lower(), 1 ) ). \
            reduceByKey( lambda a, b: a + b )


sorted_dstream_counts = hashtags.transform(lambda foo:foo.sortBy(lambda x:x[0].lower()).sortBy(lambda x:x[1], ascending = False))

In [7]:
def analyze_sentiment(text):
    testimonial = TextBlob(text)
    return testimonial.sentiment.polarity

In [8]:
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount) 

In [9]:
allSentiments = lines.map(lambda line: ('Positive', 1) if analyze_sentiment(line) > 0.1 else (('Neutral', 1) if analyze_sentiment(line) > -0.1 else ('Negative', 1) ))
sentimentCounts = allSentiments.reduceByKey(lambda x,y: x+y)
runningSentimentCounts = sentimentCounts.updateStateByKey(updateFunction)

In [10]:
sorted_dstream_counts.pprint()
runningSentimentCounts.pprint()

__________
### Now run TweetStream.py
__________

In [11]:
ssc.start()    

In [12]:
ssc.awaitTermination(30)

-------------------------------------------
Time: 2020-05-26 17:23:50
-------------------------------------------
('#taketrumpofftwitterrt', 3)
('#justiceforcarolynrt', 2)
('#1141', 1)
('#ados', 1)
('#ausbiz', 1)
('#ausecon', 1)
('#breaking:', 1)
('#ccpvirus', 1)
('#churches', 1)
('#coronavirus', 1)
...

-------------------------------------------
Time: 2020-05-26 17:23:50
-------------------------------------------
('Negative', 22)
('Positive', 15)
('Neutral', 127)

-------------------------------------------
Time: 2020-05-26 17:24:00
-------------------------------------------
('#justiceforcarolynrt', 6)
('#taketrumpofftwitterrt', 4)
('#whatdoyouthink?', 3)
('#envideo', 2)
('#obamagate', 2)
('#obamagate.rt', 2)
('#taketrumpofftwitter', 2)
('#trump', 2)
('#1141', 1)
('#25thamendmentbeforewealldiert', 1)
...

-------------------------------------------
Time: 2020-05-26 17:24:00
-------------------------------------------
('Negative', 62)
('Positive', 52)
('Neutral', 390)

-------------

In [13]:
ssc.stop()