In [1]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [2]:
sc

In [3]:
spark

In [4]:
import random
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType
from pyspark.ml import PipelineModel
from pyspark.sql.functions import regexp_replace, lower
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF
from pyspark.sql.functions import regexp_replace, lower
import os
from pyspark.ml.feature import CountVectorizerModel, IDFModel
from pyspark.ml.classification import LogisticRegressionModel

In [5]:
globals()['models_loaded'] = False
globals()['my_model'] = None



from pyspark.ml import PipelineModel
from pyspark.sql.functions import regexp_replace, lower
import pandas as pd
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF



# Toy predict function that returns a random probability. Normally you'd use your loaded globals()['my_model'] here
#def predict(df):
#    lr_model = LogisticRegressionModel.load('lr_model')
#    prediction = lr_model.transform(df)
#    return prediction

#predict_udf = udf(predict, StringType())



def preprocess(df, column):
    df = df.withColumn('review_text_cleaned', lower(regexp_replace(column, '[^\\sa-zA-Z0-9]', '')))
    df = df.na.fill('null')

    # Tokenize the review_text_cleaned column
    tokenizer = Tokenizer(inputCol='review_text_cleaned', outputCol='words')
    df_tokenized = tokenizer.transform(df)

    # Term frequency
    #loaded_cv_model = PipelineModel.load('cv_model')
    loaded_cv_model = CountVectorizerModel.load('cv_model')
    featurizedData = loaded_cv_model.transform(df_tokenized)

    # Compute IDF vectors
    idfModel = IDFModel.load('idf_model')
    rescaledData = idfModel.transform(featurizedData)

    return rescaledData



def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    
    
    df = preprocess(df, 'review_text')
    
    # Utilize our predict function
    #df_withpreds = df.withColumn("pred", predict_udf(
    #    struct([df[x] for x in df.columns])
    #))
    #df_withpreds.show()
    
    # Normally, you wouldn't use a UDF (User Defined Function) Python function to predict as we did here (you can)
    # but an MLlib model you've built and saved with Spark
    # In this case, you need to prevent loading your model in every call to "process" as follows:
    
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = LogisticRegressionModel.load('lr_model') # Replace '***' with:    [...].load('my_logistic_regression')
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model (uncomment below):
    
    df_result = globals()['my_model'].transform(df)
    df_result.select('app_id','label','review_id','review_text','probability','prediction').show()

In [6]:
ssc = StreamingContext(sc, 10)

In [7]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [8]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+-------+-----+---------+--------------------+-----------+----------+
| app_id|label|review_id|         review_text|probability|prediction|
+-------+-----+---------+--------------------+-----------+----------+
|2348650|    1|138954943|The game has a we...|  [0.0,1.0]|       1.0|
|1062810|    1|138953891|Purchased the gam...|  [1.0,0.0]|       0.0|
|1062810|    1|138953878|This game is fant...|  [0.0,1.0]|       1.0|
+-------+-----+---------+--------------------+-----------+----------+

+-------+-----+---------+--------------------+--------------------+----------+
| app_id|label|review_id|         review_text|         probability|prediction|
+-------+-----+---------+--------------------+--------------------+----------+
|2141910|    1|138954284|Glad to finally h...|[1.35689366671035...|       1.0|
|2423650|    1|138954520|Simple game gotte...|[5.86676168327464...|       1.0|
+-------+-----+---------+--------------------+--------------------+----------+

+-------+-----+---------+---------

In [9]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
