In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json,window, to_timestamp, when, count, avg, max, min,to_json,struct
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
from pyspark.sql import functions as F
from pyspark.sql.streaming import StreamingQueryException
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("StructuredStreamingWithKafka") \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0') \
    .getOrCreate()

In [10]:
kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host.docker.internal:29092")
    .option("subscribe", "logging_info")
    .option("startingOffsets", "latest")
    .load()
)

In [11]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [12]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("timestamp", LongType(), True),  # Numeric timestamp in milliseconds
    StructField("api", StringType(), True),
    StructField("response_time", IntegerType(), True),
    StructField("log_level", StringType(), True),
    StructField("ip_address", StringType(), True),
    StructField("status_code", StringType(), True),
    StructField("response_code", IntegerType(), True),
    StructField("timestamp_human", StringType(), True)  # String timestamp
])


In [13]:
log_data_parsed = kafka_df.select(from_json(col("value").cast("string"), schema).alias("log")).select("log.*")

In [14]:
log_data_parsed = log_data_parsed.withColumn(
    "timestamp_converted", 
    (col("timestamp") / 1000).cast("timestamp")  # Convert milliseconds to seconds, then cast to TIMESTAMP
)

In [15]:
log_data_parsed = log_data_parsed.withColumn(
    "timestamp", 
    when(col("timestamp_converted").isNotNull(), col("timestamp_converted"))
    .otherwise(to_timestamp(col("timestamp_human"), "yyyy-MM-dd HH:mm:ss"))
)

In [16]:
log_data_parsed = log_data_parsed.drop("timestamp_converted", "timestamp_human")

In [17]:
log_data_parsed.printSchema()

root
 |-- id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- api: string (nullable = true)
 |-- response_time: integer (nullable = true)
 |-- log_level: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- status_code: string (nullable = true)
 |-- response_code: integer (nullable = true)



In [18]:
# Aggregate logs by minute, including watermarking
minute_aggregates = (
    log_data_parsed
    .withWatermark("timestamp", "1 minute")  # Handle late-arriving data
    .groupBy(
        window(col("timestamp"), "1 minute").alias("minute_window"),  # Group by 1-minute window
        "api",
        "log_level"
    )
    .agg(
        count("*").alias("log_count"),
        avg("response_time").alias("avg_response_time"),
        max("response_time").alias("max_response_time"),
        min("response_time").alias("min_response_time")
    )
)

# Correctly reference window start and end from the window struct
minute_aggregates_stream = minute_aggregates.select(
    col("minute_window.start").alias("window_start"),  # Access window start field
    col("minute_window.end").alias("window_end"),  # Access window end field
    "api",
    "log_level",
    "log_count",
    "avg_response_time",
    "max_response_time",
    "min_response_time"
)

# Convert the DataFrame to JSON format for Kafka
minute_aggregates_stream_json = minute_aggregates_stream.select(
    to_json(struct('window_start', 'window_end', 'api', 'log_level', 
                  'log_count', 'avg_response_time', 
                  'max_response_time', 'min_response_time')).alias('value')
)
checkpoint_path = "./tmp/checkpoints"

minute_aggregates_stream_json.printSchema()
query = minute_aggregates_stream_json \
        .writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "host.docker.internal:29092")\
        .option("topic", "agg_logging_info") \
        .option("checkpointLocation", checkpoint_path)\
        .outputMode("append").start()


root
 |-- value: string (nullable = true)

