In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window,avg,max,count,sum,expr
from pyspark.sql.types import *

spark = (
    SparkSession.builder.appName("Incident-Alerts-Metrics").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
    .config("spark.sql.shuffle.partitions", "6") \
    .getOrCreate())
spark.sparkContext.setLogLevel("WARN")
spark

In [2]:
alerts_schema = StructType([
    StructField("incident_id", StringType()),
    StructField("service", StringType()),
    StructField("type", StringType()),
    StructField("severity", StringType()),
    StructField("p95_latency", IntegerType(), True),
    StructField("breach_count", IntegerType(), True),
    StructField("window_start", LongType()),
    StructField("window_end", LongType())
])

In [None]:
raw = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker2:29094")
    .option("subscribe", "incident_alerts")
    .option("startingOffsets", "latest")
    .load()
)

parsed = (
    raw
    .selectExpr("CAST(value AS STRING) AS json")
    .select(from_json(col("json"), alerts_schema).alias("data"))
    .select("data.*")
)

# ---------------- Convert timestamps ----------------
alerts = parsed.withColumn(
    "event_time",
    (col("window_end").cast("timestamp"))
)

agg_metrics = (
    alerts
    .withWatermark("event_time", "2 minutes")
    .groupBy(
        col("service"),
        window(col("event_time"), "1 minute")
    )
    .agg(
        count("*").alias("total_alerts"),
        count(expr("CASE WHEN type='LATENCY_SLO_BREACH' THEN 1 END")).alias("latency_breaches"),
        count(expr("CASE WHEN type='ERROR_RATE_SPIKE' THEN 1 END")).alias("error_spikes"),
        avg("p95_latency").alias("avg_p95_latency"),
        max("breach_count").alias("max_breach_count"),
        (sum(expr("CASE WHEN severity='CRITICAL' THEN 1 ELSE 0 END")) / count("*")).alias("critical_ratio")
    )
    .select(
        col("service"),
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        "total_alerts",
        "latency_breaches",
        "error_spikes",
        "avg_p95_latency",
        "max_breach_count",
        "critical_ratio"
    )
)

def print_non_empty(batch_df, batch_id):
    if batch_df.count() > 0:
        print(f"\n========= BATCH {batch_id} =========")
        batch_df.show(truncate=False)

query = (
    agg_metrics
    .writeStream
    .outputMode("update")
    .trigger(processingTime="2 minutes")
    .foreachBatch(print_non_empty)
    .start()
)

query.awaitTermination()


+---------------+-------------------+-------------------+------------+----------------+------------+-----------------+----------------+------------------+
|service        |window_start       |window_end         |total_alerts|latency_breaches|error_spikes|avg_p95_latency  |max_breach_count|critical_ratio    |
+---------------+-------------------+-------------------+------------+----------------+------------+-----------------+----------------+------------------+
|auth-service   |2026-01-06 06:13:00|2026-01-06 06:14:00|3           |2               |1           |804.3333333333334|2               |0.6666666666666666|
|order-service  |2026-01-06 06:13:00|2026-01-06 06:14:00|2           |1               |1           |537.5            |3               |1.0               |
|payment-service|2026-01-06 06:13:00|2026-01-06 06:14:00|2           |1               |1           |808.0            |3               |0.5               |
+---------------+-------------------+-------------------+------------