Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5154: Consumer fetches from revoked partitions when SyncGroup fails with disconnection [WIP] #3181

Closed
wants to merge 4 commits into from

Conversation

dguy
Copy link
Contributor

@dguy dguy commented May 31, 2017

Scenario is as follows:

  1. Consumer subscribes to topic t1 and begins consuming
  2. heartbeat fails as the group is rebalancing
  3. ConsumerCoordinator.onJoinGroupPrepare is called
    3.1 onPartitionsRevoked is called
  4. consumer becomes the group leader
  5. sends sync group request
  6. sync group is cancelled due to disconnection
  7. fetch request is sent for partitions that have previously been revoked

@dguy
Copy link
Contributor Author

dguy commented May 31, 2017

@guozhangwang @hachikuji this is not to be merged, but this test fails and is based on the logs i extracted from the corresponding JIRA.
One simple fix for this specific problem is to clear the subscription in ConsumerCoordinator.onJoinPrepare. I think that is probably worth doing, but it is possibly masking a bigger problem as once the SyncGroup is disconnected the rebalance is never completed, i.e., the consumer doesn't get any partitions assigned

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4638/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4653/
Test FAILed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4642/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4657/
Test FAILed (JDK 7 and Scala 2.11).

@guozhangwang
Copy link
Contributor

Regarding the general issue that it may get exposed: I think the root cause is in CoordinatorResponseHandler#onFailure() we only mark the coordinator as dead but do not do anything else; for example for syncGroupRequest, and then in its caller joinGroupIfNeeded(), we do not check the DisconnectException again, and the while condition needRejoin() || rejoinIncomplete() will also pass although the actual state is:

  1. state = MemberState.UNJOINED.
  2. coordinator = null.
  3. rejoinNeeded = false.
  4. joinFuture = null.

I think we should check all four variables above in the while condition instead.

@hachikuji
Copy link
Contributor

Good catch. I wonder if we are setting rejoinNeeded to false too early. Perhaps that should only come after the SyncGroup returns?

@dguy
Copy link
Contributor Author

dguy commented Jun 1, 2017

@guozhangwang @hachikuji i think i've fixed it by simply calling requestRejoin when the coordinator is marked as dead. It will now attempt to rejoin the group. The test passes and everything looks as if it is working as expected

@asfbot
Copy link

asfbot commented Jun 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4702/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jun 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4718/
Test FAILed (JDK 7 and Scala 2.11).

@guozhangwang
Copy link
Contributor

Seems one fix missing:

scala2.11/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:1271: error: no suitable constructor found for Metadata(int,long)
        Metadata metadata = new Metadata(0, Long.MAX_VALUE);

coordinatorDead();
requestRejoin();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should set requestRejoin in the base class (CoordinatorResponseHandler ). For example, HeartbeatResponseHandler also extends from it, but for that request if we get a disconnect, we should just mark the coordinator as dead in order to re-discover it; and then after new coordinator rediscovered retry sending heartbeat request and if that succeed just proceed as normal. Setting it here will force heartbeat request disconnection to also trigger a join group.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @hachikuji 's suggestion may be better: do not call

AbstractCoordinator.this.rejoinNeeded = false;

in JoinGroupResponseHandler#handle(), but in SyncGroupResponseHandler#handle().

@asfbot
Copy link

asfbot commented Jun 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4717/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jun 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4733/
Test PASSed (JDK 7 and Scala 2.11).

@@ -537,6 +536,7 @@ public void handle(SyncGroupResponse syncResponse,
if (error == Errors.NONE) {
sensors.syncLatency.record(response.requestLatencyMs());
future.complete(syncResponse.memberAssignment());
AbstractCoordinator.this.rejoinNeeded = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought... future.complete(syncResponse.memberAssignment()) above will trigger joinFuture.addListener's onSuccess, which will enable the heartbeat thread right away, and hence there is a (very small) race condition.

I think it is safer to just move the the above line inside onSuccess (line 395) to set it before enabling heart beat thread, and we would not need AbstractCoordinator.this prefix also.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dguy @hachikuji if it sounds good to you I can go ahead and make this change while merging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@asfgit asfgit closed this in 1b16aca Jun 1, 2017
asfgit pushed a commit that referenced this pull request Jun 1, 2017
…Group response handler

Scenario is as follows:
1. Consumer subscribes to topic t1 and begins consuming
2. heartbeat fails as the group is rebalancing
3. ConsumerCoordinator.onJoinGroupPrepare is called
   3.1 onPartitionsRevoked is called
4. consumer becomes the group leader
5. sends sync group request
6. sync group is cancelled due to disconnection
7. fetch request is sent for partitions that have previously been revoked

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3181 from dguy/kafka-5154
@guozhangwang
Copy link
Contributor

Merged to trunk and cherry-picked to 0.11.0.

@dguy dguy deleted the kafka-5154 branch August 16, 2017 13:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants