In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col, window, to_timestamp, when

spark = SparkSession.builder \
    .appName("LogsMovingWindow") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()


In [None]:
df_logs = df_raw.selectExpr("CAST(value AS STRING) as log_line")

pattern = r"(\d+\.\d+\.\d+\.\d+)\s-\s(\d+)\s\[(.*?)\]\s(\w+)\s(\S+)\s(\d+)\s(\d+)"

df_parsed = df_logs.select(
    regexp_extract("log_line", pattern, 1).alias("ip"),
    regexp_extract("log_line", pattern, 2).alias("user_id"),
    regexp_extract("log_line", pattern, 3).alias("timestamp"),
    regexp_extract("log_line", pattern, 4).alias("method"),
    regexp_extract("log_line", pattern, 5).alias("file"),
    regexp_extract("log_line", pattern, 6).cast("int").alias("status"),
    regexp_extract("log_line", pattern, 7).cast("int").alias("size")
)

df_with_time = df_parsed.withColumn(
    "event_time",
    to_timestamp("timestamp", "EEE, dd MMM yyyy HH:mm:ss z")  
)

df_classified = df_with_time.withColumn(
    "operation_type",
    when((col("method") == "GET") & (col("status") == 200), "successful_get")
    .when((col("method") == "GET") & (col("status") != 200), "failed_get")
    .when((col("method") == "POST") & (col("status") == 200), "successful_post")
    .when((col("method") == "POST") & (col("status") != 200), "failed_post")
    .otherwise("other")
)

df_windowed_ordered = df_classified.groupBy(
    window("event_time", "5 minutes", "5 minutes"),
    "operation_type"
).count().withColumn("window_start", col("window.start"))

df_sorted = df_windowed_ordered.orderBy("window_start")

query = df_sorted.writeStream \
    .outputMode("complete") \
    .format("parquet") \
    .option("checkpointLocation", "hdfs://namenode:9000/checkpoints/logs/") \
    .option("path", "hdfs://namenode:9000/output/logs/") \
    .start()

query.awaitTermination()