In [None]:
import rich
from app.core import settings
from app.core.log_adapter import logger
from confluent_kafka import Consumer, KafkaError, Message, Producer

demo_topic = "demo-topic"


def consume(topic: str) -> None:
    msg_count = 0
    logger.info("about to start consuming messages")

    consumer = Consumer(
        {
            "bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS,
            "group.id": "demo-group",
            "auto.offset.reset": "earliest",
            "enable.auto.commit": "false",  # this allows to easily replay the same events in development
        }
    )
    consumer.subscribe([topic])

    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                pass
            elif msg.error():
                logger.error(f"ERROR: {msg.error()}")
            else:
                # 打印消息的 topic / partition / offset / key / value
                rich.print(
                    f"Received message: "
                    f"topic={msg.topic()} "
                    f"partition={msg.partition()} "
                    f"offset={msg.offset()} "
                    f"key={msg.key()} "
                    f"value={msg.value().decode('utf-8')}"
                )

                # 已经消费的消息，不需要再消费
                consumer.commit()
                msg_count += 1

    except KeyboardInterrupt:
        logger.info("consumer ending: KeyboardInterrupt")
    except Exception as e:
        logger.info(f"consumer ending: {e}")
    finally:
        # Leave group and commit final offsets
        logger.info(f"{msg_count} messages consumed")
        consumer.close()


if __name__ == "__main__":
    consume(demo_topic)

[2m2025-07-10 12:23:13CST.579902[0m [[32m[1minfo     [0m] [1mabout to start consuming messages[0m


[2m2025-07-10 12:23:18CST.673263[0m [[32m[1minfo     [0m] [1mconsumer ending: KeyboardInterrupt[0m
[2m2025-07-10 12:23:18CST.674963[0m [[32m[1minfo     [0m] [1m6 messages consumed           [0m


In [None]:
producer = Producer({"bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS})


def delivery_report(err: KafkaError | None, msg: Message):
    if err:
        print("Delivery failed:", err)
    else:
        print(f"Delivered to {msg.topic()} [{msg.partition()}]")


producer.produce(demo_topic, "Hello, World!", on_delivery=delivery_report)
producer.flush()

Delivered to demo-topic [0]


0

In [28]:
consumer = Consumer(
    {
        "bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS,
        "group.id": "search-index-group",
        "auto.offset.reset": "earliest",
        "enable.auto.commit": "false",  # this allows to easily replay the same events in development
    }
)
consumer.list_topics()

ClusterMetadata(MkU3OEVBNTcwNTJENDM2Qk)