### Sentiment Analysis of Pfizer vaccine tweets Worldwide

**Original Author:** Elena Stamatelou.<br/>
**Additional Info:** Sentiment analysis on streaming twitter data using Spark Structured Streaming & Python. https://github.com/stamatelou/twitter_sentiment_analysis<br/>
**Last Modified:**  

In [1]:
# Import the os module 
import os

# Set the PYSPARK_SUBMIT_ARGS to the appropriate spark-sql-kafka package
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 pyspark-shell'

In [2]:
# Install textblob for the sentiment analysis
import sys
!{sys.executable} -m pip install -U textblob

Requirement already up-to-date: textblob in /opt/conda/lib/python3.8/site-packages (0.15.3)


In [3]:
# Import Sparksession and sql functions 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob


In [4]:
# text classification

# Define methods from TextBlob
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity

def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity

def sentiment_detection(value):
    if value < 0: 
        return 'Negative'
    elif value > 0: 
        return 'Positive'
    else:
        return 'Neutral'

# polarity detection
# Define as user defined fuction to embed method in the spark environment 
polarity_detection_udf = udf(polarity_detection, StringType())

# subjectivity detection
# Define as user defined fuction to embed method in the spark environment 
subjectivity_detection_udf = udf(subjectivity_detection, StringType())

# sentiment detection
# Define as user defined fuction to embed method in the spark environment 
sentiment_detection_udf = udf(sentiment_detection, StringType())

In [5]:
# Import the findspark module 
import findspark

# Initialize via the full spark path
findspark.init("/usr/local/spark/")

In [6]:
# create Spark session
spark = SparkSession.builder \
   .master("local[*]") \
   .appName("TwitterSentAnalysis") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()

In [7]:
try: 
    # Read Tweets from the Kafka topic vaccine 
    tweet_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
        .option("subscribe", "pfizer_worldwide") \
        .option("startingOffsets", "latest") \
        .load()
except: 
    print("Unexpected error:", sys.exc_info()[0])

In [None]:
try: 
    # Cast the data into a json
    tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING) as json_data")
    
    # extract the tweet and user info
    text_user = tweet_df_string.select(json_tuple('json_data', 'created_at','text', 'user').alias('created_at', 'text', 'json_user')) 
    
    # extract screen_name and location from user info
    text_user_info = text_user.select('text', 'created_at', json_tuple('json_user', 'location').alias('location')) 
    
    # preprocessing
    text_user_info = text_user_info.na.replace('', 'None')
    text_user_info = text_user_info.na.drop()
    
    text_user_info = text_user_info.withColumn('text', F.regexp_replace('text', r'http\S+', ''))
    text_user_info = text_user_info.withColumn('text', F.regexp_replace('text', '@\w+', ''))
    text_user_info = text_user_info.withColumn('text', F.regexp_replace('text', '#', ''))
    text_user_info = text_user_info.withColumn('text', F.regexp_replace('text', 'RT', ''))
    text_user_info = text_user_info.withColumn('text', F.regexp_replace('text', ':', ''))

    # polarity detection
    # Append polarity to dataframe
    text_user_info = text_user_info.withColumn("polarity", polarity_detection_udf(text_user_info.text))
    
    # subjectivity detection
    # Append subjectivity to dataframe 
    text_user_info = text_user_info.withColumn("subjectivity", subjectivity_detection_udf(text_user_info.text))
    
    #sentiment detection
    # Append sentiment to dataframe
    text_user_info = text_user_info.withColumn("sentiment", sentiment_detection_udf(text_user_info.polarity))
    
    # repartition 'Returns a new DataFrame partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned.'
    text_user_info = text_user_info.repartition(1)
    
    # country filter -> no filter since worldwide 
    
    #option("header", "True"). \
    #Write the spark stream
    writeTweet = text_user_info.writeStream. \
        format("csv"). \
        option("checkpointLocation", "./storage_pfizer_worldwide/"). \
        option("path", "./storage_pfizer_worldwide/"). \
        outputMode("append"). \
        queryName("worldwide_pfizer_tweets"). \
        trigger(processingTime='60 seconds'). \
        start()
    
    print("----- streaming is running -------")
    
    writeTweet.awaitTermination()
    
except: 
    print("Unexpected error:", sys.exc_info())

----- streaming is running -------


In [10]:
spark.stop()