# Spark Streaming ( Batch processing)

In [1]:
# importing required libraries
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

import re
from textblob import TextBlob

In [2]:
# clean_tweet to remove unnecessary thing is text 
def clean_tweet(tweet):
    return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ", str(tweet)).split())

In [3]:
# Getting sentiment polarity of tweet using textblob
def analyze_sentiment_polarity(tweet):
    analysis = TextBlob(clean_tweet(tweet))
    if analysis.sentiment.polarity > 0:
        return 'POSITIVE'
    elif analysis.sentiment.polarity == 0:
        return 'NEUTRAL'
    else:
        return 'NEGATIVE'

In [4]:
# count number of sentiments for each batch
def countSentiment(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

In [5]:
# stopping previous spark sessions
sc.stop()

In [6]:
# Creating new spark session
sc = SparkContext(master="local[4]", appName="Batch Processing")

In [7]:
# Creating new streaming context with 10 batches
ssc = StreamingContext(sc, 10)

In [8]:
# checkpoint
ssc.checkpoint("checkpoint")

In [9]:
IP = "localhost"
Port = 9000

In [10]:
# streaming tweets from IP address
tweets = ssc.socketTextStream(IP,Port)

In [11]:
# initializing sentiments with 0
initialStateRDD = sc.parallelize([(u'POSITIVE', 0), (u'NEGATIVE', 0), (u'NEUTRAL', 0)])

In [None]:
# mapping Dstream with modified rdd that contains sentiment and updating each key to count sentiments
sentiment = tweets.map(lambda text: (analyze_sentiment_polarity(text), 1)).updateStateByKey(countSentiment, initialRDD=initialStateRDD)
sentiment.pprint()

In [None]:
# starting streaming session
ssc.start()
time.sleep(5)

ssc.awaitTermination()

-------------------------------------------
Time: 2020-05-06 17:54:50
-------------------------------------------
('POSITIVE', 0)
('NEGATIVE', 0)
('NEUTRAL', 3)

-------------------------------------------
Time: 2020-05-06 17:55:00
-------------------------------------------
('POSITIVE', 2)
('NEGATIVE', 2)
('NEUTRAL', 14)

-------------------------------------------
Time: 2020-05-06 17:55:10
-------------------------------------------
('POSITIVE', 5)
('NEGATIVE', 2)
('NEUTRAL', 16)

-------------------------------------------
Time: 2020-05-06 17:55:20
-------------------------------------------
('POSITIVE', 6)
('NEGATIVE', 5)
('NEUTRAL', 19)

-------------------------------------------
Time: 2020-05-06 17:55:30
-------------------------------------------
('POSITIVE', 8)
('NEGATIVE', 6)
('NEUTRAL', 22)

-------------------------------------------
Time: 2020-05-06 17:55:40
-------------------------------------------
('POSITIVE', 9)
('NEGATIVE', 7)
('NEUTRAL', 23)

-----------------------