## Trash bin Structured Stream Consumer

The consumer will use as a streaming source a folder of files (instead of a socket). To produce the input, start the Structured Streaming Twitter Producer, which will output a file of 1K lines containing the most recent Tweets, in JSON format (same as in the regular streaming exercise, only saved to a file instead of written to a socket). After starting the producer once, the file is ready and you can analyse it in the consumer code below (no need to restart the Producer a second time).

NOTE: if you want to work with more data than just this single file, you can later add code in the producer to generate new files from Tweets e.g. every 1 minute. Just make sure to add them in files in the /tmp/tweets folder, which Spark is monitoring as an input streaming source (see in the code below).

As a first step, fill in the TODOs and run the code snippets below using just the single file generated with the Producer as is.

In [None]:
from pyspark.sql.functions import *
import time

In [None]:
# Definitions

sensor_data_dir  = "/FileStore/tables/trash_bin/"
sensor_data_path = "/FileStore/tables/trash_bin/sz_sensor_data_stream.json"

In [None]:

# df   = spark.read.option("multiline", "true").json("/FileStore/tables/tweets/twitter_data.json")
# json_schema = df.schema
# df.printSchema()

df = spark.read.json(sensor_data_path)
json_schema = df.schema
df.printSchema()

In [None]:
# helper function to print a few dataframe statistics: count, top rows

def printDF(streamingDF):
  iter = 0
  while iter < 10:
    if(streamingDF.count() > 0):
      print("Number of entries in dataframe: "+ str(streamingDF.count()))
      streamingDF.show(20, False) # the parameter False prevents Spark from truncating the output
      iter += 1
    time.sleep(2)

In [None]:
# create the stream from the sensor_data_dir

streaming_input_df = (
    spark
    .readStream
    .schema(json_schema)
    .json(sensor_data_dir)
)

# You can check if this is streaming dataframe using the following:
streaming_input_df.isStreaming

In [None]:
# Location extraction

In [None]:
# variant a. compute aggregations manually, using SQL on the dataframe containing all streaming data

streaming_ETL_query = streaming_input_df \
  .select("id", "city", "point")\
  .writeStream \
  .format("memory") \
  .queryName("sensor_data_stream") \
  .outputMode("append")\
  .start()

In [None]:
# variant b. compute aggregations with Spark functions directly inside the stream definition

streaming_grouped_sensors = streaming_input_df \
   .groupBy( \
       streaming_input_df.id)\
    .count()\
    .orderBy(desc("count"))

# NOTE: in the groupBy above, the field to group by can be written either as : groupBy("user.location"), in which case the resulting DF contains a field "location"
# OR as: groupBy(streamingInputDF.user.location) in which case the resulting DF contains a field "user['location']"

query = streaming_grouped_sensors \
    .writeStream \
    .queryName("grouped_ids") \
    .format("memory") \
    .outputMode("complete") \
    .start()
  
streamingDF = spark.sql("select * from grouped_ids")

printDF(streamingDF)