In [1]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [2]:
sc

In [3]:
spark

In [4]:
from pyspark.streaming import StreamingContext
from pyspark.ml import PipelineModel

In [5]:
globals()['models_loaded'] = False
globals()['my_model'] = None

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()

     # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = PipelineModel.load('DTmodel') 
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model (uncomment below):
    
    df_result = globals()['my_model'].transform(df)
    df_result.show()

In [6]:
ssc = StreamingContext(sc, 10)

In [7]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [9]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+-------+-----+---------+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+----------+
| app_id|label|review_id|         review_text|              tokens|     filtered_tokens|            features|   rawPrediction|         probability|prediction|
+-------+-----+---------+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+----------+
|2008100|    1|139151055|                 yes|               [yes]|               [yes]|(127099,[456],[1.0])|[8900.0,58338.0]|[0.13236562658020...|       1.0|
|1304930|    1|139153587|                good|              [good]|              [good]|  (127099,[6],[1.0])|[8900.0,58338.0]|[0.13236562658020...|       1.0|
|1304930|    1|139153195|This game is awes...|[this, game, is, ...|[game, awesome, t...|(127099,[0,19,25,...|[8900.0,58338.0]|[0.13236562658020...|       1.0|
+-------+-----+---------+--------------------+

+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|2005010|    1|139153388|Nails the boomer ...|
|2005010|    1|139152729|rip, tear...for t...|
|2005010|    1|139152493|So as a disclaime...|
+-------+-----+---------+--------------------+

+-------+-----+---------+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+----------+
| app_id|label|review_id|         review_text|              tokens|     filtered_tokens|            features|   rawPrediction|         probability|prediction|
+-------+-----+---------+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+----------+
|2005010|    1|139153388|Nails the boomer ...|[nails, the, boom...|[nails, boomer, s...|(127099,[1,2,5,13...|[8900.0,58338.0]|[0.13236562658020...|       1.0|
|2005010|    1|139152729|rip, tear

In [10]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|2218750|    0|139153211|am i forced to eq...|
|2218750|    1|139152148|Fun and Addicting...|
|2218750|    1|139152080|Diablo 1 + Vampir...|
|2218750|    1|139152065|Must... kill........|
|2218750|    1|139151602|Vampire Survivors...|
+-------+-----+---------+--------------------+

+-------+-----+---------+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+----------+
| app_id|label|review_id|         review_text|              tokens|     filtered_tokens|            features|   rawPrediction|         probability|prediction|
+-------+-----+---------+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+----------+
|2218750|    0|139153211|am i forced to eq...|[