In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import from_json, col, expr,udf
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml import Pipeline
import textwrap


In [2]:

def print_output(df, epoch_id):
    df.select("review_text", "label", "probability", "prediction") \
        .withColumn("review_text", df.review_text.substr(0, 35)) \
        .show(truncate=False)


In [3]:
spark = SparkSession.builder \
    .appName("StructuredStreamingPrediction") \
    .getOrCreate()

In [4]:
# Define the schema of the input JSON data
schema = StructType([
    StructField("review_text", StringType(), True),
    StructField("label", DoubleType(), True),
    # Add more fields as needed
])


In [5]:
sc

In [6]:
streaming_data = spark.readStream \
    .format("socket") \
    .option("host", "seppe.net") \
    .option("port", "7778") \
    .load()

In [7]:
# Deserialize the JSON input data using the defined schema
input_data = streaming_data.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.review_text","data.label")


In [8]:
# Preprocess data
tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
stopwords = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered_words")
hashingTF = HashingTF(numFeatures=1000, inputCol=stopwords.getOutputCol(), outputCol="features")
pipeline = Pipeline(stages=[tokenizer, stopwords, hashingTF])

In [9]:
preprocessing_pipeline = Pipeline(stages=[tokenizer, stopwords, hashingTF])


In [10]:
preprocessed_data = preprocessing_pipeline.fit(input_data).transform(input_data)


In [11]:
model_path = "Model1"
model = LogisticRegressionModel.load(model_path)

In [12]:
predictions = model.transform(preprocessed_data)


In [15]:
query = predictions.writeStream \
    .foreachBatch(print_output) \
    .start()

+-----------+-----+-----------+----------+
|review_text|label|probability|prediction|
+-----------+-----+-----------+----------+
+-----------+-----+-----------+----------+

+-----------------------------------+-----+------------------------------------------+----------+
|review_text                        |label|probability                               |prediction|
+-----------------------------------+-----+------------------------------------------+----------+
|An awesome boomer shooter made by a|1.0  |[2.3888943421570668E-4,0.9997611105657843]|1.0       |
+-----------------------------------+-----+------------------------------------------+----------+

+----------------------+-----+------------------------------------------+----------+
|review_text           |label|probability                               |prediction|
+----------------------+-----+------------------------------------------+----------+
|interesting logic game|1.0  |[3.5483094660843304E-5,0.9999645169053392]|1.0     

+--------------------------------+-----+-----------------------------------------+----------+
|review_text                     |label|probability                              |prediction|
+--------------------------------+-----+-----------------------------------------+----------+
|Best played with friends. Scary!|1.0  |[9.670497769426561E-4,0.9990329502230574]|1.0       |
+--------------------------------+-----+-----------------------------------------+----------+

+-----------+-----+-----------------------------------------+----------+
|review_text|label|probability                              |prediction|
+-----------+-----+-----------------------------------------+----------+
|W          |1.0  |[5.331118278012236E-8,0.9999999466888172]|1.0       |
+-----------+-----+-----------------------------------------+----------+



In [16]:
query.stop()