In [0]:
# import neccessary libraries
import json
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime as dt

In [0]:
# set up configurations to access Azure Event Hub
# here my own LAB: "db-eventhub-namespace", "86ATi9XElcqYcspgAo8KNdXzMiWsSJbj++AEhA35NY8=", "eventhub-sales" - Event Hub name

primaryKey = "Endpoint=sb://db-eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=86ATi9XElcqYcspgAo8KNdXzMiWsSJbj++AEhA35NY8="
entityPath = "EntityPath=eventhub-sales"

connectionString = primaryKey + ";" + entityPath

ehConf = {}

# this configuration will read from very beginning
startingEventPosition = {
  "offset": "-1",  
  "seqNo": -1,            
  "enqueuedTime": None,  
  "isInclusive": True
}

ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
ehConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)

Before running this step and all cells above you should run another notebook attached to this tutorial -in my case "SalesSource(attached to Watermarking (Event Hub as stream source)")

In [0]:
# read from Event Hub. This loads data in the json format
superstore_df = spark.readStream \
                     .format("eventhubs") \
                     .options(**ehConf) \
                     .load()

In [0]:
# specify the schema for reading streaming data
schema = StructType([StructField("State", StringType(), True),
                     StructField("Category", StringType(), True),
                     StructField("Sub-Category", StringType(), True),
                     StructField("Sales", FloatType(), True),
                     StructField("Quantity", IntegerType(), True),
                     StructField("Profit", FloatType(), True),
                     StructField("Timestamp", TimestampType(), True)
                    ])

In [0]:
# extract from json stream data into columnar format
superstore_data = superstore_df.selectExpr("cast(Body as string) as json") \
                               .select(from_json("json", schema) \
                               .alias("data")) \
                               .select("data.*")
display(superstore_data)

State,Category,Sub-Category,Sales,Quantity,Profit,Timestamp
Kentucky,Furniture,Bookcases,261.96,2,41.9136,2021-10-09T07:22:13.000+0000
Kentucky,Furniture,Chairs,731.94,3,219.582,2021-10-09T07:23:34.000+0000
California,Office Supplies,Labels,14.62,2,6.8714,2021-10-09T07:22:45.000+0000
Florida,Furniture,Tables,957.5775,5,-383.031,2021-10-09T07:23:03.000+0000
Florida,Office Supplies,Storage,22.368,2,2.5164,2021-10-09T07:25:55.000+0000
California,Furniture,Furnishings,48.86,7,14.1694,2021-10-09T07:24:37.000+0000
California,Office Supplies,Binders,18.504,3,5.7825,2021-10-09T07:38:26.000+0000
Kentucky,Office Supplies,Appliances,114.9,5,34.47,2021-10-09T07:39:01.000+0000


In [0]:
# we're using tumbling window, without watermark
windowed_count = superstore_data.groupBy(window(superstore_data.Timestamp, "2 minutes")).count()
display(windowed_count)

window,count
"List(2021-10-09T07:38:00.000+0000, 2021-10-09T07:40:00.000+0000)",2
"List(2021-10-09T07:24:00.000+0000, 2021-10-09T07:26:00.000+0000)",2
"List(2021-10-09T07:22:00.000+0000, 2021-10-09T07:24:00.000+0000)",4


##### Watermark with TumblingWindow

In [0]:
# now we're going to write the output into a memory in the "update" mode
windowed_count.writeStream\
              .queryName("windowed_count")\
              .outputMode("update")\
              .format("memory")\
              .start()

In [0]:
%sql

select * from windowed_count

window,count
"List(2021-10-09T07:24:00.000+0000, 2021-10-09T07:26:00.000+0000)",2
"List(2021-10-09T07:22:00.000+0000, 2021-10-09T07:24:00.000+0000)",4


In [0]:
# now we're ready to use watermarks within windowing queries
windowed_count_withwatermark = superstore_data.withWatermark("Timestamp", "3 minutes") \
                                              .groupBy(window(superstore_data.Timestamp, "2 minutes")) \
                                              .count()

display(windowed_count_withwatermark)

window,count
"List(2021-10-09T07:38:00.000+0000, 2021-10-09T07:40:00.000+0000)",2
"List(2021-10-09T07:24:00.000+0000, 2021-10-09T07:26:00.000+0000)",2
"List(2021-10-09T07:22:00.000+0000, 2021-10-09T07:24:00.000+0000)",4


In [0]:
windowed_count_withwatermark.writeStream\
              .queryName("windowed_count_withwatermark")\
              .outputMode("update")\
              .format("memory")\
              .start()

In [0]:
%sql
select * from windowed_count_withwatermark 

window,count
"List(2021-10-09T07:24:00.000+0000, 2021-10-09T07:26:00.000+0000)",2
"List(2021-10-09T07:22:00.000+0000, 2021-10-09T07:24:00.000+0000)",4


We just added new rows to our generating notebook to push data into Event Hub. And will rerun queries without and with watermark

In [0]:
%sql

select * from windowed_count

window,count
"List(2021-10-09T07:24:00.000+0000, 2021-10-09T07:26:00.000+0000)",2
"List(2021-10-09T07:22:00.000+0000, 2021-10-09T07:24:00.000+0000)",4


In [0]:
%sql
select * from windowed_count_withwatermark 

window,count
"List(2021-10-09T07:24:00.000+0000, 2021-10-09T07:26:00.000+0000)",2
"List(2021-10-09T07:22:00.000+0000, 2021-10-09T07:24:00.000+0000)",4
"List(2021-10-09T07:38:00.000+0000, 2021-10-09T07:40:00.000+0000)",2


In [0]:
NB! Note that according to spark documentation, data arriving aftew watermark "may be dropped" (but not neccessary). That is what happened in our example - late data after watermark were included into aggregation