# Spark Structured Streaming Example

Purpose: Reads a batch of messages from a Kafka topic and aggregates by product_id.

References: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

Author:  Gary A. Stafford

Date: 2022-12-16

Modified by Vasilios Anagnostopoulos 30/09/2025

In [None]:

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructField,
    StructType,
    IntegerType,
    StringType,
    FloatType,
    TimestampType,
    BooleanType,
)
from pyspark.sql.window import Window


def read_from_kafka(spark):
    BOOTSTRAP_SERVERS = "kafka:29092"
    TOPIC_PURCHASES = "demo.purchases"

    options = {
        "kafka.bootstrap.servers": BOOTSTRAP_SERVERS,
        "subscribe": TOPIC_PURCHASES,
        "startingOffsets": "earliest",
        "includeHeaders": "true"
    }

    df_sales = spark.read.format("kafka").options(**options).load()

    return df_sales


def summarize_sales(df_sales):
    schema = StructType(
        [
            StructField("transaction_time", TimestampType(), False),
            StructField("transaction_id", IntegerType(), False),
            StructField("product_id", StringType(), False),
            StructField("price", FloatType(), False),
            StructField("quantity", IntegerType(), False),
            StructField("is_member", BooleanType(), True),
            StructField("member_discount", FloatType(), True),
            StructField("add_supplements", BooleanType(), True),
            StructField("supplement_price", FloatType(), True),
        ]
    )

    window = Window.partitionBy("product_id").orderBy("price")
    window_agg = Window.partitionBy("product_id")

    df_sales = (
        df_sales.selectExpr("CAST(value AS STRING)")
        .select(F.from_json("value", schema=schema).alias("data"))
        .select("data.*")
        .withColumn("row", F.row_number().over(window))
        .withColumn("quantity", F.sum(F.col("quantity")).over(window_agg))
        .withColumn("sales", F.sum(F.col("quantity") * F.col("price")).over(window_agg))
        .filter(F.col("row") == 1)
        .drop("row")
        .select(
            "product_id",
            F.format_number("sales", 2).alias("sales"),
            F.format_number("quantity", 0).alias("quantity"),
        )
        .coalesce(1)
        .orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False)
        .write \
        .format("parquet") \
        .option("path", "output/summary_path") \
        .mode("overwrite") \
        .option("numRows", 25)
        .option("truncate", False)
        .save()
    )


spark = SparkSession.builder.remote("sc://127.0.0.1:15002").getOrCreate() 

df_sales = read_from_kafka(spark)

summarize_sales(df_sales)


In [8]:
spark.read.parquet("output/summary_path").show()

+----------+------+--------+
|product_id| sales|quantity|
+----------+------+--------+
|      SC03|239.60|       8|
|      CS11|224.55|       9|
|      CS02|174.65|       7|
|      SF06|167.72|       7|
|      SF07|167.72|       7|
|      SC01|143.76|       6|
|      CS08| 99.80|       5|
|      SF02| 95.84|       4|
|      CS06| 89.82|       6|
|      CS04| 74.85|       5|
|      SF01| 71.88|       4|
|      IS04| 65.88|       4|
|      SF04| 53.91|       3|
|      IS03| 43.92|       4|
|      CS09| 39.92|       4|
|      SC02| 35.94|       3|
|      IS01| 32.94|       3|
|      CS03| 29.94|       3|
|      SC04| 23.96|       2|
|      SF05| 23.96|       2|
+----------+------+--------+
only showing top 20 rows
