In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

In [2]:
#Basics 
import re
import json

#PySpark Streaming
from threading import Thread
from pyspark.streaming import StreamingContext

#PySpark MLlib
from pyspark.ml.classification import LogisticRegressionModel, RandomForestClassificationModel
from pyspark.ml.feature import  IDF, IDFModel, CountVectorizerModel
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.feature import StopWordsRemover

#NLTK
from nltk.stem.snowball import SnowballStemmer

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [6]:
#Import models and the hashtag dictionary

globals()['models_loaded'] = True
model = LogisticRegressionModel.load('Final Saved Models/logistic_regression_model_prototype_5')
model_rf = RandomForestClassificationModel.load('Final Saved Models/random_forest_model_prototype_2')
idf_model = IDFModel.load('Final Saved Models/idf_model_prototype_7')
countvec_model = CountVectorizerModel.load('Final Saved Models/count_vectorizer_model_prototype_7')
with open('index_hashtag_dict.json', 'r') as fp:
    index_dict = json.load(fp)

#Define the Snowball stemmer and use spark udf to apply function
stemmer = SnowballStemmer(language = "english")
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))

#Define the fntion to pre-process and clean incoming tweet text
def remove_punct(text):
    #clean  new line
    text = re.sub('\n', ' ', text)
    #clean full blocks
    text = re.sub('\u2588', '', text)
    #number of #hashtags
    text = text + ' ' + str(text.count('#'))
    #clean #
    text = re.sub('#', '', text)
    #Clean links
    text = re.sub('http.* ', ' urllink ', text)
    #multispace to singlespace
    text = re.sub(' +', ' ', text)
    #exp ozge's to ozges
    text = re.sub('\'', '', text)
    text = re.sub('\’', '', text)
    #create a space for punctuation
    text = re.sub('[!"$%&\()*+,-./:;<=>?@[\\]^_`{|}~“”]', ' ', text)
    #multispace to singlespace
    text = re.sub(' +', ' ', text)
    return text.lower()
    
def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    #Convert to data frame
    df = spark.read.json(rdd)
    udf_text_clean = udf(remove_punct, StringType())
    df = df.withColumn("tweet_text", udf_text_clean("tweet_text"))
    df.show()
    
    #Use the same tokenization, document vector representation and idf models to featurize incoming tweets
    tokenizer = Tokenizer(inputCol="tweet_text", outputCol="words")
    wordsData = tokenizer.transform(df)
    remover = StopWordsRemover(inputCol = "words", outputCol = "cleaned_words")
    df_cleaned_text = remover.transform(wordsData)
    df_stemmed = df_cleaned_text.withColumn("cleaned_words", stemmer_udf("cleaned_words"))
    featurizedData = countvec_model.transform(df_stemmed)
    rescaledData = idf_model.transform(featurizedData)
    
    #Predict the incoming tweets using best random forest model
    predictions = model_rf.transform(rescaledData)
    
    #Adjust preedictions (hashtag replacement) to display in real-time
    predictions = predictions.withColumn("prediction", predictions["prediction"].cast(StringType()))
    predictions = predictions.replace(to_replace=index_dict, subset='prediction')
    predictions.select("label", "tweet_id", "tweet_text", "prediction").show()

In [7]:
#Define a Spark streaming context

ssc = StreamingContext(sc, 10)

In [8]:
#Define the socket of the text stream

lines = ssc.socketTextStream("seppe.net", 7778)

#Apply for each stream the above process function

lines.foreachRDD(process)

In [9]:
# Stream in real time and observe predictions

ssc_t = StreamingThread(ssc)
ssc_t.start()

+------+-------------------+--------------------+
| label|           tweet_id|          tweet_text|
+------+-------------------+--------------------+
|#biden|1397520262778740737|covid biden offic...|
+------+-------------------+--------------------+

+------+-------------------+--------------------+----------+
| label|           tweet_id|          tweet_text|prediction|
+------+-------------------+--------------------+----------+
|#biden|1397520262778740737|covid biden offic...|    #biden|
+------+-------------------+--------------------+----------+

+------+-------------------+--------------------+
| label|           tweet_id|          tweet_text|
+------+-------------------+--------------------+
|#biden|1397520191593058306|colbert shows how...|
+------+-------------------+--------------------+

+------+-------------------+--------------------+----------+
| label|           tweet_id|          tweet_text|prediction|
+------+-------------------+--------------------+----------+
|#biden|1

+------+-------------------+--------------------+----------+
| label|           tweet_id|          tweet_text|prediction|
+------+-------------------+--------------------+----------+
|#covid|1397521893763297282| borisjohnson at ...|    #china|
+------+-------------------+--------------------+----------+

+------+-------------------+--------------------+
| label|           tweet_id|          tweet_text|
+------+-------------------+--------------------+
|#covid|1397521840763850756| runs out of vacc...|
+------+-------------------+--------------------+

+------+-------------------+--------------------+----------+
| label|           tweet_id|          tweet_text|prediction|
+------+-------------------+--------------------+----------+
|#covid|1397521840763850756| runs out of vacc...|  #vaccine|
+------+-------------------+--------------------+----------+

+------+-------------------+--------------------+
| label|           tweet_id|          tweet_text|
+------+-------------------+---------

In [10]:
#Stop streaming

ssc_t.stop()

----- Stopping... this may take a few seconds -----
+------+-------------------+--------------------+
| label|           tweet_id|          tweet_text|
+------+-------------------+--------------------+
|#covid|1397522765519876100| cystic fibrosis ...|
+------+-------------------+--------------------+

+------+-------------------+--------------------+----------+
| label|           tweet_id|          tweet_text|prediction|
+------+-------------------+--------------------+----------+
|#covid|1397522765519876100| cystic fibrosis ...|    #covid|
+------+-------------------+--------------------+----------+

