In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
# Importing the necessary modules 
import os
import pandas as pd
from textblob import TextBlob
import pyspark.pandas as ps
from pyspark.sql.functions import pandas_udf, col, udf,expr, from_json, window
from pyspark.sql.types import FloatType, StringType, StructType
from pyspark.sql import SparkSession
from kafka import KafkaConsumer, KafkaProducer
import findspark
import json
import configparser

In [None]:
# Declaring the spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Twitter Sentiment") \
    .getOrCreate()

In [None]:
# config = configparser.ConfigParser()
# config.read('secrets.ini')
# confluent_host = config['confluent_default']['bootstrap.servers']
# confluent_username = config['confluent_default']['sasl.username']
# confluent_password = config['confluent_default']['sasl.password']

# kafka_jaas = """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';""".format(confluent_username, confluent_password)
# # confluent_consumer.update(config['confluent_consumer'])
# print(kafka_jaas)

In [None]:
# Subscribing to the kafka topic that receives the data and read from it
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "kafka_tweets_stream") \
    .option("startingOffsets", "earliest") \
    .load()

In [None]:
# UDF that classifies the tweets as a positive/negative/neutral sentiment
# Binary classification 
@udf(returnType=StringType())
def sentiment_fxn(text: str):
    try:
        sent_cal = round(float(TextBlob(text).sentiment.polarity), 2)
        if sent_cal > 0:
            return "Postive"
        elif sent_cal < 0:
            return "Negative"
        return "Neutral"
    except:
        return None

In [None]:
df_timestamp = df.selectExpr("CAST(value as STRING)", "timestamp")

In [None]:
windowedCounts = df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(df.timestamp, "10 minutes", "5 minutes")
#         from_json(df.value, sample_schema).alias('sample')
    ).count().orderBy('window')

In [None]:
# The schema of the streamed data has the expected data represented by the 'value' key
# Using the SQL select statement to cast the binary data as string
my_df = df.selectExpr("CAST(value as STRING)", "timestamp")

In [None]:
# Expected schema of the expected data
sample_schema = (
    StructType()
    .add("created_at", StringType())
    .add("id", StringType())
    .add("id_str" , StringType())
    .add("text", StringType())
    .add("source", StringType())
    .add("truncated", StringType())
    .add("in_reply_to_status_id", StringType())
    .add("in_reply_to_status_id_str", StringType())
    .add("in_reply_to_user_id", StringType())
    .add("in_reply_to_user_id_str", StringType())
    .add("in_reply_to_screen_name", StringType())
    .add("user", StringType())
    .add("geo", StringType())
    .add("coordinates", StringType())
    .add("place", StringType())
    .add("contributors", StringType())
    .add("retweeted_status", StringType())
    .add("is_quote_status", StringType())
    .add("quote_count", StringType())
    .add("reply_count", StringType())
    .add("retweet_count", StringType())
    .add("favorite_count", StringType())
    .add("entities", StringType())
    .add("favorited", StringType())
    .add("retweeted", StringType())
    .add("possibly_sensitive", StringType())
    .add("filter_level", StringType())
    .add("lang", StringType())
    .add("timestamp_ms", StringType())
)

In [None]:
# Use the schema above to apply on the data
into_dataframe = my_df.select(
        from_json(col("value"), sample_schema).alias("sample"),
    "timestamp"
    )

In [None]:
# Get the data into a dataframe
into_df = into_dataframe.select("sample.*", "timestamp")

In [None]:
# Run the classification function on the text column of the dataframe
sent_df = into_df.withColumn("sentiment", sentiment_fxn(col('text')))

In [None]:
# Run an aggregate query on the df to enable wrtieStream
# aggDF = sent_df.groupBy('sentiment').count()

In [None]:
# Aggregation with watermark and window
aggDF1 = windowedCounts = sent_df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(sent_df.timestamp, "10 minutes", "5 minutes"),
        'sentiment'
    ).count().orderBy('window')

In [None]:
# Writing the aggregated data to memory as a table so we can perform SQL operations on it
aggDF1.writeStream \
    .queryName("aggregates") \
    .outputMode("complete") \
    .format("memory") \
    .start()

In [None]:
# Selecting all and converting to pandas df to enable plotting
final_result = spark.sql("select * from aggregates").toPandas()

In [None]:
# Display the aggregated table
final_result.sort_values('window', ascending=False)

In [None]:
# Plot as a bar graph the various sentiment values
spark.sql("select sentiment, count from aggregates").toPandas()['sentiment'].value_counts().plot.bar('sentiment', 'count')

In [None]:
# End of File