# AutoMQ Table Topic Demonstration

This notebook demonstrates the core capabilities of AutoMQ Table Topic, including automatic table creation, Upsert mode for data synchronization, and data partitioning. The workflow creates a topic with Upsert and partitioning enabled, sends one Insert (I), Update (U), and Delete (D) message, and queries the Iceberg table after each operation.

## 1. Import Libraries and Define Helper Functions

Import necessary libraries and define helper functions for key operations such as creating topics, producing messages, and querying Iceberg tables.

In [10]:
import uuid
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.cimpl import KafkaException, KafkaError
from datetime import datetime, timezone
from faker import Faker
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException

# Configuration constants
KAFKA_BOOTSTRAP_SERVERS = 'automq:9092'
SCHEMA_REGISTRY_URL = 'http://schema-registry:8081'
TOPIC_NAME = 'web_page_view_events'

# Initialize AdminClient and SchemaRegistryClient
admin_client_conf = {'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS}
admin_client = AdminClient(admin_client_conf)
schema_registry_conf = {'url': SCHEMA_REGISTRY_URL}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Initialize SparkSession
spark = SparkSession.builder.appName("AutoMQ Table Topic Demo").getOrCreate()
fake = Faker()

# Helper function: Create a Kafka Topic
def create_topic(topic_name, num_partitions=1, replication_factor=1, config=None):
    if config is None:
        config = {}
    topics = [NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor, config=config)]
    futures = admin_client.create_topics(topics, operation_timeout=30)
    for topic, future in futures.items():
        try:
            future.result()
            print(f"Topic '{topic}' created successfully.")
        except KafkaException as e:
            error = e.args[0]
            if error.code() == KafkaError.TOPIC_ALREADY_EXISTS:
                print(f"Topic '{topic}' already exists.")
            else:
                raise Exception(f"Failed to create topic '{topic}': {error.str()}")

# Helper function: Create a Producer
def create_producer():
    producer_conf = {'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS}
    return Producer(producer_conf)

# Helper function: Produce events to Kafka
def produce_events(producer, topic_name, events_data, avro_serializer, string_serializer):
    for event in events_data:
        try:
            producer.produce(
                topic=topic_name,
                key=string_serializer(event.event_id),
                value=avro_serializer(event, SerializationContext(topic_name, MessageField.VALUE)),
                on_delivery=delivery_report
            )
        except Exception as e:
            print(f"Failed to produce event {event.event_id}: {e}")
        producer.poll(0)
    producer.flush()
    print(f"Successfully produced {len(events_data)} event(s) to {topic_name}.")

# Delivery report callback for produced messages
def delivery_report(err, msg):
    if err is not None:
        print(f"Message delivery failed: {err}")
        return
    print(f"Message delivered to {msg.topic()} [partition {msg.partition()}] at offset {msg.offset()}")

## 2. Create Topic with Upsert and Partitioning

Create a Kafka topic with Table Topic enabled, configured for Upsert mode and partitioning. The topic uses `event_id` as the primary key, `ops` as the operation field, and partitions data by `bucket(page_url, 5)` and `hour(timestamp)`.

In [None]:
# Define Avro Schema with operation support
schema_str = """
{
  "type": "record",
  "name": "PageViewEvent",
  "namespace": "com.example.events",
  "fields": [
    {"name": "event_id", "type": "string"},
    {"name": "user_id", "type": "string"},
    {"name": "timestamp", "type": { "type": "long", "logicalType": "timestamp-millis" }},
    {"name": "page_url", "type": "string"},
    {"name": "ip_address", "type": "string"},
    {"name": "user_agent", "type": "string"},
    {"name": "ops", "type": "string"}
  ]
}
"""

# Define PageViewEvent class
class PageViewEvent:
    def __init__(self, event_id, user_id, timestamp, page_url, ip_address, user_agent, ops):
        self.event_id = event_id
        self.user_id = user_id
        self.timestamp = timestamp
        self.page_url = page_url
        self.ip_address = ip_address
        self.user_agent = user_agent
        self.ops = ops

# Serialization function for events
def event_to_dict(event, ctx):
    return {
        "event_id": event.event_id,
        "user_id": event.user_id,
        "timestamp": event.timestamp,
        "page_url": event.page_url,
        "ip_address": event.ip_address,
        "user_agent": event.user_agent,
        "ops": event.ops
    }

# Create topic with Upsert and partitioning configurations
topic_config = {
    'automq.table.topic.enable': 'true',
    'automq.table.topic.commit.interval.ms': '2000',
    'automq.table.topic.schema.type': 'schema',
    'automq.table.topic.upsert.enable': 'true',
    'automq.table.topic.id.columns': '[event_id]',
    'automq.table.topic.cdc.field': 'ops',
    'automq.table.topic.partition.by': '[bucket(page_url, 5), hour(timestamp)]'
}
create_topic(TOPIC_NAME, config=topic_config)

# Initialize serializers and producer
avro_serializer = AvroSerializer(schema_registry_client, schema_str, event_to_dict)
string_serializer = StringSerializer('utf_8')
producer = create_producer()

## 3. Insert Operation

Produce an Insert (I) event to the topic.

In [None]:
event_id = str(uuid.uuid4())
current_timestamp = int(datetime.now(timezone.utc).timestamp() * 1000)
insert_event = [PageViewEvent(event_id, fake.user_name(), current_timestamp, fake.uri_path(), fake.ipv4(), fake.user_agent(), "I")]
produce_events(producer, TOPIC_NAME, insert_event, avro_serializer, string_serializer)

## 4. Query After Insert

Query the Iceberg table to verify the inserted record.

In [None]:
df = spark.read.format("iceberg").load(f"default.{TOPIC_NAME}")
df.show()

spark.sql(f"SELECT file_path FROM default.{TOPIC_NAME}.files").show(vertical=True, truncate=False)

## 5. Update Operation

Produce an Update (U) event for the same `event_id` to update the record.

In [None]:
update_event = [PageViewEvent(event_id, fake.user_name(), current_timestamp + 1000, fake.uri_path(), fake.ipv4(), fake.user_agent(), "U")]
produce_events(producer, TOPIC_NAME, update_event, avro_serializer, string_serializer)

## 6. Query After Update

Query the Iceberg table to verify the updated record.

In [None]:
df = spark.read.format("iceberg").load(f"default.{TOPIC_NAME}")
df.show()

spark.sql(f"SELECT file_path FROM default.{TOPIC_NAME}.files").show(vertical=True, truncate=False)

## 7. Delete Operation

Produce a Delete (D) event for the same `event_id` to remove the record.

In [None]:
delete_event = [PageViewEvent(event_id, fake.user_name(), current_timestamp + 2000, fake.uri_path(), fake.ipv4(), fake.user_agent(), "D")]
produce_events(producer, TOPIC_NAME, delete_event, avro_serializer, string_serializer)

## 8. Query After Delete

Query the Iceberg table to verify that the record has been removed.

In [None]:
df = spark.read.format("iceberg").load(f"default.{TOPIC_NAME}")
df.show()

spark.sql(f"SELECT file_path FROM default.{TOPIC_NAME}.files").show(vertical=True, truncate=False)

## 9. Cleanup

Delete the topic and drop the Iceberg table after the demonstration.

In [None]:
admin_client.delete_topics([TOPIC_NAME])
spark.sql(f"DROP TABLE default.{TOPIC_NAME}")
print(f"Topic '{TOPIC_NAME}' and Iceberg table deleted.")