From 62c6695c61c628ef842c6a29dbb517d71e50ca59 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Sat, 3 Mar 2018 09:34:56 +0100 Subject: [PATCH 1/3] [FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase --- .../ZooKeeperCompletedCheckpointStore.java | 2 +- .../flink/runtime/concurrent/FutureUtils.java | 77 +++- .../ZooKeeperHighAvailabilityITCase.java | 387 ++++++++++++++++++ 3 files changed, 461 insertions(+), 5 deletions(-) create mode 100644 flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 0cbd4fb6c9e9f..f22127041d31b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -209,7 +209,7 @@ public void recover() throws Exception { if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) { throw new FlinkException( - "Could not read any of the " + numberOfInitialCheckpoints + " from storage."); + "Could not read any of the " + numberOfInitialCheckpoints + " checkpoints from storage."); } else if (completedCheckpoints.size() != numberOfInitialCheckpoints) { LOG.warn( "Could only fetch {} of {} checkpoints from storage.", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java index a27af5666fd25..a56ed925c67e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.concurrent; +import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.util.Preconditions; @@ -28,6 +29,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; +import scala.concurrent.duration.Deadline; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -86,6 +89,72 @@ public Future apply(Future value) { }); } + /** + * Retry the given operation the given number of times in case of a failure. + * + * @param operation to executed + * @param successPredicate if the result is acceptable + * @param deadline how much time we have left + * @param executor to use to run the futures + * @param type of the result + * @return Future containing either the result of the operation or a {@link RetryException} + */ + public static Future retrySuccessful( + final Callable> operation, + final FilterFunction successPredicate, + final Deadline deadline, + final Executor executor) { + + Future operationResultFuture; + + try { + operationResultFuture = operation.call(); + } catch (Exception e) { + return FlinkCompletableFuture.completedExceptionally( + new RetryException("Could not execute the provided operation.", e)); + } + + return operationResultFuture.handleAsync(new BiFunction>() { + @Override + public Future apply(T t, Throwable throwable) { + if (throwable != null) { + if (deadline.hasTimeLeft()) { + return retrySuccessful(operation, successPredicate, deadline, executor); + } else { + return FlinkCompletableFuture.completedExceptionally( + new RetryException("Could not complete the operation. Number of retries " + + "has been exhausted.", throwable)); + } + } else { + Boolean predicateResult; + try { + predicateResult = successPredicate.filter(t); + } catch (Exception e) { + return FlinkCompletableFuture.completedExceptionally( + new RetryException("Predicate threw an exception.", e)); + + } + if (predicateResult) { + return FlinkCompletableFuture.completed(t); + } if (deadline.hasTimeLeft()) { + return retrySuccessful(operation, successPredicate, deadline, executor); + } else { + return FlinkCompletableFuture.completedExceptionally( + new RetryException("No time left and predicate returned false for " + t)); + + } + } + } + }, executor) + .thenCompose(new ApplyFunction, Future>() { + @Override + public Future apply(Future value) { + return value; + } + }); + } + + public static class RetryException extends Exception { private static final long serialVersionUID = 3613470781274141862L; @@ -108,14 +177,14 @@ public RetryException(Throwable cause) { // ------------------------------------------------------------------------ /** - * Creates a future that is complete once multiple other futures completed. + * Creates a future that is complete once multiple other futures completed. * The future fails (completes exceptionally) once one of the futures in the * conjunction fails. Upon successful completion, the future returns the * collection of the futures' results. * *

The ConjunctFuture gives access to how many Futures in the conjunction have already - * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. - * + * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. + * * @param futures The futures that make up the conjunction. No null entries are allowed. * @return The ConjunctFuture that completes once all given futures are complete (or one fails). */ @@ -157,7 +226,7 @@ public static ConjunctFuture waitForAll(Collection> fu * A future that is complete once multiple other futures completed. The futures are not * necessarily of the same type. The ConjunctFuture fails (completes exceptionally) once * one of the Futures in the conjunction fails. - * + * *

The advantage of using the ConjunctFuture over chaining all the futures (such as via * {@link Future#thenCombine(Future, BiFunction)}) is that ConjunctFuture also tracks how * many of the Futures are already complete. diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java new file mode 100644 index 0000000000000..d707d840887e1 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import scala.concurrent.Await; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}. + */ +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { + + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + private static final int NUM_JMS = 1; + private static final int NUM_TMS = 1; + private static final int NUM_SLOTS_PER_TM = 1; + + @ClassRule + public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static File haStorageDir; + + private static TestingServer zkServer; + + private static LocalFlinkMiniCluster cluster = null; + + private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); + private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); + private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); + + @BeforeClass + public static void setup() throws Exception { + zkServer = new TestingServer(); + + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); + + haStorageDir = temporaryFolder.newFolder(); + + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString()); + config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString()); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + + cluster = TestBaseUtils.startCluster(config, false); + } + + @AfterClass + public static void tearDown() throws Exception { + stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT); + + zkServer.stop(); + zkServer.close(); + } + + /** + * Verify that we don't start a job from scratch if we cannot restore any of the + * CompletedCheckpoints. + * + *

Synchronization for the different steps and things we want to observe happens via + * latches in the test method and the methods of {@link CheckpointBlockingFunction}. + * + *

The test follows these steps: + *

    + *
  1. Start job and block on a latch until we have done some checkpoints + *
  2. Block in the special function + *
  3. Move away the contents of the ZooKeeper HA directory and make it non-writable + * to make creating and restoring from checkpoints impossible + *
  4. Unblock the special function, which now induces a failure + *
  5. Make sure that the job does not recover successfully + *
  6. Move back the HA directory + *
  7. Make sure that the job recovers, we use a latch to ensure that the operator + * restored successfully + *
+ */ + @Test(timeout = 120_000L) + public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { + CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1); + CheckpointBlockingFunction.successfulRestores.set(0); + CheckpointBlockingFunction.illegalRestores.set(0); + CheckpointBlockingFunction.afterMessWithZooKeeper.set(false); + CheckpointBlockingFunction.failedAlready.set(false); + + waitForCheckpointLatch = new OneShotLatch(); + failInCheckpointLatch = new OneShotLatch(); + successfulRestoreLatch = new OneShotLatch(); + + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0)); + env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms + + File checkpointLocation = temporaryFolder.newFolder(); + env.setStateBackend(new FsStateBackend(checkpointLocation.toURI())); + + DataStreamSource source = env.addSource(new UnboundedSource()); + + source + .keyBy(new KeySelector() { + @Override + public String getKey(String value) { + return value; + } + }) + .map(new CheckpointBlockingFunction()); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID()); + + // Retrieve the job manager + final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft()); + + cluster.submitJobDetached(jobGraph); + + // wait until we did some checkpoints + waitForCheckpointLatch.await(); + + // mess with the HA directory so that the job cannot restore + File movedCheckpointLocation = temporaryFolder.newFolder(); + int numCheckpoints = 0; + File[] files = haStorageDir.listFiles(); + assertNotNull(files); + for (File file : files) { + if (file.getName().startsWith("completedCheckpoint")) { + assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName()))); + numCheckpoints++; + } + } + assertTrue(numCheckpoints > 0); + + failInCheckpointLatch.trigger(); + + // Ensure that we see at least one cycle where the job tries to restart and fails. + Future jobStatusFuture = FutureUtils.retrySuccessful( + new Callable>() { + @Override + public Future call(){ + return getJobStatus(jobManager, jobID, TEST_TIMEOUT); + } + }, + new FilterFunction() { + @Override + public boolean filter(JobStatus jobStatus){ + return jobStatus == JobStatus.RESTARTING; + } + }, + deadline, + TestingUtils.defaultExecutor()); + assertEquals(JobStatus.RESTARTING, jobStatusFuture.get()); + + jobStatusFuture = FutureUtils.retrySuccessful( + new Callable>() { + @Override + public Future call() { + return getJobStatus(jobManager, jobID, TEST_TIMEOUT); + } + }, + new FilterFunction() { + @Override + public boolean filter(JobStatus jobStatus) { + return jobStatus == JobStatus.FAILING; + } + }, + deadline, + TestingUtils.defaultExecutor()); + assertEquals(JobStatus.FAILING, jobStatusFuture.get()); + + // move back the HA directory so that the job can restore + CheckpointBlockingFunction.afterMessWithZooKeeper.set(true); + + files = movedCheckpointLocation.listFiles(); + assertNotNull(files); + for (File file : files) { + if (file.getName().startsWith("completedCheckpoint")) { + assertTrue(file.renameTo(new File(haStorageDir, file.getName()))); + } + } + + // now the job should be able to go to RUNNING again and then eventually to FINISHED + jobStatusFuture = FutureUtils.retrySuccessful( + new Callable>() { + @Override + public Future call() { + return getJobStatus(jobManager, jobID, TEST_TIMEOUT); + } + }, + new FilterFunction() { + @Override + public boolean filter(JobStatus jobStatus) { + return jobStatus == JobStatus.FINISHED; + } + }, + deadline, + TestingUtils.defaultExecutor()); + assertEquals(JobStatus.FINISHED, jobStatusFuture.get()); + + // make sure we saw a successful restore + successfulRestoreLatch.await(); + + assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0)); + } + + /** + * Requests the {@link JobStatus} of the job with the given {@link JobID}. + */ + private Future getJobStatus( + final ActorGateway jobManager, + final JobID jobId, + final FiniteDuration timeout) { + + scala.concurrent.Future response = + jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout); + + FlinkFuture flinkFuture = new FlinkFuture<>(response); + + return flinkFuture.thenApply(new ApplyFunction() { + @Override + public JobStatus apply(Object value) { + if (value instanceof JobManagerMessages.CurrentJobStatus) { + return ((JobManagerMessages.CurrentJobStatus) value).status(); + } else if (value instanceof JobManagerMessages.JobNotFound) { + throw new RuntimeException( + new IllegalStateException("Could not find job with JobId " + jobId)); + } else { + throw new RuntimeException( + new IllegalStateException("Unknown JobManager response of type " + value.getClass())); + } + } + }); + } + + private static class UnboundedSource implements SourceFunction { + private boolean running = true; + + @Override + public void run(SourceContext ctx) throws Exception { + while (running) { + ctx.collect("hello"); + // don't overdo it ... ;-) + Thread.sleep(50); + if (CheckpointBlockingFunction.afterMessWithZooKeeper.get()) { + break; + } + } + } + + @Override + public void cancel() { + running = false; + } + } + + private static class CheckpointBlockingFunction + extends RichMapFunction + implements CheckpointedFunction { + + // verify that we only call initializeState() + // once with isRestored() == false. All other invocations must have isRestored() == true. This + // verifies that we don't restart a job from scratch in case the CompletedCheckpoints can't + // be read. + static AtomicInteger allowedInitializeCallsWithoutRestore = new AtomicInteger(1); + + // we count when we see restores that are not allowed. We only + // allow restores once we messed with the HA directory and moved it back again + static AtomicInteger illegalRestores = new AtomicInteger(0); + static AtomicInteger successfulRestores = new AtomicInteger(0); + + // whether we are after the phase where we messed with the ZooKeeper HA directory, i.e. + // whether it's now ok for a restore to happen + static AtomicBoolean afterMessWithZooKeeper = new AtomicBoolean(false); + + static AtomicBoolean failedAlready = new AtomicBoolean(false); + + // also have some state to write to the checkpoint + private final ValueStateDescriptor stateDescriptor = + new ValueStateDescriptor<>("state", StringSerializer.INSTANCE); + + @Override + public String map(String value) throws Exception { + getRuntimeContext().getState(stateDescriptor).update("42"); + return value; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (context.getCheckpointId() > 5) { + waitForCheckpointLatch.trigger(); + failInCheckpointLatch.await(); + if (!failedAlready.getAndSet(true)) { + throw new RuntimeException("Failing on purpose."); + } + } + } + + @Override + public void initializeState(FunctionInitializationContext context) { + if (!context.isRestored()) { + int updatedValue = allowedInitializeCallsWithoutRestore.decrementAndGet(); + if (updatedValue < 0) { + illegalRestores.getAndIncrement(); + throw new RuntimeException("We are not allowed any more restores."); + } + } else { + if (!afterMessWithZooKeeper.get()) { + illegalRestores.getAndIncrement(); + } else if (successfulRestores.getAndIncrement() > 0) { + // already saw the one allowed successful restore + illegalRestores.getAndIncrement(); + } + successfulRestoreLatch.trigger(); + } + } + } +} From b46633de42dfa1f3bbb06cd1e9e61e610ebfd169 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Wed, 7 Mar 2018 17:33:12 +0100 Subject: [PATCH 2/3] fixup! address comments --- .../test/checkpointing/ZooKeeperHighAvailabilityITCase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java index d707d840887e1..c20e043838b11 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java @@ -135,8 +135,8 @@ public static void tearDown() throws Exception { *
    *
  1. Start job and block on a latch until we have done some checkpoints *
  2. Block in the special function - *
  3. Move away the contents of the ZooKeeper HA directory and make it non-writable - * to make creating and restoring from checkpoints impossible + *
  4. Move away the contents of the ZooKeeper HA directory to make restoring from + * checkpoints impossible *
  5. Unblock the special function, which now induces a failure *
  6. Make sure that the job does not recover successfully *
  7. Move back the HA directory From d4a3b0d20b7dd74153263714fa3f697be1510c2a Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 8 Mar 2018 14:55:09 +0100 Subject: [PATCH 3/3] fixup! address comments --- .../ZooKeeperHighAvailabilityITCase.java | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java index c20e043838b11..3905c0964fc12 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java @@ -95,7 +95,6 @@ public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils { private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); - private static OneShotLatch successfulRestoreLatch = new OneShotLatch(); @BeforeClass public static void setup() throws Exception { @@ -154,7 +153,6 @@ public void testRestoreBehaviourWithFaultyStateHandles() throws Exception { waitForCheckpointLatch = new OneShotLatch(); failInCheckpointLatch = new OneShotLatch(); - successfulRestoreLatch = new OneShotLatch(); final Deadline deadline = TEST_TIMEOUT.fromNow(); @@ -249,7 +247,8 @@ public boolean filter(JobStatus jobStatus) { } } - // now the job should be able to go to RUNNING again and then eventually to FINISHED + // now the job should be able to go to RUNNING again and then eventually to FINISHED, + // which it only does if it could successfully restore jobStatusFuture = FutureUtils.retrySuccessful( new Callable>() { @Override @@ -267,9 +266,6 @@ public boolean filter(JobStatus jobStatus) { TestingUtils.defaultExecutor()); assertEquals(JobStatus.FINISHED, jobStatusFuture.get()); - // make sure we saw a successful restore - successfulRestoreLatch.await(); - assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0)); } @@ -303,17 +299,14 @@ public JobStatus apply(Object value) { } private static class UnboundedSource implements SourceFunction { - private boolean running = true; + private volatile boolean running = true; @Override public void run(SourceContext ctx) throws Exception { - while (running) { + while (running && !CheckpointBlockingFunction.afterMessWithZooKeeper.get()) { ctx.collect("hello"); // don't overdo it ... ;-) Thread.sleep(50); - if (CheckpointBlockingFunction.afterMessWithZooKeeper.get()) { - break; - } } } @@ -380,7 +373,6 @@ public void initializeState(FunctionInitializationContext context) { // already saw the one allowed successful restore illegalRestores.getAndIncrement(); } - successfulRestoreLatch.trigger(); } } }