In [None]:
# NOTE: THIS NOTEBOOK DOES NOT DO ANY TEXT PRE-PROCESSING, SO THE EXTRACTED POLARITY AND SUBJECTIVITY IS NOT REALIABLE
# THE PURPOSE OF THIS APPLICATION IS TO PROTOTYPE HOW A REAL-TIME SENTIMENT EXTRACTION CAN LOOK LIKE.

# create environment variables for spark-submit
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 pyspark-shell'

# create variables
KAFKA_TOPIC_READ = 'texts_covid19'
KAFKA_TOPIC_WRITE = 'sentiments_covid19'
CHECKPOINT = 'checkpoint'

# import libraries
import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StringType, DoubleType
from pyspark.sql.functions import udf, window, mean, first, max, round

from textblob import TextBlob

# create functions
text_blob_udf = udf(lambda x: TextBlob(x)) # creates a TextBlob object
polarity_udf = udf(lambda x: x.polarity, DoubleType()) # extracts polarity from TextBlob object
subjectivity_udf = udf(lambda x: x.subjectivity, DoubleType()) # extracts subjectivity from TextBlob object
double_to_string = udf(lambda x: str(x)[0:5]) # converts number in double format into string format with 3 decimal places

# create spark context
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

# start a stream from a Kafka topic
tweetsDF = spark.readStream\
                .format('kafka')\
                .option('kafka.bootstrap.servers', 'localhost:9092')\
                .option('subscribe', KAFKA_TOPIC_READ)\
                .load()\
                .withWatermark('timestamp', "1 minutes")

# print dataframe schema
tweetsDF.printSchema()

# keep key, value and timestamp
tweetsDF = tweetsDF.selectExpr("CAST(key AS STRING)", 
                               "CAST(value AS STRING)",
                               'timestamp')

# print schema again to check the correct columns are kept
tweetsDF.printSchema()

# create a TextBlob object
tweetsDF = tweetsDF.withColumn('textBlob', text_blob_udf(tweetsDF.value))

# extract the polarity from the TextBlob object (-1 = negative, 1 = positive)
# scale polarity to range [0,1]
tweetsDF = tweetsDF.withColumn('polarity', (polarity_udf(tweetsDF.textBlob) + 1) / 2)

# extract the subjectivity from the TextBlob object (0 = objective, 1 = subjective)
tweetsDF = tweetsDF.withColumn('subjectivity', subjectivity_udf(tweetsDF.textBlob))

# drop the text of the tweet and TextBlob object
tweetsDF = tweetsDF.drop('value', 'textBlob')

# aggregate by minute to get the average polarity and average subjectivity
tweetsDF = tweetsDF.groupBy(window(tweetsDF.timestamp, "1 minutes", "1 minutes"))\
                   .agg(max('timestamp').alias('timestamp'),
                        first('key').alias('key'),
                        mean('polarity').alias('average_polarity'),
                        mean('subjectivity').alias('average_subjectivity'))

# make sure numbers have 3 digits
tweetsDF = tweetsDF.withColumn('average_polarity', double_to_string(tweetsDF.average_polarity))\
                   .withColumn('average_subjectivity', double_to_string(tweetsDF.average_subjectivity))

# print new schema of the dataframe
tweetsDF.printSchema()

# send average polarity and average subjectivity to a Kafka topic
tweetStream = tweetsDF\
    .selectExpr("CAST(key AS STRING)", "to_json(struct(*)) AS value")\
    .writeStream\
    .format('kafka')\
    .option('kafka.bootstrap.servers', 'localhost:9092')\
    .option('topic', KAFKA_TOPIC_WRITE)\
    .option('checkpointLocation', CHECKPOINT)\
    .start()

tweetStream.awaitTermination()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- key: string (nullable = true)
 |-- average_polarity: string (nullable = true)
 |-- average_subjectivity: string (nullable = true)

