Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HOTFIX: safely clear all active state in onPartitionsLost #7691

Merged
merged 11 commits into from Nov 19, 2019

Conversation

ableegoldman
Copy link
Contributor

@ableegoldman ableegoldman commented Nov 14, 2019

After a number of last minute bugs were found stemming from the incremental closing of lost tasks in StreamsRebalanceListener#onPartitionsLost, a safer approach to this edge case seems warranted. We initially wanted to be as "future-proof" as possible, and avoid baking further protocol assumptions into the code that may be broken as the protocol evolves. This meant that rather than simply closing all active tasks and clearing all associated state in #onPartitionsLost(lostPartitions) we would loop through the lostPartitions/lost tasks and remove them one by one from the various data structures/assignments, then verify that everything was empty in the end. This verification in particular has caused us significant trouble, as it turns out to be nontrivial to determine what should in fact be empty, and if so whether it is also being correctly updated.

Therefore, before worrying about it being "future-proof" it seems we should make sure it is "present-day-proof" and implement this callback in the safest possible way, by blindly clearing and closing all active task state. We log all the relevant state (at debug level) before clearing it, so we can at least tell from the logs whether/which emptiness checks were being violated.

Note that this is targeted at 2.4 (not trunk) and that I also picked over the minor fix from #7686

@ableegoldman
Copy link
Contributor Author

@mjsax mjsax added the streams label Nov 14, 2019
@ableegoldman
Copy link
Contributor Author

ableegoldman commented Nov 14, 2019

Kicked off some sets of system tests -->
all streams tests:
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3511/ -- REBUILDING
broker bounce (x5 repeats):
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3512/ -- REBUILDING
version probing (x30 repeats):
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3513/ -- PASSED
cooperative upgrade (x3 repeats):
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3514/ -- PASSED

edit: version probing and cooperative upgrade passed, broker bounce and "all streams tests" need to be rerun after fixing a stupid UnsupportedOperationException

@ableegoldman
Copy link
Contributor Author

ableegoldman commented Nov 14, 2019

New system test runs:
all streams tests:
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3519 -- PASSED
broker bounce (x5 repeats):
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3520 -- PASSED

edit: all system tests green

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more question, otherwise LGTM.

log.debug("Closing the zombie suspended stream task {}.", id);
firstException.compareAndSet(null, closeSuspended(true, suspended.get(id)));
for (final TaskId id : allAssignedTaskIds()) {
if (running.containsKey(id)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I remember running is a superset of suspended, so if we put this condition first before line 304, it means line 304 would never trigger right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good question. I found another way to solve that issue besides making running a superset of suspended -- now, all maps are completely disjoint, and any given task should be contained in exactly one

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes here LGTM, I just have a few questions overall.

boolean hasRestoringTasks() {
if (restoring.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit awkward to have a hasXXX call have side effects. Although I do understand the motivation here, I guess this is something to revisit during a subsequent refactoring/follow-on PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fair (Guozhang said the same thing on the trunk PR 😄) -- I'll think of a better way to do this so that it's clear what's going on without having to comment and update the PR(s) by tonight

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, I rewrote this clearing functionality to be an explicit method call and not a hidden side effect

removeTaskFromRunning(task);
closedTaskChangelogs.addAll(task.changelogPartitions());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we remove this line here from closeRunning but we have the same call in suspendRunningTasks. More of a question for my own education, I wouldn't hold up merging for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer need to keep track of which changelog partitions were lost after a onPartitionsLost since we just clear everything. #closeRunning is actually only called on zombie tasks, so we can just remove it entirely here (non-zombie running tasks are closed by first suspending and then closing as suspended)


// With the current rebalance protocol, there should not be any running tasks left as they were all lost
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is part of the "strict assumptions" referred that needed to get removed for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes -- we now just clear everything. But we log the entire state of all relevant data structures so that, for our own debugging sake, we should be able to tell from the logs whether the state was actually being updated properly and the "blindly clear everything" safety mechanism was unnecessary. And if not, we can figure out what isn't being properly updated and fix that in trunk without having broken 2.4 :)

@ableegoldman
Copy link
Contributor Author

Please check out the latest commit -- I added another hopefully unnecessary safety mechanism, where we remove a task from ALL state maps (eg created, running, etc) when closing it or moving it to another. The idea is to basically ensure that a task will not end up in two state maps at once -- this has never seemed to happen or been the cause of any test failures so far, but it's been indicated (and it's true) that it is difficult to feel confident just from reading the code that these sets don't ever overlap. This may be overkill, but the idea is to make 2.4 as stable as possible as well as improve our confidence in the correctness and assumptions of the code
cc/ @bbejeck @guozhangwang @abbccdda @mjsax @vvcephei

@ableegoldman
Copy link
Contributor Author

ableegoldman commented Nov 15, 2019

Kicked off more system tests off of the latest changes:

edit: adding completed system test results

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick turnaround @ableegoldman, this LGTM I only have a couple of minor questions.

@@ -372,6 +378,24 @@ void updateRestored(final Collection<TopicPartition> restored) {
}
}

@Override
void removeTaskFromAllOldMaps(final StreamTask task, final Map<TaskId, StreamTask> newState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This took a minute to figure out based on the name newState. What about currentStateMap, but I'm not sure that's any better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agonized over the naming of this method and the "newMap" parameter ... I like currentStateMap though (also renaming the method and adding javadocs for what it does)

final Set<TopicPartition> taskPartitions = new HashSet<>(task.partitions());
taskPartitions.addAll(task.changelogPartitions());

if (newState != restoring) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this matters, but I don't see where removeTaskFromAllOldMaps passing in restoring as the new state gets called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't, but I felt it was best for the method to follow the same behavior for all possible input, and do what it says it will (remove from everything except the passed in map)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I was thinking that was your reasoning, but I wanted to confirm.

final Set<TopicPartition> taskPartitions = new HashSet<>(task.partitions());
taskPartitions.addAll(task.changelogPartitions());

if (newState != running) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here, I don't see where running is passed for the newState

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto above, let me know if you don't agree with the reasoning

@@ -79,7 +79,7 @@ int commit() {
} catch (final RuntimeException e) {
log.error("Closing the standby task {} failed due to the following error:", task.id(), e);
} finally {
removeTaskFromRunning(task);
removeTaskFromAllOldMaps(task, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor suggestion - instead of null what about Collections.emptyMap()? This suggestion is subjective so feel free to ignore. Applies here and below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah that's a good suggestion -- will do

Copy link
Contributor Author

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!

@@ -79,7 +79,7 @@ int commit() {
} catch (final RuntimeException e) {
log.error("Closing the standby task {} failed due to the following error:", task.id(), e);
} finally {
removeTaskFromRunning(task);
removeTaskFromAllOldMaps(task, null);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah that's a good suggestion -- will do

final Set<TopicPartition> taskPartitions = new HashSet<>(task.partitions());
taskPartitions.addAll(task.changelogPartitions());

if (newState != restoring) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't, but I felt it was best for the method to follow the same behavior for all possible input, and do what it says it will (remove from everything except the passed in map)

final Set<TopicPartition> taskPartitions = new HashSet<>(task.partitions());
taskPartitions.addAll(task.changelogPartitions());

if (newState != running) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto above, let me know if you don't agree with the reasoning

@@ -372,6 +378,24 @@ void updateRestored(final Collection<TopicPartition> restored) {
}
}

@Override
void removeTaskFromAllOldMaps(final StreamTask task, final Map<TaskId, StreamTask> newState) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agonized over the naming of this method and the "newMap" parameter ... I like currentStateMap though (also renaming the method and adding javadocs for what it does)

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the PR again, honestly I'm a bit concerned about the over-kill we are executing here since it may "hide" some other bugs that could be exposed, on the other hand I think I buy the argument to make 2.4 release as stable as possible and since it is only in 2.4 now it maybe okay.

Let's try to do the cleanup asap in trunk and also cherry-pick it into 2.4 and replace this overkill mechanism asap.

@bbejeck
Copy link
Contributor

bbejeck commented Nov 16, 2019

@ableegoldman failures seem relevant org.apache.kafka.streams.processor.internals.StoreChangelogReaderTest.shouldRestoreMessagesFromCheckpoint

@ableegoldman
Copy link
Contributor Author

ableegoldman commented Nov 17, 2019

The failures in the second* round of system tests were due to a bug introduced in the 73513f6 commit. Kicking off a third round of system tests with this issue fixed:

*the first round of system tests (run before the guilty 73513f6) all passed

edit: all tests are passing again

@bbejeck bbejeck changed the base branch from 2.4 to trunk November 19, 2019 17:47
@bbejeck bbejeck changed the base branch from trunk to 2.4 November 19, 2019 17:48
@bbejeck
Copy link
Contributor

bbejeck commented Nov 19, 2019

In the previous build both Java 11/2.12 and Java 11/2.13 passed, but Java 8 failed.

Doing a local ./gradlew test the build passed

Merging this now

Irrelevant as alll 3 PR builds passed.

@bbejeck bbejeck merged commit cbc9f57 into apache:2.4 Nov 19, 2019
@bbejeck
Copy link
Contributor

bbejeck commented Nov 19, 2019

Merged #7691 into 2.4

@bbejeck
Copy link
Contributor

bbejeck commented Nov 19, 2019

Thanks for the fix @ableegoldman!

bbejeck pushed a commit that referenced this pull request Nov 19, 2019
After a number of last minute bugs were found stemming from the incremental closing of lost tasks in StreamsRebalanceListener#onPartitionsLost, a safer approach to this edge case seems warranted. We initially wanted to be as "future-proof" as possible, and avoid baking further protocol assumptions into the code that may be broken as the protocol evolves. This meant that rather than simply closing all active tasks and clearing all associated state in #onPartitionsLost(lostPartitions) we would loop through the lostPartitions/lost tasks and remove them one by one from the various data structures/assignments, then verify that everything was empty in the end. This verification in particular has caused us significant trouble, as it turns out to be nontrivial to determine what should in fact be empty, and if so whether it is also being correctly updated.

Therefore, before worrying about it being "future-proof" it seems we should make sure it is "present-day-proof" and implement this callback in the safest possible way, by blindly clearing and closing all active task state. We log all the relevant state (at debug level) before clearing it, so we can at least tell from the logs whether/which emptiness checks were being violated.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Andrew Choi <andchoi@linkedin.com>
@bbejeck
Copy link
Contributor

bbejeck commented Nov 19, 2019

cherry-picked to trunk

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants