https://towardsdatascience.com/my-absolute-go-to-for-sentiment-analysis-textblob-3ac3a11d524

In [None]:
# Note - Output folder must be public to the internet
is_realtime = False
project_bucket_name = "cloud-project-bucket-ns-22"
topic = "Batman"
input_folder_name = 'gs://{}/Input/{}/'.format(project_bucket_name, topic)
output_folder_name = 'gs://{}/Output/{}/'.format(project_bucket_name, topic)

In [None]:
!pip install textblob
!pip install findspark

In [None]:
# import necessary packages
import os
import json
import time
import subprocess
import pyspark
import findspark
import socket
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark import Row
from textblob import TextBlob

from IPython.display import clear_output

In [None]:
findspark.init()

In [None]:
def json_load(text):
    return json.loads(text)

In [None]:
def get_message(text):
    msg = json.loads(text)
    # if tweet is longer than 140 characters
    if "extended_tweet" in msg:
        return str(msg['extended_tweet']['full_text'])
    else:
        return str(msg['text'])

In [None]:
def get_tweet_field(text, field):
    msg = json.loads(text)
    if '/' in field:
        fields = field.split('/')
        fieldDepth = len(fields)
        f = msg
        for i in range(fieldDepth):
            if(f is None):
                return None
            if i == fieldDepth - 1:
                return str(f[fields[i]]) 
            else:
                f = f[fields[i]]
    else:
        return str(msg[field])

In [None]:
def get_analysis(score):
    if score < 0:
        return 'Negative'
    elif score == 0:
        return 'Neutral'
    else:
        return 'Positive'

In [None]:
def process_tweets(tweets):

    # Get id
    get_tweet_field_udf = udf(get_tweet_field, StringType())
    tweets = tweets.withColumn("id", get_tweet_field_udf("value", lit('id')))

    # Get created_at
    get_tweet_field_udf = udf(get_tweet_field, StringType())
    tweets = tweets.withColumn("created_at", get_tweet_field_udf("value", lit('created_at')))

    # Get place full name
    get_tweet_field_udf = udf(get_tweet_field, StringType())
    tweets = tweets.withColumn("place_full_name", get_tweet_field_udf("value", lit('place/full_name')))

    # Get place country
    get_tweet_field_udf = udf(get_tweet_field, StringType())
    tweets = tweets.withColumn("place_country", get_tweet_field_udf("value", lit('place/country')))

    # Get place country code
    get_tweet_field_udf = udf(get_tweet_field, StringType())
    tweets = tweets.withColumn("place_country_code", get_tweet_field_udf("value", lit('place/country_code')))

    # Get place co-ordinates
    get_tweet_field_udf = udf(get_tweet_field, StringType())
    tweets = tweets.withColumn("place_coordinates", get_tweet_field_udf("value", lit('coordinates')))

    # Get place bounding_box co-ordinates
    get_tweet_field_udf = udf(get_tweet_field, StringType())
    tweets = tweets.withColumn("bounding_box_coordinates",
                               get_tweet_field_udf("value", lit('place/bounding_box/coordinates')))

    # Get place type
    get_tweet_field_udf = udf(get_tweet_field, StringType())
    tweets = tweets.withColumn("place_type", get_tweet_field_udf("value", lit('place/place_type')))

    # Get message
    get_message_udf = udf(get_message, StringType())
    tweets = tweets.withColumn("message", get_message_udf("value"))

    # Get cleaned words from message for analysis
    tweets = tweets.withColumn('words', tweets.message)
    # Remove HTML special entities (e.g. &amp;)
    tweets = tweets.withColumn('words', F.regexp_replace('words', r'\&\w*;', ''))
    # Convert @username to AT_USER
    tweets = tweets.withColumn('words', F.regexp_replace('words', '@[^\s]+', ''))
    # Remove tickers
    tweets = tweets.withColumn('words', F.regexp_replace('words', r'\$\w*', ' '))
    # Remove hyperlinks
    tweets = tweets.withColumn('words', F.regexp_replace('words', r'https?:\/\/.*\/\w*', ''))
    # Remove hashtags
    tweets = tweets.withColumn('words', F.regexp_replace('words', r'#\w*', ''))
    # Remove words with 2 or fewer letters
    # tweets = tweets.withColumn('words', F.regexp_replace('words', r'\b\w{1,2}\b', ''))
    # Remove whitespace (including new line characters)
    tweets = tweets.withColumn('words', F.regexp_replace('words', r'\s\s+', ' '))
    tweets = tweets.withColumn('words', F.regexp_replace('words', r'http\S+', ' '))
    # tweets = tweets.withColumn('words', F.regexp_replace('words', '@\w+', ' '))
    # tweets = tweets.withColumn('words', F.regexp_replace('words', '#', ' '))
    tweets = tweets.withColumn('words', F.regexp_replace('words', 'RT', ''))
    # tweets = tweets.withColumn('words', F.regexp_replace('words', ':', ' '))

    # Drop unnesscessary data
    tweets = tweets.drop("value")

    return tweets

In [None]:
# Text classification using TextBlob
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity
def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity
def text_classification(tweets):
    
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, StringType())
    tweets = tweets.withColumn("polarity", polarity_detection_udf("words"))
    
    # subjectivity detection
    subjectivity_detection_udf = udf(subjectivity_detection, StringType())
    tweets = tweets.withColumn("subjectivity", subjectivity_detection_udf("words"))
    
    # analysis
    get_analysis_udf = udf(get_analysis, StringType())
    tweets = tweets.withColumn('analysis', get_analysis_udf('polarity'))
    
    return tweets

In [None]:
def start_offline_batch_processing(spark):
    # Read files from input folder
    df_folder = spark.read.text(input_folder_name)
    lines = df_folder
    
    # Process and classify tweets
    tweets = process_tweets(lines)
    tweets = text_classification(tweets)
    tweets.createOrReplaceTempView("tweets")
    
    # Create output file names
    df_sentiment_by_country_output_file_name = output_folder_name + "df_sentiment_by_country.csv"
    df_sentiment_by_country = spark.sql('SELECT place_country as Country, place_country_code as CountryCode, avg(polarity) as Sentiment, count(id) as TweetCount FROM tweets GROUP BY place_country, place_country_code ORDER BY place_country_code')
    
    # Write to single files
    df_sentiment_by_country.toPandas().to_csv(df_sentiment_by_country_output_file_name, index=False)
    print("File '{}' updated...".format(df_sentiment_by_country_output_file_name))
    
    return tweets

In [None]:
def get_batched_file_stream():
    # Stream files from input folder
    file_stream = spark.readStream.format('text').option("maxFilesPerTrigger", 5).load(input_folder_name)
    print("Batched file stream...")
    return file_stream

In [None]:
def get_realtime_stream():
    # Socket connection stream
    socket_stream = spark.readStream \
                    .format("socket") \
                    .option("host", socket.gethostname()) \
                    .option("port", 5555) \
                    .load()
    print("Real-time socket stream...")
    return socket_stream

In [None]:
def get_sentiment_by_country_query():
    return 'SELECT place_country as Country, place_country_code as CountryCode, avg(polarity) as Sentiment, count(id) as TweetCount FROM tweets GROUP BY place_country, place_country_code'

In [None]:
def get_sentiment_by_category_query():
    return 'SELECT analysis as Sentiment, count(id) as TweetCount FROM tweets GROUP BY analysis'

In [None]:
def write_to_file(df, batch_id, output_file_name): 
    # Write to single file
    df.show()
    #df.repartition(1).write.mode("overwrite").option("header",True).csv(outputFileName)
    #df.coalesce(1).write.mode("overwrite").option("header",True).csv(outputFileName)
    df.toPandas().to_csv(output_file_name, index=False)
    print("File '{}' updated, batch_id = {}...".format(output_file_name, batch_id))

In [None]:
def start_stream_processing(spark, is_realtime=False):
    # Get tweet stream
    lines = get_realtime_stream() if is_realtime else get_batched_file_stream()
    
    # Process and classify tweets
    tweets = process_tweets(lines)
    tweets = text_classification(tweets)
    tweets.createOrReplaceTempView("tweets")
    
    # Create by country streaming query
    df_sentiment_by_country_output_file_name = output_folder_name + "df_sentiment_by_country_streaming.csv"
    df_sentiment_by_country = spark.sql(get_sentiment_by_country_query())
    df_sentiment_by_country_query = df_sentiment_by_country.writeStream\
    .format("memory")\
    .queryName("df_sentiment_by_country_query")\
    .outputMode("complete")\
    .foreachBatch(lambda df, batch_id: write_to_file(df, batch_id, df_sentiment_by_country_output_file_name)) \
    .start()
    #.trigger(processingTime='5 seconds')\
    
    # Create by category streaming query
    df_sentiment_by_category_output_file_name = output_folder_name + "df_sentiment_by_category_streaming.csv"
    df_sentiment_by_category = spark.sql(get_sentiment_by_category_query())
    df_sentiment_by_category_query = df_sentiment_by_category.writeStream\
    .format("memory")\
    .queryName("df_sentiment_by_category_query")\
    .outputMode("complete")\
    .foreachBatch(lambda df, batch_id: write_to_file(df, batch_id, df_sentiment_by_category_output_file_name)) \
    .start()
    #.trigger(processingTime='5 seconds')\
    
    # Await termination
    spark.streams.awaitAnyTermination()

In [None]:
#if __name__ == "__main__":

# Create Spark session
spark = SparkSession.builder.appName("TwitterTopicSentimentAnalysis").getOrCreate()

# Start stream processing
start_stream_processing(spark, is_realtime)