In [1]:
# To avoid un-necessary computation across the default 200 partitions

spark.conf.set("spark.sql.shuffle.partitions", 1)

In [2]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, LongType

In [3]:
kafkaHost = "ec2-52-66-45-236.ap-south-1.compute.amazonaws.com:9092"

In [4]:
linesSchema = StructType([StructField("ts", LongType(), True),
                         StructField("content", StringType(), True)])

1. Load the dataframe from kafka topic
2. Read the key and the value and cast them
3. Extract the content and the timestamp from json

In [6]:
lines = spark.readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", kafkaHost)\
  .option("subscribe", "lines")\
  .option("startingOffsets", "earliest")\
  .load()\
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")\
  .select(from_json(col("value"), linesSchema).alias("lines"))\
  .selectExpr("lines.content AS content", "CAST(lines.ts AS TIMESTAMP) AS timestamp")

In [7]:
from pyspark.sql.functions import explode, split

1. Do the word count
2. Apply the watermark of 5 seconds

In [9]:
# Split the lines into words
words = lines.select(
    # explode turns each item in an array into a separate row
    explode(split(lines.content, ' ')).alias('word'),
    lines.timestamp).\
  withWatermark("timestamp", "5 seconds")

words.printSchema()

In [10]:
from pyspark.sql.functions import window

Create the window of 10 seconds sliding every 5 seconds

In [12]:
# Generate running word count
windowedCounts = words.groupBy(
                       window(words.timestamp, "10 seconds", "5 seconds"),
                       words.word).\
                count()

windowedCounts.printSchema()

In [13]:
checkpointDir = "dbfs:///tmp/checkpoint/kafkawordcount"
dbutils.fs.rm(checkpointDir, True)

In [14]:
query = windowedCounts \
    .writeStream \
    .outputMode("append") \
    .option("truncate", "false") \
    .option("checkpointLocation", checkpointDir) \
    .format("console") \
    .queryName("kafkawordcount") \
    .start()

query.awaitTermination()

Let these be the first set of inputs <br> <br>
{"ts": 1581098400, "content": "apple orange"} <br>
{"ts": 1581098403, "content": "melon pear"} <br>
{"ts": 1581098406, "content": "apple orange"} <br>
{"ts": 1581098409, "content": "melon pear"} <br>
{"ts": 1581098412, "content": "apple orange"} <br>
{"ts": 1581098415, "content": "melon pear"} <br>
***
If the max ts of message is M <br>
and late threshold is L <br>
then window till (M - L) is finalized <br> <br>
So far the window till 18:00:10 should have been finalized (also the watermark is 1581098415 - 5 == 18:00:10) <br> <br>
Expected windows in the output: <br>
- 17:59:55 - 18:00:05 <br>
- 18:00:00 - 18:00:10 <br>
***
Window till 18:00:15 will be finalized when watermark moves there or crosses it
***
Below records will have no impact <br>
{"ts": 1581098417, "content": "apple orange"} <br>
{"ts": 1581098419, "content": "melon pear"} <br>
***
Also, this record will have no impact on the word count of window 18:00:00 - 18:00:10 (as it is before the watermark) <br>
This will only affect the window 18:00:05 - 18:00:15 (once that window is finalized) <br>
TODO: Check if this has any impact, it should be discarded being late than the threshold<br>
{"ts": 1581098409, "content": "melon pear"} <br>
***
But this record will cause the finalization <br>
{"ts": 1581098420, "content": "apple orange"} <br>
***
New window: <br>
18:00:05 - 18:00:15