Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8635: Skip client poll in Sender loop when no request is sent #7085

Merged
merged 3 commits into from Jul 18, 2019

Conversation

@bob-barrett
Copy link
Contributor

commented Jul 12, 2019

This patch changes maybeSendTransactionalRequest to handle both sending and polling transactional requests (and renames it to maybeSendAndPollTransactionalRequest), and skips the call to poll if no request is actually sent. It also removes the inner loop inside maybeSendAndPollTransactionalRequest and relies on the main Sender loop for retries.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)
KAFKA-8635: Skip client poll in Sender loop when no request is sent
This patch breaks up maybeSendTransactionalRequest() and changes it to return false if a FindCoordinatorRequest is enqueued. If this is the case, we no longer poll for because no request was actually sent.
@hachikuji
Copy link
Contributor

left a comment

Thanks for the patch. I think the fix make sense. I left a few cleanup suggestions for consideration.

/**
* Enqueue a FindCoordinatorRequest if needed, otherwise send the next request
*/
private boolean maybeSendTransactionalRequest(TransactionManager.TxnRequestHandler nextRequestHandler) {
AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
while (!forceClose) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 13, 2019

Contributor

Not necessarily something we have to do here, but I think we should be able to get rid of this loop and just rely on the next iteration of runOnce() to handle retries.

TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler(accumulator.hasIncomplete());
if (nextRequestHandler != null) {
if (maybeSendTransactionalRequest(nextRequestHandler)) {
client.poll(retryBackoffMs, time.milliseconds());

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 13, 2019

Contributor

This logic is a bit awkward. I guess it's because client.poll is disconnected from the send logic. I wonder if it would be reasonable to create a method like maybePollTransactionalRequest which handles both the inflight check, sending the next request, and calling client.poll if necessary.

/**
* Enqueue a FindCoordinatorRequest if needed, otherwise send the next request
*/
private boolean maybeSendTransactionalRequest(TransactionManager.TxnRequestHandler nextRequestHandler) {
AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
while (!forceClose) {
Node targetNode = null;

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 13, 2019

Contributor

Minor simplification we can do below:

if (targetNode == null || !awaitReady(client, targetNode, time, requestTimeoutMs)) {
  transactionManager.lookupCoordinator(nextRequestHandler);
  break;                        
}

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 13, 2019

Contributor

A helper for awaitReady might be useful as well. Might be a chance to consolidate the awaitLeastLoadedNodeReady path.

@hachikuji
Copy link
Contributor

left a comment

Thanks, looks good. Just a couple more comments.

@@ -513,8 +515,11 @@ private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOExc
return NetworkClientUtils.sendAndReceive(client, request, time);
}

private Node awaitLeastLoadedNodeReady(long remainingTimeMs) throws IOException {
Node node = client.leastLoadedNode(time.milliseconds());
private Node awaitLeastLoadedNodeReady(long remainingTimeMs, FindCoordinatorRequest.CoordinatorType coordinatorType) throws IOException {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 18, 2019

Contributor

nit: the name should be changed since we do not always use the least loaded node.

I noticed we are always using request timeout for the argument. Maybe we can drop the argument?

time.sleep(retryBackoffMs);
metadata.requestUpdate();
}

transactionManager.lookupCoordinator(nextRequestHandler);

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jul 18, 2019

Contributor

It reads a bit strange to fall through to lookupCoordinator if we know the request doesn't need the coordinator. Maybe clearer with a slight restructure:

transactionManager.retry(nextRequestHandler);

if (nextRequestHandler.needsCoordinator()) {
  transactionManager.lookupCoordinator(nextRequestHandler);
} else {
  // For non-coordinator requests, sleep here to prevent a tight loop when no node is available
  time.sleep(retryBackoffMs);
  metadata.requestUpdate();	            
}
@hachikuji
Copy link
Contributor

left a comment

LGTM. Thanks for the patch!

@hachikuji hachikuji merged commit 2e26a46 into apache:trunk Jul 18, 2019

3 checks passed

JDK 11 and Scala 2.12 SUCCESS 11680 tests run, 67 skipped, 0 failed.
Details
JDK 11 and Scala 2.13 SUCCESS 11680 tests run, 67 skipped, 0 failed.
Details
JDK 8 and Scala 2.11 SUCCESS 11680 tests run, 67 skipped, 0 failed.
Details

@bob-barrett bob-barrett deleted the bob-barrett:KAFKA-8635 branch Jul 18, 2019

hachikuji added a commit that referenced this pull request Jul 18, 2019
KAFKA-8635; Skip client poll in Sender loop when no request is sent (#…
…7085)

This patch changes maybeSendTransactionalRequest to handle both sending and polling transactional requests (and renames it to maybeSendAndPollTransactionalRequest), and skips the call to poll if no request is actually sent. It also removes the inner loop inside maybeSendAndPollTransactionalRequest and relies on the main Sender loop for retries.

Reviewers: Jason Gustafson <jason@confluent.io>
ijuma added a commit to confluentinc/kafka that referenced this pull request Jul 20, 2019
Merge remote-tracking branch 'apache-github/2.3' into ccs-2.3
* apache-github/2.3:
  MINOR: Update documentation for enabling optimizations (apache#7099)
  MINOR: Remove stale streams producer retry default docs. (apache#6844)
  KAFKA-8635; Skip client poll in Sender loop when no request is sent (apache#7085)
  KAFKA-8615: Change to track partition time breaks TimestampExtractor (apache#7054)
  KAFKA-8670; Fix exception for kafka-topics.sh --describe without --topic mentioned (apache#7094)
  KAFKA-8602: Separate PR for 2.3 branch (apache#7092)
  KAFKA-8530; Check for topic authorization errors in OffsetFetch response (apache#6928)
  KAFKA-8662; Fix producer metadata error handling and consumer manual assignment (apache#7086)
  KAFKA-8637: WriteBatch objects leak off-heap memory (apache#7050)
  KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing (apache#7021)
  HOT FIX: close RocksDB objects in correct order (apache#7076)
  KAFKA-7157: Fix handling of nulls in TimestampConverter (apache#7070)
  KAFKA-6605: Fix NPE in Flatten when optional Struct is null (apache#5705)
  Fixes #8198 KStreams testing docs use non-existent method pipe (apache#6678)
  KAFKA-5998: fix checkpointableOffsets handling (apache#7030)
  KAFKA-8653; Default rebalance timeout to session timeout for JoinGroup v0 (apache#7072)
  KAFKA-8591; WorkerConfigTransformer NPE on connector configuration reloading (apache#6991)
  MINOR: add upgrade text (apache#7013)
  Bump version to 2.3.1-SNAPSHOT
xiowu0 added a commit to linkedin/kafka that referenced this pull request Aug 22, 2019
[LI-CHERRY-PICK] [2371457] KAFKA-8635; Skip client poll in Sender loo…
…p when no request is sent (apache#7085)

TICKET = KAFKA-8635
LI_DESCRIPTION =
EXIT_CRITERIA = HASH [2371457]
ORIGINAL_DESCRIPTION =

This patch changes maybeSendTransactionalRequest to handle both sending and polling transactional requests (and renames it to maybeSendAndPollTransactionalRequest), and skips the call to poll if no request is actually sent. It also removes the inner loop inside maybeSendAndPollTransactionalRequest and relies on the main Sender loop for retries.

Reviewers: Jason Gustafson <jason@confluent.io>
(cherry picked from commit 2371457)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.