Skip to content

Commit

Permalink
[FLINK-22502][checkpointing] Don't tolerate checkpoint retrieval fail…
Browse files Browse the repository at this point in the history
…ures on recovery

Ignoring such failures and running with an incomplete
set of checkpoints can lead to consistency violation.

Instead, transient failures should be mitigated by
automatic job restart.
  • Loading branch information
rkhachatryan committed May 17, 2021
1 parent 5813d97 commit 4476301
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 120 deletions.
Expand Up @@ -140,71 +140,20 @@ public void recover() throws Exception {
return;
}

// Try and read the state handles from storage. We try until we either successfully read
// all of them or when we reach a stable state, i.e. when we successfully read the same set
// of checkpoints in two tries. We do it like this to protect against transient outages
// of the checkpoint store (for example a DFS): if the DFS comes online midway through
// reading a set of checkpoints we would run the risk of reading only a partial set
// of checkpoints while we could in fact read the other checkpoints as well if we retried.
// Waiting until a stable state protects against this while also being resilient against
// checkpoints being actually unreadable.
//
// These considerations are also important in the scope of incremental checkpoints, where
// we use ref-counting for shared state handles and might accidentally delete shared state
// of checkpoints that we don't read due to transient storage outages.
final List<CompletedCheckpoint> lastTryRetrievedCheckpoints =
new ArrayList<>(numberOfInitialCheckpoints);
final List<CompletedCheckpoint> retrievedCheckpoints =
new ArrayList<>(numberOfInitialCheckpoints);
Exception retrieveException = null;
do {
LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints);

lastTryRetrievedCheckpoints.clear();
lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints);

retrievedCheckpoints.clear();

for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle :
initialCheckpoints) {

CompletedCheckpoint completedCheckpoint;

try {
completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
if (completedCheckpoint != null) {
retrievedCheckpoints.add(completedCheckpoint);
}
} catch (Exception e) {
LOG.warn(
"Could not retrieve checkpoint, not adding to list of recovered checkpoints.",
e);
retrieveException = e;
}
}
LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints);

} while (retrievedCheckpoints.size() != numberOfInitialCheckpoints
&& !CompletedCheckpoint.checkpointsMatch(
lastTryRetrievedCheckpoints, retrievedCheckpoints));
for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle :
initialCheckpoints) {
retrievedCheckpoints.add(
checkNotNull(retrieveCompletedCheckpoint(checkpointStateHandle)));
}

// Clear local handles in order to prevent duplicates on recovery. The local handles should
// reflect
// the state handle store contents.
// reflect the state handle store contents.
completedCheckpoints.clear();
completedCheckpoints.addAll(retrievedCheckpoints);

if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) {
throw new FlinkException(
"Could not read any of the "
+ numberOfInitialCheckpoints
+ " checkpoints from storage.",
retrieveException);
} else if (completedCheckpoints.size() != numberOfInitialCheckpoints) {
LOG.warn(
"Could only fetch {} of {} checkpoints from storage.",
completedCheckpoints.size(),
numberOfInitialCheckpoints);
}
}

/**
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -152,7 +153,7 @@ public void testRecoverSortedCheckpoints() throws Exception {
assertThat(checkpointIds, contains(1L, 2L, 3L));
}

/** We got an {@link IOException} when retrieving checkpoint 2. It should be skipped. */
/** We got an {@link IOException} when retrieving checkpoint 2. It should NOT be skipped. */
@Test
public void testCorruptDataInStateHandleStoreShouldBeSkipped() throws Exception {
final long corruptCkpId = 2L;
Expand All @@ -169,45 +170,15 @@ public void testCorruptDataInStateHandleStoreShouldBeSkipped() throws Exception
final CompletedCheckpointStore completedCheckpointStore =
createCompletedCheckpointStore(stateHandleStore);

completedCheckpointStore.recover();

final List<CompletedCheckpoint> recoveredCompletedCheckpoint =
completedCheckpointStore.getAllCheckpoints();
assertThat(recoveredCompletedCheckpoint.size(), is(2));
final List<Long> checkpointIds =
recoveredCompletedCheckpoint.stream()
.map(CompletedCheckpoint::getCheckpointID)
.collect(Collectors.toList());
// Checkpoint 2 should be skipped.
assertThat(checkpointIds, contains(1L, 3L));
}

/**
* {@link DefaultCompletedCheckpointStore#recover()} should throw exception when all the
* checkpoints retrieved failed while the checkpoint pointers are not empty.
*/
@Test
public void testRecoverFailedWhenRetrieveCheckpointAllFailed() {
final int ckpNum = 3;
checkpointStorageHelper.setRetrieveStateFunction(
(state) -> {
throw new IOException(
"Failed to retrieve checkpoint " + state.getCheckpointID());
});

final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore =
builder.setGetAllSupplier(() -> createStateHandles(ckpNum)).build();
final CompletedCheckpointStore completedCheckpointStore =
createCompletedCheckpointStore(stateHandleStore);

try {
completedCheckpointStore.recover();
fail("We should get an exception when retrieving state failed.");
} catch (Exception ex) {
final String errMsg =
"Could not read any of the " + ckpNum + " checkpoints from storage.";
assertThat(ex, FlinkMatchers.containsMessage(errMsg));
} catch (Exception e) {
if (ExceptionUtils.findThrowable(e, IOException.class).isPresent()) {
return;
}
throw e;
}
fail();
}

@Test
Expand Down
Expand Up @@ -39,7 +39,6 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -64,8 +63,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
ZooKeeperCheckpointStoreUtil.INSTANCE;

/**
* Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper
* and ignores those which cannot be retrieved via their state handles.
* Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper.
*
* <p>We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation.
*/
Expand All @@ -81,11 +79,6 @@ public void testCheckpointRecovery() throws Exception {
expectedCheckpointIds.add(1L);
expectedCheckpointIds.add(2L);

final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle =
mock(RetrievableStateHandle.class);
when(failingRetrievableStateHandle.retrieveState())
.thenThrow(new IOException("Test exception"));

final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 =
mock(RetrievableStateHandle.class);
when(retrievableStateHandle1.retrieveState())
Expand Down Expand Up @@ -121,9 +114,7 @@ public void testCheckpointRecovery() throws Exception {
new TestCompletedCheckpointStorageLocation())));

checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));

final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock =
Expand Down Expand Up @@ -206,10 +197,6 @@ public Void answer(InvocationOnMock invocation)
// check that we did not discard any of the state handles
verify(retrievableStateHandle1, never()).discardState();
verify(retrievableStateHandle2, never()).discardState();

// Make sure that we also didn't discard any of the broken handles. Only when checkpoints
// are subsumed should they be discarded.
verify(failingRetrievableStateHandle, never()).discardState();
}

/**
Expand All @@ -230,11 +217,6 @@ public void testCheckpointRecoveryPreferCheckpoint() throws Exception {
expectedCheckpointIds.add(1L);
expectedCheckpointIds.add(2L);

final RetrievableStateHandle<CompletedCheckpoint> failingRetrievableStateHandle =
mock(RetrievableStateHandle.class);
when(failingRetrievableStateHandle.retrieveState())
.thenThrow(new IOException("Test exception"));

final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle1 =
mock(RetrievableStateHandle.class);
when(retrievableStateHandle1.retrieveState())
Expand Down Expand Up @@ -268,9 +250,7 @@ public void testCheckpointRecoveryPreferCheckpoint() throws Exception {
new TestCompletedCheckpointStorageLocation())));

checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1"));
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1"));
checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2"));
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2"));

final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS);
final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock =
Expand Down Expand Up @@ -353,9 +333,5 @@ public Void answer(InvocationOnMock invocation)
// check that we did not discard any of the state handles
verify(retrievableStateHandle1, never()).discardState();
verify(retrievableStateHandle2, never()).discardState();

// Make sure that we also didn't discard any of the broken handles. Only when checkpoints
// are subsumed should they be discarded.
verify(failingRetrievableStateHandle, never()).discardState();
}
}

0 comments on commit 4476301

Please sign in to comment.