From 045e2187f6d43679115909b10aaeb78c627b758c Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 28 Feb 2017 17:27:03 +0100 Subject: [PATCH 1/2] [FLINK-5938] Replace ExecutionContext by Executor in Scheduler In order to remove the Scheduler's dependency on Scala's ExecutionContext and Akka's futures, this PR replaces the ExecutionContext by an Executor which is used to execute the concurrent handleNewSlot call. --- .../jobmanager/scheduler/Scheduler.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index b839e0e5f6c7f..700cc8f557449 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -30,11 +30,9 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; +import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; -import akka.dispatch.Futures; - import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -54,9 +52,9 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.ExecutionContext; /** * The scheduler is responsible for distributing the ready-to-run tasks among instances and slots. @@ -102,16 +100,16 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl /** The number of slot allocations where locality could not be respected */ private int nonLocalizedAssignments; - /** The ExecutionContext which is used to execute newSlotAvailable futures. */ - private final ExecutionContext executionContext; + /** The Executor which is used to execute newSlotAvailable futures. */ + private final Executor executor; // ------------------------------------------------------------------------ /** * Creates a new scheduler. */ - public Scheduler(ExecutionContext executionContext) { - this.executionContext = executionContext; + public Scheduler(Executor executor) { + this.executor = Preconditions.checkNotNull(executor); } /** @@ -527,15 +525,14 @@ public void newSlotAvailable(final Instance instance) { // // that leads with a high probability to deadlocks, when scheduling fast - this.newlyAvailableInstances.add(instance); + newlyAvailableInstances.add(instance); - Futures.future(new Callable() { + executor.execute(new Runnable() { @Override - public Object call() throws Exception { + public void run() { handleNewSlot(); - return null; } - }, executionContext); + }); } private void handleNewSlot() { From 2525fb2a0e08418f09319568f58ed7daa9284376 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Tue, 28 Feb 2017 15:20:47 +0100 Subject: [PATCH 2/2] [FLINK-5934] Set the Scheduler in the ExecutionGraph via its constructor Before the scheduler was set when calling ExecutionGraph.scheduleForExecution(). This has the disadvantage that the ExecutionGraph has not scheduler set if something else went wrong before the scheduleForExecution call. Consequently, the job will be stuck in a restart loop because the recovery will fail if there is no Scheduler set. In order to solve the problem, the Scheduler is not passed to the ExecutionGraph when it is created. --- .../executiongraph/ExecutionGraph.java | 66 +++++++++---------- .../executiongraph/ExecutionGraphBuilder.java | 3 + .../flink/runtime/jobmanager/JobManager.scala | 3 +- ...ecutionGraphCheckpointCoordinatorTest.java | 2 + .../ArchivedExecutionGraphTest.java | 5 +- .../ExecutionGraphConstructionTest.java | 25 ++++--- .../ExecutionGraphDeploymentTest.java | 47 +++++++------ .../ExecutionGraphMetricsTest.java | 3 +- .../ExecutionGraphRestartTest.java | 39 ++++++----- .../ExecutionGraphSignalsTest.java | 4 +- .../ExecutionGraphTestUtils.java | 5 +- .../ExecutionStateProgressTest.java | 4 +- .../executiongraph/LegacyJobVertexIdTest.java | 18 ++--- .../executiongraph/PointwisePatternTest.java | 22 +++++-- .../TerminalStateDeadlockTest.java | 5 +- .../executiongraph/VertexSlotSharingTest.java | 4 +- .../TaskManagerLossFailsTasksTest.scala | 5 +- .../partitioner/RescalePartitionerTest.java | 2 + 18 files changed, 156 insertions(+), 106 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 66aec8b30ff33..99b390b5d7f28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -128,6 +128,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable serializedJobInformation; + /** The executor which is used to execute futures. */ + private final Executor futureExecutor; + + /** The executor which is used to execute blocking io operations */ + private final Executor ioExecutor; + /** {@code true} if all source tasks are stoppable. */ private boolean isStoppable = true; @@ -159,6 +165,18 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable serializedConfig, Time timeout, - RestartStrategy restartStrategy) throws IOException { + RestartStrategy restartStrategy, + SlotProvider slotProvider) throws IOException { this( futureExecutor, ioExecutor, @@ -239,6 +241,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable(), new ArrayList(), + slotProvider, ExecutionGraph.class.getClassLoader(), new UnregisteredMetricsGroup() ); @@ -255,6 +258,7 @@ public ExecutionGraph( RestartStrategy restartStrategy, List requiredJarFiles, List requiredClasspaths, + SlotProvider slotProvider, ClassLoader userClassLoader, MetricGroup metricGroup) throws IOException { @@ -262,7 +266,6 @@ public ExecutionGraph( checkNotNull(jobId); checkNotNull(jobName); checkNotNull(jobConfig); - checkNotNull(userClassLoader); this.jobInformation = new JobInformation( jobId, @@ -278,12 +281,13 @@ public ExecutionGraph( this.futureExecutor = Preconditions.checkNotNull(futureExecutor); this.ioExecutor = Preconditions.checkNotNull(ioExecutor); - this.userClassLoader = userClassLoader; + this.slotProvider = Preconditions.checkNotNull(slotProvider, "scheduler"); + this.userClassLoader = Preconditions.checkNotNull(userClassLoader, "userClassLoader"); - this.tasks = new ConcurrentHashMap(); - this.intermediateResults = new ConcurrentHashMap(); - this.verticesInCreationOrder = new ArrayList(); - this.currentExecutions = new ConcurrentHashMap(); + this.tasks = new ConcurrentHashMap<>(16); + this.intermediateResults = new ConcurrentHashMap<>(16); + this.verticesInCreationOrder = new ArrayList<>(16); + this.currentExecutions = new ConcurrentHashMap<>(16); this.jobStatusListeners = new CopyOnWriteArrayList<>(); this.executionListeners = new CopyOnWriteArrayList<>(); @@ -700,17 +704,9 @@ public void attachJobGraph(List topologiallySorted) throws JobExcepti } } - public void scheduleForExecution(SlotProvider slotProvider) throws JobException { - if (slotProvider == null) { - throw new IllegalArgumentException("Scheduler must not be null."); - } - - if (this.slotProvider != null && this.slotProvider != slotProvider) { - throw new IllegalArgumentException("Cannot use different slot providers for the same job"); - } + public void scheduleForExecution() throws JobException { if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { - this.slotProvider = slotProvider; switch (scheduleMode) { @@ -914,7 +910,7 @@ public void restart() { } } - scheduleForExecution(slotProvider); + scheduleForExecution(); } catch (Throwable t) { LOG.warn("Failed to restart the job.", t); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 386f202920301..45457771230e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -62,6 +63,7 @@ public static ExecutionGraph buildGraph( Configuration jobManagerConfig, Executor futureExecutor, Executor ioExecutor, + SlotProvider slotProvider, ClassLoader classLoader, CheckpointRecoveryFactory recoveryFactory, Time timeout, @@ -92,6 +94,7 @@ public static ExecutionGraph buildGraph( restartStrategy, jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths(), + slotProvider, classLoader, metrics); } catch (IOException e) { diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 302572735ea5a..00128a0ae1a2e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1287,6 +1287,7 @@ class JobManager( flinkConfiguration, futureExecutor, ioExecutor, + scheduler, userCodeLoader, checkpointRecoveryFactory, Time.of(timeout.length, timeout.unit), @@ -1405,7 +1406,7 @@ class JobManager( // the job. log.info(s"Scheduling job $jobId ($jobName).") - executionGraph.scheduleForExecution(scheduler) + executionGraph.scheduleForExecution() } else { // Remove the job graph. Otherwise it will be lingering around and possibly removed from // ZooKeeper by this JM. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index bc95de7ddc058..99305e0700ff3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; import org.junit.AfterClass; @@ -103,6 +104,7 @@ private ExecutionGraph createExecutionGraphAndEnableCheckpointing( new NoRestartStrategy(), Collections.emptyList(), Collections.emptyList(), + new Scheduler(TestingUtils.defaultExecutionContext()), ClassLoader.getSystemClassLoader(), new UnregisteredMetricsGroup()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 9b1064dcc9bca..ae1a6d5d6bedd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -103,7 +104,9 @@ public static void setupExecutionGraph() throws Exception { new Configuration(), new SerializedValue<>(config), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + mock(SlotProvider.class)); + runtimeGraph.attachJobGraph(vertices); CheckpointStatsTracker statsTracker = new CheckpointStatsTracker( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index bf3a17c86d701..ea25dca23332b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -119,7 +120,8 @@ public void testCreateSimpleGraphBipartite() throws Exception { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -169,7 +171,8 @@ public void testAttachViaDataSets() throws Exception { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -244,7 +247,8 @@ public void testAttachViaIds() throws Exception { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -504,7 +508,8 @@ public void testCannotConnectMissingId() throws Exception { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -569,7 +574,8 @@ public void testCannotConnectWrongOrder() throws Exception { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); fail("Attached wrong jobgraph"); @@ -638,7 +644,8 @@ public void testSetupInputSplits() { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -685,7 +692,8 @@ public void testMoreThanOneConsumerForIntermediateResult() { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); @@ -767,7 +775,8 @@ public void testCoLocationConstraintCreation() { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index ef4f74ca3323c..bff45841f95b1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -96,7 +96,8 @@ public void testBuildDeploymentDescriptor() { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); List ordered = Arrays.asList(v1, v2, v3, v4); @@ -311,6 +312,15 @@ public void testNoResourceAvailableFailure() throws Exception { v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); + for (int i = 0; i < dop1; i++) { + scheduler.newInstanceAvailable( + ExecutionGraphTestUtils.getInstance( + new ActorTaskManagerGateway( + new ExecutionGraphTestUtils.SimpleActorGateway( + TestingUtils.directExecutionContext())))); + } + // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph( TestingUtils.directExecutionContext(), @@ -320,25 +330,18 @@ public void testNoResourceAvailableFailure() throws Exception { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + scheduler); eg.setQueuedSchedulingAllowed(false); List ordered = Arrays.asList(v1, v2); eg.attachJobGraph(ordered); - Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext()); - for (int i = 0; i < dop1; i++) { - scheduler.newInstanceAvailable( - ExecutionGraphTestUtils.getInstance( - new ActorTaskManagerGateway( - new ExecutionGraphTestUtils.SimpleActorGateway( - TestingUtils.directExecutionContext())))); - } assertEquals(dop1, scheduler.getNumberOfAvailableSlots()); // schedule, this triggers mock deployment - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); ExecutionAttemptID attemptID = eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(); eg.updateState(new TaskExecutionState(jobId, attemptID, ExecutionState.RUNNING)); @@ -356,6 +359,15 @@ private Map setupExecution(JobVertex v1, int dop1 v1.setInvokableClass(BatchTask.class); v2.setInvokableClass(BatchTask.class); + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + for (int i = 0; i < dop1 + dop2; i++) { + scheduler.newInstanceAvailable( + ExecutionGraphTestUtils.getInstance( + new ActorTaskManagerGateway( + new ExecutionGraphTestUtils.SimpleActorGateway( + TestingUtils.directExecutionContext())))); + } + // execution graph that executes actions synchronously ExecutionGraph eg = new ExecutionGraph( TestingUtils.directExecutionContext(), @@ -365,25 +377,18 @@ private Map setupExecution(JobVertex v1, int dop1 new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + scheduler); eg.setQueuedSchedulingAllowed(false); List ordered = Arrays.asList(v1, v2); eg.attachJobGraph(ordered); - Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); - for (int i = 0; i < dop1 + dop2; i++) { - scheduler.newInstanceAvailable( - ExecutionGraphTestUtils.getInstance( - new ActorTaskManagerGateway( - new ExecutionGraphTestUtils.SimpleActorGateway( - TestingUtils.directExecutionContext())))); - } assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots()); // schedule, this triggers mock deployment - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); Map executions = eg.getRegisteredExecutions(); assertEquals(dop1 + dop2, executions.size()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index d8d8e246e7967..02e2d38f4c873 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -156,6 +156,7 @@ public void testExecutionGraphRestartTimeMetric() throws JobException, IOExcepti testingRestartStrategy, Collections.emptyList(), Collections.emptyList(), + scheduler, getClass().getClassLoader(), metricGroup); @@ -174,7 +175,7 @@ public void testExecutionGraphRestartTimeMetric() throws JobException, IOExcepti executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); // start execution - executionGraph.scheduleForExecution(scheduler); + executionGraph.scheduleForExecution(); assertTrue(0L == restartingTime.getValue()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 52bfc965c53f4..9ef72a91065b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -115,12 +115,12 @@ public void testConstraintsAfterRestart() throws Exception { //initiate and schedule job JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, groupVertex2); - ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 0L)); + ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 0L), scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); //sanity checks @@ -239,7 +239,8 @@ public void testFailWhileRestarting() throws Exception { new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), // We want to manually control the restart and delay - new InfiniteDelayRestartStrategy()); + new InfiniteDelayRestartStrategy(), + scheduler); JobVertex jobVertex = new JobVertex("NoOpInvokable"); jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); @@ -251,7 +252,7 @@ public void testFailWhileRestarting() throws Exception { assertEquals(JobStatus.CREATED, executionGraph.getState()); - executionGraph.scheduleForExecution(scheduler); + executionGraph.scheduleForExecution(); assertEquals(JobStatus.RUNNING, executionGraph.getState()); @@ -383,12 +384,12 @@ public void testFailingExecutionAfterRestart() throws Exception { JobVertex sender = newJobVertex("Task1", 1, Tasks.NoOpInvokable.class); JobVertex receiver = newJobVertex("Task2", 1, Tasks.NoOpInvokable.class); JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver); - ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000)); + ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000), scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); Iterator executionVertices = eg.getAllExecutionVertices().iterator(); @@ -452,13 +453,13 @@ public void testFailExecutionAfterCancel() throws Exception { JobGraph jobGraph = new JobGraph("Test Job", vertex); jobGraph.setExecutionConfig(executionConfig); - ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy()); + ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); // Fail right after cancel (for example with concurrent slot release) @@ -498,13 +499,13 @@ public void testFailExecutionGraphAfterCancel() throws Exception { JobGraph jobGraph = new JobGraph("Test Job", vertex); jobGraph.setExecutionConfig(executionConfig); - ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy()); + ExecutionGraph eg = newExecutionGraph(new InfiniteDelayRestartStrategy(), scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); // Fail right after cancel (for example with concurrent slot release) @@ -554,13 +555,14 @@ public void testSuspendWhileRestarting() throws Exception { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - controllableRestartStrategy); + controllableRestartStrategy, + scheduler); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); @@ -659,7 +661,7 @@ private static Tuple2 createExecutionGraph(RestartStra JobGraph jobGraph = new JobGraph("Pointwise job", sender); - ExecutionGraph eg = newExecutionGraph(restartStrategy); + ExecutionGraph eg = newExecutionGraph(restartStrategy, scheduler); if (isSpy) { eg = spy(eg); } @@ -667,7 +669,7 @@ private static Tuple2 createExecutionGraph(RestartStra assertEquals(JobStatus.CREATED, eg.getState()); - eg.scheduleForExecution(scheduler); + eg.scheduleForExecution(); assertEquals(JobStatus.RUNNING, eg.getState()); return new Tuple2<>(eg, instance); } @@ -679,7 +681,7 @@ private static JobVertex newJobVertex(String task1, int numTasks, Class(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - restartStrategy); + restartStrategy, + scheduler); + } + + private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException { + return newExecutionGraph(restartStrategy, new Scheduler(TestingUtils.defaultExecutionContext())); } private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java index fde967e519553..fab6505c7e4b4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -139,7 +140,8 @@ public void prepare() throws Exception { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(ordered); f = eg.getClass().getDeclaredField("state"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 71ae3b6dcbd9e..ef4926e1e453d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.TaskMessages.SubmitTask; import org.apache.flink.runtime.messages.TaskMessages.FailIntermediateResultPartitions; import org.apache.flink.runtime.messages.TaskMessages.CancelTask; @@ -55,6 +56,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import scala.concurrent.ExecutionContext; +import scala.concurrent.ExecutionContext$; public class ExecutionGraphTestUtils { @@ -180,7 +182,8 @@ public static ExecutionJobVertex getExecutionVertex(JobVertexID id, Executor exe new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(ExecutionContext$.MODULE$.fromExecutor(executor))); ExecutionJobVertex ejv = spy(new ExecutionJobVertex(graph, ajv, 1, AkkaUtils.getDefaultTimeout())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index 19e2d6d537e86..e458d0282b5fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -59,7 +60,8 @@ public void testAccumulatedStateFinished() { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); graph.attachJobGraph(Collections.singletonList(ajv)); setGraphStatus(graph, JobStatus.RUNNING); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java index 44dc0a400349e..a4da0bf935fde 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -49,13 +50,14 @@ public void testIntroduceLegacyJobVertexIds() throws Exception { ExecutionGraph executionGraph = new ExecutionGraph( mock(Executor.class), - mock(Executor.class), - new JobID(), - "test", - mock(Configuration.class), - mock(SerializedValue.class), - Time.seconds(1), - mock(RestartStrategy.class)); + mock(Executor.class), + new JobID(), + "test", + mock(Configuration.class), + mock(SerializedValue.class), + Time.seconds(1), + mock(RestartStrategy.class), + mock(SlotProvider.class)); ExecutionJobVertex executionJobVertex = new ExecutionJobVertex(executionGraph, jobVertex, 1, Time.seconds(1)); @@ -74,4 +76,4 @@ public void testIntroduceLegacyJobVertexIds() throws Exception { Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId1)); Assert.assertEquals(executionJobVertex, idToVertex.get(legacyId2)); } -} \ No newline at end of file +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index 0e147e3e05b73..b59dcda8b6555 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -73,7 +74,8 @@ public void testNToN() throws Exception { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -119,7 +121,8 @@ public void test2NToN() throws Exception { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -166,7 +169,8 @@ public void test3NToN() throws Exception { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -214,7 +218,8 @@ public void testNTo2N() throws Exception { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -260,7 +265,8 @@ public void testNTo7N() throws Exception { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -326,7 +332,8 @@ private void testLowToHigh(int lowDop, int highDop) throws Exception { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } @@ -383,7 +390,8 @@ private void testHighToLow(int highDop, int lowDop) throws Exception { cfg, new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); try { eg.attachJobGraph(ordered); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java index 5b1a03e4730ec..32ae093cc5243 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java @@ -187,11 +187,12 @@ static class TestExecGraph extends ExecutionGraph { EMPTY_CONFIG, new SerializedValue<>(new ExecutionConfig()), TIMEOUT, - new FixedDelayRestartStrategy(1, 0)); + new FixedDelayRestartStrategy(1, 0), + new Scheduler(TestingUtils.defaultExecutionContext())); } @Override - public void scheduleForExecution(SlotProvider slotProvider) { + public void scheduleForExecution() { // notify that we are done with the "restarting" synchronized (this) { done = true; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java index 27708a23ac258..5c8559245d4bb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; @@ -87,7 +88,8 @@ public void testAssignSlotSharingGroup() { new Configuration(), new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), - new NoRestartStrategy()); + new NoRestartStrategy(), + new Scheduler(TestingUtils.defaultExecutionContext())); eg.attachJobGraph(vertices); // verify that the vertices are all in the same slot sharing group diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala index 1cbd605202e86..5e4a80209d3bc 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala @@ -64,13 +64,14 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers { new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout, - new NoRestartStrategy()) + new NoRestartStrategy(), + scheduler) eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources) eg.getState should equal(JobStatus.CREATED) - eg.scheduleForExecution(scheduler) + eg.scheduleForExecution() eg.getState should equal(JobStatus.RUNNING) instance1.markDead() diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java index e77cbb3214167..2e05101d49077 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.streaming.api.datastream.DataStream; @@ -148,6 +149,7 @@ public void flatMap(String value, new NoRestartStrategy(), new ArrayList(), new ArrayList(), + new Scheduler(TestingUtils.defaultExecutionContext()), ExecutionGraph.class.getClassLoader(), new UnregisteredMetricsGroup()); try {