# Streaming tweets

In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

In [2]:
from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [3]:
sc

In [4]:
spark = SparkSession.builder \
    .getOrCreate()

In [5]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType

In [6]:
# Libraries
from pyspark import SparkContext  # To connect to spark
from pyspark.streaming import StreamingContext  # To do the the streaming
from pyspark.sql import SparkSession  # To connect to sql
from pyspark.sql import Row 
from threading import Thread
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, FloatType
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, IDFModel
import difflib as diff
# For modelling
from pyspark.ml.classification import LogisticRegressionModel, NaiveBayesModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np
import pandas as pd
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer

In [8]:
Tokenizer1 = Tokenizer(inputCol="tweet_text", outputCol="words") # separe words
sw = StopWordsRemover(inputCol="words", outputCol="filtered") # removes stop words
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000) 
loadedIDF = IDFModel.load(os.path.join(parent_path, 'IDF_train','stages','3_IDF_11dfbbf01f60'))
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label") #labels to numerics
pipeline = Pipeline(stages=[Tokenizer1, sw, hashingTF,loadedIDF, label_stringIdx])

In [8]:
from pyspark.ml import PipelineModel
model = PipelineModel.load(os.path.join(parent_path, 'IDF_train'))

In [9]:
globals()['models_loaded'] = False
globals()['predics_lr'] = []


def process(time, rdd):

    if not rdd.isEmpty():

        schema = StructType([])
        df1 = spark.createDataFrame(sc.emptyRDD(), schema)
        # Convert to data frame
        df = spark.read.json(rdd)

        df = df.withColumnRenamed("label", "category")

        dataset = model.transform(df)
        # dataset.show()
        if not globals()['models_loaded']:
            # load in your models here
            globals()['my_model'] = LogisticRegressionModel.load(
                "C:/Users/Marce/OneDrive - KU Leuven/Advanced Analytics/twt_pyspark_LRModel1/bestModel")
            #globals()['models_loaded'] = True
            # And then predict using the loaded model:
            df_result = globals()['my_model'].transform(dataset)
            df_lr = df_result.select(
                "tweet_text", "category", "label", "prediction")
            final_lr = df_lr.collect()
            df_lr.show()
            globals()['predics_lr'].extend(final_lr)

In [10]:
ssc = StreamingContext(sc, 2)

In [11]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

Let's start the streaming! 

4:05 pm

In [12]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+--------------------+--------+-----+----------+
|          tweet_text|category|label|prediction|
+--------------------+--------+-----+----------+
|Today's #███████ ...|#vaccine|  0.0|       0.0|
+--------------------+--------+-----+----------+

+--------------------+--------+-----+----------+
|          tweet_text|category|label|prediction|
+--------------------+--------+-----+----------+
|Minister Ashford ...|#vaccine|  0.0|       0.0|
+--------------------+--------+-----+----------+

+--------------------+--------+-----+----------+
|          tweet_text|category|label|prediction|
+--------------------+--------+-----+----------+
|Are you vaccinate...|#vaccine|  0.0|       0.0|
+--------------------+--------+-----+----------+

+--------------------+--------+-----+----------+
|          tweet_text|category|label|prediction|
+--------------------+--------+-----+----------+
|#███████ has agre...|#vaccine|  0.0|       1.0|
+--------------------+--------+-----+----------+

+---------------

+--------------------+--------+-----+----------+
|          tweet_text|category|label|prediction|
+--------------------+--------+-----+----------+
|@kaylayoungforwv ...|#vaccine|  0.0|       0.0|
+--------------------+--------+-----+----------+

+--------------------+--------+-----+----------+
|          tweet_text|category|label|prediction|
+--------------------+--------+-----+----------+
|Remember when @PO...|  #biden|  3.0|       0.0|
+--------------------+--------+-----+----------+

+--------------------+--------+-----+----------+
|          tweet_text|category|label|prediction|
+--------------------+--------+-----+----------+
|@SpeakerPelosi $c...|  #biden|  3.0|       1.0|
+--------------------+--------+-----+----------+

+--------------------+--------+-----+----------+
|          tweet_text|category|label|prediction|
+--------------------+--------+-----+----------+
|.@thetalentguru #...|#vaccine|  0.0|       1.0|
+--------------------+--------+-----+----------+

+---------------

+--------------------+--------+-----+----------+
|          tweet_text|category|label|prediction|
+--------------------+--------+-----+----------+
|@ThePowersThatBe ...|  #biden|  3.0|       1.0|
+--------------------+--------+-----+----------+

+--------------------+----------+-----+----------+
|          tweet_text|  category|label|prediction|
+--------------------+----------+-----+----------+
|Investors may be ...|#inflation|  5.0|       0.0|
+--------------------+----------+-----+----------+

+--------------------+----------+-----+----------+
|          tweet_text|  category|label|prediction|
+--------------------+----------+-----+----------+
|(Q2 2021) $COST $...|#inflation|  5.0|       5.0|
+--------------------+----------+-----+----------+

+--------------------+----------+-----+----------+
|          tweet_text|  category|label|prediction|
+--------------------+----------+-----+----------+
|Supply of $SIN de...|#inflation|  5.0|       1.0|
+--------------------+----------+-----

In [24]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----


#### checking the accuracy

In [25]:
pred_lr = pd.DataFrame(predics_lr, columns=[
                       "tweet_text", "category", "label", "prediction"])
con_tab_lr = pd.crosstab(
    pred_lr['label'], pred_lr['prediction'], margins=False)
print(con_tab_lr)

prediction  0.0  1.0  2.0  3.0  4.0  5.0
label                                   
0.0         429  208   43   14    2    0
1.0         231  544   81   20    6    1
2.0         170  177  408   31    6    2
3.0          91   90   60  104    4    0
4.0          28   48   21    6   99    0
5.0          41   49   19    3    1   23


In [26]:
pred_lr.to_csv("C:/Users/Marce/OneDrive - KU Leuven/Advanced Analytics/predictions_cv_1.csv", index=False)