Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka consumer changing group is not polling any messages #275

Closed
annusri opened this issue Nov 1, 2017 · 1 comment

Comments

Projects
None yet
2 participants
@annusri
Copy link

commented Nov 1, 2017

Description

running consumer code with different groups each time is not polling any messages from a topic,
inspite of default.topic.config : {'auto.offset.reset': 'earliest'}

How to reproduce

  1. Ran produce code as below
     from confluent_kafka import Producer

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: {0}: {1}"
              .format(msg.value(), err.str()))
    else:
        print("Message produced: {0}".format(msg.value()))

p = Producer({'bootstrap.servers': 'kafka01, kafka02, kafka03'})

try:
    for val in xrange(11, 20):
        p.produce('test', 'myvalue #{0}'
                  .format(val), callback=acked)
        p.poll(0.5)

except KeyboardInterrupt:
    pass

p.flush(30)
  1. run consumer
   from confluent_kafka import Consumer, KafkaError

settings = {
        'bootstrap.servers': 'kafka01,kafka02, kafka03', 
        'default.topic.config': {'auto.offset.reset': 'earliest'}
         ,'group.id' :'test'} 

c = Consumer(settings)

c.subscribe(['test'])

try:
    while True:
        msg = c.poll(0)
     
        if msg is None:
            continue
        elif not msg.error():
            print('Received message: {0}'.format(msg.value()))
        elif msg.error().code() == KafkaError._PARTITION_EOF:
            print('End of partition reached {0}/{1}'
                          .format(msg.topic(), msg.partition()))
            break   
        else:
            print('Error occured: {0}'.format(msg.error().str()))

except KeyboardInterrupt:
    pass

finally:
    c.close()

Result - Consumer is polling messages.

  1. Now change the consumer group.id to test1
    ran consumer - No messages are getting polled . why ?

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@edenhill

This comment has been minimized.

Copy link
Member

commented Nov 1, 2017

Are you completely sure there are no existing commits for group test1? Try a truly random group name.

Also: you might want to use a higher timeout (or no timeout, which is block-indef) on your poll() call to avoid busy-looping.

@edenhill edenhill added the wait info label Jan 16, 2018

@edenhill edenhill closed this Jan 16, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.