Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5998: fix checkpointableOffsets handling #7030

Conversation

@vvcephei
Copy link
Contributor

commented Jul 3, 2019

  • fix checkpoint file warning by filtering checkpointable offsets per task
  • clean up state manager hierarchy to prevent similar bugs

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)
@vvcephei vvcephei referenced this pull request Jul 3, 2019
0 of 3 tasks complete
/**
* This class is responsible for the initialization, restoration, closing, flushing etc
* of Global State Stores. There is only ever 1 instance of this class per Application Instance.
*/
public class GlobalStateManagerImpl extends AbstractStateManager implements GlobalStateManager {
public class GlobalStateManagerImpl implements GlobalStateManager {

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

Stop sharing mutable state between a superclass and subclass. The only reason to do it was to support the re-initialization logic, but the checkpoint map can just as easily be passed in as a parameter.

eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
baseDir = stateDirectory.globalStateDir();
checkpointFile = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
checkpointFileCache = new HashMap<>();

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

It took me a really long time to decipher the actual purposes of "checkpoint" and "checkpointableOffsets". I've renamed them to "checkpointFile" and "checkpointFileCache" to be more self-documenting.

@@ -338,12 +357,12 @@ public void close(final boolean clean) throws IOException {
log.debug("Closing global storage engine {}", entry.getKey());
try {
entry.getValue().get().close();
} catch (final Exception e) {
} catch (final RuntimeException e) {

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

Since this PR is to clean up difficult-to-maintain code, I also included other cleanups, like dropping unnecessary this modifiers, restricting too-broad catch blocks, etc.

log.error("Failed to close global state store {}", entry.getKey(), e);
closeFailed.append("Failed to close global state store:")
.append(entry.getKey())
.append(". Reason: ")
.append(e.toString())
.append(e)

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

unnecessary toString


private final List<TopicPartition> changelogPartitions = new ArrayList<>();

// TODO: this map does not work with customized grouper where multiple partitions
// of the same topic can be assigned to the same topic.
// of the same topic can be assigned to the same task.

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

pretty sure this was a typo

@@ -38,45 +38,40 @@
import static org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
import static org.apache.kafka.streams.state.internals.WrappedStateStore.isTimestamped;

abstract class AbstractStateManager implements StateManager {
final class StateManagerUtil {

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

This has changed from an abstract class to a static utility class.

throw new ProcessorStateException(String.format("%sError while deleting the checkpoint file", logPrefix), e);
}
try {
stateMgr.clearCheckpoints();

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

checking for null is now encapsulated.

@@ -431,7 +432,7 @@ public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws IOException {
noPartitions,
false,
stateDirectory,
emptyMap(),
singletonMap(persistentStoreName, persistentStorePartition.topic()),

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

This test erroneously didn't include a changelog topic for the store in question. Now that we are actually verifying the checkpoints before we write them, we have to get this right.

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

Edit: some refactoring I did actually removed this enforcement. I'm working out how to keep it, but it's pretty complicated...

@@ -752,7 +842,7 @@ public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOExceptio
noPartitions,
false,
stateDirectory,
emptyMap(),
singletonMap(persistentStoreName, persistentStoreTopicName),

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

ditto here.

@@ -1477,6 +1477,8 @@ public void punctuate(final long timestamp) {
}

private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged) {
final StateStore stateStore = new MockKeyValueStore(storeName, logged);

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

We're no longer overlooking the fact that this store wasn't "logged" when we write the checkpoints.

@vvcephei
Copy link
Contributor Author

left a comment

@mjsax @cadonna @ableegoldman @abbccdda @guozhangwang @pkleindl ... did I miss anyone?

This PR should fix the long-running KAFKA-5998 bug.

The change I'm proposing is bigger in scope than the actual fix, though, because I wanted to take steps to prevent a similar bug from cropping up in the same code in the future. My theory is that the bug had an easy time hiding in this code because the handling of checkpointable offsets was so complex. I'm hoping that by reducing the mutable scope and also tightening up the invariants around the checkpointable offsets, we will have an easier time maintaining this module.

Let me know what you think!

Thanks,
-John

import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;


public class ProcessorStateManager extends AbstractStateManager {
public class ProcessorStateManager implements StateManager {

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

Also here, no longer sharing mutable state between super and sub classes.

private final File baseDir;
private OffsetCheckpoint checkpointFile;
private final Map<TopicPartition, Long> checkpointFileCache = new HashMap<>();
private final Map<TopicPartition, Long> initialLoadedCheckpoints;

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

Adding this collection breaks a circular dependency in this class:

  • the checkpoints we load from disk are potentially not valid for the current topology
  • we have to load the checkpoints immediately because we have to delete the checkpoint file before processing in the case of EOS
  • we also need to have read the checkpoint file before registering stores, since it might be needed to create a restorer
  • we can't know if a checkpoint from the file is valid until after registering stores

In other words, if the prior code wanted to validate the loaded checkpoints, it would have to register the stores before loading checkpoints, but it also needs to load the checkpoints before registering the stores.

We're breaking the cycle here by keeping the loaded checkpoints separate. Now we read the checkpoint file into initialLoadedCheckpoints, which is used to register the stores, and then we are able to make sure that we only ever write valid checkpoints into the checkpointFileCache, which is used to update the checkpoint file later on.

@@ -149,12 +160,16 @@ public void register(final StateStore store,
restoreCallbacks.put(topic, stateRestoreCallback);
recordConverters.put(topic, recordConverter);
} else {
log.trace("Restoring state store {} from changelog topic {} at checkpoint {}", storeName, topic, checkpointableOffsets.get(storePartition));
final Long restoreCheckpoint = store.persistent() ? initialLoadedCheckpoints.get(storePartition) : null;

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

This is where we're using the loaded checkpoint for store registration. Note the missing condition which is now handled... if the store is not persistent, it should not use the loaded checkpoint.

);
}

void clearCheckpoints() throws IOException {

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

encapsulating this operation so that outside classes don't have to directly mutate our checkpointFile field.

checkpointFile.delete();
checkpointFile = null;

checkpointFileCache.clear();

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

We didn't previously clear the cache on the blocks that this method replaces, but after reading the code, I'm pretty sure this is the right thing to do.

@@ -90,6 +94,7 @@ public void write(final Map<TopicPartition, Long> offsets) throws IOException {
fileOutputStream.getFD().sync();
}

LOG.trace("Swapping tmp checkpoint file {} {}", temp.toPath(), file.toPath());

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

Having these logs would have demystified a large part of the prior (misdirected) investigation, since we were never sure whether the tmp file existed or not, or what was going on.

return restoredOffsets;
}

void setRestoredOffsets(final Map<TopicPartition, Long> restoredOffsets) {

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

Added to support some of the needed test changes.

@@ -240,7 +240,7 @@ public void testRegisterNonPersistentStore() throws IOException {
@Test
public void testChangeLogOffsets() throws IOException {
final TaskId taskId = new TaskId(0, 0);
final long lastCheckpointedOffset = 10L;
final long storeTopic1LoadedCheckpoint = 10L;

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

renamed for clarity.

@@ -441,6 +441,123 @@ public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws IOException {
assertThat(read, equalTo(offsets));
}

@Test
public void shouldIgnoreIrrlevantLoadedCheckpoints() throws IOException {

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

Added a bunch of new tests to cover both the bug itself and also previously untested code paths in the ProcessorStateManager.

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

Note: this particular test shows that we will actually repair all the corrupted checkpoint files that buggy Streams versions wrote.

}

@Test
public void shouldIgnoreIrrelevantRestoredCheckpoints() throws IOException {

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 3, 2019

Author Contributor

Note: this particular test verifies that the bug is fixed.

@cadonna
Copy link
Contributor

left a comment

Thanks for the PR @vvcephei and congratulations to you and @pkleindl for finding and fixing this bug. I left a couple of comments. I had hard time to find the code that actually fixes the bug (and I am still not sure if I found it). Could you please add some specific comments about the fix, next time, since this fix is not that trivial? I am also wondering if you could have divided this PR in two: one for the fix itself and one for the repair of old corrupted checkpoints. IMO, it would have made reviewing the PRs easier.

import static org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;


public class ProcessorStateManager extends AbstractStateManager {
public class ProcessorStateManager implements StateManager {

This comment has been minimized.

Copy link
@cadonna

cadonna Jul 5, 2019

Contributor

Wouldn't it be more meaningful to rename this class to TaskStateManager?

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 9, 2019

Author Contributor

Maybe, I'm not sure of the historical reason to name it this way.

}

private void updateCheckpointFileCache(final Map<TopicPartition, Long> checkpointableOffsetsFromProcessing) {
final Map<TopicPartition, Long> restoredOffsets = validCheckpointableOffsets(changelogReader.restoredOffsets());

This comment has been minimized.

Copy link
@cadonna

cadonna Jul 5, 2019

Contributor

This is the most important line to fix the bug, right?

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 9, 2019

Author Contributor

Yes. I'd marked it in an earlier version of this PR. I guess that comment became "outdated" at some point. Sorry about that.


final StateRestorer restorer = new StateRestorer(
storePartition,
new CompositeRestoreListener(stateRestoreCallback),
checkpointableOffsets.get(storePartition),
restoreCheckpoint,

This comment has been minimized.

Copy link
@cadonna

cadonna Jul 5, 2019

Contributor

If the store is not peristent or the read checkpoint file does not contain the partition, this will throw a NPE, right? If yes, you should add unit tests for these cases.

This comment has been minimized.

Copy link
@vvcephei

vvcephei Jul 9, 2019

Author Contributor

No, I guess you were thinking the Long would become unboxed at this point? It's actually a Long parameter, and the StateRestorer constructor checks for null... Not the cleanest code, I guess, but it looks like it's been this way since 2017.

This comment has been minimized.

Copy link
@cadonna

cadonna Jul 10, 2019

Contributor

My fault! I missed the parameter. I looked at the next parameter in the StateRestorer constructor which is a long.

@vvcephei

This comment has been minimized.

Copy link
Contributor Author

commented Jul 9, 2019

Thanks for the review, @cadonna !

I'm sorry that my earlier call-out of the actual bugfix got marked "outdated" at some point, so I guess it doesn't show up in the diff anymore. I didn't notice when that happened, or I would have re-marked it.

Regarding splitting up the PRs, I do agree with you. It would have been nice to get a smaller fix in, and then tackled the refactoring separately.

If I can make one excuse for myself, it would be that in this case, it wasn't clear to me that the fix was good enough because the scope of checkpointableOffsets was too broad. If we just did a bugfix first, it would have been equally hard for reviewers to have confidence in the fix.

In retrospect, though, I could have submitted the refactor first, and then followed up with the bugfix. It just didn't occur to me, for whatever reason.

In any case, thanks for wading through the code review! I think I addressed all your comments.

WDYT?
-John

@bbejeck

This comment has been minimized.

Copy link
Contributor

commented Jul 11, 2019

retest this please

@cadonna
Copy link
Contributor

left a comment

LGTM

@bbejeck bbejeck added the streams label Jul 12, 2019

@bbejeck bbejeck merged commit 53b4ce5 into apache:trunk Jul 12, 2019

3 checks passed

JDK 11 and Scala 2.12 SUCCESS 11638 tests run, 67 skipped, 0 failed.
Details
JDK 11 and Scala 2.13 SUCCESS 11635 tests run, 67 skipped, 0 failed.
Details
JDK 8 and Scala 2.11 SUCCESS 11635 tests run, 67 skipped, 0 failed.
Details
@bbejeck

This comment has been minimized.

Copy link
Contributor

commented Jul 12, 2019

Merged #7030 into trunk

bbejeck added a commit that referenced this pull request Jul 12, 2019
KAFKA-5998: fix checkpointableOffsets handling (#7030)
fix checkpoint file warning by filtering checkpointable offsets per task
clean up state manager hierarchy to prevent similar bugs

Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
bbejeck added a commit that referenced this pull request Jul 12, 2019
KAFKA-5998: fix checkpointableOffsets handling (#7030)
fix checkpoint file warning by filtering checkpointable offsets per task
clean up state manager hierarchy to prevent similar bugs

Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
@bbejeck

This comment has been minimized.

Copy link
Contributor

commented Jul 12, 2019

cherry-picked to 2.3 and 2.2

@Tin-Nguyen

This comment has been minimized.

Copy link

commented Jul 16, 2019

@bbejeck it means that the fix is available in 2.2.x version?

@bbejeck

This comment has been minimized.

Copy link
Contributor

commented Jul 16, 2019

@Tin-Nguyen yes if check out the 2.2 branch and build it. For right now I don't know if there will be another 2.2 release.

@Tin-Nguyen

This comment has been minimized.

Copy link

commented Jul 17, 2019

thanks @bbejeck

@Tin-Nguyen

This comment has been minimized.

Copy link

commented Jul 17, 2019

@bbejeck I'm wondering if we have an updated binary download includes the fix?

this.eosEnabled = eosEnabled;
this.checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
}
private StateManagerUtil() {}

This comment has been minimized.

Copy link
@guozhangwang

guozhangwang Jul 17, 2019

Contributor

Do we need this constructor explicitly? Would this just be default in java?

storeToChangelogTopic,
partitions,
processorContext);
StateManagerUtil.reinitializeStateStoresForPartitions(log,

This comment has been minimized.

Copy link
@guozhangwang

guozhangwang Jul 17, 2019

Contributor

I liked this refactoring a lot, thanks @vvcephei !

ijuma added a commit to confluentinc/kafka that referenced this pull request Jul 20, 2019
Merge remote-tracking branch 'apache-github/2.3' into ccs-2.3
* apache-github/2.3:
  MINOR: Update documentation for enabling optimizations (apache#7099)
  MINOR: Remove stale streams producer retry default docs. (apache#6844)
  KAFKA-8635; Skip client poll in Sender loop when no request is sent (apache#7085)
  KAFKA-8615: Change to track partition time breaks TimestampExtractor (apache#7054)
  KAFKA-8670; Fix exception for kafka-topics.sh --describe without --topic mentioned (apache#7094)
  KAFKA-8602: Separate PR for 2.3 branch (apache#7092)
  KAFKA-8530; Check for topic authorization errors in OffsetFetch response (apache#6928)
  KAFKA-8662; Fix producer metadata error handling and consumer manual assignment (apache#7086)
  KAFKA-8637: WriteBatch objects leak off-heap memory (apache#7050)
  KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing (apache#7021)
  HOT FIX: close RocksDB objects in correct order (apache#7076)
  KAFKA-7157: Fix handling of nulls in TimestampConverter (apache#7070)
  KAFKA-6605: Fix NPE in Flatten when optional Struct is null (apache#5705)
  Fixes #8198 KStreams testing docs use non-existent method pipe (apache#6678)
  KAFKA-5998: fix checkpointableOffsets handling (apache#7030)
  KAFKA-8653; Default rebalance timeout to session timeout for JoinGroup v0 (apache#7072)
  KAFKA-8591; WorkerConfigTransformer NPE on connector configuration reloading (apache#6991)
  MINOR: add upgrade text (apache#7013)
  Bump version to 2.3.1-SNAPSHOT
xiowu0 added a commit to linkedin/kafka that referenced this pull request Aug 22, 2019
[LI-CHERRY-PICK] [1052d87] KAFKA-5998: fix checkpointableOffsets hand…
…ling (apache#7030)

TICKET = KAFKA-5998
LI_DESCRIPTION =
EXIT_CRITERIA = HASH [1052d87]
ORIGINAL_DESCRIPTION =

fix checkpoint file warning by filtering checkpointable offsets per task
clean up state manager hierarchy to prevent similar bugs

Reviewers: Bruno Cadonna <bruno@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
(cherry picked from commit 1052d87)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.