# Spark Structured Streaming Example

Purpose: Reads a batch of messages from a Kafka topic and aggregates by product_id.

References: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

Author:  Gary A. Stafford

Date: 2022-12-16

In [1]:
import os

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructField,
    StructType,
    IntegerType,
    StringType,
    FloatType,
    TimestampType,
    BooleanType,
)
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.appName("kafka-batch-query").getOrCreate()

In [3]:
BOOTSTRAP_SERVERS = "kafka:29092"
TOPIC_PURCHASES = "demo.purchases"

In [4]:
options = {
    "kafka.bootstrap.servers": BOOTSTRAP_SERVERS,
    "subscribe": TOPIC_PURCHASES,
    "startingOffsets": "earliest",
    "endingOffsets": "latest",
}

df_sales = spark.read.format("kafka").options(**options).load()

In [6]:
schema = StructType(
    [
        StructField("transaction_time", TimestampType(), False),
        StructField("transaction_id", IntegerType(), False),
        StructField("product_id", StringType(), False),
        StructField("price", FloatType(), False),
        StructField("quantity", IntegerType(), False),
        StructField("is_member", BooleanType(), True),
        StructField("member_discount", FloatType(), True),
        StructField("add_supplements", BooleanType(), True),
        StructField("supplement_price", FloatType(), True),
        StructField("total_purchase", FloatType(), False),
    ]
)

window = Window.partitionBy("product_id").orderBy("total_purchase")
window_agg = Window.partitionBy("product_id")

(
    df_sales.selectExpr("CAST(value AS STRING)")
    .select(F.from_json("value", schema=schema).alias("data"))
    .select("data.*")
    .withColumn("row", F.row_number().over(window))
    .withColumn("quantity", F.sum(F.col("quantity")).over(window_agg))
    .withColumn("sales", F.sum(F.col("total_purchase")).over(window_agg))
    .filter(F.col("row") == 1)
    .drop("row")
    .select(
        "product_id",
        F.format_number("sales", 2).alias("sales"),
        F.format_number("quantity", 0).alias("quantity"),
    )
    .coalesce(1)
    .orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False)
    .show(10)
)

+----------+--------+--------+
|product_id|   sales|quantity|
+----------+--------+--------+
|      SF07|7,927.25|   1,179|
|      SC04|7,196.81|   1,049|
|      CS08|7,106.32|   1,368|
|      IS02|6,054.41|   1,060|
|      SF05|5,902.14|     867|
|      SF06|5,669.34|     841|
|      SC01|5,539.78|     820|
|      IS03|5,373.21|     937|
|      SC05|4,871.18|     712|
|      CS07|4,412.13|     844|
+----------+--------+--------+
only showing top 10 rows

