Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry ErrNotCoordinatorForConsumer in ConsumerGroup.newSession #1231

Open
wants to merge 2 commits into
base: master
from

Conversation

Projects
None yet
5 participants
@thomaslee
Copy link
Contributor

thomaslee commented Dec 4, 2018

Group coordinators seem to be selected from among those brokers which lead __consumer_offsets partitions, so when leadership of __consumer_offsets partitions move from one broker to another (e.g. due to an outage, or a controlled reassignment of partition replicas) the coordinator for a given group may move with it.

Sarama usually seems to handle these events by returning nil from ConsumerGroup.Consume() after a heartbeat fails due to a ErrNotCoordinatorForConsumer. In these cases it seems like you can just call ConsumerGroup.Consume() again to ask the consumer to resume where it left off. If you do this, what you typically see something like this in the logs when group coordinators move:

[sarama] 2018/12/04 01:32:58 consumer_group: <nil>
[sarama] 2018/12/04 01:32:58 consumer/broker/0 closed dead subscription to repro-topic/1
[sarama] 2018/12/04 01:32:58 consumer/broker/1 closed dead subscription to repro-topic/2
[sarama] 2018/12/04 01:32:58 consumer/broker/2 closed dead subscription to repro-topic/0
[sarama] 2018/12/04 01:32:58 client/coordinator requesting coordinator for consumergroup test-group from localhost:9292
[sarama] 2018/12/04 01:32:58 client/coordinator coordinator for consumergroup test-group is #1 (192.168.1.4:9192)
[sarama] 2018/12/04 01:32:58 calling Consume()
[sarama] 2018/12/04 01:32:59 client/metadata fetching metadata for [repro-topic] from broker localhost:9292
[sarama] 2018/12/04 01:32:59 client/coordinator requesting coordinator for consumergroup test-group from localhost:9292
[sarama] 2018/12/04 01:32:59 client/coordinator coordinator for consumergroup test-group is #1 (192.168.1.4:9192)
[sarama] 2018/12/04 01:32:59 consumer/broker/1 added subscription to repro-topic/2
[sarama] 2018/12/04 01:32:59 consumer/broker/0 added subscription to repro-topic/1
[sarama] 2018/12/04 01:32:59 consumer/broker/2 added subscription to repro-topic/0

However, if you're unlucky ConsumerGroup.Consume() can sometimes return ErrNotCoordinatorForConsumer & fail in such a way that the ConsumerGroup never looks up the new group coordinator:

[sarama] 2018/12/04 02:05:25 consumer_group: <nil>
[sarama] 2018/12/04 02:05:25 consumer/broker/0 closed dead subscription to repro-topic/1
[sarama] 2018/12/04 02:05:25 consumer/broker/1 closed dead subscription to repro-topic/2
[sarama] 2018/12/04 02:05:25 consumer/broker/2 closed dead subscription to repro-topic/0
[sarama] 2018/12/04 02:05:25 client/coordinator requesting coordinator for consumergroup test-group from localhost:9092
[sarama] 2018/12/04 02:05:25 client/coordinator coordinator for consumergroup test-group is #2 (192.168.1.4:9292)
[sarama] 2018/12/04 02:05:25 client/coordinator requesting coordinator for consumergroup test-group from localhost:9092
[sarama] 2018/12/04 02:05:25 client/coordinator coordinator for consumergroup test-group is #2 (192.168.1.4:9292)
[sarama] 2018/12/04 02:05:25 client/coordinator requesting coordinator for consumergroup test-group from localhost:9092
[sarama] 2018/12/04 02:05:25 client/coordinator coordinator for consumergroup test-group is #2 (192.168.1.4:9292)
[repro] 2018/12/04 02:05:25 calling Consume()
[sarama] 2018/12/04 02:05:25 client/metadata fetching metadata for [repro-topic] from broker localhost:9092
[repro] 2018/12/04 02:05:25 kafka server: Request was for a consumer group that is not coordinated by this broker.
*consumer stops*

At this point the ConsumerGroup seems to be in a broken state: checking the return value of Consume() for ErrNotCoordinatorForConsumer & simply retrying the call to Consume() doesn't help since repeated calls to ConsumerGroup.Consume() will continue to return ErrNotCoordinatorForConsumer forever. Something like this:

[sarama] 2018/12/04 02:18:48 client/metadata fetching metadata for [repro-topic] from broker localhost:9192
[repro] 2018/12/04 02:18:48 ErrNotCoordinatorForConsumer
[repro] 2018/12/04 02:18:48 calling Consume()
[sarama] 2018/12/04 02:18:48 client/metadata fetching metadata for [repro-topic] from broker localhost:9192
[repro] 2018/12/04 02:18:48 ErrNotCoordinatorForConsumer
[repro] 2018/12/04 02:18:48 calling Consume()

This is obviously not a great state for the consumer to be in & can obviously cause problems for consumers during basic cluster maintenance operations or broker outages/downtime.

We seem to get into this state because we're not attempting coordinator refreshes during calls to ConsumerGroup.newSession(), which is what I'm attempting to address in this PR. With this change applied, I can't reproduce the issue.

Example log output (prior to applying this change)

See this gist for an example of the problem this is trying to fix: https://gist.githubusercontent.com/thomaslee/b70498216fb04f6b02f26c21ef93a046/raw/6ac08ba203f6f86c75c65b9851acb7366c38fc7a/sarama-bug.txt

At the end of this output, the consumer process exits. (Note that the ** output here is from some light edits of vendored Sarama code for diagnostic purposes. Note too the whining about MaxWaitTime being low, which I don't believe is actually a prerequisite to reproducing this bug.)

Possible work-around

I haven't actually tried this myself, but it seems like until this PR is merged folks should be able to simply call Client.RefreshCoordinator(yourGroupHere) when ConsumerGroup.Consume() returns ErrNotCoordinatorForConsumer (perhaps with some backoff logic). Note that this requires users to create their ConsumerGroups using NewConsumerGroupFromClient.

@probot-shopify probot-shopify bot added the cla-needed label Dec 4, 2018

@thomaslee

This comment has been minimized.

Copy link
Contributor Author

thomaslee commented Dec 4, 2018

Signed the CLA.

@probot-shopify probot-shopify bot removed the cla-needed label Dec 4, 2018

@ShaneSaww

This comment has been minimized.

Copy link

ShaneSaww commented Dec 4, 2018

Something to possibly include in this would be that the heartbeatLoop can check for the ErrNotCoordinatorForConsumer and then run RefreshCoordinator to update the coordinator without having to drop out of the consume func.

It would change

switch resp.Err {
to look something like

switch resp.Err {
        case ErrNoError:
            retries = s.parent.config.Metadata.Retry.Max
        case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
            return
        case ErrNotCoordinatorForConsumer:
             err = s.parent.client.RefreshCoordinator(s.parent.groupID)
             if err != nil {
                s.parent.handleError(err,"",-1)
                return
            }
        default:
            s.parent.handleError(err, "", -1)
            return
        }
@prune998

This comment has been minimized.

Copy link

prune998 commented Mar 14, 2019

no update ?

@bai bai requested review from varun06 and sam-obeid Mar 19, 2019

@sam-obeid

This comment has been minimized.

Copy link

sam-obeid commented Mar 19, 2019

This is reasonable and would improve consumer resiliency. @thomaslee Could you add a unit test to cover this use case?

@varun06

This comment has been minimized.

Copy link
Collaborator

varun06 commented Mar 19, 2019

This is nice, please add a test and we can get it going.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.