# Bronze Layer: Kafka Ingestion

Reads raw crypto price data from Confluent Cloud and lands it in Delta Lake.

- **Source:** Kafka topic 'crypto-prices-raw'
- **Target:** 'crypto_analytics.bronze.raw_prices'
- **Mode:** Structured Streaming (append-only)

In [0]:
# Kafka connection config
KAFKA_TOPIC = "crypto-prices-raw"
BRONZE_TABLE = "crypto_analytics.bronze.raw_prices"
CHECKPOINT_PATH = "abfss://demo@deacourseextdlst.dfs.core.windows.net/crypto_analytics/_checkpoints/bronze_raw_prices"

In [0]:
# Retrieve Kafka credentials from secrets scope
kafka_bootstrap = dbutils.secrets.get(scope="crypto-pipeline", key="CONFLUENT_BOOTSTRAP_SERVERS")
kafka_api_key = dbutils.secrets.get(scope="crypto-pipeline", key="CONFLUENT_API_KEY")
kafka_api_secret = dbutils.secrets.get(scope="crypto-pipeline", key="CONFLUENT_API_SECRET")

# Kafka config options used below

- bootstrap.servers         Confluent Cloud broker address
- security.protocol         Encrypted connection (required for Confluent Cloud)
- sasl.mechanism            Authentication type
- sasl.jaas.config          Credentials in Java auth format (Spark uses Java Kafka client)
- subscribe                 Which topic to read
- startingOffsets           earliest = read all existing messages; latest = only new ones

In [0]:
# Build Kafka connection configuration
kafka_options = {
    "kafka.bootstrap.servers": kafka_bootstrap,
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_api_key}" password="{kafka_api_secret}";',
    "subscribe": KAFKA_TOPIC,
    "startingOffsets": "earliest"
}

In [0]:
# Read stream from Kafka
raw_stream = (
    spark.readStream
    .format("kafka")
    .options(**kafka_options)
    .load()
)

raw_stream.printSchema()

In [0]:
# Transform Kafka messages for Bronze table
# - Cast binary values to string
# - Leave values in json format for bronze tier, will parse in silver
# - Keep Kafka metadata for lineage
# - Add timestamp for ingestion
from pyspark.sql.functions import col, cast, current_timestamp

bronze_stream = (
    raw_stream
    .selectExpr(
        "CAST(key AS STRING) as message_key",
        "CAST(value AS STRING) as raw_json",
        "topic",
        "partition",
        "offset",
        "timestamp as kafka_timestamp"
    )
    .withColumn("bronze_ingested_at", current_timestamp())
)

bronze_stream.printSchema()

In [0]:
# Write stream to Bronze table
bronze_query = (
    bronze_stream
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", CHECKPOINT_PATH)
    .toTable(BRONZE_TABLE)
)

In [0]:
spark.sql("SELECT COUNT(*) as row_count FROM crypto_analytics.bronze.raw_prices").show()

In [0]:
spark.sql("""
    SELECT
        message_key,
        raw_json,
        kafka_timestamp,
        bronze_ingested_at
    FROM
        crypto_analytics.bronze.raw_prices
    LIMIT 5          
""").show(truncate=False)

In [0]:
bronze_query.stop()