diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java index f99ceea937901..5389ae437fb2f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java @@ -140,71 +140,20 @@ public void recover() throws Exception { return; } - // Try and read the state handles from storage. We try until we either successfully read - // all of them or when we reach a stable state, i.e. when we successfully read the same set - // of checkpoints in two tries. We do it like this to protect against transient outages - // of the checkpoint store (for example a DFS): if the DFS comes online midway through - // reading a set of checkpoints we would run the risk of reading only a partial set - // of checkpoints while we could in fact read the other checkpoints as well if we retried. - // Waiting until a stable state protects against this while also being resilient against - // checkpoints being actually unreadable. - // - // These considerations are also important in the scope of incremental checkpoints, where - // we use ref-counting for shared state handles and might accidentally delete shared state - // of checkpoints that we don't read due to transient storage outages. - final List lastTryRetrievedCheckpoints = - new ArrayList<>(numberOfInitialCheckpoints); final List retrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); - Exception retrieveException = null; - do { - LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints); - - lastTryRetrievedCheckpoints.clear(); - lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints); - - retrievedCheckpoints.clear(); - - for (Tuple2, String> checkpointStateHandle : - initialCheckpoints) { - - CompletedCheckpoint completedCheckpoint; - - try { - completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); - if (completedCheckpoint != null) { - retrievedCheckpoints.add(completedCheckpoint); - } - } catch (Exception e) { - LOG.warn( - "Could not retrieve checkpoint, not adding to list of recovered checkpoints.", - e); - retrieveException = e; - } - } + LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints); - } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints - && !CompletedCheckpoint.checkpointsMatch( - lastTryRetrievedCheckpoints, retrievedCheckpoints)); + for (Tuple2, String> checkpointStateHandle : + initialCheckpoints) { + retrievedCheckpoints.add( + checkNotNull(retrieveCompletedCheckpoint(checkpointStateHandle))); + } // Clear local handles in order to prevent duplicates on recovery. The local handles should - // reflect - // the state handle store contents. + // reflect the state handle store contents. completedCheckpoints.clear(); completedCheckpoints.addAll(retrievedCheckpoints); - - if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) { - throw new FlinkException( - "Could not read any of the " - + numberOfInitialCheckpoints - + " checkpoints from storage.", - retrieveException); - } else if (completedCheckpoints.size() != numberOfInitialCheckpoints) { - LOG.warn( - "Could only fetch {} of {} checkpoints from storage.", - completedCheckpoints.size(), - numberOfInitialCheckpoints); - } } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java index d14e834577b47..4d195cbb5007d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -152,7 +153,7 @@ public void testRecoverSortedCheckpoints() throws Exception { assertThat(checkpointIds, contains(1L, 2L, 3L)); } - /** We got an {@link IOException} when retrieving checkpoint 2. It should be skipped. */ + /** We got an {@link IOException} when retrieving checkpoint 2. It should NOT be skipped. */ @Test public void testCorruptDataInStateHandleStoreShouldBeSkipped() throws Exception { final long corruptCkpId = 2L; @@ -169,45 +170,15 @@ public void testCorruptDataInStateHandleStoreShouldBeSkipped() throws Exception final CompletedCheckpointStore completedCheckpointStore = createCompletedCheckpointStore(stateHandleStore); - completedCheckpointStore.recover(); - - final List recoveredCompletedCheckpoint = - completedCheckpointStore.getAllCheckpoints(); - assertThat(recoveredCompletedCheckpoint.size(), is(2)); - final List checkpointIds = - recoveredCompletedCheckpoint.stream() - .map(CompletedCheckpoint::getCheckpointID) - .collect(Collectors.toList()); - // Checkpoint 2 should be skipped. - assertThat(checkpointIds, contains(1L, 3L)); - } - - /** - * {@link DefaultCompletedCheckpointStore#recover()} should throw exception when all the - * checkpoints retrieved failed while the checkpoint pointers are not empty. - */ - @Test - public void testRecoverFailedWhenRetrieveCheckpointAllFailed() { - final int ckpNum = 3; - checkpointStorageHelper.setRetrieveStateFunction( - (state) -> { - throw new IOException( - "Failed to retrieve checkpoint " + state.getCheckpointID()); - }); - - final TestingStateHandleStore stateHandleStore = - builder.setGetAllSupplier(() -> createStateHandles(ckpNum)).build(); - final CompletedCheckpointStore completedCheckpointStore = - createCompletedCheckpointStore(stateHandleStore); - try { completedCheckpointStore.recover(); - fail("We should get an exception when retrieving state failed."); - } catch (Exception ex) { - final String errMsg = - "Could not read any of the " + ckpNum + " checkpoints from storage."; - assertThat(ex, FlinkMatchers.containsMessage(errMsg)); + } catch (Exception e) { + if (ExceptionUtils.findThrowable(e, IOException.class).isPresent()) { + return; + } + throw e; } + fail(); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java index af8c091205f2d..b3ca09d5714eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java @@ -39,7 +39,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -64,8 +63,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger { ZooKeeperCheckpointStoreUtil.INSTANCE; /** - * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper - * and ignores those which cannot be retrieved via their state handles. + * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper. * *

We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation. */ @@ -81,11 +79,6 @@ public void testCheckpointRecovery() throws Exception { expectedCheckpointIds.add(1L); expectedCheckpointIds.add(2L); - final RetrievableStateHandle failingRetrievableStateHandle = - mock(RetrievableStateHandle.class); - when(failingRetrievableStateHandle.retrieveState()) - .thenThrow(new IOException("Test exception")); - final RetrievableStateHandle retrievableStateHandle1 = mock(RetrievableStateHandle.class); when(retrievableStateHandle1.retrieveState()) @@ -121,9 +114,7 @@ public void testCheckpointRecovery() throws Exception { new TestCompletedCheckpointStorageLocation()))); checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1")); - checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1")); checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2")); - checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2")); final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS); final RetrievableStateStorageHelper storageHelperMock = @@ -206,10 +197,6 @@ public Void answer(InvocationOnMock invocation) // check that we did not discard any of the state handles verify(retrievableStateHandle1, never()).discardState(); verify(retrievableStateHandle2, never()).discardState(); - - // Make sure that we also didn't discard any of the broken handles. Only when checkpoints - // are subsumed should they be discarded. - verify(failingRetrievableStateHandle, never()).discardState(); } /** @@ -230,11 +217,6 @@ public void testCheckpointRecoveryPreferCheckpoint() throws Exception { expectedCheckpointIds.add(1L); expectedCheckpointIds.add(2L); - final RetrievableStateHandle failingRetrievableStateHandle = - mock(RetrievableStateHandle.class); - when(failingRetrievableStateHandle.retrieveState()) - .thenThrow(new IOException("Test exception")); - final RetrievableStateHandle retrievableStateHandle1 = mock(RetrievableStateHandle.class); when(retrievableStateHandle1.retrieveState()) @@ -268,9 +250,7 @@ public void testCheckpointRecoveryPreferCheckpoint() throws Exception { new TestCompletedCheckpointStorageLocation()))); checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1")); - checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1")); checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2")); - checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2")); final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS); final RetrievableStateStorageHelper storageHelperMock = @@ -353,9 +333,5 @@ public Void answer(InvocationOnMock invocation) // check that we did not discard any of the state handles verify(retrievableStateHandle1, never()).discardState(); verify(retrievableStateHandle2, never()).discardState(); - - // Make sure that we also didn't discard any of the broken handles. Only when checkpoints - // are subsumed should they be discarded. - verify(failingRetrievableStateHandle, never()).discardState(); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java new file mode 100644 index 0000000000000..1ad5a8b2bb92d --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.SerializableSupplier; + +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Test that failure on recovery leads to job restart if configured, so that transient recovery + * failures can are mitigated. + */ +public class CheckpointStoreITCase extends TestLogger { + + @ClassRule + public static final MiniClusterWithClientResource CLUSTER = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration( + new Configuration() + .set( + HighAvailabilityOptions.HA_MODE, + TestingHAFactory.class.getName())) + .build()); + + @Before + public void init() { + FailingStore.reset(); + FailingMapper.reset(); + } + + @Test + public void testRestartOnRecoveryFailure() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10); + env.setRestartStrategy(fixedDelayRestart(2 /* failure on processing + on recovery */, 0)); + env.addSource(emitUntil(() -> FailingStore.recovered && FailingMapper.failedAndProcessed)) + .map(new FailingMapper()) + .addSink(new DiscardingSink<>()); + env.execute(); + + checkState(FailingStore.recovered && FailingMapper.failedAndProcessed); + } + + private static class FailingMapper implements MapFunction { + private static volatile boolean failed = false; + private static volatile boolean failedAndProcessed = false; + + public static void reset() { + failed = false; + failedAndProcessed = false; + } + + @Override + public Integer map(Integer element) throws Exception { + if (!failed) { + failed = true; + throw new RuntimeException(); + } else { + failedAndProcessed = true; + return element; + } + } + } + + /** TestingHAFactory. */ + public static class TestingHAFactory implements HighAvailabilityServicesFactory { + + @Override + public HighAvailabilityServices createHAServices( + Configuration configuration, Executor executor) { + return new EmbeddedHaServices(Executors.directExecutor()) { + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() { + return new TestingCheckpointRecoveryFactory( + new FailingStore(), + new TestingCheckpointIDCounter(new CompletableFuture<>())); + } + }; + } + } + + private static class FailingStore implements CompletedCheckpointStore { + private static volatile boolean started = false; + private static volatile boolean failed = false; + private static volatile boolean recovered = false; + + public static void reset() { + started = failed = recovered = false; + } + + @Override + public void recover() throws Exception { + if (!started) { + started = true; + } else if (!failed) { + failed = true; + throw new RuntimeException(); + } else if (!recovered) { + recovered = true; + } + } + + @Override + public void addCheckpoint( + CompletedCheckpoint checkpoint, + CheckpointsCleaner checkpointsCleaner, + Runnable postCleanup) {} + + @Override + public void shutdown( + JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) + throws Exception {} + + @Override + public List getAllCheckpoints() { + return Collections.emptyList(); + } + + @Override + public int getNumberOfRetainedCheckpoints() { + return 0; + } + + @Override + public int getMaxNumberOfRetainedCheckpoints() { + return 1; + } + + @Override + public boolean requiresExternalizedCheckpoints() { + return false; + } + } + + private SourceFunction emitUntil(SerializableSupplier until) { + return new SourceFunction() { + private volatile boolean running = true; + + @Override + public void run(SourceContext ctx) { + while (running && !until.get()) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(0); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + ExceptionUtils.rethrow(e); + } + } + } + } + + @Override + public void cancel() { + running = false; + } + }; + } +}