Tercer ejercicio de Spark Structured Streaming. WordCount por ventana de tiempo con marca de agua (watermark)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import window

spark = SparkSession.builder.appName("StructuredNetworkWordCountWindowed").getOrCreate()

In [2]:
# Create DataFrame representing the stream of input lines from connection to localhost:9999
# In command line -> nc -lk 9999
lines =spark.readStream.format("socket").option("host", "localhost").option("port", 9999).option('includeTimestamp', 'true').load()

In [3]:
# Split the lines into words
words =lines.select(explode(split(lines.value, " ")).alias("word"), lines.timestamp)

In [4]:
# Group the data by window and word and compute the count of each group
# Se utiliza la columna timestamp  añadida mediante la opción -> option('includeTimestamp', 'true')
# como marca de agua para a partir de esta campo hacer el filtrado 
# de todos aquellos registros que lleguen 10 minutos tarde con respecto a la ventana de tiempo

windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()

In [5]:
# Start running the query that prints the running counts to the console
# Ver resultados en la consola donde se ejecuto el jupyter notebook

query = windowedCounts.writeStream.outputMode("append").format("console").option('truncate', 'false').start()
query.awaitTermination()

AnalysisException: 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;\nAggregate [window#16, word#7], [window#16 AS window#11, word#7, count(1) AS count#15L]\n+- Filter ((timestamp#3 >= window#16.start) && (timestamp#3 < window#16.end))\n   +- Expand [ArrayBuffer(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) END + cast(0 as bigint)) - cast(2 as bigint)) * 300000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) END + cast(0 as bigint)) - cast(2 as bigint)) * 300000000) + 0) + 600000000), LongType, TimestampType)), word#7, timestamp#3-T600000ms), ArrayBuffer(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) END + cast(1 as bigint)) - cast(2 as bigint)) * 300000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#3, TimestampType, LongType) - 0) as double) / cast(300000000 as double))) END + cast(1 as bigint)) - cast(2 as bigint)) * 300000000) + 0) + 600000000), LongType, TimestampType)), word#7, timestamp#3-T600000ms)], [window#16, word#7, timestamp#3-T600000ms]\n      +- EventTimeWatermark timestamp#3: timestamp, interval 10 minutes\n         +- Project [word#7, timestamp#3]\n            +- Generate explode(split(value#2,  )), false, [word#7]\n               +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@2347b3b4,socket,List(),None,List(),None,Map(host -> localhost, includeTimestamp -> true, port -> 9999),None), textSocket, [value#2, timestamp#3]\n'