### Structured Streaming
  
  
* Kafka 
* Aggregations
* Time windows
* Watermarking
* Joins

In [0]:
import pyspark.sql.functions as F

##### NB: the following Kafka server seems to be taken down as of May 2023
Please start the code executions from cell 16

In [0]:
# for getting data from Kafka (or other distributed log systems), we need minimum 2 things:
# the server
# the topic

kafka_server = "server2.databricks.training:9092"   # US (Oregon)

wiki_df = (spark.readStream                        # Get the DataStreamReader
  .format("kafka")                                 # Specify the source format as "kafka"
  .option("kafka.bootstrap.servers", kafka_server) # Configure the Kafka server name and port
  .option("subscribe", "en")                       # Subscribe to the "en" Kafka topic - edits of English wikipedia pages
  .option("startingOffsets", "earliest")           # The start point when a query is started
  .option("maxOffsetsPerTrigger", 100)             # Rate limit on max offsets per trigger interval
  .load()                                          # Load the DataFrame
)

In [0]:
display(wiki_df, streamName = "wiki_raw_stream")

#key - the data key. Used in state machines, not useful in this case
#value - the data, in binary format. This is our JSON payload. We'll need to cast it to STRING.
#topic - the topic we are subscribing to
#partition - partition. This server only has one partition.
#offset - the offset value. This is per topic, partition, and consumer group
#timestamp - the timestamp
#timestampType - whether timestamp is created time or log append time

In [0]:
# let's have a look at the JSON payload

from pyspark.sql.types import StringType

wiki_payload_df = (wiki_df
                  .select(F.col("value").cast(StringType()))
                  )

display(wiki_payload_df,streamName="wiki_payload_stream")

In [0]:
# let's create a schema for navigating the JSON payload
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType

schema = StructType([
  StructField("channel", StringType(), True),
  StructField("comment", StringType(), True),
  StructField("delta", IntegerType(), True),
  StructField("flag", StringType(), True),
  StructField("geocoding", StructType([                 # (OBJECT): Added by the server, field contains IP address geocoding information for anonymous edit.
    StructField("city", StringType(), True),
    StructField("country", StringType(), True),
    StructField("countryCode2", StringType(), True),
    StructField("countryCode3", StringType(), True),
    StructField("stateProvince", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
  ]), True),
  StructField("isAnonymous", BooleanType(), True),      # (BOOLEAN): Whether or not the change was made by an anonymous user
  StructField("isNewPage", BooleanType(), True),
  StructField("isRobot", BooleanType(), True),
  StructField("isUnpatrolled", BooleanType(), True),
  StructField("namespace", StringType(), True),         # (STRING): Page's namespace. See https://en.wikipedia.org/wiki/Wikipedia:Namespace 
  StructField("page", StringType(), True),              # (STRING): Printable name of the page that was edited
  StructField("pageURL", StringType(), True),           # (STRING): URL of the page that was edited
  StructField("timestamp", StringType(), True),         # (STRING): Time the edit occurred, in ISO-8601 format
  StructField("url", StringType(), True),
  StructField("user", StringType(), True),              # (STRING): User who made the edit or the IP address associated with the anonymous editor
  StructField("userURL", StringType(), True),
  StructField("wikipediaURL", StringType(), True),
  StructField("wikipedia", StringType(), True),         # (STRING): Short name of the Wikipedia that was edited (e.g., "en" for the English)
])

In [0]:
# Now we can use "from_json" to parse out the message and provide schema

wiki_json_df = (wiki_payload_df
                .select(F.from_json("value", schema).alias("json"))
               )

display(wiki_json_df,streamName="wiki_json_stream")

In [0]:
# let's get only the anonymous article edits which have a location

wiki_anon_df = (wiki_json_df
  .select(F.col("json.wikipedia").alias("wikipedia"),      # Promoting from sub-field to column
          F.col("json.isAnonymous").alias("isAnonymous"),  
          F.col("json.namespace").alias("namespace"),
          F.col("json.page").alias("page"),
          F.col("json.pageURL").alias("pageURL"),
          F.col("json.geocoding").alias("geocoding"),
          F.col("json.user").alias("user"),
          F.col("json.timestamp").cast("timestamp"))       # Promoting and converting to a timestamp
  .filter(F.col("namespace") == "article")                 # Limit result to just articles
  .filter((F.col("geocoding.countryCode3").isNotNull()))        # We only want results that are geocoded
)

display(wiki_anon_df)

In [0]:
wiki_loc_df = (wiki_anon_df
  .groupBy("geocoding.countryCode3") # Aggregate by country (code)
  .count()                           # Produce a count of each aggregate
)
display(wiki_loc_df, streamName = "location_stream")

In [0]:
(wiki_loc_df
 .writeStream
 .format("delta")
 .outputMode("complete") # complete overwrite on every trigger
 .option("checkpointLocation", "/tmp/wikiloc/checkpoint")
 .table("wikiloc")
)

In [0]:
display(spark.table("wikiloc"))

### Time windows

In [0]:
# commonly, you might not want an aggregation of a stream's whole history.
# for this purpose, let's use window from functions - NB this is "time windows" not "SQL-like window function"

wiki_loc_window_df = (wiki_anon_df
  .groupBy("geocoding.countryCode3", F.window("timestamp", "5 minute")) # Aggregate by country, every 5 minute block. This is a "tumbling window"
  .count()
)
display(wiki_loc_window_df, streamName = "time_window_stream")

In [0]:
# if we want to keep always the latest time window then we can use sliding windows

wiki_loc_window_slide_df = (wiki_anon_df
  .groupBy("geocoding.countryCode3", F.window("timestamp", "5 minute", "1 minute")) # Aggregate by country, 5 minute block sliding by 1 minute. This is a "sliding window"
  .count()
)
display(wiki_loc_window_slide_df, streamName = "time_window_slide_stream")

##### Non-Kafka part starts here

In [0]:
# let's try with a different, simpler dataset

# %fs head dbfs:/mnt/training/sensor-data/accelerometer/time-series-stream.json/file-0.json

input_path = "dbfs:/mnt/training/sensor-data/accelerometer/time-series-stream.json/"

json_schema = "time timestamp, action string"

In [0]:
# Let's create a dataframe and apply some transformations and aggregation

input_df = (spark
  .readStream                                 
  .schema(json_schema)                       
  .option("maxFilesPerTrigger", 1)            
  .json(input_path)                           
)

counts_df = (input_df
  .groupBy(F.col("action"),                     # Aggregate by action
           F.window(F.col("time"), "1 hour"))     # and by a 1 hour window
  .count()                                    # Count the actions
  .select(F.col("window.start").alias("start"), 
          F.col("count"),                       
          F.col("action"))                      
  .orderBy(F.col("start"))                      # Sort by the start time
)

In [0]:
# displaying the output

display(counts_df, streamName = "counts_stream")

In [0]:
# observe the 200 partitions (Spark default) in the above query 

spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism) # reduce parallelism for aggregations and joins, set it to amount of cores. Faster processing

display(counts_df, streamName = "counts_stream_improved")

### Watermarking

In [0]:
# in actual use cases, the queries above would keep running for a very long time and the amount of windows would grow indefinitely
# keeping track of all the states puts pressure on memory
# also it may often be irrelevant if delayed data updates our figures

watermarked_stream = "watermarked_stream"

watermarked_df = (input_df
  .withWatermark("time", "2 hours")             # Specify a 2-hour watermark
  .groupBy(F.col("action"),                       # Aggregate by action...
           F.window(F.col("time"), "1 hour"))       # ...then by a 1 hour window
  .count()                                      # For each aggregate, produce a count
  .select(F.col("window.start").alias("start"),   # Elevate field to column
          F.col("count"),                         # Include count
          F.col("action"))                        # Include action
  .orderBy(F.col("start"))                        # Sort by the start time
)
display(watermarked_df, streamName = watermarked_stream) # Start the stream and display it

# important note: watermarking guarantees that any event within the window gets in. It does not guarantee leaving anything out.

In [0]:
# Let's import another dataset. Let's say we are interested in hourly monitoring of incoming traffic to our website

schema = "device STRING, ecommerce STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name STRING, event_previous_timestamp BIGINT, event_timestamp BIGINT, geo STRUCT<city: STRING, state: STRING>, items ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source STRING, user_first_touch_timestamp BIGINT, user_id STRING"

hourlyEventsPath = "/mnt/training/ecommerce/events/events-2020-07-03.json"

website_df = (spark.readStream
  .schema(schema)
  .option("maxFilesPerTrigger", 1)
  .json(hourlyEventsPath)
)

In [0]:
# this dataframe does not have a proper timestamp column. So we need to create one and use it for watermarking

events_df = (website_df
             .withColumn("createdAt", (F.col("event_timestamp") / 1e6).cast("timestamp"))
             .withWatermark("createdAt", "2 hours")
)             

In [0]:
# now we can do an aggregation

traffic_df = (events_df
             .groupBy("traffic_source"
                      , F.window(F.col("createdAt"), "1 hour"))
             .agg(F.approx_count_distinct("user_id").alias("active_users"))
             .select(F.col("traffic_source")
                     , F.col("active_users")
                     , F.hour(F.col("window.start")).alias("hour"))
             .sort("hour")
)

display(traffic_df, streamName="hourly_traffic_p")

### Joining streams

In [0]:
# let's load in users dataset

users_df = spark.read.parquet("/mnt/training/ecommerce/users/users.parquet/")

In [0]:
# join works same way as with regular dataframes.
# note: this is streaming<->static join

joined_df = (events_df
            .join(users_df, "user_id")
            )

display(joined_df)

In [0]:
# example on subset of dataframe

joined_limit_df = (events_df
            .join(users_df.limit(500000), "user_id")
            )

display(joined_limit_df)

In [0]:
# let's read in users dataframe as a stream

# since we have created the dataframe from this data, we can cheat on getting the schema. Possible in development/debugging, not possible or recommended in production
users_schema = users_df.schema

users_stream_df = (spark
                   .readStream
                   .format("parquet")
                   .schema(users_schema)
                   .option("maxFilesPerTrigger", 1)
                   .parquet("/mnt/training/ecommerce/users/users.parquet/")
                  )

In [0]:
# let's do a stream to stream join

joined_streams_df = (events_df
            .join(users_stream_df, "user_id")
            )

display(joined_streams_df, processingTime="10 seconds")

In [0]:
for stream in spark.streams.active:
  stream.stop()

### Further reading

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.window.html  
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withWatermark.html  
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html  
https://docs.databricks.com/spark/latest/structured-streaming/index.html

### Task 1

Create a streaming dataframe from the data in the following path:  
/mnt/training/asa/flights/2007-01-stream.parquet/

The schema should contain
* DepartureAt (timestamp)
* UniqueCarrier (string)

Process only 1 file per trigger.  

Aggregate the data by count, using non-overlapping 30 minute windows.  
Ignore any data that is older than 6 hours.

The output should have 3 columns: startTime (window start time), UniqueCarrier, count.  
The output should be sorted ascending by startTime.

Display the output, firing the trigger every 5 seconds.

Once the stream has produced some output, call the stream shutdown function.

In [0]:
# Your answer
