In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 pyspark-shell'

import pyspark 
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


sc = SparkContext()
sc.setLogLevel("WARN")
spark = SparkSession(sc)


df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers","localhost:9092") \
    .option("subscribe", "clicks-cleaned") \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()
# df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

schema = StructType([
    StructField("visitor_platform", StringType()),
    StructField("ts_ingest", TimestampType()),
    StructField("article_title", StringType()),
    StructField("visitor_country", StringType()),
    StructField("visitor_os", StringType()),
    StructField("article", StringType()),
    StructField("visitor_browser", StringType()),
    StructField("visitor_page_timer", IntegerType()),
    StructField("visitor_page_height", IntegerType()),
])

dfs = df.selectExpr("CAST(value AS STRING)") \
      .select(from_json(col("value"), schema) \
      .alias("clicks"))

df_data = dfs.select("clicks.*")

@udf
def forbidden_clicks(click_url):
    return click_url.endswith('/admin')

# For every article url let's check if it is a forbidden url
# We can not use the map() function here, dataframes do not support this anymore since version 2.0
# Under the hood calling map() on a dataframe would transform it to an rdd which is not allowed in structured streaming
# It means you can use only DataFrame or SQL, conversion to RDD (or DStream or local collections) are not supported.
# For this we will use a User Defined Function (UDF) to execute some Pyhton code on a column.
df_forbidden = df_data.select('article', forbidden_clicks('article').cast('boolean').alias('forbidden'))

# DDOS
# Window over last X minutes, count number of 'visitor_page_timer' and 'visitor_page_height' == 0
@pandas_udf('boolean', PandasUDFType.SCALAR)
def ddos_flagged(page_timer, page_height):
    return (page_timer == 0) & (page_height == 0)

df_ddos = df_data.select("*", ddos_flagged('visitor_page_timer', 'visitor_page_height').alias('flagged'))

df_ddos_window = df_ddos.groupBy(
    window(df_ddos.ts_ingest, '10 seconds'),
    df_ddos.flagged
).count()

# Debug dataframes in terminal
# query_data = df_data.writeStream.outputMode("append").option("truncate", "false").format("console").start()
# query_forbidden = df_forbidden.writeStream.outputMode("append").option("truncate", "false").format("console").start()
# query_ddos = df_ddos_window.writeStream.outputMode("update").option("truncate", "true").format("console").start()

query_forbidden = df_forbidden.selectExpr("to_json(struct(*)) as value") \
    .writeStream.format("kafka") \
    .outputMode('update') \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "clicks-calculated-forbidden") \
    .option("checkpointLocation", "checkpointsforbidden") \
    .start()

query_ddos = df_ddos_window.selectExpr("to_json(struct(*)) as value") \
    .writeStream.format("kafka") \
    .outputMode('update') \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "clicks-calculated-ddos") \
    .option("checkpointLocation", "checkpointsddos") \
    .start()



AnalysisException: 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;\nProject [structstojson(named_struct(window, window#59, flagged, flagged#48, count, count#71L), Some(Etc/UTC)) AS value#79]\n+- Aggregate [window#72, flagged#48], [window#72 AS window#59, flagged#48, count(1) AS count#71L]\n   +- Filter isnotnull(ts_ingest#26)\n      +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(ts_ingest#26, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(ts_ingest#26, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(ts_ingest#26, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(ts_ingest#26, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(ts_ingest#26, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(ts_ingest#26, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(ts_ingest#26, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(ts_ingest#26, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0) + 10000000), LongType, TimestampType)) AS window#72, visitor_platform#25, ts_ingest#26-T15000ms, article_title#27, visitor_country#28, visitor_os#29, article#30, visitor_browser#31, visitor_page_timer#32, visitor_page_height#33, flagged#48]\n         +- EventTimeWatermark ts_ingest#26: timestamp, interval 15 seconds\n            +- Project [visitor_platform#25, ts_ingest#26, article_title#27, visitor_country#28, visitor_os#29, article#30, visitor_browser#31, visitor_page_timer#32, visitor_page_height#33, ddos_flagged(visitor_page_timer#32, visitor_page_height#33) AS flagged#48]\n               +- Project [clicks#23.visitor_platform AS visitor_platform#25, clicks#23.ts_ingest AS ts_ingest#26, clicks#23.article_title AS article_title#27, clicks#23.visitor_country AS visitor_country#28, clicks#23.visitor_os AS visitor_os#29, clicks#23.article AS article#30, clicks#23.visitor_browser AS visitor_browser#31, clicks#23.visitor_page_timer AS visitor_page_timer#32, clicks#23.visitor_page_height AS visitor_page_height#33]\n                  +- Project [jsontostructs(StructField(visitor_platform,StringType,true), StructField(ts_ingest,TimestampType,true), StructField(article_title,StringType,true), StructField(visitor_country,StringType,true), StructField(visitor_os,StringType,true), StructField(article,StringType,true), StructField(visitor_browser,StringType,true), StructField(visitor_page_timer,IntegerType,true), StructField(visitor_page_height,IntegerType,true), value#21, Some(Etc/UTC)) AS clicks#23]\n                     +- Project [cast(value#8 as string) AS value#21]\n                        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@24a52057, kafka, Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> clicks-cleaned, kafka.bootstrap.servers -> localhost:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@600365af,kafka,List(),None,List(),None,Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> clicks-cleaned, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]\n'

In [None]:
spark.streams.awaitAnyTermination()