Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5263: Avoid tight polling loop in consumer with no ready nodes #3124

Closed
wants to merge 2 commits into from

Conversation

rajinisivaram
Copy link
Contributor

@rajinisivaram rajinisivaram commented May 23, 2017

For consumers with manual partition assignment, await metadata when there are no ready nodes to avoid busy polling.

@rajinisivaram
Copy link
Contributor Author

@hachikuji @ijuma Can you take a look? Thank you...

@asfbot
Copy link

asfbot commented May 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4302/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 23, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4289/
Test PASSed (JDK 8 and Scala 2.12).

@hachikuji
Copy link
Contributor

@rajinisivaram Thanks for the patch. I'm not totally sure I understand the cause of the busy loop. Does poll return immediately if there are no channels to wait on?

@rajinisivaram
Copy link
Contributor Author

@hachikuji No, the busy loop is due to the heavy use of wakeups in the consumer. Fetch requests don't trigger new connections, but they result in wakeups that cause select to return immediately. We don't want poll to block either when there are no channels, retrying for metadata felt like the best option.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. The solution makes sense to me. Left a couple minor comments,

} else {
// For manually assigned partitions, if there are no ready nodes, await metadata
// to avoid tight polling loop with no channels
if (metadata.updateRequested() && !client.hasReadyNodes()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgive the bikeshedding, but I was curious why this was located here instead of instead KafkaConsumer.pollOnce.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The objects being checked are in ConsumerCoordinator. Rather than add a method to the coordinator to do this, it made sense to do the check here, close to where ensureCoordinatorReady was being called, which invokes awaitMetadataUpdate for this scenario when group management is used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

if (!metadataUpdated && !client.hasReadyNodes())
return;
now = time.milliseconds();
}
}

if (needRejoin()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could pull this into the partitionsAutoAssigned block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thank you for the review. Moved.

@@ -113,6 +113,12 @@ public Node leastLoadedNode() {
}
}

public boolean hasReadyNodes() {
synchronized (this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can probably just make the method synchronized right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Thank you for the review. It was a copy-and-paste from the method above. Made both methods synchronized.

@asfbot
Copy link

asfbot commented May 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4353/
Test FAILed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4339/
Test PASSed (JDK 8 and Scala 2.12).

@hachikuji
Copy link
Contributor

@rajinisivaram LGTM. This seems worth getting into 0.11.0 also? The test failure looks a little suspicious, but hard to say if it's related given overall flakiness.

@rajinisivaram
Copy link
Contributor Author

@hachikuji Thank you, I will retest this a couple of times before merging.

@rajinisivaram
Copy link
Contributor Author

retest this please

@asfbot
Copy link

asfbot commented May 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4343/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4357/
Test PASSed (JDK 7 and Scala 2.11).

@rajinisivaram
Copy link
Contributor Author

retest this please

@asfbot
Copy link

asfbot commented May 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4358/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4344/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4379/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4365/
Test PASSed (JDK 8 and Scala 2.12).

asfgit pushed a commit that referenced this pull request May 25, 2017
For consumers with manual partition assignment, await metadata when there are no ready nodes to avoid busy polling.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3124 from rajinisivaram/KAFKA-5263

(cherry picked from commit 64fc1a7)
Signed-off-by: Rajini Sivaram <rajinisivaram@googlemail.com>
@asfgit asfgit closed this in 64fc1a7 May 25, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants