From 01265fe1529b6712311051ed2d210b5e2e5cf0a8 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Mon, 22 May 2017 12:14:59 +0200 Subject: [PATCH 1/3] [FLINK-6654] [build] Let 'flink-dist' properly depend on 'flink-shaded-hadoop2-uber' This closes #3960 --- flink-dist/pom.xml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 86923d74ad26b..1caec3d71d816 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -43,6 +43,17 @@ under the License. ${project.version} + + org.apache.flink + flink-shaded-hadoop2-uber + ${project.version} + + provided + + org.apache.flink flink-java From f055645b3d905ea212b11eb570926d46447f3f52 Mon Sep 17 00:00:00 2001 From: zjureel Date: Tue, 18 Jul 2017 19:27:56 +0200 Subject: [PATCH 2/3] [FLINK-6665] [FLINK-6667] [distributed coordination] Use a callback and a ScheduledExecutor for ExecutionGraph restarts Initial work by zjureel@gmail.com , improved by sewen@apache.org. --- .../executiongraph/ExecutionGraph.java | 7 +- ...ava => ExecutionGraphRestartCallback.java} | 52 +++---- .../restart/FailureRateRestartStrategy.java | 15 +- .../restart/FixedDelayRestartStrategy.java | 15 +- .../restart/NoRestartStrategy.java | 5 +- .../restart/RestartCallback.java | 32 +++++ .../restart/RestartStrategy.java | 12 +- .../ExecutionGraphMetricsTest.java | 17 +-- .../ExecutionGraphRestartTest.java | 51 ++----- .../ExecutionGraphTestUtils.java | 11 +- .../FailureRateRestartStrategyTest.java | 128 ++++++++++++++++++ .../FixedDelayRestartStrategyTest.java | 77 ++++++++--- .../restart/InfiniteDelayRestartStrategy.java | 10 +- .../restart/LatchedRestarter.java | 38 ++++++ .../executiongraph/restart/NoOpRestarter.java | 28 ++++ 15 files changed, 385 insertions(+), 113 deletions(-) rename flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/{ExecutionGraphRestarter.java => ExecutionGraphRestartCallback.java} (53%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 7c13936f2cab4..9f29fafed7a8a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -44,11 +44,14 @@ import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; +import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback; +import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SlotProvider; @@ -1388,7 +1391,9 @@ private boolean tryRestartOrFail() { if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) { LOG.info("Restarting the job {} ({}).", getJobName(), getJobID()); - restartStrategy.restart(this); + + RestartCallback restarter = new ExecutionGraphRestartCallback(this); + restartStrategy.restart(restarter, new ScheduledExecutorServiceAdapter(futureExecutor)); return true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java similarity index 53% rename from flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java index 9287694735951..5874f91f7a898 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java @@ -19,27 +19,33 @@ package org.apache.flink.runtime.executiongraph.restart; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Callable; - -class ExecutionGraphRestarter { - private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class); - public static Callable restartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis) { - return new Callable() { - @Override - public Object call() throws Exception { - try { - LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis); - // do the delay - Thread.sleep(delayBetweenRestartAttemptsInMillis); - } catch(InterruptedException e) { - // should only happen on shutdown - } - executionGraph.restart(); - return null; - } - }; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}. + * + *

This callback implementation is one-shot; it can only be used once. + */ +public class ExecutionGraphRestartCallback implements RestartCallback { + + /** The ExecutionGraph to restart */ + private final ExecutionGraph execGraph; + + /** Atomic flag to make sure this is used only once */ + private final AtomicBoolean used; + + public ExecutionGraphRestartCallback(ExecutionGraph execGraph) { + this.execGraph = checkNotNull(execGraph); + this.used = new AtomicBoolean(false); + } + + @Override + public void triggerFullRecovery() { + if (used.compareAndSet(false, true)) { + execGraph.restart(); + } } -} \ No newline at end of file +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java index d95e1c37ac4e0..6580ec3b3d008 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.util.Preconditions; @@ -35,6 +35,7 @@ * with a fixed time delay in between. */ public class FailureRateRestartStrategy implements RestartStrategy { + private final Time failuresInterval; private final Time delayInterval; private final int maxFailuresPerInterval; @@ -66,16 +67,22 @@ public boolean canRestart() { } @Override - public void restart(final ExecutionGraph executionGraph) { + public void restart(final RestartCallback restarter, ScheduledExecutor executor) { if (isRestartTimestampsQueueFull()) { restartTimestampsDeque.remove(); } restartTimestampsDeque.add(System.currentTimeMillis()); - FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getFutureExecutor()); + + executor.schedule(new Runnable() { + @Override + public void run() { + restarter.triggerFullRecovery(); + } + }, delayInterval.getSize(), delayInterval.getUnit()); } private boolean isRestartTimestampsQueueFull() { - return restartTimestampsDeque.size() == maxFailuresPerInterval; + return restartTimestampsDeque.size() >= maxFailuresPerInterval; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java index f51ea7c27300f..0dd700a7f20fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java @@ -20,16 +20,19 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.util.Preconditions; import scala.concurrent.duration.Duration; +import java.util.concurrent.TimeUnit; + /** * Restart strategy which tries to restart the given {@link ExecutionGraph} a fixed number of times * with a fixed time delay in between. */ public class FixedDelayRestartStrategy implements RestartStrategy { + private final int maxNumberRestartAttempts; private final long delayBetweenRestartAttempts; private int currentRestartAttempt; @@ -56,9 +59,15 @@ public boolean canRestart() { } @Override - public void restart(final ExecutionGraph executionGraph) { + public void restart(final RestartCallback restarter, ScheduledExecutor executor) { currentRestartAttempt++; - FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getFutureExecutor()); + + executor.schedule(new Runnable() { + @Override + public void run() { + restarter.triggerFullRecovery(); + } + }, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java index 958d9ac0fab8a..5502d2d2eab90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph.restart; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.executiongraph.ExecutionGraph; /** @@ -32,8 +33,8 @@ public boolean canRestart() { } @Override - public void restart(ExecutionGraph executionGraph) { - throw new RuntimeException("NoRestartStrategy does not support restart."); + public void restart(RestartCallback restarter, ScheduledExecutor executor) { + throw new UnsupportedOperationException("NoRestartStrategy does not support restart."); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java new file mode 100644 index 0000000000000..6499be365844e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +/** + * A callback to trigger restarts, passed to the {@link RestartStrategy} to + * trigger recovery on the ExecutionGraph. + */ +public interface RestartCallback { + + /** + * Triggers a full recovery in the target ExecutionGraph. + * A full recovery resets all vertices to the state of the latest checkpoint. + */ + void triggerFullRecovery(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java index 2880c0128f246..60e2e8bed219d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph.restart; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.executiongraph.ExecutionGraph; /** @@ -33,9 +34,14 @@ public interface RestartStrategy { boolean canRestart(); /** - * Restarts the given {@link ExecutionGraph}. + * Called by the ExecutionGraph to eventually trigger a full recovery. + * The recovery must be triggered on the given callback object, and may be delayed + * with the help of the given scheduled executor. + * + *

The thread that calls this method is not supposed to block/sleep. * - * @param executionGraph The ExecutionGraph to be restarted + * @param restarter The hook to restart the ExecutionGraph + * @param executor An scheduled executor to delay the restart */ - void restart(ExecutionGraph executionGraph); + void restart(RestartCallback restarter, ScheduledExecutor executor); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index 6b5ceaef0479c..0785a2640ebb9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -24,11 +24,13 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge; +import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; @@ -255,25 +257,20 @@ public void testExecutionGraphRestartTimeMetric() throws JobException, IOExcepti static class TestingRestartStrategy implements RestartStrategy { - private boolean restartable = true; - private ExecutionGraph executionGraph = null; + private RestartCallback restarter; @Override public boolean canRestart() { - return restartable; + return true; } @Override - public void restart(ExecutionGraph executionGraph) { - this.executionGraph = executionGraph; - } - - public void setRestartable(boolean restartable) { - this.restartable = restartable; + public void restart(RestartCallback restarter, ScheduledExecutor executor) { + this.restarter = restarter; } public void restartExecutionGraph() { - executionGraph.restart(); + restarter.triggerFullRecovery(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index eeb6c69b0c5cf..81702a2f8a883 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -18,21 +18,24 @@ package org.apache.flink.runtime.executiongraph; -import akka.dispatch.Futures; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; -import org.apache.flink.runtime.executiongraph.restart.FailureRateRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -41,6 +44,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.SerializedValue; @@ -56,11 +60,9 @@ import java.io.IOException; import java.util.Iterator; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -158,34 +160,6 @@ public void testRestartAutomatically() throws Exception { restartAfterFailure(eg, new FiniteDuration(2, TimeUnit.MINUTES), true); } - @Test - public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception { - FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(2, Time.of(10, TimeUnit.SECONDS), Time.of(0, TimeUnit.SECONDS)); - FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS); - Tuple2 executionGraphInstanceTuple = createExecutionGraph(restartStrategy); - ExecutionGraph eg = executionGraphInstanceTuple.f0; - - restartAfterFailure(eg, timeout, false); - restartAfterFailure(eg, timeout, false); - makeAFailureAndWait(eg, timeout); - //failure rate limit exceeded, so task should be failed - assertEquals(JobStatus.FAILED, eg.getState()); - } - - @Test - public void taskShouldNotFailWhenFailureRateLimitWasNotExceeded() throws Exception { - FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(1, Time.of(1, TimeUnit.MILLISECONDS), Time.of(0, TimeUnit.SECONDS)); - FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS); - Tuple2 executionGraphInstanceTuple = createExecutionGraph(restartStrategy); - ExecutionGraph eg = executionGraphInstanceTuple.f0; - - //task restarted many times, but after all job is still running, because rate limit was not exceeded - restartAfterFailure(eg, timeout, false); - restartAfterFailure(eg, timeout, false); - restartAfterFailure(eg, timeout, false); - assertEquals(JobStatus.RUNNING, eg.getState()); - } - @Test public void testCancelWhileRestarting() throws Exception { // We want to manually control the restart and delay @@ -618,23 +592,20 @@ public boolean canRestart() { } @Override - public void restart(final ExecutionGraph executionGraph) { - Futures.future(new Callable() { + public void restart(final RestartCallback restarter, ScheduledExecutor executor) { + executor.execute(new Runnable() { @Override - public Object call() throws Exception { + public void run() { try { - Await.ready(doRestart.future(), timeout); - executionGraph.restart(); + restarter.triggerFullRecovery(); } catch (Exception e) { exception = e; } restartDone.success(true); - - return null; } - }, TestingUtils.defaultExecutionContext()); + }); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 140e984675bc9..51b5c7fd9759f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -254,9 +254,7 @@ public static ExecutionGraph createSimpleTestGraph() throws Exception { * restart strategy. */ public static ExecutionGraph createSimpleTestGraph(RestartStrategy restartStrategy) throws Exception { - JobVertex vertex = new JobVertex("vertex"); - vertex.setInvokableClass(NoOpInvokable.class); - vertex.setParallelism(10); + JobVertex vertex = createNoOpVertex(10); return createSimpleTestGraph(new JobID(), restartStrategy, vertex); } @@ -313,6 +311,13 @@ public static ExecutionGraph createSimpleTestGraph( 1, TEST_LOGGER); } + + public static JobVertex createNoOpVertex(int parallelism) { + JobVertex vertex = new JobVertex("vertex"); + vertex.setInvokableClass(NoOpInvokable.class); + vertex.setParallelism(parallelism); + return vertex; + } // ------------------------------------------------------------------------ // utility mocking methods diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java new file mode 100644 index 0000000000000..b42dd779c4997 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; + +import org.junit.After; +import org.junit.Test; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Unit test for the {@link FailureRateRestartStrategy}. + */ +public class FailureRateRestartStrategyTest { + + public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); + + public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService); + + @After + public void shutdownExecutor() { + executorService.shutdownNow(); + } + + // ------------------------------------------------------------------------ + + @Test + public void testManyFailuresWithinRate() throws Exception { + final int numAttempts = 10; + final int intervalMillis = 1; + + final FailureRateRestartStrategy restartStrategy = + new FailureRateRestartStrategy(1, Time.milliseconds(intervalMillis), Time.milliseconds(0)); + + for (int attempsLeft = numAttempts; attempsLeft > 0; --attempsLeft) { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + sleepGuaranteed(2 * intervalMillis); + } + + assertTrue(restartStrategy.canRestart()); + } + + @Test + public void testFailuresExceedingRate() throws Exception { + final int numFailures = 3; + final int intervalMillis = 10_000; + + final FailureRateRestartStrategy restartStrategy = + new FailureRateRestartStrategy(numFailures, Time.milliseconds(intervalMillis), Time.milliseconds(0)); + + for (int failuresLeft = numFailures; failuresLeft > 0; --failuresLeft) { + assertTrue(restartStrategy.canRestart()); + restartStrategy.restart(new NoOpRestarter(), executor); + } + + // now the rate should be exceeded + assertFalse(restartStrategy.canRestart()); + } + + @Test + public void testDelay() throws Exception { + final long restartDelay = 2; + final int numberRestarts = 10; + + final FailureRateRestartStrategy strategy = + new FailureRateRestartStrategy(numberRestarts + 1, Time.milliseconds(1), Time.milliseconds(restartDelay)); + + for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) { + assertTrue(strategy.canRestart()); + + final OneShotLatch sync = new OneShotLatch(); + final RestartCallback restarter = new LatchedRestarter(sync); + + final long time = System.nanoTime(); + strategy.restart(restarter, executor); + sync.await(); + + final long elapsed = System.nanoTime() - time; + assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000); + } + } + + // ------------------------------------------------------------------------ + + /** + * This method makes sure that the actual interval and is not spuriously waking up. + */ + private static void sleepGuaranteed(long millis) throws InterruptedException { + final long deadline = System.nanoTime() + millis * 1_000_000; + + long nanosToSleep; + while ((nanosToSleep = deadline - System.nanoTime()) > 0) { + long millisToSleep = nanosToSleep / 1_000_000; + if (nanosToSleep % 1_000_000 != 0) { + millisToSleep++; + } + + Thread.sleep(millisToSleep); + } + } + + +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java index 4beedb00323db..17504d887bc31 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java @@ -18,34 +18,75 @@ package org.apache.flink.runtime.executiongraph.restart; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; -import com.google.common.util.concurrent.MoreExecutors; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.junit.After; import org.junit.Test; -import org.mockito.Mockito; -import scala.concurrent.ExecutionContext$; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Unit test for the {@link FixedDelayRestartStrategy}. + */ public class FixedDelayRestartStrategyTest { + public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); + + public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService); + + @After + public void shutdownExecutor() { + executorService.shutdownNow(); + } + + // ------------------------------------------------------------------------ + + @Test + public void testNumberOfRestarts() throws Exception { + final int numberRestarts = 10; + + final FixedDelayRestartStrategy strategy = + new FixedDelayRestartStrategy(numberRestarts, 0L); + + for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) { + // two calls to 'canRestart()' to make sure this is not used to maintain the counter + assertTrue(strategy.canRestart()); + assertTrue(strategy.canRestart()); + + strategy.restart(new NoOpRestarter(), executor); + } + + assertFalse(strategy.canRestart()); + } + @Test - public void testFixedDelayRestartStrategy() { - int numberRestarts = 10; - long restartDelay = 10; + public void testDelay() throws Exception { + final long restartDelay = 10; + final int numberRestarts = 10; + + final FixedDelayRestartStrategy strategy = + new FixedDelayRestartStrategy(numberRestarts, restartDelay); + + for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) { + assertTrue(strategy.canRestart()); - FixedDelayRestartStrategy fixedDelayRestartStrategy = new FixedDelayRestartStrategy( - numberRestarts, - restartDelay); + final OneShotLatch sync = new OneShotLatch(); + final RestartCallback restarter = new LatchedRestarter(sync); - ExecutionGraph executionGraph = mock(ExecutionGraph.class); - when(executionGraph.getFutureExecutor()) - .thenReturn(ExecutionContext$.MODULE$.fromExecutor(MoreExecutors.directExecutor())); + final long time = System.nanoTime(); + strategy.restart(restarter, executor); + sync.await(); - while(fixedDelayRestartStrategy.canRestart()) { - fixedDelayRestartStrategy.restart(executionGraph); + final long elapsed = System.nanoTime() - time; + assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000); } - Mockito.verify(executionGraph, Mockito.times(numberRestarts)).restart(); + assertFalse(strategy.canRestart()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java index c1cbdd302ce30..d1452981fd507 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph.restart; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +28,7 @@ * Actually {@link ExecutionGraph} will never be restarted. No additional threads will be used. */ public class InfiniteDelayRestartStrategy implements RestartStrategy { + private static final Logger LOG = LoggerFactory.getLogger(InfiniteDelayRestartStrategy.class); private final int maxRestartAttempts; @@ -43,15 +45,11 @@ public InfiniteDelayRestartStrategy(int maxRestartAttempts) { @Override public boolean canRestart() { - if (maxRestartAttempts >= 0) { - return restartAttemptCounter < maxRestartAttempts; - } else { - return true; - } + return maxRestartAttempts < 0 || restartAttemptCounter < maxRestartAttempts; } @Override - public void restart(ExecutionGraph executionGraph) { + public void restart(RestartCallback restarter, ScheduledExecutor executor) { LOG.info("Delaying retry of job execution forever"); if (maxRestartAttempts >= 0) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java new file mode 100644 index 0000000000000..7682faba0adda --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +import org.apache.flink.core.testutils.OneShotLatch; + +/** + * A testing RestartCallback that triggers a latch when restart is triggered. + */ +class LatchedRestarter implements RestartCallback { + + private final OneShotLatch latch; + + LatchedRestarter(OneShotLatch latch) { + this.latch = latch; + } + + @Override + public void triggerFullRecovery() { + latch.trigger(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java new file mode 100644 index 0000000000000..04a7c10a5ba26 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.restart; + +/** + * A testing RestartCallback that does nothing. + */ +class NoOpRestarter implements RestartCallback { + + @Override + public void triggerFullRecovery() {} +} From 11e2144892a57c58ffe919ac228c702595f34025 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 18 Jul 2017 19:49:56 +0200 Subject: [PATCH 3/3] [FLINK-7216] [distr. coordination] Guard against concurrent global failover --- .../executiongraph/ExecutionGraph.java | 35 ++-- .../ExecutionGraphRestartCallback.java | 23 ++- .../restart/RestartStrategy.java | 2 +- .../ExecutionGraphRestartTest.java | 158 +++++++++++++++++- .../ExecutionGraphTestUtils.java | 29 ++++ 5 files changed, 225 insertions(+), 22 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 9f29fafed7a8a..4c28f9b98bcb1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -966,7 +966,7 @@ public void cancel() { if (transitionState(current, JobStatus.CANCELLING)) { // make sure no concurrent local actions interfere with the cancellation - incrementGlobalModVersion(); + final long globalVersionForRestart = incrementGlobalModVersion(); final ArrayList> futures = new ArrayList<>(verticesInCreationOrder.size()); @@ -980,7 +980,9 @@ public void cancel() { allTerminal.thenAccept(new AcceptFunction() { @Override public void accept(Void value) { - allVerticesInTerminalState(); + // cancellations may currently be overridden by failures which trigger + // restarts, so we need to pass a proper restart global version here + allVerticesInTerminalState(globalVersionForRestart); } }); @@ -1085,17 +1087,20 @@ public void failGlobal(Throwable t) { return; } else if (current == JobStatus.RESTARTING) { + // we handle 'failGlobal()' while in 'RESTARTING' as a safety net in case something + // has gone wrong in 'RESTARTING' and we need to re-attempt the restarts this.failureCause = t; - if (tryRestartOrFail()) { + final long globalVersionForRestart = incrementGlobalModVersion(); + if (tryRestartOrFail(globalVersionForRestart)) { return; } } else if (transitionState(current, JobStatus.FAILING, t)) { this.failureCause = t; - // make sure no concurrent local actions interfere with the cancellation - incrementGlobalModVersion(); + // make sure no concurrent local or global actions interfere with the failover + final long globalVersionForRestart = incrementGlobalModVersion(); // we build a future that is complete once all vertices have reached a terminal state final ArrayList> futures = new ArrayList<>(verticesInCreationOrder.size()); @@ -1109,7 +1114,7 @@ else if (transitionState(current, JobStatus.FAILING, t)) { allTerminal.thenAccept(new AcceptFunction() { @Override public void accept(Void value) { - allVerticesInTerminalState(); + allVerticesInTerminalState(globalVersionForRestart); } }); @@ -1120,10 +1125,16 @@ public void accept(Void value) { } } - public void restart() { + public void restart(long expectedGlobalVersion) { try { synchronized (progressLock) { - JobStatus current = state; + // check and increment the global version to move this recovery up + if (globalModVersion != expectedGlobalVersion) { + LOG.info("Concurrent full restart subsumed this restart."); + return; + } + + final JobStatus current = state; if (current == JobStatus.CANCELED) { LOG.info("Canceled job during restart. Aborting restart."); @@ -1329,7 +1340,7 @@ void vertexUnFinished() { * This method is a callback during cancellation/failover and called when all tasks * have reached a terminal state (cancelled/failed/finished). */ - private void allVerticesInTerminalState() { + private void allVerticesInTerminalState(long expectedGlobalVersionForRestart) { // we are done, transition to the final state JobStatus current; while (true) { @@ -1345,7 +1356,7 @@ else if (current == JobStatus.CANCELLING) { } } else if (current == JobStatus.FAILING) { - if (tryRestartOrFail()) { + if (tryRestartOrFail(expectedGlobalVersionForRestart)) { break; } // concurrent job status change, let's check again @@ -1374,7 +1385,7 @@ else if (current.isGloballyTerminalState()) { * * @return true if the operation could be executed; false if a concurrent job status change occurred */ - private boolean tryRestartOrFail() { + private boolean tryRestartOrFail(long globalModVersionForRestart) { JobStatus currentState = state; if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) { @@ -1392,7 +1403,7 @@ private boolean tryRestartOrFail() { if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) { LOG.info("Restarting the job {} ({}).", getJobName(), getJobID()); - RestartCallback restarter = new ExecutionGraphRestartCallback(this); + RestartCallback restarter = new ExecutionGraphRestartCallback(this, globalModVersionForRestart); restartStrategy.restart(restarter, new ScheduledExecutorServiceAdapter(futureExecutor)); return true; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java index 5874f91f7a898..7f98110f5eb17 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java @@ -25,27 +25,38 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}. - * + * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}. + * *

This callback implementation is one-shot; it can only be used once. */ public class ExecutionGraphRestartCallback implements RestartCallback { - /** The ExecutionGraph to restart */ + /** The ExecutionGraph to restart. */ private final ExecutionGraph execGraph; - /** Atomic flag to make sure this is used only once */ + /** Atomic flag to make sure this is used only once. */ private final AtomicBoolean used; - public ExecutionGraphRestartCallback(ExecutionGraph execGraph) { + /** The globalModVersion that the ExecutionGraph needs to have for the restart to go through. */ + private final long expectedGlobalModVersion; + + /** + * Creates a new ExecutionGraphRestartCallback. + * + * @param execGraph The ExecutionGraph to restart + * @param expectedGlobalModVersion The globalModVersion that the ExecutionGraph needs to have + * for the restart to go through + */ + public ExecutionGraphRestartCallback(ExecutionGraph execGraph, long expectedGlobalModVersion) { this.execGraph = checkNotNull(execGraph); this.used = new AtomicBoolean(false); + this.expectedGlobalModVersion = expectedGlobalModVersion; } @Override public void triggerFullRecovery() { if (used.compareAndSet(false, true)) { - execGraph.restart(); + execGraph.restart(expectedGlobalModVersion); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java index 60e2e8bed219d..ffa2777ae8c0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java @@ -37,7 +37,7 @@ public interface RestartStrategy { * Called by the ExecutionGraph to eventually trigger a full recovery. * The recovery must be triggered on the given callback object, and may be delayed * with the help of the given scheduled executor. - * + * *

The thread that calls this method is not supposed to block/sleep. * * @param restarter The hook to restart the ExecutionGraph diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 81702a2f8a883..a4bfa77b29b52 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -36,10 +36,13 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; @@ -60,9 +63,16 @@ import java.io.IOException; import java.util.Iterator; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilDeployedAndSwitchToRunning; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -90,7 +100,7 @@ public void testNoManualRestart() throws Exception { assertEquals(JobStatus.FAILED, eg.getState()); // This should not restart the graph. - eg.restart(); + eg.restart(eg.getGlobalModVersion()); assertEquals(JobStatus.FAILED, eg.getState()); } @@ -187,7 +197,7 @@ public void testCancelWhileRestarting() throws Exception { assertEquals(JobStatus.CANCELED, executionGraph.getState()); // The restart has been aborted - executionGraph.restart(); + executionGraph.restart(executionGraph.getGlobalModVersion()); assertEquals(JobStatus.CANCELED, executionGraph.getState()); } @@ -254,7 +264,7 @@ public void testFailWhileRestarting() throws Exception { assertEquals(JobStatus.FAILED, executionGraph.getState()); // The restart has been aborted - executionGraph.restart(); + executionGraph.restart(executionGraph.getGlobalModVersion()); assertEquals(JobStatus.FAILED, executionGraph.getState()); } @@ -555,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception { assertEquals(JobStatus.SUSPENDED, eg.getState()); } + @Test + public void testConcurrentLocalFailAndRestart() throws Exception { + final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L)); + eg.setScheduleMode(ScheduleMode.EAGER); + eg.scheduleForExecution(); + + waitUntilDeployedAndSwitchToRunning(eg, 1000); + + final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next(); + final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt(); + final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt(); + + final OneShotLatch failTrigger = new OneShotLatch(); + final CountDownLatch readyLatch = new CountDownLatch(2); + + Thread failure1 = new Thread() { + @Override + public void run() { + readyLatch.countDown(); + try { + failTrigger.await(); + } catch (InterruptedException ignored) {} + + first.fail(new Exception("intended test failure 1")); + } + }; + + Thread failure2 = new Thread() { + @Override + public void run() { + readyLatch.countDown(); + try { + failTrigger.await(); + } catch (InterruptedException ignored) {} + + last.fail(new Exception("intended test failure 2")); + } + }; + + // make sure both threads start simultaneously + failure1.start(); + failure2.start(); + readyLatch.await(); + failTrigger.trigger(); + + waitUntilJobStatus(eg, JobStatus.FAILING, 1000); + completeCancellingForAllVertices(eg); + + waitUntilJobStatus(eg, JobStatus.RUNNING, 1000); + waitUntilDeployedAndSwitchToRunning(eg, 1000); + finishAllVertices(eg); + + eg.waitUntilTerminal(); + assertEquals(JobStatus.FINISHED, eg.getState()); + } + + @Test + public void testConcurrentGlobalFailAndRestarts() throws Exception { + final OneShotLatch restartTrigger = new OneShotLatch(); + + final int parallelism = 10; + final JobID jid = new JobID(); + final JobVertex vertex = createNoOpVertex(parallelism); + final SlotProvider slots = new SimpleSlotProvider(jid, parallelism, new NotCancelAckingTaskGateway()); + final TriggeredRestartStrategy restartStrategy = new TriggeredRestartStrategy(restartTrigger); + + final ExecutionGraph eg = createSimpleTestGraph(jid, slots, restartStrategy, vertex); + eg.setScheduleMode(ScheduleMode.EAGER); + eg.scheduleForExecution(); + + waitUntilDeployedAndSwitchToRunning(eg, 1000); + + // fail into 'RESTARTING' + eg.failGlobal(new Exception("intended test failure 1")); + assertEquals(JobStatus.FAILING, eg.getState()); + completeCancellingForAllVertices(eg); + waitUntilJobStatus(eg, JobStatus.RESTARTING, 1000); + + eg.failGlobal(new Exception("intended test failure 2")); + assertEquals(JobStatus.RESTARTING, eg.getState()); + + // trigger both restart strategies to kick in concurrently + restartTrigger.trigger(); + + waitUntilJobStatus(eg, JobStatus.RUNNING, 1000); + waitUntilDeployedAndSwitchToRunning(eg, 1000); + finishAllVertices(eg); + + eg.waitUntilTerminal(); + assertEquals(JobStatus.FINISHED, eg.getState()); + + if (eg.getNumberOfFullRestarts() > 2) { + fail("Too many restarts: " + eg.getNumberOfFullRestarts()); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + private static class ControllableRestartStrategy implements RestartStrategy { private Promise reachedCanRestart = new Promise.DefaultPromise<>(); @@ -727,4 +837,46 @@ private static void haltExecution(ExecutionGraph eg) { assertEquals(JobStatus.FINISHED, eg.getState()); } + + // ------------------------------------------------------------------------ + + /** + * A TaskManager gateway that does not ack cancellations. + */ + private static final class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway { + + @Override + public org.apache.flink.runtime.concurrent.Future cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { + return new FlinkCompletableFuture<>(); + } + } + + private static final class TriggeredRestartStrategy implements RestartStrategy { + + private final OneShotLatch latch; + + TriggeredRestartStrategy(OneShotLatch latch) { + this.latch = latch; + } + + @Override + public boolean canRestart() { + return true; + } + + @Override + public void restart(final RestartCallback restarter, ScheduledExecutor executor) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + restarter.triggerFullRecovery(); + } + }); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 51b5c7fd9759f..01f3bbaf0e054 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -189,6 +189,35 @@ public static void finishAllVertices(ExecutionGraph eg) { } } + /** + * Turns a newly scheduled execution graph into a state where all vertices run. + * This waits until all executions have reached state 'DEPLOYING' and then switches them to running. + */ + public static void waitUntilDeployedAndSwitchToRunning(ExecutionGraph eg, long timeout) throws TimeoutException { + // wait until everything is running + for (ExecutionVertex ev : eg.getAllExecutionVertices()) { + final Execution exec = ev.getCurrentExecutionAttempt(); + waitUntilExecutionState(exec, ExecutionState.DEPLOYING, timeout); + } + + // Note: As ugly as it is, we need this minor sleep, because between switching + // to 'DEPLOYED' and when the 'switchToRunning()' may be called lies a race check + // against concurrent modifications (cancel / fail). We can only switch this to running + // once that check is passed. For the actual runtime, this switch is triggered by a callback + // from the TaskManager, which comes strictly after that. For tests, we use mock TaskManagers + // which cannot easily tell us when that condition has happened, unfortunately. + try { + Thread.sleep(2); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + for (ExecutionVertex ev : eg.getAllExecutionVertices()) { + final Execution exec = ev.getCurrentExecutionAttempt(); + exec.switchToRunning(); + } + } + // ------------------------------------------------------------------------ // state modifications // ------------------------------------------------------------------------