In [13]:
import threading

# Helper thread to avoid the Spark StreamingContext from blocking Jupyter
        
class StreamingThread(threading.Thread):
    def __init__(self, ssc):
        super().__init__()
        self.ssc = ssc
    def run(self):
        self.ssc.start()
        self.ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

sc

In [14]:
spark

In [15]:
import random
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType

The following section corresponds to loading the collected data set 

In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import os
import pandas as pd

# Specify the directory path where the JSON files are located
directory = '/Users/davidfrost/Documents/analytics/a3/spark/notebooks/data' # Add path

# Define the schema for the JSON files
schema = StructType([
    StructField("review_id", StringType(), True),
    StructField("app_id", StringType(), True),
    StructField("review_text", StringType(), True),
    StructField("label", IntegerType(), True),
    # Add more fields as needed
])

for filename in os.listdir(directory):
    if not filename.endswith('.json'):
        new_filename = os.path.join(directory, filename + '.json')
        old_filepath = os.path.join(directory, filename)
        new_filepath = os.path.join(directory, new_filename)
        
        # Check if the destination file already exists
        if not os.path.exists(new_filepath):
            os.rename(old_filepath, new_filepath)
        
# Get a list of JSON file paths excluding _SUCCESS files and empty files
file_paths = []
for filename in os.listdir(directory):
    if filename.endswith('.json') and filename != '_SUCCESS':
        file_path = os.path.join(directory, filename)
        if os.path.getsize(file_path) > 0:  # Check if the file is not empty
            file_paths.append(file_path)

# Read JSON files from the directory into a DataFrame with the specified schema
df = spark.read.schema(schema).json(file_paths)
df.show()

                                                                                

+---------+-------+--------------------+-----+
|review_id| app_id|         review_text|label|
+---------+-------+--------------------+-----+
|138827659| 824600|Very fun boomer s...|    1|
|138825357| 824600|I kept squeezing ...|    1|
|138826496|2412700|Good game, some m...|    1|
|138828398|1268750|I'M DOING MY PART...|    1|
|138828252|1268750|Honestly fun and ...|    1|
|138828203|1268750|I'm playing my fa...|    1|
|138827799|1268750|it I'm part doing!!!|    1|
|138827540|1268750|Do you like Stars...|    1|
|138827272|1268750|My review of this...|    1|
|138827016|1268750|COME ON YOU APES!...|    1|
|138826983|1268750|I don't understan...|    0|
|138826753|1268750|   I'M DOING MY PART|    1|
|138827015|1724770|An Epic Tower Def...|    1|
|138826634|1724770|Tower Defence is ...|    1|
|138827455|1934780|Really good game,...|    1|
|138827269|2370310|A laid-back but s...|    1|
|138828505|1304930|Considering that ...|    1|
|138828184|1304930|I can now shit my...|    1|
|138828143|13

In [17]:
# Total observations
num_observations = df.count()
print("Number of observations:", num_observations)

Number of observations: 1868


In this section, the model training is performed

In [18]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Split the data into training and test sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Define the stages of the pipeline
tokenizer = Tokenizer(inputCol="review_text", outputCol="tokens")
stopwords = StopWordsRemover.loadDefaultStopWords("english")
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens", stopWords=stopwords)
cv = CountVectorizer(inputCol="filtered_tokens", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Create a pipeline
pipeline = Pipeline(stages=[tokenizer, remover, cv, idf, lr])

# Train the model
logreg = pipeline.fit(train_data)

                                                                                

23/05/27 19:39:36 ERROR LBFGS: Failure! Resetting history: breeze.optimize.StepSizeUnderflow: 


                                                                                

23/05/27 19:39:54 ERROR LBFGS: Failure! Resetting history: breeze.optimize.FirstOrderException: Line search failed


                                                                                

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Make predictions on the test dataset
y_pred = logreg.transform(test_data)

# BinaryClassificationEvaluator
binary_evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")

# MulticlassClassificationEvaluator
multiclass_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")

# Evaluate the model and obtain the evaluation metric
accuracy = multiclass_evaluator.evaluate(y_pred, {multiclass_evaluator.metricName: "accuracy"})
precision = multiclass_evaluator.evaluate(y_pred, {multiclass_evaluator.metricName: "weightedPrecision"})
recall = multiclass_evaluator.evaluate(y_pred, {multiclass_evaluator.metricName: "weightedRecall"})
f1_score = multiclass_evaluator.evaluate(y_pred, {multiclass_evaluator.metricName: "f1"})
area_under_roc = binary_evaluator.evaluate(y_pred, {binary_evaluator.metricName: "areaUnderROC"})

# Print the metrics results
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1_score}")
print(f"Area Under ROC: {area_under_roc}")

                                                                                

Accuracy: 0.8027027027027027
Precision: 0.8460758607597941
Recall: 0.8027027027027026
F1 Score: 0.8170861314064869
Area Under ROC: 0.8333579626619378


Storing the model 

In [20]:
model_path = "/Users/davidfrost/Documents/analytics/a3/spark/notebooks/model" # Add path
logreg.write().overwrite().save(model_path)

Loading the model

In [21]:
from pyspark.ml import PipelineModel

logreg = PipelineModel.load("/Users/davidfrost/Documents/analytics/a3/spark/notebooks/model") # Add path

Getting streamed data

In [22]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType

In [23]:
socketDF = spark.readStream.format("socket").option("host", "seppe.net").option("port", 7778).load()
socketDF.printSchema()

23/05/27 19:40:38 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
root
 |-- value: string (nullable = true)



Performing predictions 

In [24]:
from pyspark.sql.functions import schema_of_json, from_json

    # Defining process function
def process_row(df, epoch_id):
    print(epoch_id)
    if df.count() == 0:
        return
    
    schema = schema_of_json(df.first().value)
    df_cols = df.selectExpr('CAST(value AS STRING)') \
        .select(from_json('value', schema).alias('temp')) \
        .select('temp.*')
    
    df_cols.show()
    
    # Apply the Pipeline model to the DataFrame and make predictions
    predictions = logreg.transform(df_cols)
    predictions = predictions.select('app_id','label','review_id','review_text','prediction')
    
    # Process the predictions
    predictions.show()

In [25]:
query = socketDF.writeStream.trigger(processingTime='5 seconds').foreachBatch(
    lambda df, epoch_id: process_row(df, epoch_id)).start()

23/05/27 19:40:38 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/wx/hpntqq5547q8b335qs5sgvcc0000gn/T/temporary-d287cf90-29ed-4429-8559-72563be0bc0a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/05/27 19:40:38 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
0
1
+------+-----+---------+--------------------+
|app_id|label|review_id|         review_text|
+------+-----+---------+--------------------+
|855740|    1|139088525|Are ya winning, s...|
+------+-----+---------+--------------------+

+------+-----+---------+--------------------+----------+
|app_id|label|review_id|         review_text|prediction|
+------+-----+---------+--------------------+----------+
|855740|    1

13
+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|1062810|    1|139090737|I highly recommen...|
+-------+-----+---------+--------------------+

+-------+-----+---------+--------------------+----------+
| app_id|label|review_id|         review_text|prediction|
+-------+-----+---------+--------------------+----------+
|1062810|    1|139090737|I highly recommen...|       1.0|
+-------+-----+---------+--------------------+----------+

14
+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|1062810|    0|139090659|Bad things first....|
|1062810|    0|139090472|I don't think I'v...|
+-------+-----+---------+--------------------+

+-------+-----+---------+--------------------+----------+
| app_id|label|review_id|         review_text|prediction|
+-------+-----+---------+--------------------+----------+
|1062810| 

In [26]:
query.stop()

220
+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|1581480|    0|139092311|Sadly i refunded....|
+-------+-----+---------+--------------------+

+-------+-----+---------+--------------------+----------+
| app_id|label|review_id|         review_text|prediction|
+-------+-----+---------+--------------------+----------+
|1581480|    0|139092311|Sadly i refunded....|       0.0|
+-------+-----+---------+--------------------+----------+

221
+-------+-----+---------+--------------------+
| app_id|label|review_id|         review_text|
+-------+-----+---------+--------------------+
|1581480|    1|139091467|very good puzzle ...|
+-------+-----+---------+--------------------+

+-------+-----+---------+--------------------+----------+
| app_id|label|review_id|         review_text|prediction|
+-------+-----+---------+--------------------+----------+
|1581480|    1|139091467|very good puzzle ...|       1.

:)