In [162]:
from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions
from confluent_kafka import Consumer
from confluent_kafka import Producer
from confluent_kafka import TopicPartition

bootstrap_server='localhost:9092'
topic = 'notebook-topic'

# Producer

In [50]:
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('❌ Message delivery failed: {}'.format(err))
    else:
        print('✅  📬  Message delivered: "{}" to {} [partition {}]'.format(msg.value().decode('utf-8'),msg.topic(), msg.partition()))


In [51]:
# initialize producer
producer = Producer({'bootstrap.servers': bootstrap_server})

In [52]:
# produce 10 messages
for i in range(10):
    # Trigger any available delivery report callbacks from previous produce() calls
    producer.poll(0)
    
    data = f"message:{i}"
    producer.produce(topic, key='key', value=data.encode('utf-8'), callback=delivery_report)

r=producer.flush(timeout=5)
if r>0:
    print('❌ Message delivery failed ({} message(s) still remain, did we timeout sending perhaps?)\n'.format(r))

✅  📬  Message delivered: "message:0" to notebook-topic [partition 0]
✅  📬  Message delivered: "message:1" to notebook-topic [partition 0]
✅  📬  Message delivered: "message:2" to notebook-topic [partition 0]
✅  📬  Message delivered: "message:3" to notebook-topic [partition 0]
✅  📬  Message delivered: "message:4" to notebook-topic [partition 0]
✅  📬  Message delivered: "message:5" to notebook-topic [partition 0]
✅  📬  Message delivered: "message:6" to notebook-topic [partition 0]
✅  📬  Message delivered: "message:7" to notebook-topic [partition 0]
✅  📬  Message delivered: "message:8" to notebook-topic [partition 0]
✅  📬  Message delivered: "message:9" to notebook-topic [partition 0]


# Consumer

In [71]:
# Initlize consumer

# Offset management should be set properly:
# 1. auto-commit: Consumer will commit/report offsets to Kakfa
# 2. manual-commit: You need to do that(consumer.commit()) yourself
# https://docs.confluent.io/platform/current/clients/consumer.html#id 

consumer_auto_commit = Consumer({
    'bootstrap.servers': bootstrap_server,
    'group.id': 'notebook_auto',
    'enable.auto.commit':'true',
    # 'session.timeout.ms': 5000,
    'auto.offset.reset': 'earliest'
})

consumer_manual_commit = Consumer({
    'bootstrap.servers': bootstrap_server,
    'group.id': 'notebook_manual',
    'enable.auto.commit':'false',
    # 'session.timeout.ms': 5000,
    'auto.offset.reset': 'earliest'
})

# Specify what topic(s) to subscribe
consumer_auto_commit.subscribe([topic])
consumer_manual_commit.subscribe([topic])

## Auto-commit consumer

In [72]:
# Check current offset
print(consumer_auto_commit.position([TopicPartition(topic, 0)]))

# Consume 5 messages
for _ in range(5):
    msg = consumer_auto_commit.poll(1.0)
    if msg is None:
        print("Message: {None}")
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('✅  📬  Message received: "{}" to {} [partition {}]'.format(msg.value().decode('utf-8'),msg.topic(), msg.partition()))

# Check current offset
print(consumer_auto_commit.position([TopicPartition(topic, 0)]))

[TopicPartition{topic=notebook-topic,partition=0,offset=-1001,leader_epoch=None,error=None}]
✅  📬  Message received: "message:0" to notebook-topic [partition 0]
✅  📬  Message received: "message:1" to notebook-topic [partition 0]
✅  📬  Message received: "message:2" to notebook-topic [partition 0]
✅  📬  Message received: "message:3" to notebook-topic [partition 0]
✅  📬  Message received: "message:4" to notebook-topic [partition 0]
[TopicPartition{topic=notebook-topic,partition=0,offset=5,leader_epoch=0,error=None}]


In [75]:
# Consume 3 more messages
for _ in range(3):
    msg = consumer_auto_commit.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('✅  📬  Message received: "{}" to {} [partition {}]'.format(msg.value().decode('utf-8'),msg.topic(), msg.partition()))

# Check current offset
print(consumer_auto_commit.position([TopicPartition(topic, 0)]))

✅  📬  Message received: "message:5" to notebook-topic [partition 0]
✅  📬  Message received: "message:6" to notebook-topic [partition 0]
✅  📬  Message received: "message:7" to notebook-topic [partition 0]
[TopicPartition{topic=notebook-topic,partition=0,offset=8,leader_epoch=0,error=None}]


In [76]:
# As we enable auto-commit,
# if we create another consumer with same setup, we will receive messages from offset=8

consumer_auto_commit = Consumer({
    'bootstrap.servers': bootstrap_server,
    'group.id': 'notebook_auto',
    'enable.auto.commit':'true',
    # 'session.timeout.ms': 5000,
    'auto.offset.reset': 'earliest'
})
consumer_auto_commit.subscribe([topic])

# Check current offset
# It shows offset=-1001 because we just initilize it
print(consumer_auto_commit.position([TopicPartition(topic, 0)]))

[TopicPartition{topic=notebook-topic,partition=0,offset=-1001,leader_epoch=None,error=None}]


In [77]:
# Consume 2 more messages
for _ in range(2):
    msg = consumer_auto_commit.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('✅  📬  Message received: "{}" to {} [partition {}]'.format(msg.value().decode('utf-8'),msg.topic(), msg.partition()))

# Check current offset
print(consumer_auto_commit.position([TopicPartition(topic, 0)]))

✅  📬  Message received: "message:8" to notebook-topic [partition 0]
✅  📬  Message received: "message:9" to notebook-topic [partition 0]
[TopicPartition{topic=notebook-topic,partition=0,offset=10,leader_epoch=0,error=None}]


## Manual-commit consumer

In [91]:
# Check current offset
print(consumer_manual_commit.position([TopicPartition(topic, 0)]))

# Consume 5 messages
for _ in range(5):
    msg = consumer_manual_commit.poll(1.0)
    if msg is None:
        print("Message: {None}")
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('✅  📬  Message received: "{}" to {} [partition {}]'.format(msg.value().decode('utf-8'),msg.topic(), msg.partition()))

# Check current offset
print(consumer_manual_commit.position([TopicPartition(topic, 0)]))

[TopicPartition{topic=notebook-topic,partition=0,offset=-1001,leader_epoch=None,error=None}]
✅  📬  Message received: "message:0" to notebook-topic [partition 0]
✅  📬  Message received: "message:1" to notebook-topic [partition 0]
✅  📬  Message received: "message:2" to notebook-topic [partition 0]
✅  📬  Message received: "message:3" to notebook-topic [partition 0]
✅  📬  Message received: "message:4" to notebook-topic [partition 0]
[TopicPartition{topic=notebook-topic,partition=0,offset=5,leader_epoch=0,error=None}]


In [92]:
# commit offsets
consumer_manual_commit.commit()

In [93]:
# Consume 3 more messages
for _ in range(3):
    msg = consumer_manual_commit.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('✅  📬  Message received: "{}" to {} [partition {}]'.format(msg.value().decode('utf-8'),msg.topic(), msg.partition()))

# Check current offset
print(consumer_manual_commit.position([TopicPartition(topic, 0)]))

✅  📬  Message received: "message:5" to notebook-topic [partition 0]
✅  📬  Message received: "message:6" to notebook-topic [partition 0]
✅  📬  Message received: "message:7" to notebook-topic [partition 0]
[TopicPartition{topic=notebook-topic,partition=0,offset=8,leader_epoch=0,error=None}]


In [94]:
# As we do not enable auto-commit,
# if we create another consumer with same setup, we will receive messages from offset=5

consumer_manual_commit = Consumer({
    'bootstrap.servers': bootstrap_server,
    'group.id': 'notebook_manual',
    'enable.auto.commit':'false',
    # 'session.timeout.ms': 5000,
    'auto.offset.reset': 'earliest'
})
consumer_manual_commit.subscribe([topic])

# Check current offset
# It shows offset=-1001 because we just initilize it
print(consumer_manual_commit.position([TopicPartition(topic, 0)]))

[TopicPartition{topic=notebook-topic,partition=0,offset=-1001,leader_epoch=None,error=None}]


In [97]:
# Consume 2 more messages
for _ in range(2):
    msg = consumer_manual_commit.poll(1.0)
    if msg is None:
        print("Message: {None}")
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('✅  📬  Message received: "{}" to {} [partition {}]'.format(msg.value().decode('utf-8'),msg.topic(), msg.partition()))

# Check current offset
print(consumer_manual_commit.position([TopicPartition(topic, 0)]))

✅  📬  Message received: "message:5" to notebook-topic [partition 0]
✅  📬  Message received: "message:6" to notebook-topic [partition 0]
[TopicPartition{topic=notebook-topic,partition=0,offset=7,leader_epoch=0,error=None}]


%4|1695027339.084|MAXPOLL|rdkafka#consumer-29| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 273ms (adjust max.poll.interval.ms for long-running message processing): leaving group


# AdminClient

In [98]:
ac = AdminClient({'bootstrap.servers': bootstrap_server})

## View/List topics&consumer

In [100]:
# List all topics
ac.list_topics().topics

{'example-topic': TopicMetadata(example-topic, 1 partitions),
 'another_topic': TopicMetadata(another_topic, 1 partitions),
 'notebook-topic': TopicMetadata(notebook-topic, 1 partitions),
 '__consumer_offsets': TopicMetadata(__consumer_offsets, 50 partitions)}

In [129]:
# List all consumer groups
future = ac.list_consumer_groups()
group_ids = []

try:
    list_consumer_groups_result = future.result()
    print("{} consumer groups".format(len(list_consumer_groups_result.valid)))
    for valid in list_consumer_groups_result.valid:
        group_ids.append(valid.group_id)
        print("    id: {} is_simple: {} state: {}".format(
            valid.group_id, valid.is_simple_consumer_group, valid.state))
    for error in list_consumer_groups_result.errors:
        print("    error: {}".format(error))
except Exception:
    raise

7 consumer groups
    id: consumer-group-id is_simple: False state: ConsumerGroupState.EMPTY
    id: rmoff13 is_simple: False state: ConsumerGroupState.EMPTY
    id: notebook_auto is_simple: False state: ConsumerGroupState.EMPTY
    id: consumer-group-2 is_simple: False state: ConsumerGroupState.EMPTY
    id: console-consumer-12979 is_simple: False state: ConsumerGroupState.STABLE
    id: consumer-plge is_simple: False state: ConsumerGroupState.STABLE
    id: notebook_manual is_simple: False state: ConsumerGroupState.EMPTY


In [130]:
# Detail consumer group info
futureMap = ac.describe_consumer_groups(group_ids, request_timeout=10)

for group_id, future in futureMap.items():
    try:
        g = future.result()
        print("Group Id: {}".format(g.group_id))
        print("  Is Simple          : {}".format(g.is_simple_consumer_group))
        print("  State              : {}".format(g.state))
        print("  Partition Assignor : {}".format(g.partition_assignor))
        print("  Coordinator        : ({}) {}:{}".format(g.coordinator.id, g.coordinator.host, g.coordinator.port))
        print("  Members: ")
        for member in g.members:
            print("    Id                : {}".format(member.member_id))
            print("    Host              : {}".format(member.host))
            print("    Client Id         : {}".format(member.client_id))
            print("    Group Instance Id : {}".format(member.group_instance_id))
            if member.assignment:
                print("    Assignments       :")
                for toppar in member.assignment.topic_partitions:
                    print("      {} [{}]".format(toppar.topic, toppar.partition))
    except KafkaException as e:
        print("Error while describing group id '{}': {}".format(group_id, e))
    except Exception:
        raise

Group Id: consumer-group-id
  Is Simple          : False
  State              : ConsumerGroupState.EMPTY
  Partition Assignor : 
  Coordinator        : (1) localhost:9092
  Members: 
Group Id: rmoff13
  Is Simple          : False
  State              : ConsumerGroupState.EMPTY
  Partition Assignor : 
  Coordinator        : (1) localhost:9092
  Members: 
Group Id: notebook_auto
  Is Simple          : False
  State              : ConsumerGroupState.EMPTY
  Partition Assignor : 
  Coordinator        : (1) localhost:9092
  Members: 
Group Id: consumer-group-2
  Is Simple          : False
  State              : ConsumerGroupState.EMPTY
  Partition Assignor : 
  Coordinator        : (1) localhost:9092
  Members: 
Group Id: console-consumer-12979
  Is Simple          : False
  State              : ConsumerGroupState.STABLE
  Partition Assignor : range
  Coordinator        : (1) localhost:9092
  Members: 
    Id                : consumer-console-consumer-12979-1-ac79970b-86a7-44a2-b248-da591f4

In [106]:
what = "all"


md = ac.list_topics(timeout=10)

print("Cluster {} metadata (response from broker {}):".format(md.cluster_id, md.orig_broker_name))
print('--------')
if what in ("all", "brokers"):
    print(" {} brokers:".format(len(md.brokers)))
    for b in iter(md.brokers.values()):
        if b.id == md.controller_id:
            print("  {}  (controller)".format(b))
        else:
            print("  {}".format(b))
print('--------')
if what in ("all", "topics"):
    print(" {} topics:".format(len(md.topics)))
    for t in iter(md.topics.values()):
        if t.error is not None:
            errstr = ": {}".format(t.error)
        else:
            errstr = ""

        print("  \"{}\" with {} partition(s){}".format(t, len(t.partitions), errstr))

        for p in iter(t.partitions.values()):
            if p.error is not None:
                errstr = ": {}".format(p.error)
            else:
                errstr = ""

            print("partition {} leader: {}, replicas: {},"
                  " isrs: {} errstr: {}".format(p.id, p.leader, p.replicas,
                                                p.isrs, errstr))
print('--------')
if what in ("all", "groups"):
    groups = ac.list_groups(timeout=10)
    print(" {} consumer groups".format(len(groups)))
    for g in groups:
        if g.error is not None:
            errstr = ": {}".format(g.error)
        else:
            errstr = ""

        print(" \"{}\" with {} member(s), protocol: {}, protocol_type: {}{}".format(
            g, len(g.members), g.protocol, g.protocol_type, errstr))

        for m in g.members:
            print("id {} client_id: {} client_host: {}".format(m.id, m.client_id, m.client_host))

Cluster W4nU-5eNTlmzzun9UpuBJg metadata (response from broker localhost:9092/1):
--------
 1 brokers:
  localhost:9092/1  (controller)
--------
 4 topics:
  "example-topic" with 1 partition(s)
partition 0 leader: 1, replicas: [1], isrs: [1] errstr: 
  "another_topic" with 1 partition(s)
partition 0 leader: 1, replicas: [1], isrs: [1] errstr: 
  "notebook-topic" with 1 partition(s)
partition 0 leader: 1, replicas: [1], isrs: [1] errstr: 
  "__consumer_offsets" with 50 partition(s)
partition 0 leader: 1, replicas: [1], isrs: [1] errstr: 
partition 1 leader: 1, replicas: [1], isrs: [1] errstr: 
partition 2 leader: 1, replicas: [1], isrs: [1] errstr: 
partition 3 leader: 1, replicas: [1], isrs: [1] errstr: 
partition 4 leader: 1, replicas: [1], isrs: [1] errstr: 
partition 5 leader: 1, replicas: [1], isrs: [1] errstr: 
partition 6 leader: 1, replicas: [1], isrs: [1] errstr: 
partition 7 leader: 1, replicas: [1], isrs: [1] errstr: 
partition 8 leader: 1, replicas: [1], isrs: [1] errstr: 
pa

  groups = ac.list_groups(timeout=10)


## Create/Delete topic

In [157]:
# List all topics
ac.list_topics().topics

{'topic2': TopicMetadata(topic2, 1 partitions),
 'example-topic': TopicMetadata(example-topic, 1 partitions),
 'topic1': TopicMetadata(topic1, 1 partitions),
 'notebook-topic': TopicMetadata(notebook-topic, 1 partitions),
 '__consumer_offsets': TopicMetadata(__consumer_offsets, 50 partitions),
 'another_topic': TopicMetadata(another_topic, 1 partitions)}

In [183]:
# Creat topic

new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1) for topic in ["topic1", "topic2"]]
# Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability.

# Call create_topics to asynchronously create topics. A dict
# of <topic,future> is returned.
fs = ac.create_topics(new_topics)

# Wait for each operation to finish.
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

Topic topic1 created
Topic topic2 created


In [184]:
# List all topics
ac.list_topics().topics

{'topic2': TopicMetadata(topic2, 1 partitions),
 'example-topic': TopicMetadata(example-topic, 1 partitions),
 'topic1': TopicMetadata(topic1, 1 partitions),
 'notebook-topic': TopicMetadata(notebook-topic, 1 partitions),
 '__consumer_offsets': TopicMetadata(__consumer_offsets, 50 partitions),
 'another_topic': TopicMetadata(another_topic, 1 partitions)}

In [185]:
# Create partition

# NewPartitions(topic, new_total_count[, replica_assignment])
new_partitions = [NewPartitions(topic.topic, topic.num_partitions+1)for topic in new_topics]

# Try switching validate_only to True to only validate the operation
# on the broker but not actually perform it.
fs = ac.create_partitions(new_partitions, validate_only=False)

# Wait for operation to finish.
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Additional partitions created for topic {}".format(topic))
    except Exception as e:
        print("Failed to add partitions to topic {}: {}".format(topic, e))

Additional partitions created for topic topic1
Additional partitions created for topic topic2


In [186]:
ac.list_topics().topics

{'topic2': TopicMetadata(topic2, 2 partitions),
 'example-topic': TopicMetadata(example-topic, 1 partitions),
 'topic1': TopicMetadata(topic1, 2 partitions),
 'notebook-topic': TopicMetadata(notebook-topic, 1 partitions),
 '__consumer_offsets': TopicMetadata(__consumer_offsets, 50 partitions),
 'another_topic': TopicMetadata(another_topic, 1 partitions)}

In [187]:
# Delete topic

fs = ac.delete_topics(["topic1", "topic2"], operation_timeout=0)

# Wait for operation to finish.
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} deleted".format(topic))
    except Exception as e:
        print("Failed to delete topic {}: {}".format(topic, e))

Topic topic1 deleted
Topic topic2 deleted


In [188]:
ac.list_topics().topics

{'example-topic': TopicMetadata(example-topic, 1 partitions),
 'another_topic': TopicMetadata(another_topic, 1 partitions),
 'notebook-topic': TopicMetadata(notebook-topic, 1 partitions),
 '__consumer_offsets': TopicMetadata(__consumer_offsets, 50 partitions)}

---

In [None]:
import random
import time

# initialize producer
producer = Producer({'bootstrap.servers': bootstrap_server})

In [206]:
# produce 10 messages
for i in range(50):
    # Trigger any available delivery report callbacks from previous produce() calls
    producer.poll(0)
    
    data = f"message:{i}"
    producer.produce('env-topic', partition=random.randint(0, 1), key='key', value=data.encode('utf-8'), callback=delivery_report)
    time.sleep(0.5)
r=producer.flush(timeout=5)
if r>0:
    print('❌ Message delivery failed ({} message(s) still remain, did we timeout sending perhaps?)\n'.format(r))

✅  📬  Message delivered: "message:0" to env-topic [partition 1]
✅  📬  Message delivered: "message:1" to env-topic [partition 1]
✅  📬  Message delivered: "message:2" to env-topic [partition 1]
✅  📬  Message delivered: "message:3" to env-topic [partition 1]
✅  📬  Message delivered: "message:4" to env-topic [partition 0]
✅  📬  Message delivered: "message:5" to env-topic [partition 0]
✅  📬  Message delivered: "message:6" to env-topic [partition 1]
✅  📬  Message delivered: "message:7" to env-topic [partition 0]
✅  📬  Message delivered: "message:8" to env-topic [partition 1]
✅  📬  Message delivered: "message:9" to env-topic [partition 1]
✅  📬  Message delivered: "message:10" to env-topic [partition 0]
✅  📬  Message delivered: "message:11" to env-topic [partition 0]
✅  📬  Message delivered: "message:12" to env-topic [partition 0]
✅  📬  Message delivered: "message:13" to env-topic [partition 1]
✅  📬  Message delivered: "message:14" to env-topic [partition 0]
✅  📬  Message delivered: "message:1

In [209]:
# produce 10 messages
for i in range(30):
    # Trigger any available delivery report callbacks from previous produce() calls
    producer.poll(0)
    
    data = f"message:{i}"
    producer.produce('env-topic', key='key', value=data.encode('utf-8'), callback=delivery_report)
    time.sleep(0.2)
r=producer.flush(timeout=5)
if r>0:
    print('❌ Message delivery failed ({} message(s) still remain, did we timeout sending perhaps?)\n'.format(r))

✅  📬  Message delivered: "message:0" to env-topic [partition 1]
✅  📬  Message delivered: "message:1" to env-topic [partition 1]
✅  📬  Message delivered: "message:2" to env-topic [partition 1]
✅  📬  Message delivered: "message:3" to env-topic [partition 1]
✅  📬  Message delivered: "message:4" to env-topic [partition 1]
✅  📬  Message delivered: "message:5" to env-topic [partition 1]
✅  📬  Message delivered: "message:6" to env-topic [partition 1]
✅  📬  Message delivered: "message:7" to env-topic [partition 1]
✅  📬  Message delivered: "message:8" to env-topic [partition 1]
✅  📬  Message delivered: "message:9" to env-topic [partition 1]
✅  📬  Message delivered: "message:10" to env-topic [partition 1]
✅  📬  Message delivered: "message:11" to env-topic [partition 1]
✅  📬  Message delivered: "message:12" to env-topic [partition 1]
✅  📬  Message delivered: "message:13" to env-topic [partition 1]
✅  📬  Message delivered: "message:14" to env-topic [partition 1]
✅  📬  Message delivered: "message:1