# Introduction

This notebook shows how to work with spark's [structured streaming API](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) to stream a topic from confluent cloud.

Based on confluent's [guide](https://www.confluent.io/blog/consume-avro-data-from-kafka-topics-and-secured-schema-registry-with-databricks-confluent-cloud-on-azure/#step-2)

# Imports

In [1]:
from pyspark.sql import SparkSession
from confluent_kafka.schema_registry import SchemaRegistryClient
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
from pyspark.sql.avro.functions import from_avro
import time
import os

# Setup

In [2]:
# spark connect does not support the structured streaming API

jar_packages = [
    "org.apache.spark:spark-avro_2.12:3.4.1",
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1",
]
spark = (
    SparkSession.builder.appName("demo-structured-streaming-confluent")
    .config("spark.jars.packages", ",".join(jar_packages))
    .getOrCreate()
)

sc = spark.sparkContext

:: loading settings :: url = jar:file:/usr/local/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-35a84c6f-eea0-49c6-a66e-4e705b4790a2;1.0
	confs: [default]
	found org.apache.spark#spark-avro_2.12;3.4.1 in central
	found org.tukaani#xz;1.9 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.

Grab some creds:

In [3]:
confluentClusterName = os.environ.get("CONFLUENT_CLUSTER_NAME")
confluentBootstrapServers = os.environ.get("CONFLUENT_BOOTSTRAP_SERVERS")
confluentTopicName = os.environ.get("CONFLUENT_TOPIC_NAME")
schemaRegistryUrl = os.environ.get("SCHEMA_REGISTRY_URL")
confluentApiKey = os.environ.get("CONFLUENT_API_KEY")
confluentSecret = os.environ.get("CONFLUENT_SECRET")
confluentRegistryApiKey = os.environ.get("CONFLUENT_REGISTRY_API_KEY")
confluentRegistrySecret = os.environ.get("CONFLUENT_REGISTRY_SECRET")

In [4]:
confluentSecret

'BWyZxhTJsvSCk0UJWqzTzfet03d9L5AsTn+ju3X5epCrusxoUzIwISdZgHNlx6tK'

Create the schema registry client:

In [5]:
schema_registry_conf = {
    "url": schemaRegistryUrl,
    "basic.auth.user.info": "{}:{}".format(
        confluentRegistryApiKey, confluentRegistrySecret
    ),
}

schema_registry_client = SchemaRegistryClient(schema_registry_conf)

Create the spark readstream (read from a kafka topic and do some data manipulation):

In [6]:
# A Kafka message contains a key and a value. Data going through a Kafka topic in Confluent Cloud has five bytes added to the beginning of every Avro value. If you are using Avro format keys, then five bytes will be added to the beginning of those as well. For this example, we’re assuming string keys. These bytes consist of one magic byte and four bytes representing the schema ID of the schema in the registry that is needed to decode that data. The bytes need to be removed so that the schema ID can be determined and the Avro data can be parsed
binary_to_string = fn.udf(
    lambda x: str(int.from_bytes(x, byteorder="big")), StringType()
)


clickstreamTestDf = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", confluentBootstrapServers)
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        "org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(
            confluentApiKey, confluentSecret
        ),
    )
    .option("kafka.ssl.endpoint.identification.algorithm", "https")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("subscribe", confluentTopicName)
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false")
    .load()
    .withColumn("key", fn.col("key").cast(StringType()))
    .withColumn("fixedValue", fn.expr("substring(value, 6, length(value)-5)"))
    .withColumn("valueSchemaId", binary_to_string(fn.expr("substring(value, 2, 4)")))
    .select(
        "topic",
        "partition",
        "offset",
        "timestamp",
        "timestampType",
        "key",
        "valueSchemaId",
        "fixedValue",
    )
)

In [7]:
DATA_FOLDER = "./data/structured_streaming_confluent"

In [8]:
def parseAvroDataWithSchemaId(df, ephoch_id):
    cachedDf = df.cache()
    fromAvroOptions = {"mode": "FAILFAST"}

    def getSchema(id):
        return str(schema_registry_client.get_schema(id).schema_str)

    distinctValueSchemaIdDF = cachedDf.select(
        fn.col("valueSchemaId").cast("integer")
    ).distinct()

    for valueRow in distinctValueSchemaIdDF.collect():
        currentValueSchemaId = sc.broadcast(valueRow.valueSchemaId)
        currentValueSchema = sc.broadcast(getSchema(currentValueSchemaId.value))
        filterValueDF = cachedDf.filter(
            fn.col("valueSchemaId") == currentValueSchemaId.value
        )
        filterValueDF.select(
            "topic",
            "partition",
            "offset",
            "timestamp",
            "timestampType",
            "key",
            from_avro("fixedValue", currentValueSchema.value, fromAvroOptions).alias(
                "parsedValue"
            ),
        ).write.format("avro").mode("append").option("mergeSchema", "true").save(
            DATA_FOLDER
        )

# Demo

Start the write stream:

In [9]:
write_stream = (
    clickstreamTestDf.writeStream.foreachBatch(parseAvroDataWithSchemaId)
    .queryName("clickStreamTestFromConfluent")
    .start()
)

23/08/15 14:39:22 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-99a58ebd-d47a-4cff-bd56-68de9ab8fb39. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/08/15 14:39:22 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Stop the stream after 1 minute:

In [10]:
time.sleep(60)
write_stream.stop()

23/08/15 14:39:23 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


23/08/15 14:39:28 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
23/08/15 14:39:28 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
23/08/15 14:39:28 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
23/08/15 14:39:28 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
23/08/15 14:39:28 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
23/08/15 14:39:28 WARN KafkaDataConsumer: KafkaDataConsumer is not running in Un

Now we can query the results:

In [None]:
results_df = spark.read.format("avro").load(DATA_FOLDER)
results_df.show(5)

+------------+---------+------+--------------------+-------------+---+--------------------+
|       topic|partition|offset|           timestamp|timestampType|key|         parsedValue|
+------------+---------+------+--------------------+-------------+---+--------------------+
|clickstreams|        4|     0|2023-08-14 13:02:...|            0|  6|{6, AbdelKable_86...|
|clickstreams|        4|     1|2023-08-14 13:02:...|            0| 10|{10, ArlyneW8ter,...|
|clickstreams|        4|     2|2023-08-14 13:02:...|            0| 12|{12, Roberto_123,...|
|clickstreams|        4|     3|2023-08-14 13:02:...|            0| 18|{18, alison_99, 1...|
|clickstreams|        4|     4|2023-08-14 13:02:...|            0| 20|{20, ArlyneW8ter,...|
+------------+---------+------+--------------------+-------------+---+--------------------+
only showing top 5 rows

