# BONUS: Streaming Iceberg data

An interesting pattern that is emerging currently is using Iceberg as the sink for streaming events. Iceberg is great for persisting large tables, and it's easy to consume from your existing analytical stack as we've learned today.

Redpanda has native Iceberg storage built-in and Confluent recently launched Tableflow to enable this pattern. 

We'll use Kafka Connect to replicate the same functionality here.

In [46]:
from quixstreams import Application
from quixstreams.models.serializers.avro import AvroSerializer, AvroDeserializer
from quixstreams.models import (
    SchemaRegistryClientConfig,
    SchemaRegistrySerializationConfig,
)
import httpx
import polars as pl
import csv
from utils import read_house_prices, catalog

# Defining our schema

We need to define our housing prices Avro schema, so we can use the Avro schema registry

In [64]:
housing_prices_avro = {
        "type": "record",
        "name": "HousePrices",
        "namespace": "housing",
        "doc": "Schema for housing.staging_prices",
        "fields": [
            {
                "name": "transaction_id",
                "type": "string",
            },
            {
                "name": "price",
                "type": "int",
            },
            {
                "name": "date_of_transfer",
                "type": {"type": "int", "logicalType": "date"},
            },
            {
                "name": "postcode",
                "type": "string",
            },
            {
                "name": "property_type",
                "type": "string",
            },
            {
                "name": "new_property",
                "type": "string",
            },
            {
                "name": "duration",
                "type": "string",
            },
            {
                "name": "paon",
                "type": ["null", "string"],
                "default": None,
            },
            {
                "name": "saon",
                "type": ["null", "string"],
                "default": None,
            },
            {"name": "street", "type": ["null", "string"], "default": None},
            {"name": "locality", "type": ["null", "string"], "default": None},
            {"name": "town", "type": ["null", "string"], "default": None},
            {"name": "district", "type": ["null", "string"], "default": None},
            {"name": "county", "type": ["null", "string"], "default": None},
            {
                "name": "ppd_category_type",
                "type": ["null", "string"],
                "default": None,
            },
            {
                "name": "record_status",
                "type": ["null", "string"],
                "default": None,
            },
        ],
    }

Next, we need to tell the Serializer about the schema registry

In [31]:
schema_registry_client_config = SchemaRegistryClientConfig(
    url='http://schema-registry:8081'
)

Now we can define an Avro serializer and deserializer - the serializer needs to know the schema up front, but the deserializer can fetch it at read

In [65]:
serializer = AvroSerializer(housing_prices_avro ,schema_registry_client_config=schema_registry_client_config)
deserializer = AvroDeserializer(schema_registry_client_config=schema_registry_client_config)

Using the `quixstream` library we simplify our Kafka producing logic a bit

In [33]:
app = Application(broker_address="broker:29092", consumer_group="iceberg-demo")

In [34]:
housing_prices_topic = app.topic("housing_prices", value_serializer=serializer, value_deserializer=deserializer)

Now we have a connection to our Kafka broker, as well as a `housing_prices` topic defined, it's time to read in some data.

In our case, we are still batching data, but it would be easy to imagine we had a stream of housing purchases we would want to put on Kafka.

In [62]:
df = read_house_prices('data/house_prices/pp-2015.csv')

With our data ready, we produce our messages to Kafka

In [63]:
with app.get_producer() as producer:
    for line in df.to_dicts():
        message = housing_prices_topic.serialize(key=line['transaction_id'], value=line)
        producer.produce(topic=housing_prices_topic.name, key=message.key, value=message.value)

Now we have a bunch of messages on Kafka lying around - we would like to sink them to an Iceberg table for analytical purposes. We've chosen to use the Iceberg Kafka connector to do this. Kafka Connect is basically a standardised application from Kafka and sinking it somewhere, or sourcing it from somewhere and putting it on Kafka.

Kafka Connect can be configured via API - to create a new connector task, we define the configuration in JSON and pass it to the API

In [66]:
connector_config = {
    "name": "housing-prices-connector",
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "iceberg.catalog.type": "rest", 
    "iceberg.catalog.uri": "http://lakekeeper:8181/catalog", # Connecting to our Catalog
    "iceberg.catalog.warehouse": "lakehouse",
    "iceberg.control.topic": "iceberg-demo-connector-control", # A topic the connector uses to keep track of what files are committed
    "iceberg.tables": "housing.streaming_prices", # The table we want to write to
    "iceberg.tables.auto-create-enabled": "true", # Should the Connector create the table?
    "iceberg.tables.evolve-schema-enabled": "true", # Should the Connector alter the table if the schema changes?
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "topics": housing_prices_topic.name, 
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "iceberg.control.commit.interval-ms": 10000 # How often should we commit data? 10 seconds for demo purposes only - default is 5 mins
}

In [68]:
r = httpx.put("http://connect:8083/connectors/housing-prices-connector/config", json=connector_config)
r.status_code

(200,
 {'name': 'housing-prices-connector',
  'config': {'name': 'housing-prices-connector',
   'connector.class': 'io.tabular.iceberg.connect.IcebergSinkConnector',
   'iceberg.catalog.type': 'rest',
   'iceberg.catalog.uri': 'http://lakekeeper:8181/catalog',
   'iceberg.catalog.warehouse': 'lakehouse',
   'iceberg.control.topic': 'iceberg-demo-connector-control',
   'iceberg.tables': 'housing.streaming_prices',
   'iceberg.tables.auto-create-enabled': 'true',
   'iceberg.tables.evolve-schema-enabled': 'true',
   'key.converter': 'org.apache.kafka.connect.storage.StringConverter',
   'topics': 'housing_prices',
   'value.converter': 'io.confluent.connect.avro.AvroConverter',
   'value.converter.schema.registry.url': 'http://schema-registry:8081',
   'iceberg.control.commit.interval-ms': '10000'},
  'tasks': [{'connector': 'housing-prices-connector', 'task': 0}],
  'type': 'sink'})

We can check the current status of the connector to make sure it's running

In [70]:
r = httpx.get("http://connect:8083/connectors/housing-prices-connector/status")
r.json()

{'name': 'housing-prices-connector',
 'connector': {'state': 'RUNNING', 'worker_id': 'connect:8083'},
 'tasks': [{'id': 0, 'state': 'RUNNING', 'worker_id': 'connect:8083'}],
 'type': 'sink'}

And just to prove that we can work with the Iceberg table, just like we've been doing the whole time

In [71]:
table = catalog.load_table("housing.streaming_prices")

In [72]:
pl.scan_iceberg(table).head(10).collect()



transaction_id,price,date_of_transfer,postcode,property_type,new_property,duration,paon,saon,street,locality,town,district,county,ppd_category_type,record_status
str,i32,date,str,str,str,str,str,str,str,str,str,str,str,str,str
"""{582D0636-D373-8F22-E053-6C04A…",309950,2017-06-16,"""DE73 6UX""","""D""","""Y""","""F""","""93""","""""","""WOODGATE DRIVE""","""CHELLASTON""","""DERBY""","""CITY OF DERBY""","""CITY OF DERBY""","""A""","""A"""
"""{582D0636-D374-8F22-E053-6C04A…",245000,2017-04-26,"""S21 1JG""","""D""","""N""","""F""","""4""","""""","""HOLME MEADOWS""","""KILLAMARSH""","""SHEFFIELD""","""NORTH EAST DERBYSHIRE""","""DERBYSHIRE""","""A""","""A"""
"""{582D0636-D375-8F22-E053-6C04A…",83500,2017-05-31,"""NG16 4EH""","""F""","""Y""","""L""","""ST GEORGES COURT, 51A""","""APARTMENT 3""","""CROMFORD ROAD""","""LANGLEY MILL""","""NOTTINGHAM""","""AMBER VALLEY""","""DERBYSHIRE""","""A""","""A"""
"""{582D0636-D376-8F22-E053-6C04A…",255000,2017-06-30,"""DE73 6WY""","""D""","""Y""","""F""","""25""","""""","""KINGSGATE ROAD""","""CHELLASTON""","""DERBY""","""SOUTH DERBYSHIRE""","""DERBYSHIRE""","""A""","""A"""
"""{582D0636-D377-8F22-E053-6C04A…",249950,2017-07-07,"""DE75 7TF""","""D""","""Y""","""F""","""23""","""""","""VARLEY CLOSE""","""""","""HEANOR""","""AMBER VALLEY""","""DERBYSHIRE""","""A""","""A"""
"""{582D0636-D378-8F22-E053-6C04A…",191995,2017-06-16,"""DE73 6XA""","""S""","""Y""","""F""","""34""","""""","""KINGSGATE ROAD""","""CHELLASTON""","""DERBY""","""SOUTH DERBYSHIRE""","""DERBYSHIRE""","""A""","""A"""
"""{582D0636-D379-8F22-E053-6C04A…",421000,2017-07-07,"""DE45 1AQ""","""F""","""N""","""L""","""RUTLAND MILL""","""THE PENTHOUSE""","""COOMBS ROAD""","""""","""BAKEWELL""","""DERBYSHIRE DALES""","""DERBYSHIRE""","""A""","""A"""
"""{582D0636-D37A-8F22-E053-6C04A…",250000,2017-06-29,"""S21 1JG""","""D""","""N""","""F""","""6""","""""","""HOLME MEADOWS""","""KILLAMARSH""","""SHEFFIELD""","""NORTH EAST DERBYSHIRE""","""DERBYSHIRE""","""A""","""A"""
"""{582D0636-D37B-8F22-E053-6C04A…",196950,2017-07-12,"""DE75 7TF""","""D""","""Y""","""F""","""24""","""""","""VARLEY CLOSE""","""""","""HEANOR""","""AMBER VALLEY""","""DERBYSHIRE""","""A""","""A"""
"""{582D0636-D37C-8F22-E053-6C04A…",229950,2017-07-14,"""DE75 7TF""","""D""","""Y""","""F""","""25""","""""","""VARLEY CLOSE""","""""","""HEANOR""","""AMBER VALLEY""","""DERBYSHIRE""","""A""","""A"""
