In [14]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType
from datetime import datetime
from pyspark.ml.classification import LogisticRegression
from difflib import unified_diff
from pyspark.ml import PipelineModel
from pyspark.ml.feature import IndexToString
from itertools import chain
from pyspark.sql.functions import create_map, lit
import pyspark.sql.functions as F

In [15]:
path = '/home/edoardo/Desktop/University/KU Leuven/Advancedanalytics/ThirdAssignment/output'

In [16]:
from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [17]:
def make_diff(old, new):
    return '\n'.join([ l for l in unified_diff(old.split('\n'), new.split('\n')) if l.startswith('+') or l.startswith('-') ])

In [18]:
globals()['models_loaded'] = False


def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    
    # preprocessing
    diff_udf = F.udf(make_diff, StringType())
    df_withdiff = df.withColumn("diff", diff_udf(F.col("text_old"), F.col("text_new")))
    df_withip = df_withdiff.withColumn("ip", F.when((F.size(F.split(F.col("name_user"), r"\.")) - 1) == 4, 1)
                                               .when((F.size(F.split(F.col("name_user"), r"\:")) - 1) == 7, 1)
                                               .otherwise(0))
    df_nanComment = df_withip.withColumn("nan_comment", F.when(F.isnan(F.col("comment")) | F.isnull(F.col("comment")), 1)
                                            .otherwise(0))
    df = df_nanComment.select(['diff', 'ip', 'nan_comment', 'label'])
    
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = PipelineModel.load(f"{path}/pipelineModel")
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model: 
    df_result = globals()['my_model'].transform(df)
    meta = [f.metadata for f in df_result.schema.fields if f.name == "newlabel"]
    repl_dict = dict(enumerate(meta[0]["ml_attr"]["vals"]))
    mapping_expr = create_map([lit(x) for x in chain(*repl_dict.items())])
    df_result = df_result.withColumn('prediction', mapping_expr[df_result['prediction']])
    df_result.select(['label','prediction']).show()

In [19]:
ssc = StreamingContext(sc, 10)

In [20]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [21]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+------+----------+
| label|prediction|
+------+----------+
|  safe|      safe|
|unsafe|      safe|
|  safe|      safe|
+------+----------+

+-----+----------+
|label|prediction|
+-----+----------+
| safe|      safe|
| safe|      safe|
+-----+----------+



In [22]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
+------+----------+
| label|prediction|
+------+----------+
|unsafe|      safe|
|  safe|      safe|
+------+----------+

