# BONUS: Streaming Iceberg data

An interesting pattern that is emerging currently is using Iceberg as the sink for streaming events. Iceberg is great for persisting large tables, and it's easy to consume from your existing analytical stack as we've learned today.

This is called Kappa Architecture - combining streaming and batch models

```{figure}images/kappa_iceberg.svg
```
A number of Streaming providers, including Confluent and Redpanda, are now offering this capabiltiy built-in, but we are using the Kafka connector in this example

In [1]:
from quixstreams import Application
from quixstreams.models.serializers.avro import AvroSerializer, AvroDeserializer
from quixstreams.models import (
    SchemaRegistryClientConfig,
    SchemaRegistrySerializationConfig,
)
import httpx
import polars as pl
from utils import read_house_prices, catalog

# Defining our schema

We need to define our housing prices Avro schema, so we can use the Avro schema registry. Our Connector will use the Schema registry to create the target Iceberg table

In [2]:
housing_prices_avro = {
        "type": "record",
        "name": "HousePrices",
        "namespace": "housing",
        "doc": "Schema for housing.staging_prices",
        "fields": [
            {
                "name": "transaction_id",
                "type": "string",
            },
            {
                "name": "price",
                "type": "int",
            },
            {
                "name": "date_of_transfer",
                "type": {"type": "int", "logicalType": "date"},
            },
            {
                "name": "postcode",
                "type": "string",
            },
            {
                "name": "property_type",
                "type": "string",
            },
            {
                "name": "new_property",
                "type": "string",
            },
            {
                "name": "duration",
                "type": "string",
            },
            {
                "name": "paon",
                "type": ["null", "string"],
                "default": None,
            },
            {
                "name": "saon",
                "type": ["null", "string"],
                "default": None,
            },
            {"name": "street", "type": ["null", "string"], "default": None},
            {"name": "locality", "type": ["null", "string"], "default": None},
            {"name": "town", "type": ["null", "string"], "default": None},
            {"name": "district", "type": ["null", "string"], "default": None},
            {"name": "county", "type": ["null", "string"], "default": None},
            {
                "name": "ppd_category_type",
                "type": ["null", "string"],
                "default": None,
            },
            {
                "name": "record_status",
                "type": ["null", "string"],
                "default": None,
            },
        ],
    }

# Setup Serialization

With our Avro schema defined, we need to tell the Serializer about the schema registry so it can be uploaded.

In [3]:
schema_registry_client_config = SchemaRegistryClientConfig(
    url='http://schema-registry:8081'
)

Now we can define an Avro serializer and deserializer - the serializer needs to know the schema up front, but the deserializer can fetch it at read

In [4]:
serializer = AvroSerializer(housing_prices_avro ,schema_registry_client_config=schema_registry_client_config)
deserializer = AvroDeserializer(schema_registry_client_config=schema_registry_client_config)

Using the `quixstream` library we simplify our Kafka producing logic a bit

In [5]:
app = Application(broker_address="broker:29092", consumer_group="iceberg-demo")

In [6]:
housing_prices_topic = app.topic("housing_prices", value_serializer=serializer, value_deserializer=deserializer)

[2025-10-07 21:32:03,555] [INFO] [quixstreams] : Creating a new topic "housing_prices" with a config: "{'num_partitions': 1, 'replication_factor': 1, 'extra_config': {}}"
[2025-10-07 21:32:04,561] [INFO] [quixstreams] : Topic "housing_prices" has been created


# Publishing Data

Now we have a connection to our Kafka broker, as well as a `housing_prices` topic defined, it's time to read in some data.

In our case, we are still batching data, but it would be easy to imagine we had a stream of housing purchases we would want to put on Kafka.

In [7]:
df = read_house_prices('data/house_prices/pp-2015.csv')

With our data ready, we produce our messages to Kafka

In [8]:
with app.get_producer() as producer:
    for line in df.to_dicts():
        message = housing_prices_topic.serialize(key=line['transaction_id'], value=line)
        producer.produce(topic=housing_prices_topic.name, key=message.key, value=message.value)

Now we have a bunch of messages on Kafka lying around - we would like to sink them to an Iceberg table for analytical purposes. 

We've chosen to use the Iceberg Kafka connector to do this. Kafka Connect is basically a standardised application from Kafka and sinking it somewhere, or sourcing it from somewhere and putting it on Kafka.

# Configuring the Connector
Kafka Connect can be configured via API - to create a new connector task, we define the configuration in JSON and pass it to the API

In [9]:
connector_config = {
    "name": "housing-prices-connector",
    "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
    "iceberg.catalog.type": "rest", 
    "iceberg.catalog.uri": "http://lakekeeper:8181/catalog", # Connecting to our Catalog
    "iceberg.catalog.warehouse": "lakehouse",
    "iceberg.control.topic": "iceberg-demo-connector-control", # A topic the connector uses to keep track of what files are committed
    "iceberg.tables": "housing.streaming_prices", # The table we want to write to
    "iceberg.tables.auto-create-enabled": "true", # Should the Connector create the table?
    "iceberg.tables.evolve-schema-enabled": "true", # Should the Connector alter the table if the schema changes?
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "topics": housing_prices_topic.name, 
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "iceberg.control.commit.interval-ms": 10000 # How often should we commit data? 10 seconds for demo purposes only - default is 5 mins
}

In [10]:
r = httpx.put("http://connect:8083/connectors/housing-prices-connector/config", json=connector_config)
r.status_code

201

We can check the current status of the connector to make sure it's running

In [12]:
r = httpx.get("http://connect:8083/connectors/housing-prices-connector/status")
r.json()

{'name': 'housing-prices-connector',
 'connector': {'state': 'RUNNING', 'worker_id': 'connect:8083'},
 'tasks': [{'id': 0, 'state': 'RUNNING', 'worker_id': 'connect:8083'}],
 'type': 'sink'}

Now we have an always-on syncer that will periodically write any new data on the Kafka topic to our Iceberg table!

And just to prove that we can work with the Iceberg table, just like we've been doing the whole time

In [13]:
table = catalog.load_table("housing.streaming_prices")

In [14]:
pl.scan_iceberg(table).head(10).collect()



transaction_id,price,date_of_transfer,postcode,property_type,new_property,duration,paon,saon,street,locality,town,district,county,ppd_category_type,record_status
str,i32,date,str,str,str,str,str,str,str,str,str,str,str,str,str
"""{25EA59FA-4331-4D50-E050-A8C06…",130000,2015-10-23,"""NE29 0RX""","""T""","""N""","""F""","""77""","""""","""TUDOR AVENUE""","""""","""NORTH SHIELDS""","""NORTH TYNESIDE""","""TYNE AND WEAR""","""A""","""A"""
"""{25EA59FA-4332-4D50-E050-A8C06…",97500,2015-11-03,"""SR4 0AX""","""T""","""N""","""F""","""13""","""""","""FORDHAM ROAD""","""""","""SUNDERLAND""","""SUNDERLAND""","""TYNE AND WEAR""","""A""","""A"""
"""{25EA59FA-4333-4D50-E050-A8C06…",95000,2015-10-30,"""NE2 4DE""","""F""","""N""","""L""","""10A""","""""","""VICTORIA SQUARE""","""""","""NEWCASTLE UPON TYNE""","""NEWCASTLE UPON TYNE""","""TYNE AND WEAR""","""A""","""A"""
"""{25EA59FA-4334-4D50-E050-A8C06…",115000,2015-10-12,"""NE40 3HA""","""T""","""N""","""L""","""41""","""""","""SILVERMERE DRIVE""","""""","""RYTON""","""GATESHEAD""","""TYNE AND WEAR""","""A""","""A"""
"""{25EA59FA-4335-4D50-E050-A8C06…",85000,2015-10-20,"""SR4 0ES""","""S""","""N""","""F""","""17""","""""","""PRESCOT ROAD""","""""","""SUNDERLAND""","""SUNDERLAND""","""TYNE AND WEAR""","""A""","""A"""
"""{25EA59FA-4336-4D50-E050-A8C06…",185000,2015-09-25,"""NE12 6SZ""","""S""","""N""","""F""","""40""","""""","""CRUMSTONE COURT""","""""","""NEWCASTLE UPON TYNE""","""NORTH TYNESIDE""","""TYNE AND WEAR""","""A""","""A"""
"""{25EA59FA-4337-4D50-E050-A8C06…",155000,2015-11-03,"""NE16 3ES""","""S""","""N""","""L""","""35""","""""","""GROSVENOR AVENUE""","""SWALWELL""","""NEWCASTLE UPON TYNE""","""GATESHEAD""","""TYNE AND WEAR""","""A""","""A"""
"""{25EA59FA-4338-4D50-E050-A8C06…",120000,2015-10-19,"""NE6 1UL""","""F""","""N""","""L""","""THE MOORINGS""","""14""","""ST LAWRENCE ROAD""","""ST PETERS BASIN""","""NEWCASTLE UPON TYNE""","""NEWCASTLE UPON TYNE""","""TYNE AND WEAR""","""A""","""A"""
"""{25EA59FA-4339-4D50-E050-A8C06…",74000,2015-10-29,"""SR4 7HA""","""T""","""N""","""F""","""10""","""""","""THELMA STREET""","""""","""SUNDERLAND""","""SUNDERLAND""","""TYNE AND WEAR""","""A""","""A"""
"""{25EA59FA-4530-4D50-E050-A8C06…",174850,2015-09-30,"""NE13 9BD""","""T""","""Y""","""F""","""117""","""""","""ROSEDEN WAY""","""""","""NEWCASTLE UPON TYNE""","""NEWCASTLE UPON TYNE""","""TYNE AND WEAR""","""A""","""A"""
