In [None]:
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))
from pyspark.sql import SparkSession
from pyspark.sql.streaming import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import os
import re

In [None]:
spark_jars =  ("{},{},{},{},{},{}".format(os.getcwd() + "/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar",  
                                      os.getcwd() + "/jars/spark-sql-kafka-0-10_2.12-3.5.1-sources.jar", 
                                      os.getcwd() + "/jars/kafka-clients-3.5.1-sources.jar", 
                                      os.getcwd() + "/jars/kafka-clients-3.5.1.jar",  
                                      os.getcwd() + "/jars/spark-streaming_2.12-3.5.1.jar",
                                      os.getcwd() + "/jars/commons-pool2-2.12.0.jar",
                                      os.getcwd() + "/jars/spark-streaming-kafka-0-10-assembly_2.12-3.5.1.jar",
                                      os.getcwd() + "/jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar"))

spark_jars_path = "/home/rahma/Final_Project/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar:/home/rahma/Final_Project/jars/spark-sql-kafka-0-10_2.12-3.5.1-sources.jar:/home/rahma/Final_Project/jars/kafka-clients-3.5.1-sources.jar:/home/rahma/Final_Project/jars/kafka-clients-3.5.1.jar:/home/rahma/Final_Project/jars/spark-streaming_2.12-3.5.1.jar:/home/rahma/Final_Project/jars/commons-pool2-2.12.0.jar:/home/rahma/Final_Project/jars/spark-streaming-kafka-0-10-assembly_2.12-3.5.1.jar"

spark = SparkSession.builder \
                    .config("spark.jars", spark_jars) \
                    .config("spark.executor.extraClassPath", spark_jars_path) \
                    .config("spark.executor.extraLibrary", spark_jars_path) \
                    .config("spark.driver.extraClassPath", spark_jars_path) \
                    .appName("PySpark Streaming with Kafka").getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 1)

In [None]:
# ssc = StreamingContext(spark, 300)

In [None]:
spark.sparkContext.setLogLevel("ERROR")

In [None]:
KAFKA_TOPIC_NAME = "load-balancer-logs"
KAFKA_BOOTSTRAP_SERVER = "localhost:9092"
kafka_df = (
        spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
        .option("subscribe", KAFKA_TOPIC_NAME)
        .option("startingOffsets", "earliest") \
        .load()
    )

In [None]:
df = kafka_df.selectExpr("CAST(value as STRING) as log", "timestamp" )

In [None]:
log_df = df.select("log")

In [None]:
pattern = r"(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - (\d+) \[(.*?)\] (\w+) ([^\s]+) (\d+) (\d+)"

In [None]:
df_with_columns = df.withColumn("ip", regexp_extract(col("log"), pattern, 1)) \
                    .withColumn("uid", regexp_extract(col("log"), pattern, 2)) \
                    .withColumn("dateTime", regexp_extract(col("log"), pattern, 3)) \
                    .withColumn("method", regexp_extract(col("log"), pattern, 4)) \
                    .withColumn("filename", regexp_extract(col("log"), pattern, 5)) \
                    .withColumn("statusCode", regexp_extract(col("log"), pattern, 6)) \
                    .withColumn("fileSize", regexp_extract(col("log"), pattern, 7))


In [None]:
df_with_columns

In [None]:
windowedCounts = df_with_columns \
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(window(col("timestamp"), "5 minutes"))\
    .agg(
    count(when((col("method") == "POST") & (col("statusCode") == "200"), True)).alias("no_of_successful_POST_operations"),
    count(when((col("method") == "POST") & (col("statusCode") > "200"), True)).alias("no_of_failed_POST_operations"),
    count(when((col("method") == "GET") & (col("statusCode") == "200"), True)).alias("no_of_successful_GET_operations"),
    count(when((col("method") == "GET") & (col("statusCode") > "200"), True)).alias("no_of_failed_POST_operations")
)


In [None]:
query = windowedCounts.writeStream \
    .outputMode("append") \
    .format("json") \
    .option("path", "hdfs://localhost:8020/load-balancer-logs/aggregated_stream_logs") \
    .option("checkpointLocation", "hdfs://localhost:8020/user/hdfs/checkpoint") \
    .option("truncate", "false") \
    .trigger(processingTime="5 minute") \
    .start()

# query.awaitTermination()