-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… #15910
Conversation
…nterrupt offset translation MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore stores OffsetSyncs before reading till end. Add test case simulating restarted task where the store is reinitialized with later OffsetSyncs and check that emitted Checkpoint do not rewind. Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once because the OffsetSyncStore store is populated before reading to log end. Co-Authored-By: Adrian Preston <prestona@uk.ibm.com>
Hey @edoardocomar and @prestona thanks for the PR! One of the reasons I thought this might require a KIP is because it requires additional permissions that the current MM2 doesn't need: If an operator has already configured ACLs such that MM2 has write permissions for the checkpoints topic but no read permissions, it could be operating today and then failing after an upgrade with this change. I don't know if that is a common configuration or even a recommended one, but it does seem possible in the wild. Perhaps this can be configuration-less and backwards-compatible if we fallback to the old behavior if reading the checkpoints fails for any reason, including insufficient permissions. |
Co-Authored-By: Adrioan Preston <prestona@uk.ibm.com>
Hi @gharris1727 we're now handling errors in loading the Checkpoints topic. Specifically we tested with the not authorized to read case - which the existing KafkaBasedLog was not handling well. This looks to us a better behavior than reverting to the old one in case of failure, as maintaining and testing two modes of operation seems too complex. Do you still think we need a KIP - to introduce yet another config to choose between the old behavior (default) and the new one (arguably better in the eyes of this PR authors ...) ? |
@edoardocomar In general connectors do have to add a configuration like this eventually, because users have different tolerances for errors. Some users want the errors to cause the connector to become FAILED, so that they can see the exception in the REST API and retry it explicitly. Other users want the connector to retry internally infinitely, and not fail for any reason. MM2 has a lot of operations that can fail, and virtually none of them cause the connector to fail. The reason for this is that MM2 has dedicated mode, where there isn't a REST API to surface errors or perform external retries, so external retries are very expensive. It is definitely something that could be fixed eventually with like a "strict mode"? configuration or similar. We've also considered ways to address this from the framework side, with retry policies and automatic restarts, but none of that has been fully designed or implemented yet. I think we should not block this fix on solving that more general problem. If there is a permissions error loading the checkpoints, MM2 should log that, and then degrade gracefully to the current behavior. We can have a KIP that adds "strict mode" make this failure surface, to make this new permission required. In practical terms, without a configuration and with the graceful degradation implementation, we can get this into 3.8. |
use previous OffsetSyncStore "pessimistic" load logic Co-Authored-By: Adrian Preston <prestona@uk.ibm.com>
@gharris1727 thanks for your feedback. we've added another commit to allow for the old OffsetSyncs load behavior in case the task cannot read the checkpoints |
testing results in the following scenario:
Emitted Checkpoints:
NEW implementation with checkpoints read by Checkpoint task
NEW implementation with checkpoints FAILED read by Checkpoint task
Original implementation (prior to this PR)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @edoardocomar I definitely think this can be included in 3.8.
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
Show resolved
Hide resolved
Hi @gharris1727 commit e33edd2 hopefully addresses most of your comments. Thanks for the quick feedback. We also noticed that the loading of the checkpoints must complete before the task start method completes. |
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Outdated
Show resolved
Hide resolved
Use a wrapper to the checkpointsPerGroup map to ensure thread safety on restart. The callback rethrows the error as a way to stop the KafkaBasedLog looping forever in readtoEnd. This occurs when unauthorized to READ from the topic, as KafkaBasedLog retries indefinitely. Unauthorized to DESCRIBE instead fails immediately before fetching. All such failures result in a warning log message about degraded offset translation.
Hi @gharris1727 we worked out the asynchronous loading using a wrapper to the checkpointsPerGroupMap. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, this is getting really close! I think the error handling is where it needs to be.
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
Outdated
Show resolved
Hide resolved
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
Show resolved
Hide resolved
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java
Outdated
Show resolved
Hide resolved
Hi @gharris1727, hopefully the latest commits address your review comments. Once again, really appreciate all your feedback and suggestions. |
for consistency with OffsetSyncStore CheckpointStore started debug/trace message
Hi @gharris1727 if you have the time, can you please have a look again ? thanks |
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java
Outdated
Show resolved
Hide resolved
@@ -271,4 +284,102 @@ private Map<TopicPartition, Checkpoint> assertCheckpointForTopic( | |||
assertEquals(truth, checkpoints.containsKey(remoteTp), "should" + (truth ? "" : " not") + " emit offset sync"); | |||
return checkpoints; | |||
} | |||
|
|||
@Test | |||
public void testCheckpointsTaskRestartUsesExistingCheckpoints() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using "real checkpoints" generated by the first MirrorCheckpointTask to test the second MirrorCheckpointTask is not necessary, and you can use simulated checkpoints instead.
Reassigning variables and copy-pasting sections in tests is typo-prone and I think we can avoid it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - we fixed the reassignments.
We already load the OffsetSyncStore with different OffsetSync, but we think the CheckpointStore at restart of the task should contain the exact last checkpoint emitted by the previous instance of the task
store.sync(tp, offset, offset); | ||
assertSparseSyncInvariant(store, tp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this into backingStoreStart, so that it happens when initializationMustReadToEnd is properly initialized? Here it's relying on the default value set by construction, not the value passed into start
.
Actually this seems to be the case in a lot of the tests here. Can you look through the tests, and whenever there are assertions or sync
calls before start
, apply the same fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok - we can assert sync is only called after start
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
Outdated
Show resolved
Hide resolved
@gharris1727 please review, thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I think this is the last round. I only had a nit for the tests. Thank you @edoardocomar and @prestona so much for your patience!
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); | ||
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); | ||
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); | ||
FakeOffsetSyncStore store = new FakeOffsetSyncStore() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: My IDE gives me warnings about this being an AutoCloseable used without a try-with-resources, and in this particular case, it was already in a try-with-resources.
I know that the FakeOffsetStore has nothing to leak if close is not called, but i'd like to prevent any future leaks, and prevent the IDE from nagging people about it in the future.
I would probably have made this change because the indeting was getting out-of-hand, maybe you get the same feeling? Would you consider having a FakeOffsetSyncStore constructor that takes a Consumer, and calls it during backingStoreStart?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @gharris1727
I use IntelliJ too and saw the warning.
I could have used a @suppress
annotation but I am very reluctant to make code less readable because of limited insight by linters. Similarly to make the fake store more complex.
Using try-with-resource with a local class results in horrible indentation as you said.
I don't share a strong worry of future leaks in testing - seems speculative to me.
In this instance unless you have very strong feelings, I'd really leave the test as-is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on another note, if you approve I'd also backport the fix to 3.7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have somewhat strong feelings, I wouldn't call them very strong. If someone noticed this IDE warning and created a ticket and a PR to fix it, I would review that. Am I going to make the build enforce this warning? No, but I have seen other situations where the warning did point out real resource leaks...
I just wanted to save the effort required to go and rework this later, and prevent this PR from introducing an easily avoidable warning. I agree with you about suppressing warnings, I don't think that is a healthy practice to have.
I just tried making this a try-with-resources and the indenting turned out fine. The body of backingStoreStart is at the exact same indentation as it is currently.
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore() {
@Override
void backingStoreStart() {
// read a sync during startup
sync(tp, 100, 200);
assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 0));
assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 100));
assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 200));
}
}) {
// no offsets exist and store is not started
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
// After the store is started all offsets are visible
store.start(true);
assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
}
Here's the Consumer alternative I thought about, which uses one less indentation level at the cost of a variable, a field, and two constructors:
Consumer<FakeOffsetSyncStore> init = store -> {
// read a sync during startup
store.sync(tp, 100, 200);
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
};
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore(init)) {
// no offsets exist and store is not started
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
// After the store is started all offsets are visible
store.start(true);
assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
}
Either of these is preferable to having the warning or suppressing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you approve I'd also backport the fix to 3.7
I'm on the fence about that, leaning towards yes. I regret backporting KAFKA-12468 so far and introducing this issue, and I didn't communicate it properly to users. I think you can backport this once you have a full release note written that can be backported at the same time.
@gharris1727 I gave up and used the ugly try. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks @edoardocomar and @prestona!
I'll wait for the CI to stabilize on this before merging.
backingStore.start(); | ||
public void start(boolean initializationMustReadToEnd) { | ||
this.initializationMustReadToEnd = initializationMustReadToEnd; | ||
log.debug("OffsetSyncStore starting - must read to OffsetSync end = ", initializationMustReadToEnd); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
noticed one typo here, the log message actually doesn't print the boolean
Thanks @gharris1727 |
Hi @gharris1727 ... more about warnings. there are two Java21 compiler warnings that result in a compile failure
This failure is due to us having made the constructor of OffsetSyncStore public for consistency with CheckpointStore. Do you have preferences ? |
@edoardocomar Since we're not substantially changing that class, I think it's acceptable to keep the old visibility or add the suppression, rather than fix the this-escape. It's up to you. |
@gharris1727 we would like to backport to 3.7 - ok ? |
From my earlier comment:
Please open a PR for the release note first. |
do you mean a PR for a notable entry in docs/upgrade.html ?
|
Yeah, if a section like that doesn't exist yet we can start it. |
commit 93238ae Author: Antoine Pourchet <antoine@responsive.dev> Date: Thu May 23 13:45:29 2024 -0600 KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified (apache#16034) This PR uses the new TaskTopicPartition structure to simplify the build process for the ApplicationState, which is the input to the new TaskAssignor#assign call. Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org> commit 4020307 Author: Kuan-Po (Cooper) Tseng <brandboat@gmail.com> Date: Fri May 24 02:51:26 2024 +0800 KAFKA-16795 Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter (apache#16020) This commit allows users to apply the scala version Formatters, but users will receive the warning messages about deprecation. This compatibility support will be removed from 4.0.0 Reviewers: Chia-Ping Tsai <chia7712@gmail.com> commit c3018ef Author: TingIāu "Ting" Kì <51072200+frankvicky@users.noreply.github.com> Date: Fri May 24 01:15:56 2024 +0800 KAFKA-16804: Replace archivesBaseName with archivesName (apache#16016) Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Greg Harris <greg.harris@aiven.io> commit 0ba15ad Author: Edoardo Comar <ecomar@uk.ibm.com> Date: Thu May 23 17:17:56 2024 +0100 KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… (apache#15910) * KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently interrupt offset translation MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore stores OffsetSyncs before reading till end. If CheckpointTask cannot read checkpoints at startup, use previous OffsetSyncStore load logic, with warning log message about degraded offset translation. Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once because the OffsetSyncStore store is populated before reading to log end. Co-Authored-By: Adrian Preston <prestona@uk.ibm.com> Reviewers: Greg Harris <greg.harris@aiven.io> commit 5a48984 Author: Viktor Somogyi-Vass <viktorsomogyi@gmail.com> Date: Thu May 23 17:36:39 2024 +0200 KAFKA-15649: Handle directory failure timeout (apache#15697) A broker that is unable to communicate with the controller will shut down after the configurable log.dir.failure.timeout.ms. The implementation adds a new event to the Kafka EventQueue. This event is deferred by the configured timeout and will execute the shutdown if the heartbeat communication containing the failed log dir is still pending with the controller. Reviewers: Igor Soarez <soarez@apple.com> commit 8d117a1 Author: Mickael Maison <mimaison@users.noreply.github.com> Date: Thu May 23 17:03:24 2024 +0200 KAFKA-16825: Update netty/jetty/jackson/zstd dependencies (apache#16038) Reviewers: Luke Chen <showuon@gmail.com> commit ab0cc72 Author: Mickael Maison <mimaison@users.noreply.github.com> Date: Thu May 23 16:01:45 2024 +0200 MINOR: Move parseCsvList to server-common (apache#16029) Reviewers: Chia-Ping Tsai <chia7712@gmail.com> commit 14b5c4d Author: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com> Date: Thu May 23 02:27:00 2024 -0400 KAFKA-16793; Heartbeat API for upgrading ConsumerGroup (apache#15988) This patch implements the heartbeat api to the members that use the classic protocol in a ConsumerGroup. Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Jacot <djacot@confluent.io> commit e692fee Author: Jeff Kim <kimkb2011@gmail.com> Date: Thu May 23 02:24:23 2024 -0400 MINOR: fix flaky testRecordThreadIdleRatio (apache#15987) DelayEventAccumulator should return immediately if there are no events in the queue. Also removed some unused fields inside EventProcessorThread. Reviewers: Gaurav Narula <gaurav_narula2@apple.com>, Chia-Ping Tsai <chia7712@gmail.com>, David Jacot <djacot@confluent.io> commit bef83ce Author: Nick Telford <nick.telford@gmail.com> Date: Thu May 23 05:34:31 2024 +0100 KAFKA-15541: Add iterator-duration metrics (apache#16028) Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw). This new `StateStore` metric tracks the average and maximum amount of time between creating and closing Iterators. Iterators with very high durations can indicate to users performance problems that should be addressed. If a store reports no data for these metrics, despite the user opening Iterators on the store, it suggests those iterators are not being closed, and have therefore leaked. Reviewers: Matthias J. Sax <matthias@confluent.io>
@gharris1727 please see #16070 |
#15910) * KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently interrupt offset translation MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore stores OffsetSyncs before reading till end. If CheckpointTask cannot read checkpoints at startup, use previous OffsetSyncStore load logic, with warning log message about degraded offset translation. Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once because the OffsetSyncStore store is populated before reading to log end. Co-Authored-By: Adrian Preston <prestona@uk.ibm.com> Reviewers: Greg Harris <greg.harris@aiven.io>
cherry-picked on 3.7 |
apache#15910) * KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently interrupt offset translation MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore stores OffsetSyncs before reading till end. If CheckpointTask cannot read checkpoints at startup, use previous OffsetSyncStore load logic, with warning log message about degraded offset translation. Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once because the OffsetSyncStore store is populated before reading to log end. Co-Authored-By: Adrian Preston <prestona@uk.ibm.com> Reviewers: Greg Harris <greg.harris@aiven.io>
apache#15910) * KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently interrupt offset translation MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore stores OffsetSyncs before reading till end. If CheckpointTask cannot read checkpoints at startup, use previous OffsetSyncStore load logic, with warning log message about degraded offset translation. Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once because the OffsetSyncStore store is populated before reading to log end. Co-Authored-By: Adrian Preston <prestona@uk.ibm.com> Reviewers: Greg Harris <greg.harris@aiven.io>
apache#15910) * KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently interrupt offset translation MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore stores OffsetSyncs before reading till end. If CheckpointTask cannot read checkpoints at startup, use previous OffsetSyncStore load logic, with warning log message about degraded offset translation. Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once because the OffsetSyncStore store is populated before reading to log end. Co-Authored-By: Adrian Preston <prestona@uk.ibm.com> Reviewers: Greg Harris <greg.harris@aiven.io>
KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently interrupt offset translation
MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore stores OffsetSyncs before reading till end.
Add test case simulating restarted task where the store is reinitialized with later OffsetSyncs and check that emitted Checkpoint do not rewind.
Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once because the OffsetSyncStore store is populated before reading to log end.
Co-Authored-By: Adrian Preston prestona@uk.ibm.com