# Import packages

In [1]:
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka import KafkaClient, SimpleProducer

from pyspark.sql.types import StructField, StructType, DoubleType, StringType, LongType, IntegerType, BinaryType, TimestampType, ArrayType
from pyspark.ml.image import ImageSchema

from kafka.admin import KafkaAdminClient, NewTopic
import matplotlib.pyplot as plt
import pickle
import io
import numpy as np
import scipy.io
import pandas as pd

from pyspark.sql.functions import udf,col
from pyspark.sql.functions import from_json, col, to_timestamp
from pyspark.sql.functions import array, lit, struct

import sys

from tweepy import OAuthHandler
from tweepy import API
from tweepy import Stream
from tweepy.streaming import StreamListener

import tweepy 
import json

import nltk
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')
from nltk.corpus import stopwords
from nltk import word_tokenize
from nltk.stem.wordnet import WordNetLemmatizer
import re

from sparkdl import KerasTransformer
from pyspark.sql.functions import split, array_remove, size, to_json
from pyspark.sql.types import StructType, StructField, FloatType, StringType, ArrayType


[nltk_data] Downloading package stopwords to
[nltk_data]     /home/samcoopmans/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     /home/samcoopmans/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /home/samcoopmans/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
Using TensorFlow backend.


# ML Model
## Training the model using batch data

In [2]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
# Load training data
train_data = spark.read.format("csv") \
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("train_data.csv")

train_data = train_data.selectExpr("sentiment as label", "tweet as text")

In [4]:
# Delete NULL Values from training data
cleaned_text_train = train_data.na.drop(subset=["text"])

In [5]:
# Tokenization
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words")
tokenized_train_df = regexTokenizer.transform(cleaned_text_train)

In [6]:
# Vectorization
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=100000, minDF=1.0)
vec_model = cv.fit(tokenized_train_df)
count_df_train = vec_model.transform(tokenized_train_df)

In [7]:
# Fit the Model
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
model = nb.fit(count_df_train)

## Testing the model on test data

In [8]:
test_data = spark.read.format("csv") \
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("test_data.csv")

test_data = test_data.selectExpr("sentiment as label", "tweet as text")

In [13]:
# Delete NULL Values from testing data
cleaned_text_test = test_data.na.drop(subset=["text"])

In [14]:
# Tokenization
tokenized_test_df = regexTokenizer.transform(cleaned_text_test)

In [15]:
# Vectorization
count_df_test = vec_model.transform(tokenized_test_df)

In [16]:
# Testing the model
predictions = model.transform(count_df_test)

In [17]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

In [18]:
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.7671323369813113


# Start predicting STREAM of tweets (Spark Streaming)

In [8]:
timestampFormat = "E MMM dd HH:mm:ss +0000 yyyy"

In [9]:
spark = SparkSession.builder.appName("TwitterStreaming").getOrCreate()

df = spark.readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", "34.90.212.110:9092") \
          .option("startingOffsets", "earliest") \
          .option("subscribe", "weather") \
          .load()

In [10]:
raw_value_df = df.selectExpr("CAST(value AS STRING)")

In [11]:
twitter_schema = StructType([StructField("timestamp", StringType(), False),
                     StructField("id", StringType(), False),
                     StructField("tweet", StringType(), False),
                     StructField("city", StringType(), False),
                     StructField("temperature", StringType(), False),
                     StructField("description", StringType(), False)])

In [12]:
json_value_df = raw_value_df.select(from_json("value", twitter_schema).alias('weather'))\
                            .select('weather.*')

json_value_df = json_value_df.withColumn('timestamp',to_timestamp(json_value_df.timestamp, timestampFormat))
json_value_df = json_value_df.withColumn("temperature", json_value_df["temperature"].cast(FloatType()))

## Pre-processing the stream of tweets

In [13]:
def preprocessed_udf(tweet):
    tweet = tweet.lower()
    tweet_split = tweet.split()
    
    for i in range(len(tweet_split)):
        if tweet_split[i].startswith('@'):
            tweet_split[i] = 'mention'
        elif tweet_split[i].startswith('http://'):
            tweet_split[i] = 'link'
        elif tweet_split[i].startswith('https://'):
            tweet_split[i] = 'link'
        else:
            continue
            
    tweet_without_mention = " ".join(tweet_split)
    
    tweet_without_mention = re.sub('<.*?>', '', tweet_without_mention) # remove HTML tags
    tweet_without_mention = re.sub(r'[^\w\s]', ' ', tweet_without_mention) # remove punc
    tweet_without_mention = re.sub(r'\d+', '', tweet_without_mention) # remove numbers
    tweet_without_mention = re.sub('  ', ' ', tweet_without_mention)
    
    spelling_correct = eval(open('mistakes.txt', 'r').read())
    
    tweet_split_2 = word_tokenize(tweet_without_mention)
    replaced = [spelling_correct[word] if word in spelling_correct else word for word in tweet_split_2]
    
    lemmatizer = WordNetLemmatizer()
    lemmatized = [lemmatizer.lemmatize(token) for token in replaced]
    
    stop_words = list(set(stopwords.words('english')))
    stop_words.extend(['link', 'mention'])
    
    filtered_tweet = [word for word in lemmatized if not word in stop_words]
    no_short_words = [word for word in filtered_tweet if len(word)>2]
    no_too_long_words = [word for word in no_short_words if len(word)<15]

    full_tweet = " ".join(no_too_long_words)
    
    return full_tweet

In [15]:
preprocessed = spark.udf.register("preprocessed", preprocessed_udf)

In [16]:
clean_tweets_df = json_value_df.select(col("timestamp"), col("id"), col('tweet'), col('city'), col('temperature'), col('description'), preprocessed(col("tweet")).alias("text"))

In [17]:
# Remove Null values
clean_tweets_1 = clean_tweets_df.na.drop(subset=["text"])

In [18]:
# Tokenization
clean_tweets_tokenized = regexTokenizer.transform(clean_tweets_1)

In [19]:
#Vectorization
clean_tweets_vectorized = vec_model.transform(clean_tweets_tokenized)

## Predict tweets based on the trained model

In [20]:
# Predict the tweets based on Trained Model
tweets_predictions = model.transform(clean_tweets_vectorized)

In [21]:
final_df = tweets_predictions.select(col('id'), col('timestamp'), col('tweet'), col('text'), col('temperature'), col('city'), col('description'), col('prediction'))

# Push STREAM DATAFRAME to Kafka broker

In [22]:
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka import KafkaClient, SimpleProducer
from kafka.admin import KafkaAdminClient, NewTopic

In [39]:
from pyspark.sql.functions import from_json, array, to_json, col, struct
final_df_json = final_df.select(to_json(struct([col(c).alias(c) for c in final_df.columns])).alias("value"))

In [40]:
ds = final_df_json \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "34.90.212.110:9092") \
  .option("topic", "predictions_7") \
  .option("checkpointLocation", "predictions_7")\
  .start()