Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12639: exit upon expired timer to prevent tight looping #13190

Merged

Conversation

philipnee
Copy link
Collaborator

@philipnee philipnee commented Feb 2, 2023

https://issues.apache.org/jira/browse/KAFKA-12639

In AbstractCoordinator#joinGroupIfNeeded - joinGroup request will be retried without proper backoff, due to the expired timer. This is an uncommon scenario and possibly only appears during the testing, but I think it makes sense to enforce the client to drive the join group via poll.

@philipnee philipnee changed the title KAFKA-12539: exit upon expired timer to prevent tight looping KAFKA-12639: exit upon expired timer to prevent tight looping Feb 2, 2023
@philipnee
Copy link
Collaborator Author

@guozhangwang - would you have time to review this 🥺 ?

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @philipnee . I left some comments.

Also I think we should add unit test with exactly the mocking clients to 1) return a non-retriable exception, and check that we throw immediately (if we already have it in the test, then we can skip), 2) return one of the four exceptions, and check that we never sleep backoffs, 3) return other retriable exception, and check that we would still the remaining timer; and in case 2/3), we check that if timer has elapsed, we would also return false immediately.

continue;
else if (!future.isRetriable())
throw exception;

if (timer.isExpired()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a couple comments here explaining why we check the timer again here in addition to in line 452 above? Maybe something like this:

We check the timer again after calling poll with the timer since it's possible that even after the timer has elapsed, the next client.poll(timer) would immediately return an error response which would cause us to not exiting the while loop.

exception instanceof MemberIdRequiredException)
exception instanceof IllegalGenerationException ||
exception instanceof RebalanceInProgressException ||
exception instanceof MemberIdRequiredException)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we actually do the timer check before this? Since otherwise if the exception from the immediately returned responses is any of those four, we would still continue and skip the check below.

More concretely I think we can just move the remaining logic inside the if call:

if (!future.isRetriable()) {
    throw ..
} else {
    if (timer.isExpired()) {
        return false;
    } else if (exception instance of..) {
        continue;
    } else {
        timer.sleep(..)
    }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed. A comment here, retriableException check should happen after the instanceOf checks, because I think we actually want to retry upon these (according to the logic).

@philipnee
Copy link
Collaborator Author

Thanks @guozhangwang for the feedback - Added some tests there to cover the untesed cases. I still have a quick question around this block, is it intentional to continue w/o sleep on the backoff timer? (quoting the original code)

if (exception instanceof UnknownMemberIdException ||
                    exception instanceof IllegalGenerationException ||
                    exception instanceof RebalanceInProgressException ||
                    exception instanceof MemberIdRequiredException)
                    continue;
                else if (!future.isRetriable())
                    throw exception;

                timer.sleep(rebalanceConfig.retryBackoffMs);

@guozhangwang
Copy link
Contributor

is it intentional to continue w/o sleep on the backoff timer?

Yes that's intentional. For those four exceptions, we'd like to send the follow-up request right away since the broker is waiting for those join-group request. But the question is, when the timer has already elapsed, should we honor that or should we ignore but always try to complete this mid-stage.

Since in the new protocol we would no longer have such mid-stages during a prepare_rebalance phase (cc @dajac to chime in if you feel different), I would suggest we respect the timer still for now to have a stronger poll(timer) timing guarantees.

@philipnee
Copy link
Collaborator Author

Thanks, @guozhangwang, that's my understanding as well.

@philipnee
Copy link
Collaborator Author

Moving the time check just broke a bunch of unit test 😅

@guozhangwang
Copy link
Contributor

@philipnee is this the final version of this PR? Seems we are still honoring the four exceptions indicating the mid-stage of a rebalance more than the elapsed timer here?

@philipnee
Copy link
Collaborator Author

Hey @guozhangwang it's WIP - I think moving the timer check before the exception handling block (that 4 exceptions), kind of breaks a bunch of tests, as most tests are expecting the complete within a single poll. I'm looking into these breakage actually. sorry about the confusion.

@guozhangwang
Copy link
Contributor

Oh got it, thanks! All good :) Please let me know when it's ready for a final look.

@philipnee
Copy link
Collaborator Author

Although, are we ok with handling these 4 exceptions the same way as before? I know you previously mentioned that it might be better off to make the rules more consistent, and I kind of agree with it.

- First call will exit upon timeout
- Second call should send a proper request before exiting.
re trigger-test
@guozhangwang
Copy link
Contributor

Yeah I think it's okay to make the rule consistent, i.e. to honor the timeout even under those four exceptions: if the timer has elapsed, then we should well return from the loop in

client.poll(future, timer);
            if (!future.isDone()) {
                // we ran out of time
                return false;
            }

even if the response yet to be returned would contain any of these four exceptions. So I think we should still obey this rule, i.e. even if a response has been returned and we know it's going to be one of these four exceptions, if the timer has elapsed, we still exit the loop.

@philipnee
Copy link
Collaborator Author

Hmm, strangely, this branch seems to trigger a bunch of initializing error failures. And I can't seem to reproduce them locally...

@philipnee
Copy link
Collaborator Author

Just a bit a note here on this PR: Seems like we need to be more deliberate at handling the timeout, because the non-retriable errors are always expected to be thrown. (except for the 4 cases), which is why the change triggered 60-ish breaking tests. Updating the PR to retrigger the test.

@philipnee
Copy link
Collaborator Author

The failures seem irrelevant to the change here: i.e. they dont' show up in both rounds.

Build / JDK 11 and Scala 2.13 / testDynamicListenerConnectionCreationRateQuota() – kafka.network.DynamicConnectionQuotaTest
41s
Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest
2m 0s
Build / JDK 8 and Scala 2.12 / [1] Type=ZK, Name=testRegisterZkBrokerInKraft, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.server.KafkaServerKRaftRegistrationTest
7s
Build / JDK 8 and Scala 2.12 / [1] Type=ZK, Name=testRegisterZkBrokerInKraft, MetadataVersion=3.4-IV0, Security=PLAINTEXT – kafka.server.KafkaServerKRaftRegistrationTest
12s

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@philipnee thanks for the added unit test. I made another pass and left some more comments.

@@ -500,14 +500,22 @@ boolean joinGroupIfNeeded(final Timer timer) {
requestRejoin(shortReason, fullReason);
}

// continue to retry as long as the timer hasn't expired
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we simplify this multi-if logic as:

if (!future.isRetriable()) { throw }
else {
    if (timer.isExpired() { return false }
    else if (exception instance of.. ) { continue}
    else {timer.sleep(..)}
}

Also could we add a comment on top clarifying that the order of precedence are deliberated in this order and future changes should pay attention to not change it unnecessarily.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the instanceof ... exceptions are also non-retriable, and I think they need to be handled first.

Copy link
Collaborator Author

@philipnee philipnee Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the if else blocks becomes a bit fragmented. or we could do:

if (!future.isRetriable()) {
  if ( ... instance of ... ) { continue; }
  throw ...
}

{rest of the logic there}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, this is a bit more nested, which can be harder to read

@@ -1484,6 +1484,8 @@ public void testRebalanceWithMetadataChange() {
Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1))));
client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.poll(time.timer(0)); // failing joinGroup request will require re-poll in order to retry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not very clear to me why here and line 3403 below we need additional polls since the test scenarios seems irrelevant to error cases?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NOT_COORDINATOR error originally should trigger retries; however, in the new code, it would exit due to an expired timer. Another way to do it is using poll(time.timer(1))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, that makes sense.

@@ -3398,7 +3400,8 @@ public void testPrepareJoinAndRejoinAfterFailedRebalance() {
client.respond(syncGroupResponse(partitions, Errors.NONE));

// Join future should succeed but generation already cleared so result of join is false.
res = coordinator.joinGroupIfNeeded(time.timer(1));
coordinator.joinGroupIfNeeded(time.timer(0));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here, the timer is expired upon IllegalGenerationException, the loop would continued in the original code, but now it would exit. I guess we could try to poll for a bit longer, like 3ms instead of 0ms.

@philipnee
Copy link
Collaborator Author

Here are a couple things I updated:

  1. Added some documentation to clarify the intent, but I didn't rewrite it as nested if "can be" harder to read.
  2. Added non zero timeouts for the tests as our timer now is stricter and will explicitly exit upon expiration.

@@ -500,14 +500,24 @@ boolean joinGroupIfNeeded(final Timer timer) {
requestRejoin(shortReason, fullReason);
}

// 4 special non-retriable exceptions that we want to retry, as long as the timer hasn't expired.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here the comment is not to just state what the code did, since readers can just understand that from the code :P instead what we want to emphasize is to remind future contributors that they should be careful to not change the precedence ordering of this logic unnecessarily.

exception instanceof IllegalGenerationException ||
exception instanceof RebalanceInProgressException ||
exception instanceof MemberIdRequiredException)
exception instanceof IllegalGenerationException ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks for the clarifications!

Thinking about this a bit more (sorry for getting back and forth..), I now concerned a bit more that for some usage patterns where poll call would be triggered less frequently, we may not be coming back to handle these four exceptions while at the same time the broker is ticking and waiting for the join-group request to be re-sent. Hence I'm changing my mind to lean a bit more to honor the exception types for immediate handling than the timeouts --- again, sorry for going back and forth...

So I think we would define the ordering as the following:

  1. For un-retriable exception, always try to handle immediately and not honor the timer.
  2. Otherwise, honor the timer.

In that case, we could just go back to the first time you made the change, i.e. just add the

if (timer.isExpired())
                        return false;

After the if/else-if block. Still it's better to comment that above ordering is diligently designed as such.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey thanks for the comments again and absolutely no apology is needed there! I guess, as we all know, rebalancing is full of subtleties, so it makes sense to be careful about these non-retriable exception case. I think it's a good idea to keep the original behavior consistent, in case of unexpected breakage. Updating the PR.

@@ -1484,7 +1484,8 @@ public void testRebalanceWithMetadataChange() {
Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 1))));
client.respond(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NOT_COORDINATOR));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.poll(time.timer(0));
assertFalse(client.hasInFlightRequests());
coordinator.poll(time.timer(1));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: we need to add a timeout here to give the retry a second chance, because in the new code, the timer is checked and causes the method to exit.

exception instanceof MemberIdRequiredException)
exception instanceof IllegalGenerationException ||
exception instanceof RebalanceInProgressException ||
exception instanceof MemberIdRequiredException)
continue;
else if (!future.isRetriable())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the previous logic was reverted with some autocorrection to the indentation.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, waiting for the jenkins job to complete.

@philipnee
Copy link
Collaborator Author

Hmm. I think these tests are flaky actually

Build / JDK 17 and Scala 2.13 / shouldPauseStandbyTaskAndNotTransitToUpdateStandbyAgain() – org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest
30s
Build / JDK 17 and Scala 2.13 / shouldPauseActiveTaskAndTransitToUpdateStandby() – org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest
30s
Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest
2m 0s
Build / JDK 11 and Scala 2.13 / testListenerConnectionRateLimitWhenActualRateAboveLimit() – kafka.network.ConnectionQuotasTest
19s
Build / JDK 11 and Scala 2.13 / shouldRemovePausedAndUpdatingTasksOnShutdown() – org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest
30s
Build / JDK 11 and Scala 2.13 / shouldPauseStandbyTaskAndNotTransitToUpdateStandbyAgain() – org.apache.kafka.streams.processor.internals.DefaultStateUpdaterTest
31s

@guozhangwang guozhangwang merged commit f7f376f into apache:trunk Mar 1, 2023
@guozhangwang
Copy link
Contributor

The test failures are not relevant (but some of them are related to DefaultStateUpdaterTest.. sigh).

@guozhangwang
Copy link
Contributor

Merged to trunk.

@philipnee
Copy link
Collaborator Author

yeah the DefaultStateUpdaterTest has been failing from time to time... not sure why 😭

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants