Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10199: Commit the restoration progress within StateUpdater #12279

Merged
merged 11 commits into from Jun 23, 2022

Conversation

guozhangwang
Copy link
Contributor

During restoring, we should always commit a.k.a. write checkpoint file regardless of EOS or ALOS, since if there's a failure we would just over-restore them upon recovery so no EOS violations happened.

Also when we complete restore or remove task, we should enforce a checkpoint as well; for failing cases though, we should not write a new one.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Thanks for the PR!

Here my comment.

}

for (final Task task : updatingTasks.values()) {
// do not enforce checkpointing during restoration if its position has not advanced much
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a ToDo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a Todo: it's explaining why we set the enforceCheckpoint as false. Inside that callee when it's false we will only write a new checkpoint if the offsets has significantly advanced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see! This is a bit hard to understand in my opinion. Could we have two methods -- commitTaskAndEnforceCheckpoint() and commitTaskAndMaybeEnforcedCheckpoint()? If we change the code to only write the checkpoints, this code might change anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I will refactor this piece of code in the follow-up PR (I have one doing the code refactoring already) such that:

  1. We remove the prepareCommit and postCommit in standby task, instead we call writeCheckpoint directly.
  2. We remove the postCommit in active task, enforceCheckpoint boolean in the maybeCheckpoint function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking forward to the follow-up PR 🙂

throw new IllegalStateException("Task " + task.id() + " should not have any source offset " +
"committable during restoration, but have " + offsetAndMetadata + " instead. " + BUG_ERROR_MESSAGE);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change


@AfterEach
public void tearDown() {
stateUpdater.shutdown(Duration.ofMinutes(1));
}

private Properties configProps(final int commitInterval) {
return mkObjectProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueClassTestName(getClass())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this since we do not use an embedded Kafka cluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two configs are required configs of StreamsConfig and hence we have to provide a dummy.

Copy link
Contributor

@cadonna cadonna Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I know. My question was actually about if we need to use safeUniqueClassTestName(getClass()) and provide a new override for that method. AFAIK, that method is primarily used in integration tests to ensure that consumer groups within one integration test class have distinct names so that consumer groups of different tests in the test class do not clash leading to longer execution times. This is not an integration test and we do not use an embedded Kafka cluster. Why do we need safeUniqueClassTestName(getClass())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. I will remove the safeUniqueClassTestName and just dummy with appID.

mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval),
// we need to make sure that transaction timeout is not lower than commit interval for EOS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this also a ToDo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a todo: I was explaining why we need to explicitly set the RANSACTION_TIMEOUT_CONFIG as commitInterval as well.

Copy link
Contributor

@cadonna cadonna Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a check for this somewhere in our production code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, otherwise the app would fail upon starting up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a check in the production code, we do not need to add this comment since the test would fail anyways, right? I would remove the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG!

Comment on lines 706 to 708
verify(task, times(0)).prepareCommit();
verify(task, times(0)).postCommit(true);
verify(task, times(0)).postCommit(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
verify(task, times(0)).prepareCommit();
verify(task, times(0)).postCommit(true);
verify(task, times(0)).postCommit(false);
verify(task, never()).prepareCommit();
verify(task, never()).postCommit(true);
verify(task, never()).postCommit(false);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack!

Comment on lines 690 to 701
waitForCondition(
() -> {
for (final Task task : tasks) {
verify(task, atLeast(1)).prepareCommit();
verify(task, atLeast(1)).postCommit(enforceCheckpoint);
}

return true;
},
VERIFICATION_TIMEOUT,
"Did not auto commit all tasks within the given timeout!"
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider to use verify(task, timeout(VERIFICATION_TIMEOUT).atLeast(1)) instead of waitForCondition()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip! will give it a try.


return true;
},
VERIFICATION_TIMEOUT,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you express the timeout in terms of the commit interval?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it a bit but since we are using a system test here using exact commit interval could cause flakiness; while using a larger value say commit interval * 2 is safe enough I felt it would be similar to just using a longer value as VERIFICATION_TIMEOUT here.


private void commitTask(final Task task, final boolean enforceCheckpoint) {
// prepare commit should not take any effect except a no-op verification
final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand why we should commit. The task does not read from the input at this point. Wouldn't flushing the stores and writing the checkpoint file be enough? Can we somehow just flush the state store and write the checkpoint instead of calling the *Commit() methods? I think that would simplify the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to leverage on the extra state validation logic inside the *Commit() function, but I agree we can directly call checkpointing which would be more straight-forward. LMK if you feel strong about this and I can change it accordingly.

Copy link
Contributor

@cadonna cadonna Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I feel strongly about it. 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to do the function refactoring (mentioned above) in the follow-up PR, and here I would just directly call the checkpoint functions.

@@ -286,6 +308,7 @@ public void shouldRestoreActiveStatefulTasksAndUpdateStandbyTasks() throws Excep
stateUpdater.add(task4);

verifyRestoredActiveTasks(task2, task1);
verifyCommitTasks(true, task2, task1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the offsets of standby tasks also be written to the checkpoint file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can only be certain that active tasks would be checkpointed since they are completed upon when we enforce the checkpoint; standby tasks would only try checkpointing without enforcing during processing and hence here they will not write the checkpoint.

Copy link
Contributor

@cadonna cadonna Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if you move this verification after verifyUpdatingStandbyTasks()? Are standby tasks still not checkpointed because we do not enforce the checkpoint?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, unless we wait for commit interval.. btw I think since this is already covered in other tests we do not need to wait (and since it's system time we need to wait conservatively to reduce flakiness) again in this test.

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, @guozhangwang !

Here my feedback!

@@ -85,7 +86,7 @@ public Collection<StandbyTask> getUpdatingStandbyTasks() {
}

public boolean onlyStandbyTasksLeft() {
return !updatingTasks.isEmpty() && updatingTasks.values().stream().allMatch(t -> !t.isActive());
return !updatingTasks.isEmpty() && updatingTasks.values().stream().noneMatch(Task::isActive);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the same change in #12312 🙂
I will revert the change in my PR to avoid merge conflicts.

@@ -88,7 +88,8 @@ public abstract class AbstractTask implements Task {
* @throws StreamsException fatal error when flushing the state store, for example sending changelog records failed
* or flushing state store get IO errors; such error should cause the thread to die
*/
protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
@Override
public void maybeCheckpoint(final boolean enforceCheckpoint) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this method is public, could you please add unit tests for this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will do.

}

for (final Task task : updatingTasks.values()) {
// do not enforce checkpointing during restoration if its position has not advanced much
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking forward to the follow-up PR 🙂

mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval),
// we need to make sure that transaction timeout is not lower than commit interval for EOS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a check in the production code, we do not need to add this comment since the test would fail anyways, right? I would remove the comment.

Comment on lines 239 to 248
public static String safeUniqueClassTestName(final Class<?> testClass) {
return (testClass.getSimpleName())
.replace(':', '_')
.replace('.', '_')
.replace('[', '_')
.replace(']', '_')
.replace(' ', '_')
.replace('=', '_');
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be dead code now that we do not need this method in DefaultStateUpdater. Could you please remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes :)

@@ -506,6 +537,7 @@ public void shouldAddFailedTasksToQueueWhenRestoreThrowsStreamsExceptionWithTask
verifyUpdatingTasks(task2);
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyNeverCheckpointTasks(task1, task3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same flakiness as above

@@ -531,6 +563,7 @@ public void shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException()
verifyUpdatingTasks(task3);
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyNeverCheckpointTasks(task1, task2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same flakiness as above

@@ -552,6 +585,7 @@ public void shouldAddFailedTasksToQueueWhenUncaughtExceptionIsThrown() throws Ex
verifyUpdatingTasks();
verifyRestoredActiveTasks();
verifyRemovedTasks();
verifyNeverCheckpointTasks(task1, task2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same flakiness as above

Comment on lines 664 to 665
final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE));
final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
Copy link
Contributor

@cadonna cadonna Jun 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to do this, otherwise verifications like verifyExceptionsAndFailedTasks() will not work. You do not use them in this method, but the change makes the test future-proof.

Suggested change
final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE));
final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
stateUpdater.shutdown(Duration.ofMinutes(1));
final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE));
stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I will remove final on that variable then.


private void verifyCheckpointTasks(final boolean enforceCheckpoint, final Task... tasks) throws Exception {
for (final Task task : tasks) {
verify(task, timeout(VERIFICATION_TIMEOUT).atLeast(1)).maybeCheckpoint(enforceCheckpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the timeout is not needed here, since you use this verification after the tasks are either restored or removed. In those cases maybeCheckpoint() should have already be called by then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I removed the timeout(VERIFICATION_TIMEOUT) the shouldAutoCheckpointTasksOnInterval starts to be flaky (once every ~30 on my laptop). This is because in that test we do not close the task before verification, and the system time makes sleeping just one commit interval not sufficient to be 100% sure (even with commit interval * 2 I can still see flakiness). So I reverted that removal by the end.

@guozhangwang
Copy link
Contributor Author

Thanks for having another look @cadonna , your comments are addressed.

@guozhangwang
Copy link
Contributor Author

Oh I had one commit for adding the unit test lost, adding now.

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Thanks for the updates

Here my feedback.

Feel free to react on my feedback in a follow-up PR.

Comment on lines +243 to +245
task.maybeCheckpoint(false); // this should not checkpoint
task.maybeCheckpoint(false); // this should checkpoint
task.maybeCheckpoint(false); // this should not checkpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is not really clear about when the checkpointing happens. Ideally, we would need to verify after each call to maybeCheckpoint(). That might be possible with EasyMock#reset() but I am not 100% sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I figured out a way without relying on reset(), and instead just checking on offsetSnapshotSinceLastFlush which is only updated if checkpoint is indeed executed.

task = createOptimizedStatefulTask(createConfig("100"), consumer);

task.initializeIfNeeded();
task.maybeCheckpoint(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need to also test task.maybeCheckpoint(false)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, added.

@@ -313,6 +337,7 @@ public void shouldRestoreActiveStatefulTaskThenUpdateStandbyTaskAndAgainRestoreA
stateUpdater.add(task2);

verifyRestoredActiveTasks(task1);
verifyCheckpointTasks(true, task1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need another verification before line 350.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

@guozhangwang
Copy link
Contributor Author

Thanks @cadonna . I will follow-up on your comments for unit tests in the next PR.

stateUpdater.add(task3);
stateUpdater.add(task4);

sleep(COMMIT_INTERVAL);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use MockTime() instead of system time to better control the progress of time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure: #12344

mjsax pushed a commit to confluentinc/kafka that referenced this pull request Jun 30, 2022
…che#12279)

During restoring, we should always commit a.k.a. write checkpoint file regardless of EOS or ALOS, since if there's a failure we would just over-restore them upon recovery so no EOS violations happened.

Also when we complete restore or remove task, we should enforce a checkpoint as well; for failing cases though, we should not write a new one.

Reviewers: Bruno Cadonna <cadonna@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants