## Real-Time Sentiment Analysis Using Spark Streaming

#### Set up streaming source

In [3]:
import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from pyspark.sql.types import *

spark = SparkSession.builder\
                    .master("local")\
                    .appName("Structured Streaming - Twitter Sentiment")\
                    .getOrCreate()

pythonSchema = StructType() \
          .add("id", StringType(), True) \
          .add("tweet", StringType(), True) \
          .add("ts", StringType(), True)

awsAccessKeyId = "" # update the access key
awsSecretKey = ""   # update the secret key
kinesisStreamName = "twitter-data-stream"  # update the kinesis stream name (need to set up the stream first and ingest data)
kinesisRegion = "us-east-1"


kinesisDF = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", kinesisStreamName)\
  .option("region", kinesisRegion) \
  .option("initialPosition", "latest") \
  .option("format", "json") \
  .option("awsAccessKey", awsAccessKeyId)\
  .option("awsSecretKey", awsSecretKey) \
  .option("inferSchema", "true") \
  .load()


#### <img src='https://s3.amazonaws.com/weclouddata/images/logos/asterisk_1.png' width='2%'> Create Streaming DF

In [5]:
df = kinesisDF \
  .writeStream \
  .format("memory") \
  .outputMode("append") \
  .queryName("tweets")  \
  .start()

In [6]:
df.status

#### Explore the stream data in SQL

In [8]:
%sql

select partitionKey, cast(data as string) from tweets;

partitionKey,data
wizpampy,"[{""id"": ""1243933850968809472"", ""tweet"": ""b'RT @T9uRhHfR8jsSP57: \\xe0\\xb9\\x83\\xe0\\xb8\\x99\\xe0\\xb8\\x95\\xe0\\xb8\\xad\\xe0\\xb8\\x99\\xe0\\xb8\\x99\\xe0\\xb8\\xb5\\xe0\\xb9\\x89\\xe0\\xb8\\x9a\\xe0\\xb8\\xb2\\xe0\\xb8\\x87\\xe0\\xb8\\x88\\xe0\\xb8\\xb1\\xe0\\xb8\\x87\\xe0\\xb8\\xab\\xe0\\xb8\\xa7\\xe0\\xb8\\xb1\\xe0\\xb8\\x94\\xe0\\xb8\\x95\\xe0\\xb9\\x89\\xe0\\xb8\\xad\\xe0\\xb8\\x87\\xe0\\xb8\\x9b\\xe0\\xb8\\xaa\\xe0\\xb8\\x94\\xe0\\xb8\\x81\\xe0\\xb8\\xb1\\xe0\\xb8\\x9a\\xe0\\xb9\\x84\\xe0\\xb8\\xa7\\xe0\\xb8\\xa3\\xe0\\xb8\\xb1\\xe0\\xb8\\xaa \\xe0\\xb9\\x81\\xe0\\xb8\\x95\\xe0\\xb9\\x88\\xe0\\xb9\\x80\\xe0\\xb8\\x8a\\xe0\\xb8\\xb5\\xe0\\xb8\\xa2\\xe0\\xb8\\x87\\xe0\\xb9\\x83\\xe0\\xb8\\xab\\xe0\\xb8\\xa1\\xe0\\xb9\\x88\\xe0\\xb8\\x95\\xe0\\xb8\\xad\\xe0\\xb8\\x99\\xe0\\xb8\\x99\\xe0\\xb8\\xb5\\xe0\\xb9\\x89\\xe0\\xb8\\x84\\xe0\\xb8\\xb7\\xe0\\xb8\\xad \\xe0\\xb9\\x84\\xe0\\xb8\\x9f\\xe0\\xb8\\x9b\\xe0\\xb9\\x88\\xe0\\xb8\\xb2 \\xe0\\xb9\\x84\\xe0\\xb8\\xa7\\xe0\\xb8\\xa3\\xe0\\xb8\\xb1\\xe0\\xb8\\xaa\\xe0\\xb8\\x81\\xe0\\xb9\\x87\\xe0\\xb8\\x81\\xe0\\xb8\\xa5\\xe0\\xb8\\xb1\\xe0\\xb8\\xa7\\xe0\\xb9\\x81\\xe0\\xb8\\x96\\xe0\\xb8\\xa1\\xe0\\xb8\\x95\\xe0\\xb8\\xad\\xe0\\xb8\\x99\\xe0\\xb8\\x99\\xe0\\xb8\\xb5\\xe0\\xb9\\x89\\xe0\\xb8\\x94\\xe0\\xb9\\x89\\xe0\\xb8\\xa7\\xe0\\xb8\\xa2\\xe0\\xb8\\x84\\xe0\\xb9\\x88\\xe0\\xb8\\xb2\\xe0\\xb8\\x9d\\xe0\\xb8\\xb8\\xe0\\xb9\\x88\\xe0\\xb8\\x99\\xe0\\xb8\\x97\\xe0\\xb8\\xb5\\xe0\\xb9\\x88\\xe0\\xb8\\x89\\xe0\\xb8\\xb4\\xe0\\xb8\\x9a\\xe0\\xb8\\xab\\xe0\\xb8\\xb2\\xe0\\xb8\\xa2\\xe0\\xb8\\x82\\xe0\\xb8\\xb6\\xe0\\xb9\\x89\\xe0\\xb8\\x99\\xe0\\xb9\\x84\\xe0\\xb8\\x9b\\xe0\\xb8\\xad\\xe0\\xb8\\xb5\\xe0\\xb8\\x81 \\xe0\\xb9\\x80\\xe0\\xb8\\xad\\xe0\\xb8\\xb2\\xe0\\xb9\\x83\\xe0\\xb8\\x88\\xe2\\x80\\xa6'"", ""ts"": ""Sat Mar 28 16:11:55 +0000 2020""}]"
PragNews,"[{""id"": ""1243933850666823680"", ""tweet"": ""b'States affected by Coronavirus in India\\n\\n#IndiaFightsCorona \\n#Covid_19 #coronavirus https://t.co/V6Sp85u3gX'"", ""ts"": ""Sat Mar 28 16:11:55 +0000 2020""}]"
Amina_I_sheikh,"[{""id"": ""1243933849853149186"", ""tweet"": ""b\""@ABSCBNNews Watch #MuhammadShaikh, who has delivered more than 50 lectures on different topics of Qur'an in English\\xe2\\x80\\xa6 https://t.co/7GPuUE9Bwx\"""", ""ts"": ""Sat Mar 28 16:11:55 +0000 2020""}]"
mcezeh1,"[{""id"": ""1243933852273455104"", ""tweet"": ""b'RT @BayoOmoboriowo: Thank you for checking up brother. Here are some images from today, Pls stay safe also. https://t.co/yBHZr1w1Ux https:/\\xe2\\x80\\xa6'"", ""ts"": ""Sat Mar 28 16:11:56 +0000 2020""}]"
Afridi_huOO7,"[{""id"": ""1243933852567044096"", ""tweet"": ""b'RT @Humafayaz1: Can\\'t Wait For The Day To Hear\\n \""Corona Virus Is Finally Gone\""\\n#Covid_19 #SaturdayMotivation'"", ""ts"": ""Sat Mar 28 16:11:56 +0000 2020""}]"
beautiquebkk,"[{""id"": ""1243933852994654208"", ""tweet"": ""b'\\xe0\\xb9\\x82\\xe0\\xb8\\x9b\\xe0\\xb8\\xa3\\xe0\\xb8\\x94\\xe0\\xb8\\xb5 \\xe0\\xb9\\x84\\xe0\\xb8\\xa1\\xe0\\xb9\\x88\\xe0\\xb8\\x95\\xe0\\xb9\\x89\\xe0\\xb8\\xad\\xe0\\xb8\\x87\\xe0\\xb8\\xa3\\xe0\\xb8\\xad OTP\\n\\n\\xe0\\xb9\\x82\\xe0\\xb8\\x9b\\xe0\\xb8\\xa3\\xe0\\xb8\\xa5\\xe0\\xb8\\x94\\xe0\\xb8\\x97\\xe0\\xb9\\x89\\xe0\\xb8\\xb2\\xe0\\xb9\\x80\\xe0\\xb8\\x84\\xe0\\xb8\\xad\\xe0\\xb8\\xa3\\xe0\\xb9\\x8c\\xe0\\xb8\\x9f\\xe0\\xb8\\xb4\\xe0\\xb8\\xa7\\n#\\xe0\\xb8\\xa3\\xe0\\xb8\\xb2\\xe0\\xb8\\x84\\xe0\\xb8\\xb2\\xe0\\xb8\\x8a\\xe0\\xb8\\xb4\\xe0\\xb8\\xa5 \\xe0\\xb8\\x97\\xe0\\xb8\\xb8\\xe0\\xb8\\x81\\xe0\\xb8\\x9c\\xe0\\xb8\\xa5\\xe0\\xb8\\xb4\\xe0\\xb8\\x95\\xe0\\xb8\\xa0\\xe0\\xb8\\xb1\\xe0\\xb8\\x93\\xe0\\xb8\\x91\\xe0\\xb9\\x8c\\n#\\xe0\\xb8\\x96\\xe0\\xb8\\xb9\\xe0\\xb8\\x81\\xe0\\xb8\\x97\\xe0\\xb8\\xb5\\xe0\\xb9\\x88\\xe0\\xb8\\xaa\\xe0\\xb8\\xb8\\xe0\\xb8\\x94 \\xe0\\xb8\\x97\\xe0\\xb8\\xb5\\xe0\\xb9\\x88\\xe0\\xb9\\x80\\xe0\\xb8\\x84\\xe0\\xb8\\xa2\\xe0\\xb8\\xa1\\xe0\\xb8\\xb5\\xe0\\xb8\\xa1\\xe0\\xb8\\xb2\\n\\xe0\\xb9\\x82\\xe0\\xb8\\xad\\xe0\\xb8\\x99\\xe0\\xb8\\x88\\xe0\\xb8\\xad\\xe0\\xb8\\x87\\xe0\\xb8\\xa7\\xe0\\xb8\\xb1\\xe0\\xb8\\x99\\xe0\\xb8\\x99\\xe0\\xb8\\xb5\\xe0\\xb9\\x89\\n\\xe0\\xb9\\x80\\xe0\\xb8\\xa5\\xe0\\xb8\\xb4\\xe0\\xb8\\x81\\xe0\\xb9\\x80\\xe0\\xb8\\x84\\xe0\\xb8\\xad\\xe0\\xb8\\xa3\\xe0\\xb9\\x8c\\xe0\\xb8\\x9f\\xe0\\xb8\\xb4\\xe0\\xb8\\xa7\\xe0\\xb8\\xa1\\xe0\\xb8\\xb2\\xe0\\xb8\\x94\\xe0\\xb8\\xb9\\xe0\\xb9\\x81\\xe0\\xb8\\xa5\\xe0\\xb8\\x9c\\xe0\\xb8\\xb4\\xe2\\x80\\xa6 https://t.co/bvYLic0RIe'"", ""ts"": ""Sat Mar 28 16:11:56 +0000 2020""}]"
Ademusta6648,"[{""id"": ""1243933852864823296"", ""tweet"": ""b'Ben de 81 milyona her ay 2000 tl vaad ediyorum \\xc5\\x9fimdi Ba\\xc5\\x9fkan m\\xc4\\xb1 oldum ahmak?'"", ""ts"": ""Sat Mar 28 16:11:56 +0000 2020""}]"
kshatriya_arya,"[{""id"": ""1243933853300822016"", ""tweet"": ""b'Citizens can also pay online for the #Covid_19 Relief Fund. The #UPI payment system can be used for this.\\n\\n\\xc2\\xa0 VPA: c\\xe2\\x80\\xa6 https://t.co/TfQJK6x3uD'"", ""ts"": ""Sat Mar 28 16:11:56 +0000 2020""}]"
RoseAzuras,"[{""id"": ""1243933853636534272"", ""tweet"": ""b'RT @GamingGifter: #CoronaLockdown is happening. To make someone\\xe2\\x80\\x99s day & spread some positivity I will reward one person who Retweets + Foll\\xe2\\x80\\xa6'"", ""ts"": ""Sat Mar 28 16:11:56 +0000 2020""}]"
CrummyMo,"[{""id"": ""1243933853653151746"", ""tweet"": ""b'RT @Stone_nayouung: \\xe0\\xb8\\x96\\xe0\\xb8\\xb9\\xe0\\xb8\\x81\\xe0\\xb8\\x95\\xe0\\xb9\\x89\\xe0\\xb8\\xad\\xe0\\xb8\\x87\\xe0\\xb9\\x81\\xe0\\xb8\\xa5\\xe0\\xb9\\x89\\xe0\\xb8\\xa7\\xe0\\xb8\\x84\\xe0\\xb9\\x88\\xe0\\xb8\\xb0 \\xe0\\xb8\\x82\\xe0\\xb8\\xad\\xe0\\xb8\\x87\\xe0\\xb9\\x81\\xe0\\xb8\\x97\\xe0\\xb9\\x89\\xe0\\xb8\\x95\\xe0\\xb9\\x89\\xe0\\xb8\\xad\\xe0\\xb8\\x87\\xe0\\xb8\\xa1\\xe0\\xb8\\xb5\\xe0\\xb8\\xa3\\xe0\\xb8\\xb9\\xe0\\xb8\\x9b'"", ""ts"": ""Sat Mar 28 16:11:56 +0000 2020""}]"


In [9]:
tweets = spark.sql("select cast(data as string) from tweets")

In [10]:
tweets.printSchema()

In [11]:
tweets.show(5, truncate=False)

In [12]:
tweets.show(5, truncate=True)

In [13]:
tweets.count()

#### Parsing the Streaming Tweets

In [15]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *

import json

def parse_tweet(text):
    data = json.loads(text)
    id = data[0]['id']
    ts = data[0]['ts']
    tweet = data[0]['tweet'] 
    return (id, ts, tweet)
    
# Define your function
getID = UserDefinedFunction(lambda x: parse_tweet(x)[0], StringType())
getTs = UserDefinedFunction(lambda x: parse_tweet(x)[1], StringType())
getTweet = UserDefinedFunction(lambda x: parse_tweet(x)[2], StringType())

# Apply the UDF using withColumn
tweets = (tweets.withColumn('id', getID(col("data")))
               .withColumn('ts', getTs(col("data")))
               .withColumn('tweet', getTweet(col("data")))
         )


In [16]:
tweets.show(truncate=False)

In [17]:
tweets.show(truncate=True)

In [18]:
tweets.printSchema()

#### Sentiment Scoring on Streaming Tweets using TextBlob

In [20]:
import textblob

In [21]:
def get_sentiment(text):
    from textblob import TextBlob
    tweet = TextBlob(text)
    if tweet.sentiment.polarity < 0:
      sentiment = "negative"
    elif tweet.sentiment.polarity == 0:
        sentiment = "neutral"
    else:
        sentiment = "positive"
    return sentiment
  
# Define your function
getSentiment = UserDefinedFunction(lambda x: get_sentiment(x), StringType())

# Apply the UDF using withColumn
tweets = tweets.withColumn('sentiment', getSentiment(col("tweet")))

In [22]:
tweets.show()

In [23]:
tweets.printSchema()

In [24]:
tweets.createOrReplaceTempView("tweets_parsed")
#tweets.groupBy('sentiment').count()

In [25]:
%sql
select sentiment, count(*) as covid19_canada_tweet from tweets_parsed group by sentiment

sentiment,covid19_canada_tweet
positive,7154
neutral,32258
negative,3356


In [26]:
import plotly.express as px

import pandas as pd
df = pd.read_csv('https://raw.githubusercontent.com/plotly/datasets/master/finance-charts-apple.csv')

fig = px.line(df, x='Date', y='AAPL.High')
fig.show()

## Convert Spark Dataframe to Pandas Dataframe

In [28]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime

In [29]:
type(tweets)

In [30]:
tweets.dtypes

In [31]:
tweets_pdf = tweets.toPandas()

In [32]:
tweets_pdf.head()

Unnamed: 0,data,id,ts,tweet,sentiment
0,"[{""id"": ""1243933850968809472"", ""tweet"": ""b'RT ...",1243933850968809472,Sat Mar 28 16:11:55 +0000 2020,b'RT @T9uRhHfR8jsSP57: \xe0\xb9\x83\xe0\xb8\x9...,neutral
1,"[{""id"": ""1243933850666823680"", ""tweet"": ""b'Sta...",1243933850666823680,Sat Mar 28 16:11:55 +0000 2020,b'States affected by Coronavirus in India\n\n#...,neutral
2,"[{""id"": ""1243933849853149186"", ""tweet"": ""b\""@A...",1243933849853149186,Sat Mar 28 16:11:55 +0000 2020,"b""@ABSCBNNews Watch #MuhammadShaikh, who has d...",positive
3,"[{""id"": ""1243933852273455104"", ""tweet"": ""b'RT ...",1243933852273455104,Sat Mar 28 16:11:56 +0000 2020,b'RT @BayoOmoboriowo: Thank you for checking u...,positive
4,"[{""id"": ""1243933852567044096"", ""tweet"": ""b'RT ...",1243933852567044096,Sat Mar 28 16:11:56 +0000 2020,b'RT @Humafayaz1: Can\'t Wait For The Day To H...,neutral


In [33]:
pd.to_datetime(tweets_pdf['ts'])

In [34]:
idx = pd.DatetimeIndex(pd.to_datetime(tweets_pdf['ts']))

In [35]:
idx

In [36]:
type(idx)

In [37]:
len(pd.to_datetime(tweets_pdf['ts']))

In [38]:
ones = np.ones(len(pd.to_datetime(tweets_pdf['ts'])))

In [39]:
idx.shape[0]

In [40]:
# the actual series (at series of 1s for the moment) 
my_series = pd.Series(ones, index=idx)

In [41]:
# Resampling / bucketing into 1-minute buckets
per_minute = my_series.resample('1Min').sum().fillna(0) 

In [42]:
per_minute

In [43]:
# Plotting the series
%matplotlib inline

fig, ax = plt.subplots()
ax.grid(True)

ax.set_title("Tweet numbers")
interval = mdates.MinuteLocator(interval=10)
date_formatter = mdates.DateFormatter('%H:%M')

datemin = datetime(2020, 3, 28, 16, 00) 
datemax = datetime(2020, 3, 28, 17, 10)

ax.xaxis.set_major_locator(interval) 
ax.xaxis.set_major_formatter(date_formatter) 
ax.set_xlim(datemin, datemax)
max_freq = per_minute.max()
min_freq = per_minute.min()
ax.set_ylim(min_freq-100, max_freq+100) 
ax.plot(per_minute.index, per_minute)

display(fig)