Reads data from a Kafka Stream using Spark Streaming API
and writes aggregated data to another Kafka topic

In [1]:
# Import Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

Install some dependencies:

In [2]:
!pip install confluent-kafka



In [3]:
# Connect to Spark on port 7077
spark = SparkSession.builder.master("spark://spark:7077").appName("demo-stream").config("spark.cores.max", 1).getOrCreate()

In [4]:
# Set some important variables
kafka_server = "kafka:19092"
kafka_topic_name = "demo-data-inventory"

topic_schema_key_name = f"{kafka_topic_name}-key"
topic_schema_value_name = f"{kafka_topic_name}-value"
schema_registry_address = "http://kafka-schema-registry:8081"

checkpoints_path = "s3a://demo-data-agg/checkpoints"

consumer_group_name = "demo-data-intentory-spark-agg"

Get the schema of the data in our topic from the Kafka Schema Registry:

In [5]:
from confluent_kafka.avro.cached_schema_registry_client import (
    CachedSchemaRegistryClient,
)

sr = CachedSchemaRegistryClient(
    {
        "url": schema_registry_address
    }
)
key_schema = str(sr.get_latest_schema(topic_schema_key_name)[1])
value_schema = str(sr.get_latest_schema(topic_schema_value_name)[1])
print(f"Key schema:\n{key_schema}")
print(f"Value schema:\n{value_schema}")

Key schema:
"long"
Value schema:
{"type": "record", "connect.name": "ksql.inventory", "name": "inventory", "namespace": "ksql", "fields": [{"type": "long", "name": "id"}, {"type": "long", "name": "quantity"}, {"type": "long", "name": "productid"}]}


In [6]:
# Read data from the 'demo-data-inventory' kafka topic
raw_stream_df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", kafka_server)
    .option("subscribe", kafka_topic_name)
    # Start from offset can be "latest" or "earliest"
    .option("startingOffsets", "earliest")
    # Set the consumer group
    .option("group.id", consumer_group_name)
    .load()
)
raw_stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



Confluent Kafka's stores Avro messages in binary format with 5 bytes at the beginning of each message. These 5 bytes are used to store a magic byte and a schema ID. The schema ID is used to retrieve the schema from the schema registry. The schema is then used to decode the rest of the message.

![](https://miro.medium.com/v2/resize:fit:640/format:webp/1*clQmx1rbPZaqDlAkHF4jvg.png)

In [7]:
stream_df = raw_stream_df.select(
    F.expr("substring(key, 6, length(key)-5)").alias("key"),
    F.expr("substring(value, 6, length(value)-5)").alias("value"),
    F.expr("substring(value, 1, 1)").alias("magicByte"),
    F.expr("substring(value, 2, 4)").alias("valueSchemaId"),
    "partition",
    "offset",
    "timestamp",
    "timestampType",
)
stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- magicByte: binary (nullable = true)
 |-- valueSchemaId: binary (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



Let's see how it looks in the console:

In [13]:
# Start the Structured Streaming query and write to console
query = (
    stream_df
    .writeStream.format("console").option("truncate", False)
)
streaming_query = query.start()

In [14]:
# Stop the query after checking the output
streaming_query.stop()

Now let's parse the Avro message using the schema we retrieved from the schema registry:

In [15]:
from pyspark.sql.avro.functions import from_avro

inventory_df = stream_df.select(
    from_avro(F.col("key"), key_schema).alias("key"),
    from_avro(F.col("value"), value_schema).alias("inventory"),
    F.col("timestamp"),
    # Date of the timestamp
    F.to_date(F.col("timestamp")).alias("timestamp_date"),
    # Date as YYYY-MM-DD string
    F.date_format(F.col("timestamp"), "yyyy-MM-dd").alias("timestamp_date_iso"),
    # Hour of the timestamp
    F.hour(F.col("timestamp")).alias("timestamp_hour"),
    # Minute of the timestamp
    F.minute(F.col("timestamp")).alias("timestamp_minute"),
)
inventory_df.printSchema()

root
 |-- key: long (nullable = true)
 |-- inventory: struct (nullable = true)
 |    |-- id: long (nullable = false)
 |    |-- quantity: long (nullable = false)
 |    |-- productid: long (nullable = false)
 |-- timestamp: timestamp (nullable = true)
 |-- timestamp_date: date (nullable = true)
 |-- timestamp_date_iso: string (nullable = true)
 |-- timestamp_hour: integer (nullable = true)
 |-- timestamp_minute: integer (nullable = true)



In [11]:
# Write the raw inventory data to json in an S3 bucket
output_dir = "s3a://demo-data-agg/raw-inventory/"
query = (
    inventory_df.writeStream.format("json")
    .option("path", output_dir)
    .option("checkpointLocation", f"{checkpoints_path}/json/raw-inventory")
    .partitionBy("timestamp_date_iso", "timestamp_hour", "timestamp_minute")
)
inventory_streaming_query = query.start()

In [12]:
inventory_streaming_query.stop()

Now that we have the data in a readable format, let's aggregate it for analysis and send it to another Kafka topic:

In [17]:
# Create a windowed aggregation of the inventory data
agg_df = (
    inventory_df.withWatermark("timestamp", "1 minutes")
    .groupBy(
        F.window("timestamp", "1 minutes", "1 minutes"),
        F.col("inventory.productid").alias("productid"),
    )
    .agg(F.sum("inventory.quantity").alias("quantity"))
)
agg_df.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- productid: long (nullable = true)
 |-- quantity: long (nullable = true)



Write to Kafka topic:

In [18]:
# Write the inventory data to a Kafka topic
output_topic = "demo-data-inventory-agg"
# We need to put the data we want in a 'value' column and the key in a 'key' column. We can do this with a struct and then convert it to a JSON string.
topic_agg_df = agg_df.select(
    F.to_json(
        F.struct(
            F.col("window.start").alias("window_start"),
            F.col("window.end").alias("window_end"),
            F.col("productid").alias("product_id"),
            F.col("quantity"),
        )
    )
    .cast("string")
    .alias("value"),
    # Concat the window start and product id to create a unique key
    F.concat(
        # Parse to ISO format
        F.date_format(F.col("window.start"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),
        F.lit(" "),
        F.col("productid").cast("string"),
    ).alias("key"),
)
topic_agg_df.printSchema()

root
 |-- value: string (nullable = true)
 |-- key: string (nullable = true)



In [19]:
# Start the Structured Streaming query
query = (
    topic_agg_df.writeStream.format("kafka")
    .option("kafka.bootstrap.servers", "kafka:19092")
    .option("topic", output_topic)
    .option("checkpointLocation", f"{checkpoints_path}/kafka/{output_topic}")
    .partitionBy("timestamp_date_iso", "timestamp_hour", "timestamp_minute")
)
streaming_query = query.start()

In [20]:
streaming_query.stop()

## Standalone KafkaConsumer

### Kafka Python Client

In [None]:
# Install dependencies to consume from the Kafka Topic and parse the Avro schema
!pip install kafka-python fastavro

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting avro-python3
  Downloading avro-python3-1.10.2.tar.gz (38 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: avro-python3
  Building wheel for avro-python3 (setup.py) ... [?25ldone
[?25h  Created wheel for avro-python3: filename=avro_python3-1.10.2-py3-none-any.whl size=43991 sha256=14193737e3e14b348fbe44a50946831a0ea26e3484146f3ca6f7295a130b290f
  Stored in directory: /home/jovyan/.cache/pip/wheels/31/be/50/1145d9510eb4440893fc0ec676ef9464a05e0f7492a76fbb2c
Successfully built avro-python3
Installing collected packages: kafka-python, avro-python3
Successfully installed avro-python3-1.10.2 kafka-python-2.0.2


In [None]:
from kafka import KafkaConsumer
import fastavro
import io

msg_schema_dict = {  # Avro schema of the data we are consuming
    "type": "record",
    "name": "inventory",
    "namespace": "ksql",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "quantity", "type": "long"},
        {"name": "productid", "type": "long"},
    ],
    "connect.name": "ksql.inventory",
}
msg_avro_schema = fastavro.parse_schema(msg_schema_dict)


def deserialize_avro(message: bytes, schema) -> dict:
    bytes_reader = io.BytesIO(message)
    # First 5 bytes are magic bytes and schema id
    bytes_reader.seek(5)
    return fastavro.schemaless_reader(bytes_reader, schema)


consumer = KafkaConsumer(
    bootstrap_servers="kafka:19092",
    auto_offset_reset="earliest",
    group_id="demo-data-inventory-consumer",
    # Deserialize Avro messages using the schema registry
    value_deserializer=lambda m: deserialize_avro(m, msg_avro_schema),
)
consumer.subscribe(["demo-data-inventory"])
n = 10  # Number of messages to read
for it, message in enumerate(consumer):
    print(message.value)
    if it > n:
        break

{'id': 0, 'quantity': 0, 'productid': 0}
{'id': 4, 'quantity': 4, 'productid': 4}
{'id': 5, 'quantity': 5, 'productid': 5}
{'id': 7, 'quantity': 7, 'productid': 7}
{'id': 8, 'quantity': 8, 'productid': 8}
{'id': 9, 'quantity': 9, 'productid': 9}
{'id': 10, 'quantity': 10, 'productid': 10}
{'id': 11, 'quantity': 11, 'productid': 11}
{'id': 12, 'quantity': 12, 'productid': 12}
{'id': 18, 'quantity': 18, 'productid': 18}
{'id': 19, 'quantity': 19, 'productid': 19}
{'id': 21, 'quantity': 21, 'productid': 21}


### Confluent Kafka Python Client
We can also use the confluent_kafka library to read from Kafka topics

In [None]:
!pip install confluent-kafka fastavro

Collecting fastavro
  Downloading fastavro-1.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Downloading fastavro-1.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m15.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: fastavro
Successfully installed fastavro-1.9.0


In [None]:
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

In [None]:
sr_conf = {"url": schema_registry_address}
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_schema = str(sr.get_latest_schema(topic_schema_value_name)[1])
avro_deserializer = AvroDeserializer(schema_registry_client, avro_schema)
consumer_conf = {
    "bootstrap.servers": kafka_server,
    "group.id": "demo-data-inventory",
    "auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_conf)
consumer.subscribe([kafka_topic_name])

In [None]:
n_messages = 10 # Number of messages to read
while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    # Get the message value
    value = avro_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))
    print(value)
    n_messages -= 1
    if n_messages <= 0:
        break

{'id': 23, 'quantity': 23, 'productid': 23}
{'id': 24, 'quantity': 24, 'productid': 24}
{'id': 26, 'quantity': 26, 'productid': 26}
{'id': 31, 'quantity': 31, 'productid': 31}
{'id': 32, 'quantity': 32, 'productid': 32}
{'id': 33, 'quantity': 33, 'productid': 33}
{'id': 34, 'quantity': 34, 'productid': 34}
{'id': 35, 'quantity': 35, 'productid': 35}
{'id': 38, 'quantity': 38, 'productid': 38}
{'id': 41, 'quantity': 41, 'productid': 41}
