From 4476301225d6a7f3c815c6cc3585b7fb6d60bb54 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Wed, 5 May 2021 18:59:14 +0200 Subject: [PATCH] [FLINK-22502][checkpointing] Don't tolerate checkpoint retrieval failures on recovery Ignoring such failures and running with an incomplete set of checkpoints can lead to consistency violation. Instead, transient failures should be mitigated by automatic job restart. --- .../DefaultCompletedCheckpointStore.java | 65 +----- .../DefaultCompletedCheckpointStoreTest.java | 45 +--- ...erCompletedCheckpointStoreMockitoTest.java | 26 +-- .../checkpointing/CheckpointStoreITCase.java | 207 ++++++++++++++++++ 4 files changed, 223 insertions(+), 120 deletions(-) create mode 100644 flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java index f99ceea937901..5389ae437fb2f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java @@ -140,71 +140,20 @@ public void recover() throws Exception { return; } - // Try and read the state handles from storage. We try until we either successfully read - // all of them or when we reach a stable state, i.e. when we successfully read the same set - // of checkpoints in two tries. We do it like this to protect against transient outages - // of the checkpoint store (for example a DFS): if the DFS comes online midway through - // reading a set of checkpoints we would run the risk of reading only a partial set - // of checkpoints while we could in fact read the other checkpoints as well if we retried. - // Waiting until a stable state protects against this while also being resilient against - // checkpoints being actually unreadable. - // - // These considerations are also important in the scope of incremental checkpoints, where - // we use ref-counting for shared state handles and might accidentally delete shared state - // of checkpoints that we don't read due to transient storage outages. - final List lastTryRetrievedCheckpoints = - new ArrayList<>(numberOfInitialCheckpoints); final List retrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); - Exception retrieveException = null; - do { - LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints); - - lastTryRetrievedCheckpoints.clear(); - lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints); - - retrievedCheckpoints.clear(); - - for (Tuple2, String> checkpointStateHandle : - initialCheckpoints) { - - CompletedCheckpoint completedCheckpoint; - - try { - completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); - if (completedCheckpoint != null) { - retrievedCheckpoints.add(completedCheckpoint); - } - } catch (Exception e) { - LOG.warn( - "Could not retrieve checkpoint, not adding to list of recovered checkpoints.", - e); - retrieveException = e; - } - } + LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints); - } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints - && !CompletedCheckpoint.checkpointsMatch( - lastTryRetrievedCheckpoints, retrievedCheckpoints)); + for (Tuple2, String> checkpointStateHandle : + initialCheckpoints) { + retrievedCheckpoints.add( + checkNotNull(retrieveCompletedCheckpoint(checkpointStateHandle))); + } // Clear local handles in order to prevent duplicates on recovery. The local handles should - // reflect - // the state handle store contents. + // reflect the state handle store contents. completedCheckpoints.clear(); completedCheckpoints.addAll(retrievedCheckpoints); - - if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) { - throw new FlinkException( - "Could not read any of the " - + numberOfInitialCheckpoints - + " checkpoints from storage.", - retrieveException); - } else if (completedCheckpoints.size() != numberOfInitialCheckpoints) { - LOG.warn( - "Could only fetch {} of {} checkpoints from storage.", - completedCheckpoints.size(), - numberOfInitialCheckpoints); - } } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java index d14e834577b47..4d195cbb5007d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -152,7 +153,7 @@ public void testRecoverSortedCheckpoints() throws Exception { assertThat(checkpointIds, contains(1L, 2L, 3L)); } - /** We got an {@link IOException} when retrieving checkpoint 2. It should be skipped. */ + /** We got an {@link IOException} when retrieving checkpoint 2. It should NOT be skipped. */ @Test public void testCorruptDataInStateHandleStoreShouldBeSkipped() throws Exception { final long corruptCkpId = 2L; @@ -169,45 +170,15 @@ public void testCorruptDataInStateHandleStoreShouldBeSkipped() throws Exception final CompletedCheckpointStore completedCheckpointStore = createCompletedCheckpointStore(stateHandleStore); - completedCheckpointStore.recover(); - - final List recoveredCompletedCheckpoint = - completedCheckpointStore.getAllCheckpoints(); - assertThat(recoveredCompletedCheckpoint.size(), is(2)); - final List checkpointIds = - recoveredCompletedCheckpoint.stream() - .map(CompletedCheckpoint::getCheckpointID) - .collect(Collectors.toList()); - // Checkpoint 2 should be skipped. - assertThat(checkpointIds, contains(1L, 3L)); - } - - /** - * {@link DefaultCompletedCheckpointStore#recover()} should throw exception when all the - * checkpoints retrieved failed while the checkpoint pointers are not empty. - */ - @Test - public void testRecoverFailedWhenRetrieveCheckpointAllFailed() { - final int ckpNum = 3; - checkpointStorageHelper.setRetrieveStateFunction( - (state) -> { - throw new IOException( - "Failed to retrieve checkpoint " + state.getCheckpointID()); - }); - - final TestingStateHandleStore stateHandleStore = - builder.setGetAllSupplier(() -> createStateHandles(ckpNum)).build(); - final CompletedCheckpointStore completedCheckpointStore = - createCompletedCheckpointStore(stateHandleStore); - try { completedCheckpointStore.recover(); - fail("We should get an exception when retrieving state failed."); - } catch (Exception ex) { - final String errMsg = - "Could not read any of the " + ckpNum + " checkpoints from storage."; - assertThat(ex, FlinkMatchers.containsMessage(errMsg)); + } catch (Exception e) { + if (ExceptionUtils.findThrowable(e, IOException.class).isPresent()) { + return; + } + throw e; } + fail(); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java index af8c091205f2d..b3ca09d5714eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java @@ -39,7 +39,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -64,8 +63,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger { ZooKeeperCheckpointStoreUtil.INSTANCE; /** - * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper - * and ignores those which cannot be retrieved via their state handles. + * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper. * *

We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation. */ @@ -81,11 +79,6 @@ public void testCheckpointRecovery() throws Exception { expectedCheckpointIds.add(1L); expectedCheckpointIds.add(2L); - final RetrievableStateHandle failingRetrievableStateHandle = - mock(RetrievableStateHandle.class); - when(failingRetrievableStateHandle.retrieveState()) - .thenThrow(new IOException("Test exception")); - final RetrievableStateHandle retrievableStateHandle1 = mock(RetrievableStateHandle.class); when(retrievableStateHandle1.retrieveState()) @@ -121,9 +114,7 @@ public void testCheckpointRecovery() throws Exception { new TestCompletedCheckpointStorageLocation()))); checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1")); - checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1")); checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2")); - checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2")); final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS); final RetrievableStateStorageHelper storageHelperMock = @@ -206,10 +197,6 @@ public Void answer(InvocationOnMock invocation) // check that we did not discard any of the state handles verify(retrievableStateHandle1, never()).discardState(); verify(retrievableStateHandle2, never()).discardState(); - - // Make sure that we also didn't discard any of the broken handles. Only when checkpoints - // are subsumed should they be discarded. - verify(failingRetrievableStateHandle, never()).discardState(); } /** @@ -230,11 +217,6 @@ public void testCheckpointRecoveryPreferCheckpoint() throws Exception { expectedCheckpointIds.add(1L); expectedCheckpointIds.add(2L); - final RetrievableStateHandle failingRetrievableStateHandle = - mock(RetrievableStateHandle.class); - when(failingRetrievableStateHandle.retrieveState()) - .thenThrow(new IOException("Test exception")); - final RetrievableStateHandle retrievableStateHandle1 = mock(RetrievableStateHandle.class); when(retrievableStateHandle1.retrieveState()) @@ -268,9 +250,7 @@ public void testCheckpointRecoveryPreferCheckpoint() throws Exception { new TestCompletedCheckpointStorageLocation()))); checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1")); - checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1")); checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2")); - checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2")); final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS); final RetrievableStateStorageHelper storageHelperMock = @@ -353,9 +333,5 @@ public Void answer(InvocationOnMock invocation) // check that we did not discard any of the state handles verify(retrievableStateHandle1, never()).discardState(); verify(retrievableStateHandle2, never()).discardState(); - - // Make sure that we also didn't discard any of the broken handles. Only when checkpoints - // are subsumed should they be discarded. - verify(failingRetrievableStateHandle, never()).discardState(); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java new file mode 100644 index 0000000000000..1ad5a8b2bb92d --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SerializableSupplier; + +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Test that failure on recovery leads to job restart if configured, so that transient recovery + * failures can are mitigated. + */ +public class CheckpointStoreITCase extends TestLogger { + + @ClassRule + public static final MiniClusterWithClientResource CLUSTER = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration( + new Configuration() + .set( + HighAvailabilityOptions.HA_MODE, + TestingHAFactory.class.getName())) + .build()); + + @Before + public void init() { + FailingStore.reset(); + FailingMapper.reset(); + } + + @Test + public void testRestartOnRecoveryFailure() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10); + env.setRestartStrategy(fixedDelayRestart(2 /* failure on processing + on recovery */, 0)); + env.addSource(emitUntil(() -> FailingStore.recovered && FailingMapper.failedAndProcessed)) + .map(new FailingMapper()) + .addSink(new DiscardingSink<>()); + env.execute(); + + checkState(FailingStore.recovered && FailingMapper.failedAndProcessed); + } + + private static class FailingMapper implements MapFunction { + private static volatile boolean failed = false; + private static volatile boolean failedAndProcessed = false; + + public static void reset() { + failed = false; + failedAndProcessed = false; + } + + @Override + public Integer map(Integer element) throws Exception { + if (!failed) { + failed = true; + throw new RuntimeException(); + } else { + failedAndProcessed = true; + return element; + } + } + } + + /** TestingHAFactory. */ + public static class TestingHAFactory implements HighAvailabilityServicesFactory { + + @Override + public HighAvailabilityServices createHAServices( + Configuration configuration, Executor executor) { + return new EmbeddedHaServices(Executors.directExecutor()) { + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { + return new TestingCheckpointRecoveryFactory( + new FailingStore(), + new TestingCheckpointIDCounter(new CompletableFuture<>())); + } + }; + } + } + + private static class FailingStore implements CompletedCheckpointStore { + private static volatile boolean started = false; + private static volatile boolean failed = false; + private static volatile boolean recovered = false; + + public static void reset() { + started = failed = recovered = false; + } + + @Override + public void recover() throws Exception { + if (!started) { + started = true; + } else if (!failed) { + failed = true; + throw new RuntimeException(); + } else if (!recovered) { + recovered = true; + } + } + + @Override + public void addCheckpoint( + CompletedCheckpoint checkpoint, + CheckpointsCleaner checkpointsCleaner, + Runnable postCleanup) {} + + @Override + public void shutdown( + JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) + throws Exception {} + + @Override + public List getAllCheckpoints() { + return Collections.emptyList(); + } + + @Override + public int getNumberOfRetainedCheckpoints() { + return 0; + } + + @Override + public int getMaxNumberOfRetainedCheckpoints() { + return 1; + } + + @Override + public boolean requiresExternalizedCheckpoints() { + return false; + } + } + + private SourceFunction emitUntil(SerializableSupplier until) { + return new SourceFunction() { + private volatile boolean running = true; + + @Override + public void run(SourceContext ctx) { + while (running && !until.get()) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(0); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + ExceptionUtils.rethrow(e); + } + } + } + } + + @Override + public void cancel() { + running = false; + } + }; + } +}