In [1]:
## Author: Yam Jason

In [19]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("Spark")\
        .getOrCreate()

In [20]:
sc = spark.sparkContext
sc.addFile("../../de_classes/data_storage/hadoop_file_handler.py")


from hadoop_file_handler import HadoopFileHandler

handler = HadoopFileHandler()
df = handler.read_json('data/predictions/predictions3.json')


24/09/07 15:30:36 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [21]:
dataSchema = df.schema
dataSchema

StructType([StructField('Review', StringType(), True), StructField('prediction', DoubleType(), True)])

## Create the Data Stream

In [22]:
streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
  .json("data/predictions/predictions3.json")

In [23]:
activityCounts = streaming.groupBy(
    "Prediction").count()

In [24]:
# Set the shuffle partitions to a small value
spark.conf.set("spark.sql.shuffle.partitions", 5)

In [25]:
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
  .format("memory").outputMode("complete")\
  .start()

24/09/07 15:30:39 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b64d15ae-fe8e-4688-bb22-2e11a678a741. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/09/07 15:30:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [26]:
from time import sleep
for i in range(5):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(1)

+----------+-----+
|Prediction|count|
+----------+-----+
|       0.0|   16|
|       2.0|   89|
+----------+-----+

+----------+-----+
|Prediction|count|
+----------+-----+
|       0.0|   35|
|       2.0|  235|
+----------+-----+

+----------+-----+
|Prediction|count|
+----------+-----+
|       0.0|   53|
|       2.0|  342|
+----------+-----+

+----------+-----+
|Prediction|count|
+----------+-----+
|       0.0|   59|
|       2.0|  374|
+----------+-----+

+----------+-----+
|Prediction|count|
+----------+-----+
|       0.0|   59|
|       2.0|  374|
+----------+-----+



In [27]:
spark.streams.active

[<pyspark.sql.streaming.query.StreamingQuery at 0x7f0a2016a800>]

In [28]:
activityQuery.stop()

In [29]:
from pyspark.sql.functions import expr, when

# Filter where the Prediction is 2.0
positiveFilter = streaming.withColumn("Sentiment", when(expr("Prediction = 2.0"), "Positive"))\
  .where("Sentiment = 'Positive'")\
  .where("Prediction is not null")\
  .select("Review", "Sentiment")\
  .writeStream\
  .queryName("positive_filter")\
  .format("memory")\
  .outputMode("append")\
  .start()

# Wait for termination
#positiveFilter.awaitTermination()



24/09/07 15:30:46 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2ee562be-acd5-48cd-99ca-a578b851da7d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/09/07 15:30:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [30]:
# Note: this may take a while
for i in range(3):
    spark.sql("SELECT * FROM positive_filter").show()
    sleep(1)

+------+---------+
|Review|Sentiment|
+------+---------+
+------+---------+

+--------------------+---------+
|              Review|Sentiment|
+--------------------+---------+
|  has arrived thanks| Positive|
|good quality deli...| Positive|
|                best| Positive|
|             perfect| Positive|
|good packing very...| Positive|
|good good good go...| Positive|
|        good packing| Positive|
|                good| Positive|
|        ok good item| Positive|
|                good| Positive|
|lazada u should r...| Positive|
|                good| Positive|
|item received goo...| Positive|
|second time i bou...| Positive|
|fast shipment sel...| Positive|
|fast delivery goo...| Positive|
|        good product| Positive|
|seller s delivery...| Positive|
|good delivery goo...| Positive|
|                good| Positive|
+--------------------+---------+
only showing top 20 rows

+--------------------+---------+
|              Review|Sentiment|
+--------------------+---------+
|  has

In [31]:
positiveFilter.stop()

In [32]:
from pyspark.sql.functions import expr, when

# Filter where the Prediction is 2.0
negativeFilter = streaming.withColumn("Sentiment", when(expr("Prediction = 0.0"), "Negative"))\
  .where("Sentiment = 'Negative'")\
  .where("Prediction is not null")\
  .select("Review", "Sentiment")\
  .writeStream\
  .queryName("negative_filter")\
  .format("memory")\
  .outputMode("append")\
  .start()

# Wait for termination
#positiveFilter.awaitTermination()

24/09/07 15:30:49 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-1d8fa11f-92a8-42dc-ae4e-2dc4057b05d7. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/09/07 15:30:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [33]:
# Note: this may take a while
for i in range(3):
    spark.sql("SELECT * FROM negative_filter").show()
    sleep(1)

+------+---------+
|Review|Sentiment|
+------+---------+
+------+---------+

+--------------------+---------+
|              Review|Sentiment|
+--------------------+---------+
|packs black are r...| Negative|
|tng ewallet auto ...| Negative|
|quality no so goo...| Negative|
|        poor quality| Negative|
|dark red become pink| Negative|
|receive good cond...| Negative|
|                thin| Negative|
|ear strings broke...| Negative|
|bought brand mask...| Negative|
|received quantity...| Negative|
|very disappointed...| Negative|
|good use thin com...| Negative|
|wrong colour but ...| Negative|
|             not bad| Negative|
|doesnt match vide...| Negative|
|i bought each col...| Negative|
|ear strings broke...| Negative|
|your mask not goo...| Negative|
|second time i ord...| Negative|
|   time repeat order| Negative|
+--------------------+---------+
only showing top 20 rows

+--------------------+---------+
|              Review|Sentiment|
+--------------------+---------+
|packs

In [34]:
negativeFilter.stop()

In [36]:
spark.stop()