In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

In [2]:
from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [3]:
sc

In [4]:
spark

In [5]:
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import IntegerType

In [6]:
data = sc.textFile('file:///C:/Users/lenne/Desktop/spark/coding_and_data/data/lots_of_data/tweets')
data.first()



In [7]:
df = spark.read.json(data)
df.show()

+--------+-------------------+--------------------+
|   label|           tweet_id|          tweet_text|
+--------+-------------------+--------------------+
|  #biden|1380150333767262211|#███████ bubbles ...|
|  #biden|1380150113650274308|Nothing to see he...|
|#vaccine|1380150618275389448|Well that was qui...|
|#vaccine|1380150554974904321|Good morning, Twi...|
|#vaccine|1380150530526306314|Here for my secon...|
|#vaccine|1380150486339366928|You don't have a ...|
|#vaccine|1380150434208358402|Because only cert...|
|#vaccine|1380150386863013894|#███████
#███████...|
|#vaccine|1380150373399339009|Proud to see thou...|
|  #covid|1380150776299937795|The pandemic is n...|
|  #covid|1380150698948575232|Tandon's next Emp...|
|  #covid|1380150671123623942|2 days virtual Co...|
|  #covid|1380150657823469573|Toronto ICU doc: ...|
|  #covid|1380150622796742658|India records 4th...|
|  #covid|1380150957300969473|When #███████ hit...|
|  #covid|1380150954503381000|With low literacy...|
|  #covid|13

In [8]:
from pyspark.sql.functions import when,regexp_replace
df2 = df.withColumn("label", when(df.label == "#biden",1)
                                 .when(df.label == "#inflation",2)
                    .when(df.label == "#china",3)
                    .when(df.label == "#stopasianhate",4)
                    .when(df.label == "#covid",5)
                    .when(df.label == "#vaccine",6)
                                 .when(df.label.isNull() ,"")
                                 .otherwise(df.label))
df2 = df2.withColumn('tweet_text', regexp_replace('tweet_text', r'[#@][^\s#@]+', ''))
df2.show(20)

+-----+-------------------+--------------------+
|label|           tweet_id|          tweet_text|
+-----+-------------------+--------------------+
|    1|1380150333767262211| bubbles under su...|
|    1|1380150113650274308|Nothing to see he...|
|    6|1380150618275389448|Well that was qui...|
|    6|1380150554974904321|Good morning, Twi...|
|    6|1380150530526306314|Here for my secon...|
|    6|1380150486339366928|You don't have a ...|
|    6|1380150434208358402|Because only cert...|
|    6|1380150386863013894|





Ocugen: Pot...|
|    6|1380150373399339009|Proud to see thou...|
|    5|1380150776299937795|The pandemic is n...|
|    5|1380150698948575232|Tandon's next Emp...|
|    5|1380150671123623942|2 days virtual Co...|
|    5|1380150657823469573|Toronto ICU doc: ...|
|    5|1380150622796742658|India records 4th...|
|    5|1380150957300969473|When  hit,  saw t...|
|    5|1380150954503381000|With low literacy...|
|    5|1380150949264695304|Republican Lawmak...|
|    5|1380150948895

In [9]:
#We Tokenize the tweet texts

from pyspark.ml.feature import  Tokenizer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import  IDF
final_data = df2.withColumn("label", df2["label"].cast(IntegerType()))

tokenizer = Tokenizer(inputCol="tweet_text", outputCol="tweet_text_tokens")
data_words = tokenizer.transform(final_data)
data_words.show()

+-----+-------------------+--------------------+--------------------+
|label|           tweet_id|          tweet_text|   tweet_text_tokens|
+-----+-------------------+--------------------+--------------------+
|    1|1380150333767262211| bubbles under su...|[, bubbles, under...|
|    1|1380150113650274308|Nothing to see he...|[nothing, to, see...|
|    6|1380150618275389448|Well that was qui...|[well, that, was,...|
|    6|1380150554974904321|Good morning, Twi...|[good, morning,, ...|
|    6|1380150530526306314|Here for my secon...|[here, for, my, s...|
|    6|1380150486339366928|You don't have a ...|[you, don't, have...|
|    6|1380150434208358402|Because only cert...|[because, only, c...|
|    6|1380150386863013894|





Ocugen: Pot...|[, , , , , , ocug...|
|    6|1380150373399339009|Proud to see thou...|[proud, to, see, ...|
|    5|1380150776299937795|The pandemic is n...|[the, pandemic, i...|
|    5|1380150698948575232|Tandon's next Emp...|[tandon's, next, ...|
|    5|1380150671123

In [10]:
count = CountVectorizer(inputCol="tweet_text_tokens", outputCol="count_vectors")
count_model = count.fit(data_words)
data_count_vectors = count_model.transform(data_words)
data_count_vectors.show()

+-----+-------------------+--------------------+--------------------+--------------------+
|label|           tweet_id|          tweet_text|   tweet_text_tokens|       count_vectors|
+-----+-------------------+--------------------+--------------------+--------------------+
|    1|1380150333767262211| bubbles under su...|[, bubbles, under...|(4102,[0,2,6,7,30...|
|    1|1380150113650274308|Nothing to see he...|[nothing, to, see...|(4102,[0,1,2,4,6,...|
|    6|1380150618275389448|Well that was qui...|[well, that, was,...|(4102,[0,12,13,33...|
|    6|1380150554974904321|Good morning, Twi...|[good, morning,, ...|(4102,[5,11,28,42...|
|    6|1380150530526306314|Here for my secon...|[here, for, my, s...|(4102,[0,1,5,7,8,...|
|    6|1380150486339366928|You don't have a ...|[you, don't, have...|(4102,[0,2,5,6,11...|
|    6|1380150434208358402|Because only cert...|[because, only, c...|(4102,[0,2,3,5,11...|
|    6|1380150386863013894|





Ocugen: Pot...|[, , , , , , ocug...|(4102,[0,6,31,61,...|

In [27]:
idf = IDF(inputCol="count_vectors", outputCol="features")
idf_model = idf.fit(data_count_vectors)
idf_data = idf_model.transform(data_count_vectors)

preprocessed_data = idf_data.select("label", "features")
preprocessed_data = preprocessed_data.withColumn("label", preprocessed_data["label"].cast(IntegerType()))
preprocessed_data.show()

train_set, test_set = preprocessed_data.randomSplit([0.75,0.25],0)

#pandas_df = preprocessed_data.toPandas()
#print(pandas_df)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    1|(4102,[0,2,6,7,30...|
|    1|(4102,[0,1,2,4,6,...|
|    6|(4102,[0,12,13,33...|
|    6|(4102,[5,11,28,42...|
|    6|(4102,[0,1,5,7,8,...|
|    6|(4102,[0,2,5,6,11...|
|    6|(4102,[0,2,3,5,11...|
|    6|(4102,[0,6,31,61,...|
|    6|(4102,[0,2,5,7,9,...|
|    5|(4102,[0,1,2,3,4,...|
|    5|(4102,[0,5,6,15,1...|
|    5|(4102,[0,12,26,12...|
|    5|(4102,[0,1,2,3,4,...|
|    5|(4102,[0,1,2,3,4,...|
|    5|(4102,[0,1,3,4,5,...|
|    5|(4102,[0,1,2,4,10...|
|    5|(4102,[0,7,489,18...|
|    5|(4102,[0,1,2,5,6,...|
|    5|(4102,[0,1,2,4,5,...|
|    5|(4102,[0,473,771,...|
+-----+--------------------+
only showing top 20 rows



In [28]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np

log_model = LogisticRegression(maxIter = 100)
#elasticnetPar = 0 : ridge , elasticnetPar = 1 : lasso  regpar = lambda 
grid = ParamGridBuilder().addGrid(log_model.regParam, np.linspace(0, 100,5)).addGrid(log_model.elasticNetParam, [1]).build()
cross_val = CrossValidator(estimator=log_model,
                          estimatorParamMaps=grid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds= 2)  
cross_val = cross_val.fit(train_set)

In [29]:
best_model = cross_val.bestModel
print(best_model)

LogisticRegressionModel: uid=LogisticRegression_7ad9eebcf013, numClasses=7, numFeatures=4102


In [30]:
pred = best_model.transform(test_set)
pred.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    1|(4102,[0,1,2,3,4,...|[-4.5290954043510...|[4.19404397101991...|       6.0|
|    2|(4102,[0,1,2,3,4,...|[-4.0324531998479...|[3.46018048246986...|       6.0|
|    2|(4102,[0,1,2,3,5,...|[-4.0778927836054...|[5.31337449709482...|       5.0|
|    2|(4102,[0,4,6,7,8,...|[-4.2140712230301...|[1.70555264104222...|       5.0|
|    3|(4102,[0,1,2,3,4,...|[-4.0945175511037...|[2.81315466286097...|       3.0|
|    3|(4102,[0,1,2,3,4,...|[-5.0544506988865...|[5.35027631699152...|       5.0|
|    3|(4102,[0,1,2,3,4,...|[-4.4000560704736...|[3.00549271454315...|       3.0|
|    3|(4102,[0,1,2,4,5,...|[-4.5787365590111...|[3.39241221339827...|       3.0|
|    3|(4102,[0,1,2,4,6,...|[-4.7701134905650...|[1.32201678093548...|       6.0|
|    3|(4102,[0,

In [31]:
pred.groupBy('label','prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       6.0|    3|
|    3|       1.0|    4|
|    5|       5.0|   15|
|    6|       6.0|   22|
|    1|       5.0|    2|
|    6|       5.0|    9|
|    3|       3.0|    9|
|    3|       5.0|   10|
|    2|       2.0|    1|
|    1|       1.0|    2|
|    6|       1.0|    1|
|    5|       6.0|   12|
|    3|       6.0|   10|
|    2|       6.0|    3|
|    2|       5.0|    2|
|    6|       3.0|    3|
|    4|       6.0|    2|
+-----+----------+-----+



In [38]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Create both evaluators
evaluatorMulti = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Make predicitons
predictionAndTarget = best_model.transform(test_set).select("label", "prediction")

# Get metrics
Accuracy = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
WeightedF1 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedFMeasure"})
weightedPrecision = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedPrecision"})
weightedRecall = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "weightedRecall"})

print("Weighted recall = %s" % weightedRecall)
print("Weighted precision = %s" % weightedPrecision)
print("Weighted F1 Score = %s" % WeightedF1)
print("Accuracy = %s" % Accuracy)

Weighted recall = 0.44545454545454544
Weighted precision = 0.5292326094957674
Weighted F1 Score = 0.4279724872828321
Accuracy = 0.44545454545454544


In [39]:
count_model.write().overwrite().save('countvectorizer')
idf_model.write().overwrite().save('tf-idf')
best_model.write().overwrite().save('log_regression')

In [40]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType
 

In [41]:
globals()['models_loaded'] = False
globals()['my_model'] = None

from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml.classification import LogisticRegressionModel
from  pyspark.ml.feature import CountVectorizerModel
from pyspark.ml.feature import  IDFModel 

    
def process(time, rdd):
    if rdd.isEmpty():
        return
    
  
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_log_model'] = LogisticRegressionModel.load('log_regression')
        globals()['my_count_vectoriser'] = CountVectorizerModel.load('countvectorizer')
        globals()['my_tf_idf'] = IDFModel.load('tf-idf')        # Replace '***' with:    [...].load('my_logistic_regression')
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model: 
    
    token = tokenizer.transform(df)
    count = my_count_vectoriser.transform(token)
    
    idf = my_tf_idf.transform(count)
    idf_select = idf.select("label", "features")
    predict = my_log_model.transform(idf_select)
    predict_names = predict.withColumn("prediction", when(predict.prediction == 1, '#biden')
                                 .when(predict.prediction == 2,'#inflation')
                    .when(predict.prediction == 3,'#china')
                    .when(predict.prediction == 4,"#stopasianhate")
                    .when(predict.prediction == 5,"#covid")
                    .when(predict.prediction == 6,"#vaccine")
                                 .when(predict.prediction.isNull() ,"")
                                 .otherwise(predict.prediction))
    predict_names.show() 



In [42]:
ssc = StreamingContext(sc, 10)

In [43]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [44]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+------+-------------------+--------------------+
| label|           tweet_id|          tweet_text|
+------+-------------------+--------------------+
|#covid|1387699178638356481|The pandemic has ...|
|#covid|1387699166906888193|Join us on 18 May...|
+------+-------------------+--------------------+

+------+--------------------+--------------------+--------------------+----------+
| label|            features|       rawPrediction|         probability|prediction|
+------+--------------------+--------------------+--------------------+----------+
|#covid|(4102,[1,2,5,11,1...|[-4.0291202056636...|[3.58548742211234...|    #covid|
|#covid|(4102,[0,1,2,7,9,...|[-4.0289907259607...|[7.15337731391432...|  #vaccine|
+------+--------------------+--------------------+--------------------+----------+

+--------------+-------------------+--------------------+
|         label|           tweet_id|          tweet_text|
+--------------+-------------------+--------------------+
|#stopasianhate|1387699324

In [45]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
+--------+-------------------+---------------------+
|   label|           tweet_id|           tweet_text|
+--------+-------------------+---------------------+
|#vaccine|1387699759209541632|WTACH VIDEO：Nearl...|
+--------+-------------------+---------------------+

+--------+--------------------+--------------------+--------------------+----------+
|   label|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+--------------------+--------------------+----------+
|#vaccine|(4102,[4,61,270,3...|[-3.5902960378938...|[2.19283957034341...|  #vaccine|
+--------+--------------------+--------------------+--------------------+----------+

