Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4426: Add close with timeout for KafkaConsumer #2285

Closed
wants to merge 9 commits into from

Conversation

rajinisivaram
Copy link
Contributor

Code corresponding to KIP-102.

@asfbot
Copy link

asfbot commented Dec 21, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/308/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Dec 21, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/310/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Dec 21, 2016

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/309/
Test FAILed (JDK 8 and Scala 2.12).

@ijuma
Copy link
Contributor

ijuma commented Dec 28, 2016

cc @hachikuji

@asfbot
Copy link

asfbot commented Jan 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/519/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/518/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/517/
Test FAILed (JDK 7 and Scala 2.10).

* Tries to close the consumer cleanly within the specified timeout. This method waits up to
* <code>timeout</code> for the consumer to complete pending commits and leave the group.
* If the consumer is unable to complete the requests before the timeout expires, the consumer
* is force closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have more of a question than a comment. In the KIP discussion thread, a question was asked about happens if the consumer coordinator is not known when the consumer.close(), is called. It appears to me that if auto commit is enabled, then we wait upto the timeout limit to find the coordinator. In other cases, we close immediately.

Is this correct? If so, would it be worth mentioning this nuance in the javadoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apurvam Thank you for the review. Yes, that is a good point.

I was trying to make the behaviour no worse than it was before. For auto-commit, the code was waiting indefinitely to find the coordinator. I changed that to timeout. For commitAsync, the code was not waiting for coordinator. I didn't want that to block for 10 minutes in the default case. But now that the default timeout is a much more reasonable 30 seconds, I think it makes sense for the code to be consistent in both cases. So I have modified close to wait up to timeout to find coordinator when commits are pending. The alternative would be not to wait in either case, but that would be worse behaviour for auto-commit than it is now.

@hachikuji The changes are in the latest commit. Hope they make sense. Thank you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. So in the autocommit mode, the offsets will be committed on close. Further, any asynchronous commits pending will wait until the timeout before the close proceeds. If the coordinator cannot be found within the timeout, the close proceeds without committing anything. The default timeout in all cases will be 30 seconds.

I think it make sense to put this in the java doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated javadoc

@asfbot
Copy link

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/622/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/620/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 9, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/621/
Test PASSed (JDK 8 and Scala 2.12).


@Test
public void testCloseManualAssignment() throws Exception {
ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't make out the difference between this test and the previous one. looks identical from the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, typo. Fixed.

val consumer3 = createNewConsumer
val future3 = subscribeAndPoll(consumer3)
// Wait for consumer3 to poll and trigger rebalance
Thread.sleep(2000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use MockTime here instead. Would be preferable to slowing the test down by 2 whole seconds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the unit tests use MockTime. But for integration tests like this one, it would be much harder to use MockTime. Hence the real waits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not use waitUntilTrue to only wait as long as needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/**
* Consumer closed during rebalance when brokers are not available.
* Close should terminate immediately since coordinator not known.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the closeDuringRebalance method when brokersAvailableDuringClose is true, I don't see the check that close terminates immediately. Am I missing something? The two tests (this one and the previous one make the same assertions regardless of the value of the boolean. How is the immediate termination being asserted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two tests uses a close timeout of Long.MaxValue. The timeout tested is 1 second (to process leave group in the case where brokers are available) + a grace period - if close doesn't complete within this timeout, the test fails. I have updated comments in the test and also changed to using a variable instead of 1000ms. Hopefully it makes it clearer now.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram Left some initial comments. I'll try to get to the rest tomorrow.


/**
* Tries to close the consumer cleanly within the specified timeout. This method waits up to
* <code>timeout</code> for the consumer to complete pending commits and leave the group.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth mentioning that this method also sends a new offset commit when autocommit is enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated javadoc.

@@ -1550,7 +1574,7 @@ private void close(boolean swallowException) {
if (exception instanceof InterruptException) {
throw (InterruptException) exception;
}
throw new KafkaException("Failed to close kafka consumer", exception);
throw new KafkaException("Failed to close kafka consumer", firstException.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this was changed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintentional change, removed.

@@ -75,6 +74,7 @@
private final int autoCommitIntervalMs;
private final ConsumerInterceptors<?, ?> interceptors;
private final boolean excludeInternalTopics;
private final AtomicInteger pendingAsyncCommits;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not actually sure we need this. The method ConsumerNetworkClient.pendingRequestCount handles sent and unset requests, so it seems sufficient to just wait until the count reaches zero to be sure all outstanding offset commits have been sent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to use ConsumerNetworkClient.pendingRequestCount(Node), we need to know the coordinator. But we don't want to wait to find the coordinator if there are no pending commits. If pendingAsyncCommits > 0, close waits upto timeout to find coordinator and wait for ConsumerNetworkClient.pendingRequestCount(coordinator) to become zero. We could use ConsumerNetworkClient.pendingRequestCount() instead to wait for pending requests to all nodes, but I thought that would include fetch requests as well and I wanted to avoid that.

Copy link
Contributor

@hachikuji hachikuji Jan 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this a little more, I'm not sure adding coordinator discovery to this loop helps us. The issue is that a coordinator disconnect will cause all of the pending offset commit requests to fail. So I think the logic should be something like:

  1. Check if the coordinator exists. If not, close immediately.
  2. Check if there are pending requests. An optimization here is to check the rebalance state and exit immediately if we are rebalancing (after sending LeaveGroup).
  3. Wait until all pending requests have returned. If the coordinator disconnects at any time, exit immediately.

Does that make sense or not?

Copy link
Contributor

@hachikuji hachikuji Jan 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I did miss one case: if we are closing while the coordinator is failing over. Hmm, in this case, we could still send any unsent offset commit requests after the coordinator is rediscovered... I'm not sure if this case is worth trying to handle though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji If coordinator disconnects and in-flight requests are aborted, pendingAsyncCommits will be decremented and the loop would no longer wait to find the new coordinator. If there are unsent commit requests, pendingAsyncCommits > 0 and the loop waits until a coordinator is known. I was trying to remove the inconsistency between auto-commit and manual async commit that Apurva pointed out. Isn't it worth attempting to send pending commits when possible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajinisivaram I guess so. I was trying to come up with an excuse to keep shutdown fast and eliminate some bookkeeping 😉 . By the way, I'm not sure it makes sense to distinguish between async and sync commits since wakeup can interrupt a synchronous commit before returning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed pendingAsyncCommits to pendingCommits to track both sync and async commits.

Node coordinator = coordinator();
long now = time.milliseconds();
long endTimeMs = now + Math.min(timeoutMs - (now - startTimeMs), requestTimeoutMs);
while (pendingAsyncCommits.get() > 0 || (coordinator != null && client.pendingRequestCount(coordinator) > 0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't need to track the pending offset commits, maybe we could move this into a AbstractCoordinator.close(timeout, unit).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See note above.

Copy link
Contributor

@hachikuji hachikuji Jan 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the only thing we really care about is offset commits when shutting down. As long as we send the LeaveGroup, it's probably fine not to await its response (because of one of your previous patches). Because we now have the check for pendingAsyncCommits, I'm wondering if it's actually necessary to await all pending requests from the coordinator? At least if we keep the check, maybe we could ensure that we are not in the middle of a rebalance since that would unnecessarily delay shutdown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pendingRequestCount check ensures that LeaveGroup is actually sent. We could use unsent count from ConsumerNetworkClient instead, but since LeaveGroup is not sent if the coordinator is not known and pendingRequestCount is only checked when coordinator is known, waiting for LeaveGroup response shouldn't have too much impact on the overall close time.

ensureCoordinatorReady();
long startMs = time.milliseconds();
long remainingMs = timeoutMs;
while (remainingMs > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this a "do while" loop to ensure that we always send the offset commit at a minimum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

client.awaitMetadataUpdate();
else
if (future.isRetriable()) {
remainingMs = timeoutMs == Long.MAX_VALUE ? Long.MAX_VALUE : time.milliseconds() - startTimeMs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be timeoutMs - (time.milliseconds() - startTimeMs)? Also, it's not too big of a deal, but the checks for Long.MAX_VALUE seem like overkill.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, removed check for Long.MAX_VALUE to make code more readable.

public synchronized void ensureCoordinatorReady(long timeoutMs) {
long remainingMs = timeoutMs;
long startTimeMs = time.milliseconds();
while (coordinatorUnknown() && remainingMs > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes a difference at the moment given the current usage, but would it make sense to use a "do while" loop to ensure that it is safe to use this method with a timeout of 0 in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need the while (coordinatorUnknown()) at the start. But moved remainingMs > 0 to the end of the loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, seems like you forgot to remove the remainingMs in the while condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
coordinatorDead();
time.sleep(retryBackoffMs);
}
remainingMs = timeoutMs == Long.MAX_VALUE ? Long.MAX_VALUE : (timeoutMs - time.milliseconds() - startTimeMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the parenthesis should be around time.milliseconds() - startTimeMs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, removed check for Long.MAX_VALUE.

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I took a quick pass and left some comments.

coordinator.close(timeoutMs, requestTimeoutMs);
} catch (Throwable t) {
firstException.compareAndSet(null, t);
log.error("Failed to close coordinator", t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want this to be error or warn? Maybe error is right, so take this as a question. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClientUtils.closeQuietly which was used earlier logs error. Since that is used for the other entities in this method, it makes sense to log at the same level. Exceptions do indicate a catastrophic failure (broker unavailable etc. close without exceptions).

@@ -1505,9 +1506,27 @@ public void resume(Collection<TopicPartition> partitions) {
*/
@Override
public void close() {
close(DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we need to update the javadoc for this method (waiting indefinitely, in particular).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in one of the previous commits.

long endTimeMs = now + Math.min(timeoutMs - (now - startTimeMs), requestTimeoutMs);
while (pendingAsyncCommits.get() > 0 || (coordinator != null && client.pendingRequestCount(coordinator) > 0)) {
long remainingTimeMs = endTimeMs - time.milliseconds();
if (Thread.currentThread().isInterrupted())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: seems like the interrupted check should be done before we compute the remaining time (from a clarity point of view).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat
closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
Thread[] threads = new Thread[10];
int threadCount = Thread.currentThread().getThreadGroup().enumerate(threads);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you can call Thread.enumerate directly. Also, it would be good to assert that threadCount is < than threads.length.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Thread.enumerate. Since Thread.enumerate returns the number of threads actually returned rather than the number of active threads (ignoring any extra threads), threadCount is guaranteed to be less than threads.length. But I have changed the hard-coded size of the array to use Thread.activeCount().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way you did it is OK. I didn't suggest it because the javadoc talked about race conditions, but we are not generating new threads here, so it should be OK. To clarify my original suggestion: if threadCount == threads.length, there's a risk that some threads were ignored. If threadCount < threads.length, then you are sure that no thread was ignored. That was the motivation for my original suggestion.

@@ -64,7 +64,7 @@ private void maybeSleep(long ms) {
}

@Override
public void sleep(long ms) {
public synchronized void sleep(long ms) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why these changes are needed? The two clock sources are not synchronized in practice, so I thought we shouldn't enforce it in MockTime either. But perhaps there's a good reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes were made when the time was a long rather than AtomicLong. They are no longer needed, removed.

* last committed offset.
*/
@Test
def testClose() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we measure how much longer this test takes after these changes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, I mean ConsumerBounceTest as a whole.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumerBounceTest now takes 3m10s, up from 1m4s on my laptop. Which is rather high, I have to admit. The times are (the last two are the older tests):

  • testClose 10.099s
  • testCloseDuringRebalance 14.021s
  • testCloseDuringRebalanceBrokersUnavailable 14.171s
  • testCloseNoTimeoutWithClusterFailure 12.090s
  • testCloseTimeoutWithClusterFailure 9.859s
  • testCloseWithCoordinatorFailureUsingGroupManagement 14.024s
  • testCloseWithCoordinatorFailureUsingManualAssignment 10.080s
  • testCloseWithRandomBrokerFailures 39.924s
  • testConsumptionWithBrokerFailures 42.199s
  • testSeekAndCommitWithBrokerFailures 23.883s

I will remove testCloseWithRandomBrokerFailures since not sure it is worth spending 40 seconds on, system tests should come across these scenarios. Will try and remove some more cases to bring it below 2 minutes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that it would be better to rely on system tests for random broker failure tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you investigated why close() is taking longer in the bounce test? Would be helpful to know which case we're hitting that is causing the delay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed testCloseWithRandomBrokerFailures and refactored the other tests to run more logic concurrently. Tests for different options in the same scenario are now run concurrently. The individual cases were running within the time expected, but the tests were taking a long time because it takes time to create each scenario, close() and then cleanup. The code is slightly more complicated now because of concurrency, but the total time for the tests has come down to 1m42s (1m4s for the existing tests in the class and 38s for the new close tests).

@rajinisivaram
Copy link
Contributor Author

@apurvam @hachikuji Thank you both for the reviews. Have made most of the changes, left comments for the remaining for review.

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/667/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/668/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/666/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/668/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/669/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/667/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/688/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/686/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/686/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/692/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/755/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/755/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/759/
Test FAILed (JDK 7 and Scala 2.10).

close(0);
}

protected void close(long timeoutMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be synchronized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

long now = time.milliseconds();
long endTimeMs = now + timeoutMs;
Node coordinator;
while ((coordinator = coordinator()) != null && client.pendingRequestCount(coordinator) > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't do coordinator discovery in this loop, do we need to reinitialize the coordinator variable? If the connection fails, then pending request count should drop to 0 already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// we do not need to re-enable wakeups since we are closing already
client.disableWakeups();

long now = time.milliseconds();
timeoutMs = Math.min(timeoutMs, requestTimeoutMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems we could move this to the caller and remove the requestTimeoutMs parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/761/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/759/
Test PASSed (JDK 8 and Scala 2.12).

@@ -189,22 +189,33 @@ protected abstract void onJoinComplete(int generation,
* Block until the coordinator for this group is known and is ready to receive requests.
*/
public synchronized void ensureCoordinatorReady() {
ensureCoordinatorReady(0, Long.MAX_VALUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth a comment mentioning that the use of 0 is intentional since the timeout is effectively infinite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@apurvam apurvam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I think Jason's comment about close being synchronized is the only major item left. But the code is fairly straightforward otherwise. I am better of for the iterations -- learnt a bunch about the workings of the consumer!

Thanks for the patience in responding to comments!

}

def waitForRebalance(timeoutMs: Long, future: Future[Any], otherConsumers: KafkaConsumer[Array[Byte], Array[Byte]]*) {
val startMs1 = System.currentTimeMillis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not sure if the "1" is intentional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed 1.

}
def onPartitionsRevoked(partitions: Collection[TopicPartition]) {
revokeSemaphore.foreach(s => s.release())
}})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe one brace can go on the previous line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@hachikuji
Copy link
Contributor

LGTM. Thanks for the patch and the iterations. This was a bit trickier than I expected!

@rajinisivaram
Copy link
Contributor Author

@apurvam @hachikuji Thank you for all the reviews. Have made the changes suggested.

@asfbot
Copy link

asfbot commented Jan 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/769/
Test FAILed (JDK 8 and Scala 2.11).

@asfgit asfgit closed this in f72203e Jan 12, 2017
@asfbot
Copy link

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/767/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/767/
Test FAILed (JDK 7 and Scala 2.10).

soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Apurva Mehta <apurva.1618@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes apache#2285 from rajinisivaram/KAFKA-4426
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants