## 1. Setup Confluent Kafka (credentials & test connection)

In [0]:
# Get confluent kafka secrets

bootstrap_servers = dbutils.secrets.get(scope="confluent-kafka", key="bootstrap-servers")
sasl_username = dbutils.secrets.get(scope="confluent-kafka", key="api-key")
sasl_password = dbutils.secrets.get(scope="confluent-kafka", key="api-secret")


In [0]:
# Get Schema Registry secret

schema_registry_url = dbutils.secrets.get(scope="confluent-kafka", key="schema-registry-url")
schema_registry_api_key = dbutils.secrets.get(scope="confluent-kafka", key="schema-registry-api-key")
schema_registry_api_secret = dbutils.secrets.get(scope="confluent-kafka", key="schema-registry-api-secret")

In [0]:
# Other configurations
topic = "orders"
subject = "orders-value"
bronze_table = "bronze.orders_stream"
checkpoint_path = "/Volumes/workspace/bronze/checkpoint"

In [0]:
import json
import requests
from functools import lru_cache

from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, DoubleType, BooleanType, ArrayType
)

In [0]:
# Check connectivity

''' try:

    jaas_config = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(sasl_username, sasl_password)

    test_df = (spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_servers)
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.jaas.config", jaas_config)
        .option("subscribe", topic)
        .option("startingOffsets", "earliest")
        .load()
    )

    print("Connection successful")

except Exception as e:
    print( f"Connection failed: {e}") '''

## 2. Resolving messages schema from Confluent Schema Registry

In [0]:
# mapping python types to spark types

_TYPE_MAP = {
    "string": StringType(),
    "integer": IntegerType(),
    "number": DoubleType(),
    "boolean": BooleanType()
}

In [0]:
def _resolve_type(field_def: dict):
    """ Recursivesly converts one JSON schema field to a Pyspark DataType"""

    typ = field_def.get("type", "string")
    if typ == "object":
        return _json_schema_to_struct(field_def)
    if typ == "array":
        return ArrayType(_resolve_type(field_def.get("items", {"type": "string"})))
    return _TYPE_MAP.get(typ, StringType())

In [0]:
def _json_schema_to_struct(json_schema: dict) -> StructType:
    """Converts a full JSON Schema to Spark StructType"""
    required = set(json_schema.get("required", []))
    return StructType([
        StructField(name, _resolve_type(defn), nullable=(name not in required)) for name, defn in json_schema.get("properties", {}).items()
    ])

In [0]:
@lru_cache(maxsize=32)
def fetch_schema(subject: str) -> StructType:
    """Pulls latest JSON Schema from confluent cloud and returns a StructType"""

    response = requests.get(
        f"{schema_registry_url}/subjects/{subject}/versions/latest",
        auth=(schema_registry_api_key, schema_registry_api_secret),
        headers={"Accept": "application/vnd.schemaregistry.v1+json"}
    )

    response.raise_for_status()

    record= response.json()
    json_schema= json.loads(record["schema"])

    print(f"[SR] subject={subject} | version={record['version']} | id={record['id']}")
    print(f"[SR] properties: {list(json_schema.get('properties', {}).keys())}")

    return _json_schema_to_struct(json_schema)

## 3. Reading Kafka Topic & Write Stream

In [0]:
def run():
    orders_schema: StructType = fetch_schema(subject)
    print(f"\n[Spark] Using Schema:\n{orders_schema.simpleString()}\n")

    jaas_config = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(sasl_username, sasl_password)

    raw_orders_stream = (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrap_servers)
        .option("subscribe", topic)
        .option("startingOffsets", "earliest")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.jaas.config", jaas_config)
        .load()
    )

    parsed_orders_stream = (
        raw_orders_stream
        .withColumn("value", F.col("value").cast("string"))
        .withColumn("data", F.from_json("value", orders_schema))
        .select(
            "data.*",
            F.col("key").cast("string").alias("kafka_key"),
            F.col("timestamp").alias("kafka_timestamp"),
            F.col("partition").alias("kafka_partition"),
            F.col("offset").alias("kafka_offset")
        )
    )

    query = (
        parsed_orders_stream.writeStream
        .trigger(availableNow=True)
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", checkpoint_path)
        .option("mergeSchema", "true")
        .table(bronze_table)
    )

    query.awaitTermination()

In [0]:
run()