Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9734: Fix IllegalState in Streams transit to standby #8319

Merged
merged 4 commits into from Mar 20, 2020
Merged

KAFKA-9734: Fix IllegalState in Streams transit to standby #8319

merged 4 commits into from Mar 20, 2020

Conversation

vvcephei
Copy link
Contributor

Consolidate ChangelogReader state management inside of StreamThread to avoid having to reason about all execution paths in both StreamThread and TaskManager.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang ,

What do you think of this fix? I tried for quite a while to add a regression test for this, but it's quite tricky because the bug requires a specific interleaving of the Producer, StreamThread, and TaskManager. I ultimately decided such a test would be pointless anyway, since all it would be exercising a code path which this PR removes.

I also didn't add a new test because the existing tests already cover the transitions in question. It's just that a specific edge case no longer exists.

Does that make sense, or am I cheating?

Thanks,
-John

// pause all partitions that are for standby tasks from the restore consumer
pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
// pause all partitions that are for standby tasks from the restore consumer
pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this inside the block because we should only need to do it if we are transitioning from restoring standbys. I wanted to make the idempotent call slightly more efficient, since we now call this method every time in StreamThread#runOnce.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move the line 290 inside as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah, of course :)

throw new IllegalStateException("The changelog reader is not restoring active tasks while trying to " +
"transit to update standby tasks: " + changelogs);
throw new IllegalStateException(
"The changelog reader is not restoring active tasks (is " + state + ") while trying to " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a bit redundant, but it wasted a bit of my time to find out that if it wasn't in active_restoring, it must have been in restoring standbys because that's the only other state. I figure it doesn't hurt to just be explicit.

Comment on lines +504 to +505
log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. " +
"Will close the task as dirty and re-create and bootstrap from scratch.", e);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rearranged this so we can include the exception itself. The stacktrace is useful for tracking down the reason for the exception.

"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Will close out all assigned tasks and rejoin the consumer group.");
"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Will close out all assigned tasks and rejoin the consumer group.", e);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, here.

@@ -578,13 +578,13 @@ void runOnce() {
// only try to initialize the assigned tasks
// if the state is still in PARTITION_ASSIGNED after the poll call
if (state == State.PARTITIONS_ASSIGNED) {
// transit to restore active is idempotent so we can call it multiple times
changelogReader.enforceRestoreActive();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the primary fix. Instead of relying (hoping) on TaskManager to put the changelog reader into restoring_active, we just idempotently make sure it's in that state any time we're in partitions_assigned.

@@ -279,13 +279,13 @@ private boolean hasRestoredToEnd(final ChangelogMetadata metadata) {
// NOTE: even if the newly created tasks do not need any restoring, we still first transit to this state and then
// immediately transit back -- there's no overhead of transiting back and forth but simplifies the logic a lot.
@Override
public void transitToRestoreActive() {
public void enforceRestoreActive() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed this method to make it clear we aren't necessarily "transitioning", we actually call it all the time now any time we want to "be in restoreActive".

Comment on lines -680 to -681
changeLogReader.transitToRestoreActive();
expectLastCall();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a strict mock, so we have to remove this expectation, since the TaskManager no longer calls it.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one nit, otherwise LGTM! Thanks @vvcephei

Please feel free to merge after green builds.

// pause all partitions that are for standby tasks from the restore consumer
pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
// pause all partitions that are for standby tasks from the restore consumer
pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move the line 290 inside as well?

@vvcephei
Copy link
Contributor Author

This time, there were two different, unrelated failures.

kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

@vvcephei vvcephei merged commit 960b216 into apache:trunk Mar 20, 2020
@vvcephei vvcephei deleted the kafka-9734-transit-standby branch March 20, 2020 15:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants