Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is default.topic.config really deprecated is consumer config? #1316

Closed
7 tasks
deniskrumko opened this issue Apr 6, 2022 · 9 comments
Closed
7 tasks

Is default.topic.config really deprecated is consumer config? #1316

deniskrumko opened this issue Apr 6, 2022 · 9 comments

Comments

@deniskrumko
Copy link

deniskrumko commented Apr 6, 2022

Description

I got issue with auto.offset.reset: earliest not working as expected.

Here is my consumer config:

config = {
    'bootstrap.servers': ...,
    'group.id': ...,
    'auto.offset.reset': 'earliest',  # like in README
    'enable.auto.commit': False,
}

My original issue:

  1. Create Kafka topic my_topic, put there 100 messages
  2. Set group.id = AAA in config
  3. Create Consumer(), subscribe, use consumer.poll(1.0) in loop. Eventually, I got 100 messages - ✅. Commit here too.
  4. Check current offset: TopicPartition{topic=my_topic,partition=0,offset=100,error=None} - ✅
  5. Close consumer. Program closed too.
  6. Change group.id to BBB in config
  7. Repeat step 3: create consumer, subscribe and run consumer.poll() again - nothing, no messages - ❌
  8. Check current offset: TopicPartition{topic=my_topic,partition=0,offset=-1001,error=None}
  9. Here I add 10 more messages to topic
  10. Consumer polled 10 messages starting with offset 100. Just like if auto.offset.reset: latest

Solution to this issue - adding default.topic.config:

config = {
    'bootstrap.servers': servers,
    'group.id': group_id,
    'auto.offset.reset': 'earliest',
    'default.topic.config': {'auto.offset.reset': 'earliest'},  # even it's deprecated
    'enable.auto.commit': False,
}

After changing config like that and creating new consumer with group.id = CCC - it starts polling from offset 0, like it should.

Looks like:

  • default.topic.config is not really deprecated?
  • or I got something wrong 😀

How to reproduce

See Description.

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
 >>> confluent_kafka.version()
('1.8.2', 17302016)
>>> confluent_kafka.libversion()
('1.8.2', 17302271)
  • Apache Kafka broker version: 2.8.1
  • Client configuration: see in issue
  • Operating system: macOS Monterey 12.2.1 (21D62)
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue: no
@edenhill
Copy link
Contributor

edenhill commented Apr 6, 2022

auto.offset.reset should definitely work on the global config, and default.topic.config does remain deprecated :)

Can you reproduce this with 'debug': 'cgrp,fetch' set and provide the logs, at least for the consumer instance that does not honour auto.offset.reset?
Thanks

@deniskrumko
Copy link
Author

@edenhill ok, I added logs to gist:
https://gist.github.com/deniskrumko/45c4cf57e2cee3b611bbf1c58bbd7ab6

  1. group_id_AAA - everything works ok, polled 10 messages
  2. group_id_BBB - no polled messages, stuck in loop, closed program by ctrl+c
  3. group_id_BBB_fixed - updated config, now everything works ok, polled 10 messages too

@deniskrumko
Copy link
Author

Also it's interesting to me - why it's always works on freshly created topic for the first time. Looks like group.id group_id_AAA occupied the topic 😀

@edenhill
Copy link
Contributor

edenhill commented Apr 6, 2022

Thank you! I unfortunately need more debug logs from the second consumer (group_id_BBB); could you please repro that with "debug": "all"?
Need to figure out why it thinks 10 is a good starting offset.

@deniskrumko
Copy link
Author

@edenhill

Ok, but i changed to group.id = group_id_CCC, because BBB was already committed to offset 10 :)

https://gist.githubusercontent.com/deniskrumko/45c4cf57e2cee3b611bbf1c58bbd7ab6/raw/c788d748538bac6ca63156dafd1fb4b1f0e45c92/group_id_CCC.txt

@deniskrumko
Copy link
Author

deniskrumko commented Apr 6, 2022

I can see line in last logs here:

%7|1649257442.632|CONF|rdkafka#consumer-1| [thrd:app]:   auto.offset.reset = largest

As I understand largest = latest, even if I specified earliest

Looks like config didn't really apply

@edenhill
Copy link
Contributor

edenhill commented Apr 6, 2022

Thanks! Can you provide the code for your test program? At least the consumer setup

@deniskrumko
Copy link
Author

@edenhill it was my program all along :(

Sorry for your wasted time and thanks a lot for very quick help!

I got custom class with default kwarg equal to "latest" in init that override every config that I passed to it. So stupid :)

At least now I know how to use "debug": "all", so thanks again!

@edenhill
Copy link
Contributor

edenhill commented Apr 6, 2022

No worries, happy you found the issue 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants