# Libraries

In [29]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import pyspark.sql.functions as F

from pyspark.sql.types import MapType, StringType, IntegerType, ArrayType, StructType, \
                              StructField, LongType, DoubleType, BooleanType, FloatType

from IPython.display import display, clear_output
import time


# for the model
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import NGram

from pyspark.ml import Pipeline, PipelineModel

from pyspark.ml.classification import NaiveBayes

from pyspark.ml.feature import Word2Vec
from pyspark.sql import Row
from json import dumps
from pyspark.sql import udf
import json

import cld3

import os
import shutil


# Step 1: Building Spark streaming and Schema

In [2]:
# Creating Spark session
spark = SparkSession.builder \
        .appName('kafka') \
        .getOrCreate()

In [3]:
## To always show the results of DataFrames and improve the formatting of the output
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

## To allow future conversion of Spark DataFrame into Pandas DataFrame
spark.conf.set("spark.sql.execution.arrow.enabled", True)

In [4]:
# Read data from the topic 
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "broker:29092") \
  .option("subscribe", "engtweets") \
  .option("startingOffsets","earliest") \
  .option("includeHeaders", "true") \
  .load()

In [5]:
#Print the Schema
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- headers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: binary (nullable = true)



In [6]:
# Schema of the english tweets data

schema_tweet = StructType([
    StructField("CREATEDAT", LongType(),  True),
    StructField("ID", LongType(),  True),
    StructField("USER_ID", LongType(),  True),
    StructField("USER_NAME", StringType(),  True),
    StructField("SCREENNAME", StringType(),  True),
    StructField("USER_LOCATION", StringType(), True),
    StructField("FOLLOW_COUNT", IntegerType(),  True),
    StructField("FRIEND_COUNT", IntegerType(),  True),
    StructField("USER_CREAT_AT", LongType(),  True),
    StructField("USER_FAV_COUNT", IntegerType(), True),
    StructField("USER_VERIFIED", BooleanType(), True),
    StructField("USER_LANG", StringType(),  True),
    StructField("TEXT", StringType(),  True),
    StructField("LANG", StringType(),  True),
    StructField("GAO_LAT", DoubleType(), True),
    StructField("GEO_LONG", DoubleType(),  True),
    StructField("PLACE_NAME", StringType(),  True),
    StructField("PLACE_COUNTRY", StringType(), True),
    StructField("HASHTAG", ArrayType( StringType()), True),
    StructField("USER_MENTION_NAME", ArrayType( StringType()), True)
    ]
)

# Step 2: Cleaning tweets text with the same process of training

In [7]:
# Transform data of the topic
clean_df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
    .withColumn("value", F.from_json("value", schema_tweet)) \
    .select("key", F.col('value.*'))

In [8]:
clean_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- CREATEDAT: long (nullable = true)
 |-- ID: long (nullable = true)
 |-- USER_ID: long (nullable = true)
 |-- USER_NAME: string (nullable = true)
 |-- SCREENNAME: string (nullable = true)
 |-- USER_LOCATION: string (nullable = true)
 |-- FOLLOW_COUNT: integer (nullable = true)
 |-- FRIEND_COUNT: integer (nullable = true)
 |-- USER_CREAT_AT: long (nullable = true)
 |-- USER_FAV_COUNT: integer (nullable = true)
 |-- USER_VERIFIED: boolean (nullable = true)
 |-- USER_LANG: string (nullable = true)
 |-- TEXT: string (nullable = true)
 |-- LANG: string (nullable = true)
 |-- GAO_LAT: double (nullable = true)
 |-- GEO_LONG: double (nullable = true)
 |-- PLACE_NAME: string (nullable = true)
 |-- PLACE_COUNTRY: string (nullable = true)
 |-- HASHTAG: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- USER_MENTION_NAME: array (nullable = true)
 |    |-- element: string (containsNull = true)



Remove words that start with a character

In [9]:
#remove words that start with a character
def remove_start(text, start_chr):
    return " ".join(filter(lambda word:word[0]!=start_chr, text.split()))

In [10]:
UDF_remove_start = F.udf(lambda text, start_chr: remove_start(text, start_chr), 
                        StringType())

Detect language

In [11]:
#detect language: english
def detect_lang(text):
    try:
        return cld3.get_language(text)[0]
    except:
        return None

In [12]:
UDF_detect_lang = F.udf(lambda text: detect_lang(text), 
                        StringType())

In [13]:
#Apply the above functions to the dataframe
# 1) Reading tweets
# 2) Removing words that strat with @
# 3) Removing words that strat with #
# 4) To lowercase
# 5) Add language column
# 6) Filter only enlish
# 7) Remove lang column
# 8) Remove numbers
# 9) Remove symbols
# 10) Keep obs with at least one letter

final_df = clean_df.withColumn('new_text',UDF_remove_start(F.col('TEXT'), F.lit('@'))) \
    .withColumn('new_text',UDF_remove_start(F.col('TEXT'), F.lit('#'))) \
    .withColumn('new_text', F.lower(F.col('TEXT'))) \
    .withColumn('LANG', UDF_detect_lang(F.col('TEXT'))) \
    .filter(F.col('LANG') == 'en') \
    .withColumn("new_text", F.regexp_replace(F.col("TEXT"), r'[0-9]', '')) \
    .withColumn("new_text", F.regexp_replace(F.col("TEXT"), r'[$-/:-?{-~!"^_`\[\]]', '')) \
    .filter(F.col('new_text').rlike("^.*[a-zA-Z]+.*$")) \
    .select(F.col('key'),F.col('USER_NAME'),F.col('text').alias("raw_text"),F.col('new_text').alias("text")) #F.col('id'), ,

In [14]:
final_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- USER_NAME: string (nullable = true)
 |-- raw_text: string (nullable = true)
 |-- text: string (nullable = true)



# Step 3: Predictions 

In [15]:
#Load the model
pipelineModel = PipelineModel.load("models/naivebayes.model")

In [16]:
# Getting predictions
pred = pipelineModel.transform(final_df)

In [17]:
# Printing prediction schema
pred.printSchema()

root
 |-- key: string (nullable = true)
 |-- USER_NAME: string (nullable = true)
 |-- raw_text: string (nullable = true)
 |-- text: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ngrams: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [18]:
# Select some columns
prediction= pred.select(F.col('key'),F.col('USER_NAME'),F.col('raw_text').alias("tweet_text"),F.col('prediction').alias("sentiment"))

In [19]:
# Changing 0 to negative and 1 to positive
prediction = prediction.withColumn("sentiment", 
                                         F.when((F.col("sentiment")== 0), "negative")
                                          .otherwise("positive"))

In [20]:
# Print the schema
prediction.printSchema()

root
 |-- key: string (nullable = true)
 |-- USER_NAME: string (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- sentiment: string (nullable = false)



In [21]:
# Transform the columns to json format to upload to kafka (required format)

prediction = prediction.withColumn("value",F.create_map(
                                        F.lit("USER_NAME"),F.col("USER_NAME"),
                                        F.lit("tweet_text"),F.col("tweet_text"),
                                        F.lit("sentiment"),F.col("sentiment")
                                    )) \
    .drop("USER_NAME","tweet_text", "sentiment") \
    .select(F.col("key").cast(StringType()).alias("key"), F.to_json(F.col("value")).alias("value"))

In [22]:
prediction.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [32]:
# Remove checkpoint folder just in case
dirpath = 'checkpoint'
if os.path.exists(dirpath) and os.path.isdir(dirpath):
    shutil.rmtree(dirpath)

In [23]:
# Deploy the sentiment analysis to kafka 
query = prediction \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:29092") \
    .option("topic", "sentiment_analysis") \
    .option("checkpointLocation", "checkpoint/data") \
    .start() 
# Important: delete existing checkpoints and not run memory query before

# Step 4: Read/Query the prediction Stream

In [24]:
# Read data from the topic 
df2 = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "broker:29092") \
  .option("subscribe", "sentiment_analysis") \
  .option("startingOffsets","earliest") \
  .option("includeHeaders", "true") \
  .load()

In [25]:
df2.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- headers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: binary (nullable = true)



Cleaning

In [26]:
string_stream_df = df2 \
    .withColumn("key", df2["key"].cast(StringType())) \
    .withColumn("value", df2["value"].cast(StringType()))

Looking results in memory format

In [27]:
see_results = string_stream_df \
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("predictions") \
    .trigger(processingTime='2 seconds') \
    .start()

In [29]:
clear_output(wait=True)
display(spark.sql('SELECT * FROM predictions').show(20, False))
time.sleep(1)

+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+---------+------+-----------------------+-------------+-------+
|key |value                                                                                                                                                                                                                                                                                                                                                                        |topic             |partition|offset|timestamp              |timestampType|headers|
+----+------------------------------------------------------------------------------------

None

In [30]:
# Stop the spark session
spark.stop()