In [0]:
dbutils.widgets.text("kafka_topic","","Kafka Topic Name")
dbutils.widgets.text("kafka_broker","","Kafka Broker")
dbutils.widgets.text("checkpoint_dir","","Checkpoint Directory")
dbutils.widgets.text("target_path","","Target Path")

In [0]:
chk_dir = dbutils.widgets.get("checkpoint_dir")
tgt_path = dbutils.widgets.get("target_path")
kafka_broker = dbutils.widgets.get("kafka_broker")


In [0]:
%sh
telnet ec2-3-83-245-187.compute-1.amazonaws.com  9092

In [0]:
dbutils.fs.rm(chk_dir, recurse=True)
dbutils.fs.rm(tgt_path, recurse=True)

In [0]:
from pyspark.sql.functions import from_json, col, expr, window, avg
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType

spark.conf.set("spark.sql.shuffle.partitions", 10)
spark.conf.set("spark.streaming.stopGracefullyOnShutdown", "true")

kafka_df = spark.readStream.format("kafka").\
option("kafka.bootstrap.servers", dbutils.widgets.get("kafka_broker")+":9092").\
option("subscribe", dbutils.widgets.get("kafka_topic")).option("minPartitions", "2")\
.option("failOnDataLoss", "true").option("startingOffsets", "latest").option("maxOffsetsPerTrigger",4000).load()

log_schema = StructType([
      StructField("remote_host", StringType(), True),
      StructField("user-identifier", StringType(), True),
      StructField("frank", StringType(), True),
      StructField("time_received", StringType(), True),
      StructField("request_first_line", StringType(), True),
      StructField("status", StringType(), True),
      StructField("size_bytes", StringType(), True),
      StructField("request_header_referer", StringType(), True),
      StructField("request_header_user_agent", StringType(), True)
    ])

value_df = kafka_df.select(from_json(col("value").cast("string"), log_schema).alias("apache_log"))



In [0]:
select_df = value_df.selectExpr("apache_log.remote_host", "apache_log.time_received", "apache_log.status","apache_log.size_bytes")

from pyspark.sql.functions import *

formatted_df = select_df.withColumn("datetype_timestamp", to_timestamp(col("time_received"), "d/MMM/yyyy:HH:mm:ss +SSSS")).withColumn("fmt_remote_host", trim(col("remote_host"))).drop("time_received").drop("remote_host")
                          

In [0]:
# This works dont run this
query_stream=formatted_df.writeStream.format("json").trigger(processingTime='2 seconds').\
option("checkpointLocation", chk_dir).outputMode("append").option("path" ,tgt_path).queryName("file_output_stream").start()  

In [0]:
logs_watermarked_df = formatted_df.withWatermark("datetype_timestamp", "100 seconds").\
groupBy("fmt_remote_host", window("datetype_timestamp", "20 seconds","5 seconds")).count().filter(col("count")>2).\
select("fmt_remote_host","count")

In [0]:
suspicious_ip_df = logs_watermarked_df.filter("count >= 3")
display(suspicious_ip_df)

fmt_remote_host,count
162.211.221.127,78
94.250.131.17,78
99.236.114.130,69
44.173.133.164,58
172.201.136.93,59
100.101.243.175,58
91.157.33.242,20
189.206.93.202,67
91.157.33.242,68
95.87.220.138,19
