# Create Topic
- Condition: running kafka docker compose

In [1]:
# create_topic.py

from confluent_kafka.admin import AdminClient, NewTopic

conf = {"bootstrap.servers": "localhost:19092,localhost:29092,localhost:39092"}
admin_client = AdminClient(conf)

topic_name = "test_topic"

# Create topic with 1 partition and replication factor 1
new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1)

# Create topic
fs = admin_client.create_topics([new_topic])

for topic, f in fs.items():
    try:
        f.result()  # Wait for the result
        print(f"Topic {topic} created successfully")
    except Exception as e:
        print(f"Error creating topic {topic}: {e}")



Topic test_topic created successfully


# The Producer
- Stimulate sending events to kafka

In [2]:
# producer.py

from confluent_kafka import Producer
import json
import time

# 1. Configuration
conf = {"bootstrap.servers": "localhost:19092,localhost:29092,localhost:39092",
        "client.id": "producer-1"}

# 2. Create Producer instance
producer = Producer(conf)

# 3. Callback for delivery report
def delivery_report(err, msg):
    if err is not None:
        print(f"Delivery failed for record {msg.key()}: {err}")
    else:
        print(f"Record {msg.key()} successfully produced to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")

# 4. Produce messages
print("Starts producing messages...")
for i in range(5):
    data = {'user_id': i, 'action': 'login', 'timestamp': time.time()}

    # Trigger any availabel delivery report callbacks from previous produce() calls
    producer.poll(0)

    # Asynchronous produce a message. The delivery report callback will be triggerd from poll()
    producer.produce(
        topic=topic,
        key=str(i), # Key is important for ordering within partitions
        value=json.dumps(data).encode('utf-8'),
        on_delivery=delivery_report
    )
    time.sleep(1)

# 5. wait for any outstanding messages to be delivered and delivery report callbacks to be triggered
producer.flush()
print("Done")


Starts producing messages...
Record b'0' successfully produced to test_topic [0] at offset 0
Record b'1' successfully produced to test_topic [0] at offset 1
Record b'2' successfully produced to test_topic [0] at offset 2
Record b'3' successfully produced to test_topic [0] at offset 3
Record b'4' successfully produced to test_topic [0] at offset 4
Done


# The Consumer
- Reading data

In [4]:
# consumer.py

from confluent_kafka import Consumer, KafkaError

# 1. Configuration
conf = {"bootstrap.servers": "localhost:19092,localhost:29092,localhost:39092",
        "group.id": "mygroup",
        "auto.offset.reset": "earliest"} # Start reading from the beginning if no offset is stored

# 2. Create Consumer instance
consumer = Consumer(conf)

# 3. Subscribe to the topic
topic = "test_topic"
consumer.subscribe([topic])

# 4. Poll for new messages
print("Waiting for messages...")
try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                print(msg.error())
                break
        print(f"Received message: key={msg.key().decode('utf-8')}, value={msg.value().decode('utf-8')}")
except KeyboardInterrupt:
    pass
finally:
    consumer.close()



Waiting for messages...


# Schema Registry

In [5]:
import time
from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import StringSerializer, StringDeserializer
import time

# Configuration
KAFKA_BOOTSTRAP = "localhost:19092,localhost:29092,localhost:39092"
SCHEMA_REGISTRY_URL = "http://localhost:8081"
TOPIC_NAME = "comment_events"

# 1. Define the schema
value_schema_str = """
{
    "doc": "Schema for user comments used in livestream assistant agent",
    "name": "CommentEvent",
    "namespace": "com.comment.events",
    "type": "record",
    "fields": [
    {
        "name": "reviewerID",
        "type": "string",
        "doc": "Unique ID of the reviewer"
    },
    {
        "name": "reviewText",
        "type": "string",
        "doc": "The full text content of the review"
    },
    {
        "name": "comment_time",
        "type": "long",
        "doc": "The time user comment"
    }
    ]
}
"""

def run_test():
    # 2. Set up the schema registry client
    schema_registry_client = SchemaRegistryClient({"url": SCHEMA_REGISTRY_URL})

    # 3. Producer setup
    avro_serializer = AvroSerializer(schema_registry_client, value_schema_str)
    producer_conf = {
        "bootstrap.servers": KAFKA_BOOTSTRAP,
        "key.serializer": StringSerializer("utf_8"),
        "value.serializer": avro_serializer,
    }
    producer = SerializingProducer(producer_conf)

    # 4. Consumer setup
    avro_deserializer = AvroDeserializer(schema_registry_client, value_schema_str)
    consumer_conf = {
        "bootstrap.servers": KAFKA_BOOTSTRAP,
        "key.deserializer": StringDeserializer("utf_8"),
        "value.deserializer": avro_deserializer,
        "group.id": f"group_test",
        "auto.offset.reset": "earliest",
    }
    consumer = DeserializingConsumer(consumer_conf)
    consumer.subscribe([TOPIC_NAME])

    # 5. Test loop
    print(f"--- Sending test message to {TOPIC_NAME} ---")
    test_message = {
        "reviewerID": "user121233",
        "reviewText": "This is a test comment",
        "comment_time": int(time.time())
    }
    producer.produce(
        topic=TOPIC_NAME,
        key="test_key",
        value=test_message,
    )
    producer.flush()

    print("Waiting 3 seconds for topic metadata to propagate...")
    time.sleep(5)

    # 6. Consume the message
    print("--- Waiting for message recovery ---")
    count = 0
    while count < 10: # Increased retries
        try:
            msg = consumer.poll(timeout=5.0)
            if msg is None:
                print("No message yet, polling...")
                count += 1
                continue
            
            if msg.error():
                # This catches the UNKNOWN_TOPIC error without crashing
                print(f"Wait... {msg.error()}")
                time.sleep(1)
                count += 1
                continue

            print(f"SUCCESS: Recovered data from Kafka: {msg.value()}")
            break
        except Exception as e:
            print(f"Polling error: {e}")
            time.sleep(1)
            count += 1
    consumer.close()

In [None]:
run_test()

--- Sending test message to comment_events ---
Waiting 3 seconds for topic metadata to propagate...
--- Waiting for message recovery ---
Polling error: KafkaError{code=UNKNOWN_TOPIC_OR_PART,val=3,str="Subscribed topic not available: comment_events: Broker: Unknown topic or partition"}
SUCCESS: Recovered data from Kafka: {'reviewerID': 'user121233', 'reviewText': 'This is a test comment', 'comment_time': 1766079158}


%6|1766080132.538|FAIL|producer-1#producer-2| [thrd:localhost:39092/3]: localhost:39092/3: Disconnected: connection reset by peer (after 0ms in state APIVERSION_QUERY)
%6|1766080132.593|FAIL|producer-1#producer-2| [thrd:localhost:29092/2]: localhost:29092/2: Disconnected: connection reset by peer (after 3ms in state APIVERSION_QUERY)
%6|1766080132.643|FAIL|producer-1#producer-2| [thrd:localhost:39092/bootstrap]: localhost:39092/bootstrap: Disconnected: connection reset by peer (after 0ms in state APIVERSION_QUERY)
%6|1766080132.699|FAIL|producer-1#producer-2| [thrd:localhost:29092/bootstrap]: localhost:29092/bootstrap: Disconnected: connection reset by peer (after 0ms in state APIVERSION_QUERY)
%6|1766080132.754|FAIL|producer-1#producer-2| [thrd:localhost:19092/bootstrap]: localhost:19092/bootstrap: Disconnected: connection reset by peer (after 0ms in state APIVERSION_QUERY)
%6|1766080132.768|FAIL|rdkafka#producer-1| [thrd:localhost:19092/1]: localhost:19092/1: Disconnected: connection

#