# 🏗️ Building a Raw Events (Bronze) Layer with Iceberg

This lesson demonstrates how to build a robust, queryable raw events layer (bronze layer) using Apache Iceberg.

You'll learn how to aggregate real-time events from multiple Kafka topics, validate schemas with Schema Registry, and write the unified stream to an Iceberg table.

---

## Why Iceberg for the Raw Layer?
- **ACID transactions**: Reliable ingestion, even with late or duplicate events.
- **Schema evolution**: Supports changing event structures over time.
- **Time travel**: Enables auditing and replay.
- **Separation of storage and compute**: Efficient for large-scale, multi-tenant analytics.

The raw (bronze) layer is the foundation for all downstream analytics. It must be reliable, auditable, and flexible.

---

## ⚙️ Environment Setup

Ensure all core services are running, including Kafka, Schema Registry, MinIO, Hive Metastore, and Spark.

```bash
docker compose --profile core --profile datagen up -d
```

This lesson assumes the data generator is producing events to all retail topics.

In [1]:
import os
MINIO_ENDPOINT = "http://minio:9000"
MINIO_ACCESS_KEY = os.getenv('MINIO_ROOT_USER', 'minio')
MINIO_SECRET_KEY = os.getenv('MINIO_ROOT_PASSWORD', 'minio123')
HIVE_METASTORE_URI = "thrift://hive-metastore:9083"
SPARK_MASTER = os.getenv('SPARK_MASTER_URL', 'spark://spark-master:7077')
SCHEMA_REGISTRY_URL = os.getenv('SCHEMA_REGISTRY_URL', 'http://schema-registry:8081')
KAFKA_BOOTSTRAP = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'kafka:9092')
RETAIL_TOPICS = "orders.v1,payments.v1,shipments.v1,inventory-changes.v1,customer-interactions.v1"
S3_ENDPOINT = "minio:9000"
S3_ACCESS_KEY = MINIO_ACCESS_KEY
S3_SECRET_KEY = MINIO_SECRET_KEY
os.environ["AWS_REGION"] = "us-east-1"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
print("🔧 Environment configured for AWS region us-east-1")
print(f"MinIO: {MINIO_ENDPOINT}")
print(f"Hive Metastore: {HIVE_METASTORE_URI}")
print(f"Spark Master: {SPARK_MASTER}")

🔧 Environment configured for AWS region us-east-1
MinIO: http://minio:9000
Hive Metastore: thrift://hive-metastore:9083
Spark Master: spark://spark-master:7077


---

## 📨 Read and Validate Kafka Events

Read all retail topics as a single stream, extract schema IDs, and validate with Schema Registry.

In [2]:
from pyspark.sql import SparkSession

packages = [
    'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2',
    'org.apache.hadoop:hadoop-aws:3.3.4',
    'software.amazon.awssdk:bundle:2.20.158',
    'org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0',
    'org.apache.spark:spark-avro_2.12:3.4.0',
    'org.apache.avro:avro:1.11.0'
 ]

try:
    spark = SparkSession.builder \
        .appName("IcebergBronzeDemo") \
        .master(SPARK_MASTER) \
        .config("spark.executor.memory", "512m") \
        .config("spark.driver.memory", "512m") \
        .config("spark.executor.cores", "1") \
        .config("spark.cores.max", "2") \
        .config("spark.jars.packages", ",".join(packages)) \
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
        .config("spark.sql.catalog.spark_catalog.type", "hive") \
        .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.iceberg.type", "rest") \
        .config("spark.sql.catalog.iceberg.uri", "http://hive-metastore:9001/iceberg") \
        .config("spark.sql.catalog.iceberg.warehouse", "s3a://iceberg/warehouse") \
        .config("spark.sql.catalog.iceberg.s3.endpoint", f"http://{S3_ENDPOINT}") \
        .config("spark.sql.catalog.iceberg.s3.access-key-id", S3_ACCESS_KEY) \
        .config("spark.sql.catalog.iceberg.s3.secret-access-key", S3_SECRET_KEY) \
        .config("spark.hadoop.fs.s3a.endpoint.region", "us-east-1") \
        .config("spark.sql.catalog.iceberg.s3.region", "us-east-1") \
        .config("spark.executorEnv.AWS_REGION", "us-east-1") \
        .config("spark.executorEnv.AWS_DEFAULT_REGION", "us-east-1") \
        .config("spark.executorEnv.aws.region", "us-east-1") \
        .config("spark.driver.extraJavaOptions", "-Daws.region=us-east-1") \
        .config("spark.executor.extraJavaOptions", "-Daws.region=us-east-1") \
        .config("spark.sql.catalog.iceberg.s3.path-style-access", "true") \
        .config("spark.sql.defaultCatalog", "iceberg") \
        .config("spark.hadoop.hive.metastore.uris", HIVE_METASTORE_URI) \
        .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
        .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \
        .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .enableHiveSupport() \
        .getOrCreate()
    print("✅ Spark session created successfully with robust AWS region config")
    print(f"📊 Spark UI available at: http://localhost:8088")
    print(f"🔧 Spark version: {spark.version}")
    print(f"🌐 Iceberg REST Catalog: http://hive-metastore:9001/iceberg")
except Exception as e:
    print(f"❌ All attempts failed: {e}")
    raise

✅ Spark session created successfully with robust AWS region config
📊 Spark UI available at: http://localhost:8088
🔧 Spark version: 3.5.0
🌐 Iceberg REST Catalog: http://hive-metastore:9001/iceberg


---

## 📝 Create the Iceberg Bronze Table

We create a single Iceberg table to store all raw events from all Kafka topics. This table is designed for flexibility, auditability, and efficient downstream analytics.

**Key design choices:**
- **Wide, unified schema:** All event types are stored together, with a flexible payload column for the decoded JSON event.
- **Partitioned by event_source:** Enables efficient queries by event type and supports scalable ingestion.
- **ACID guarantees:** Iceberg ensures reliable, atomic streaming writes, even with late or duplicate events.
- **Schema evolution:** The table can evolve as event schemas change, with full history and time travel.

**DDL Example:**
```sql
CREATE TABLE IF NOT EXISTS iceberg.bronze_example.raw_events (
  event_source STRING,         -- Kafka topic name (event type)
  event_time TIMESTAMP,        -- Event timestamp from Kafka
  schema_id INT,               -- Avro schema ID from Schema Registry
  payload_size INT,            -- Size of the Avro payload
  json_payload STRING,         -- Decoded event as JSON (for easy analytics)
  partition INT,               -- Kafka partition
  offset BIGINT                -- Kafka offset
 )
USING iceberg
PARTITIONED BY (event_source)
TBLPROPERTIES ('write.format.default'='parquet')
```

This structure enables fast, flexible analytics and robust data management for all downstream use cases.

In [3]:
spark.sql("""
CREATE DATABASE IF NOT EXISTS iceberg.bronze_example
""")
spark.sql("DROP TABLE IF EXISTS iceberg.bronze_example.raw_events")
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg.bronze_example.raw_events (
  event_source STRING,
  event_time TIMESTAMP,
  schema_id INT,
  payload_size INT,
  json_payload STRING,
  partition INT,
  offset BIGINT
)
USING iceberg
PARTITIONED BY (event_source)
TBLPROPERTIES ('write.format.default'='parquet')
""")
print('Iceberg bronze table ready.')

Iceberg bronze table ready.


---

## 🚀 Write the Unified Stream to Iceberg

Write all validated raw events to the Iceberg bronze table. This enables downstream analytics, audit, and replay.

## ⚠️ Note on Bronze Layer Design

**Production best practice:**  
The Bronze layer should store a byte-for-byte copy of what arrives from Kafka, with no transformation or interpretation.

- **Reproducibility:** Enables re-processing and rebuilding downstream layers if decoding bugs or schema changes occur.
- **Auditability:** Guarantees a verifiable record of exactly what was received.
- **Flexibility:** Allows future re-decoding for different use cases (analytics, ML, etc.).

**Demo note:**  
For learning and demo purposes, this notebook decodes Avro payloads to JSON immediately.  
- **Why?** JSON is easy to inspect and query with tools like Trino or Superset, making workshops and demos more accessible.

**Summary:**  
- **Production:** Bronze = immutable raw bytes + metadata.  
- **This demo:** Bronze-JSON = easy-to-query teaching aid.

In [5]:
import os
import json
import datetime
import io
import avro.schema
import avro.io
from confluent_kafka.schema_registry import SchemaRegistryClient
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def decode_avro_message(value, schema_registry_url):
    if value is None or len(value) < 5:
        return json.dumps({'error': 'invalid or missing value'})
    magic_byte = value[0]
    schema_id = int.from_bytes(value[1:5], byteorder='big')
    avro_payload = value[5:]
    if magic_byte != 0:
        return json.dumps({'error': f'invalid magic byte: {magic_byte}', 'schema_id': schema_id})
    try:
        sr_client = SchemaRegistryClient({'url': schema_registry_url})
        schema = sr_client.get_schema(schema_id)
        avro_schema = avro.schema.parse(schema.schema_str)
        bytes_reader = io.BytesIO(avro_payload)
        decoder = avro.io.BinaryDecoder(bytes_reader)
        reader = avro.io.DatumReader(avro_schema)
        decoded = reader.read(decoder)
        def default(obj):
            if isinstance(obj, (datetime.datetime, datetime.date)):
                return obj.isoformat()
            raise TypeError
        return json.dumps(decoded, default=default)
    except Exception as e:
        return json.dumps({'error': f'avro decode error: {str(e)}', 'schema_id': schema_id})

SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://schema-registry:8081')

decode_udf = udf(lambda v: decode_avro_message(v, SCHEMA_REGISTRY_URL), StringType())

raw_stream = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
        .option("subscribe", RETAIL_TOPICS)
        .option("startingOffsets", "latest")
        .option("failOnDataLoss", "false")
        .option("kafka.consumer.group.id", "bronze-layer-consumer")
        .load()
        .select(
            F.col("topic").alias("event_source"),
            F.col("timestamp").alias("event_time"),
            F.col("partition"),
            F.col("offset"),
            F.col("value"),
            F.when(F.length("value") >= 5, F.conv(F.hex(F.substring("value", 2, 4)), 16, 10).cast("int")).alias("schema_id"),
            F.when(F.length("value") > 5, F.length("value") - 5).otherwise(0).alias("payload_size")
        )
)

raw_stream = raw_stream.withColumn('json_payload', decode_udf("value"))

bronze_query = (
    raw_stream
    .select('event_source', 'event_time', 'schema_id', 'payload_size', 'json_payload', 'partition', 'offset')
    .writeStream
    .format('iceberg')
    .outputMode('append')
    .option('checkpointLocation', '/tmp/bronze-layer-checkpoint')
    .option('path', 'iceberg.bronze_example.raw_events')
    .trigger(processingTime='5 seconds')
    .start()
)
print('Unified raw events stream (with JSON payload) is writing to Iceberg bronze layer.')

import time
for i in range(5):
    time.sleep(5)
    df = spark.read.format('iceberg').load('iceberg.bronze_example.raw_events')
    print(f"Batch {i+1}: {df.count()} events in Iceberg bronze table")
    print(df.select('json_payload').limit(3).toPandas())

bronze_query.stop()
print('Bronze layer streaming ingestion stopped.')

Unified raw events stream (with JSON payload) is writing to Iceberg bronze layer.
Batch 1: 0 events in Iceberg bronze table
Empty DataFrame
Columns: [json_payload]
Index: []
Batch 2: 169 events in Iceberg bronze table
                                        json_payload
0  {"order_id": "ord_zi1xsmoms6", "user_id": "U00...
1  {"order_id": "ord_gk4xqf6d10", "user_id": "U00...
2  {"order_id": "ord_5skcjfp1we", "user_id": "U00...
Batch 3: 169 events in Iceberg bronze table
                                        json_payload
0  {"order_id": "ord_zi1xsmoms6", "user_id": "U00...
1  {"order_id": "ord_gk4xqf6d10", "user_id": "U00...
2  {"order_id": "ord_5skcjfp1we", "user_id": "U00...
Batch 4: 1163 events in Iceberg bronze table
                                        json_payload
0  {"order_id": "ord_aht1wtyy0r", "user_id": "U00...
1  {"order_id": "ord_y29zzug4ou", "user_id": "U00...
2  {"order_id": "ord_4frhvuc4te", "user_id": "U00...
Batch 5: 2881 events in Iceberg bronze table
            

---

## 🔎 Query and Audit the Bronze Layer

You can now query the raw events table for any event type, time window, or schema version.

Example: Count events by source and day.

In [6]:
from pyspark.sql import functions as F
df = spark.read.format('iceberg').load('iceberg.bronze_example.raw_events')
df.groupBy('event_source', F.to_date('event_time').alias('date')).count().show()

+--------------------+----------+-----+
|        event_source|      date|count|
+--------------------+----------+-----+
|customer-interact...|2025-09-11| 1000|
|           orders.v1|2025-09-11|  752|
|        shipments.v1|2025-09-11|  195|
|inventory-changes.v1|2025-09-11|  429|
|         payments.v1|2025-09-11|  505|
+--------------------+----------+-----+



In [None]:
print("🧹 BRONZE LAYER CLEANUP:")
print("=" * 50)

active_queries = spark.streams.active
if active_queries:
    print(f"🛑 Stopping {len(active_queries)} active streaming queries...")
    for query in active_queries:
        try:
            query.stop()
            print(f"   ✅ Stopped: {query.name}")
        except Exception as e:
            print(f"   ⚠️ Error stopping {query.name}: {e}")
else:
    print("ℹ️ No active queries to stop")

try:
    import shutil
    checkpoint_paths = [
        "/tmp/bronze-layer-checkpoint"
    ]
    for checkpoint_path in checkpoint_paths:
        if os.path.exists(checkpoint_path):
            shutil.rmtree(checkpoint_path)
            print(f"🗑️ Cleaned up checkpoint: {checkpoint_path}")
except Exception as e:
    print(f"⚠️ Checkpoint cleanup: {e}")

try:
    spark.sql("DROP TABLE IF EXISTS iceberg.bronze_example.raw_events")
    print("🗑️ Dropped Iceberg table: iceberg.bronze_example.raw_events")
    spark.sql("DROP DATABASE IF EXISTS iceberg.bronze_example")
    print("🗑️ Dropped Iceberg database: iceberg.bronze_example")
except Exception as e:
    print(f"⚠️ Iceberg cleanup: {e}")

🧹 BRONZE LAYER CLEANUP:
ℹ️ No active queries to stop
🗑️ Cleaned up checkpoint: /tmp/bronze-layer-checkpoint
🗑️ Dropped Iceberg table: iceberg.bronze_example.raw_events
🗑️ Dropped Iceberg database: iceberg.bronze_example
🗑️ Dropped Iceberg database: iceberg.bronze_example


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 46110)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/usr/local/spark/python/pyspark/accumulators.py", line 271, in accum_updates
    num_updates =

## 🕰️ Iceberg Time Travel & Table Optimization

Apache Iceberg enables powerful features for data reliability and analytics, including time travel and table optimization.

### Time Travel: Querying Historical Snapshots

You can query your raw events table as it existed at a previous point in time or at a specific snapshot. This is useful for:

- **Auditing:** See exactly what data was present at a given time.
- **Reproducibility:** Re-run analytics on historical data states.
- **Debugging:** Investigate the impact of late or erroneous events.

**Example: Query by Snapshot ID**
```python
# Get the latest snapshot ID
snapshots = spark.sql("SELECT * FROM iceberg.bronze_example.raw_events.snapshots")
snapshots.select("snapshot_id", "committed_at").show()

# Query the table as of a specific snapshot
snapshot_id = snapshots.orderBy("committed_at", ascending=False).first()["snapshot_id"]
df_snapshot = spark.read.option("snapshot-id", snapshot_id).format("iceberg").load("iceberg.bronze_example.raw_events")
df_snapshot.show(5)
```

**Example: Query by Timestamp**
```python
# Query the table as of a specific timestamp (in ms since epoch)
import time
ts = int(time.time() * 1000)  # current time, replace with your desired timestamp
df_asof = spark.read.option("as-of-timestamp", ts).format("iceberg").load("iceberg.bronze_example.raw_events")
df_asof.show(5)
```

---

### Table Optimization: Remove Old Snapshots and Expired Data Files

Over time, streaming ingestion creates many snapshots and orphaned files. Regular cleanup is recommended for performance and cost.

**Expire Snapshots (Keep Last 2 Days):**
```python
spark.sql("""
    CALL iceberg.system.expire_snapshots(
        table => 'iceberg.bronze_example.raw_events',
        older_than => TIMESTAMPADD('DAY', -2, CURRENT_TIMESTAMP),
        retain_last => 1
    )
""")
```

**Remove Orphan Files:**
```python
spark.sql("""
    CALL iceberg.system.remove_orphan_files(
        table => 'iceberg.bronze_example.raw_events'
    )
""")
```

**Rewrite Manifests and Data Files (Compaction):**
```python
spark.sql("""
    CALL iceberg.system.rewrite_manifests('iceberg.bronze_example.raw_events')
""")
spark.sql("""
    CALL iceberg.system.rewrite_data_files('iceberg.bronze_example.raw_events')
""")
```

---

---

## 📝 Notes & Best Practices

- **Partitioning**: Partition by event source and date for efficient queries.
- **Schema evolution**: Iceberg supports adding/removing fields as event schemas change.
- **Auditability**: All raw events are stored with schema ID and timestamp for full traceability.
- **Downstream processing**: Silver/gold layers can read from this bronze table for further enrichment and analytics.

---

## ✅ Summary

- Iceberg provides a robust, flexible foundation for the raw events layer.
- Unified streaming ingestion enables audit, replay, and schema validation.
- This pattern is production-ready and extensible for any event-driven architecture.