-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Closed
Description
Hi, when I run my consumer script I disable auto commits because in my use case there are some messages that need to be processed and if we got no errors then we can remove them from the queue hence the manual commit bit.
But when I manually commit the offset it is still there on next iteration of my script; of course if I re-enable auto-commit it's gone..
this is my code:
from kafka import KafkaConsumer
from kafka.structs import OffsetAndMetadata
consumer = KafkaConsumer(bootstrap_servers='192.168.33.10:9092', consumer_timeout_ms=1000, enable_auto_commit=False)
consumer.subscribe(['my-topic', 'another-topic'])
for message in consumer:
print (message)
meta = consumer.partitions_for_topic(message.topic)
partition = consumer.assignment().pop()
offsets = OffsetAndMetadata(message.offset, meta)
options = {partition: offsets}
consumer.commit(offsets=options)
consumer.close()
any hints?
Metadata
Metadata
Assignees
Labels
No labels