Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calling consumer.Assign is not actually optional #163

Closed
3 tasks done
vladaionescu opened this issue Mar 24, 2018 · 7 comments
Closed
3 tasks done

Calling consumer.Assign is not actually optional #163

vladaionescu opened this issue Mar 24, 2018 · 7 comments
Labels

Comments

@vladaionescu
Copy link
Contributor

Description

Seeing this in channel consumer.

The documentation states (emphasis mine):

By default the consumer will start fetching messages for its assigned partitions at this point, but your application may enable rebalance events to get an insight into what the assigned partitions where as well as set the initial offsets. To do this you need to pass "go.application.rebalance.enable": true to the NewConsumer() call mentioned above. You will (eventually) see a kafka.AssignedPartitions event with the assigned partition set. You can optionally modify the initial offsets (they'll default to stored offsets and if there are no previously stored offsets it will fall back to "default.topic.config": ConfigMap{"auto.offset.reset": ..} which defaults to the latest message) and then call .Assign(partitions) to start consuming. If you don't need to modify the initial offsets you will not need to call .Assign(), the client will do so automatically for you if you dont.

It doesn't look like it automatically calls Assign(). I'm seeing no consumption if that call is removed (at least not in the channel consumer).

How to reproduce

  1. Start a channel consumer with go.application.rebalance.enable set to true.
  2. Handle kafka.AssignedPartitions but don't call .Assign() on them
  3. No assignment happens, no consumption takes place.

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()):

confluent-kafka-go: v0.11.0

  • Apache Kafka broker version:

0.11.0

  • Client configuration: ConfigMap{...}

    • go.application.rebalance.enable: true
    • go.events.channel.enable: true
@edenhill
Copy link
Contributor

Your analysis is spot on, and the documentation is wrong - when using the channel-based consumer with rebalance events the application must call Assign() to sync state.

There is a comment in the code that says exactly this, too bad it wasn't in the docs:
https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/event.go#L177

@edenhill edenhill added the docs label Mar 26, 2018
@briansorahan
Copy link

@edenhill We are using a Poll-based consumer and would like to have the option of choosing offsets when we get an AssignedPartitions event.
It seems like an assignment has already been made here in this case.
To set the initial offsets for a Poll consumer, should we just make another Assign call, overriding that one?

@agis
Copy link
Contributor

agis commented May 17, 2018

@edenhill Sorry to hijack this issue but I didn't want to open a new one.

We have the following scenario: we're using the poll-based consumer and we have go.application.rebalance.enable set to false. We never call Assign(), we just go straight to Poll() since it's supposed to call assign for us (code is here, Run() is called first and concurrently we might call Poll()).

My question, is that the suggested use-case? Asking because we noticed that sometimes a consumer might not receive any messages even though it's subscribed to a partition that does have messages.

@edenhill
Copy link
Contributor

That should indeed work.

One reason you are not seeing messages might be that there are no new messages coming in, and the consumer defaults to fetching only new messages (auto.offset.reset=latest) when no previously committed offsets are available.

If this is not the case you can debug the consumer by setting the debug config property to cgrp,fetch.

@agis
Copy link
Contributor

agis commented May 17, 2018

There were actually new messages produced to the partition, the consumer was indeed subscribed to the partition (as reported by kafka-consumer-groups --describe) but for some reason, messages weren't consumed.

It's as if we weren't calling Poll(), however that's highly improbable judging from the code.

In any way, I'll add cgrp,fetch and try to re-produce it.

@agis
Copy link
Contributor

agis commented May 17, 2018

Just wanted to add that we're on librdkafka 0.11.1.

@PandaCodes
Copy link

PandaCodes commented Jul 13, 2020

@edenhill From here appReassigned assumed to be true if we use channel-based consumer OR we have no rebalanceCb.
This means it is also necessary to call Assign() when using Poll() with no rebalanceCb defined. We also faced it within our work use-case.
Docs, however, seem to be still confusing for this case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants