From 019ddf811154af9b7b67afcd9f08dccd30bb4bc2 Mon Sep 17 00:00:00 2001 From: Paris Carbone Date: Wed, 20 Jan 2016 03:03:41 +0100 Subject: [PATCH] [FLINK-3256] Fix colocation group re-instantiation --- .../executiongraph/ExecutionGraph.java | 16 +- .../executiongraph/ExecutionJobVertex.java | 3 - .../ExecutionGraphRestartTest.java | 166 +++++++++++++----- 3 files changed, 135 insertions(+), 50 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 90854836b9378..a03f0bf7a1018 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.ExecutionGraphMessages; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -76,6 +77,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Collection; +import java.util.HashSet; import java.util.NoSuchElementException; import java.util.UUID; import java.util.concurrent.Callable; @@ -147,7 +150,7 @@ public class ExecutionGraph implements Serializable { /** All vertices, in the order in which they were created **/ private final List verticesInCreationOrder; - + /** All intermediate results that are part of this graph */ private final ConcurrentHashMap intermediateResults; @@ -719,7 +722,7 @@ public void attachJobGraph(List topologiallySorted) throws JobExcepti res.getId(), res, previousDataSet)); } } - + this.verticesInCreationOrder.add(ejv); } } @@ -849,7 +852,16 @@ else if (current != JobStatus.RESTARTING) { this.currentExecutions.clear(); + Collection colGroups = new HashSet<>(); + for (ExecutionJobVertex jv : this.verticesInCreationOrder) { + + CoLocationGroup cgroup = jv.getCoLocationGroup(); + if(cgroup != null && !colGroups.contains(cgroup)){ + cgroup.resetConstraints(); + colGroups.add(cgroup); + } + jv.resetForNewExecution(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 93ae7c1dedc4b..bc368abdaa14f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -362,9 +362,6 @@ public void resetForNewExecution() { if (slotSharingGroup != null) { slotSharingGroup.clearTaskAssignment(); } - if (coLocationGroup != null) { - coLocationGroup.resetConstraints(); - } // reset vertices one by one. if one reset fails, the "vertices in final state" // fields will be consistent to handle triggered cancel calls diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 127ae339b5c08..0c3af8f535873 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -28,7 +28,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; import scala.concurrent.duration.Deadline; @@ -38,7 +40,8 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; @@ -93,6 +96,72 @@ public void testNoManualRestart() throws Exception { assertEquals(JobStatus.FAILED, eg.getState()); } + @Test + public void testConstraintsAfterRestart() throws Exception { + + //setting up + Instance instance = ExecutionGraphTestUtils.getInstance( + new SimpleActorGateway(TestingUtils.directExecutionContext()), + NUM_TASKS); + + Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); + scheduler.newInstanceAvailable(instance); + + JobVertex groupVertex = new JobVertex("Task1"); + groupVertex.setInvokableClass(Tasks.NoOpInvokable.class); + groupVertex.setParallelism(NUM_TASKS); + + JobVertex groupVertex2 = new JobVertex("Task2"); + groupVertex2.setInvokableClass(Tasks.NoOpInvokable.class); + groupVertex2.setParallelism(NUM_TASKS); + + SlotSharingGroup sharingGroup = new SlotSharingGroup(); + groupVertex.setSlotSharingGroup(sharingGroup); + groupVertex2.setSlotSharingGroup(sharingGroup); + groupVertex.setStrictlyCoLocatedWith(groupVertex2); + + //initiate and schedule job + JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, groupVertex2); + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + new JobID(), + "test job", + new Configuration(), + AkkaUtils.getDefaultTimeout()); + eg.setNumberOfRetriesLeft(1); + eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + + assertEquals(JobStatus.CREATED, eg.getState()); + + eg.scheduleForExecution(scheduler); + assertEquals(JobStatus.RUNNING, eg.getState()); + + //sanity checks + validateConstraints(eg); + + //restart automatically + restartAfterFailure(eg, new FiniteDuration(2, TimeUnit.MINUTES), false); + + //checking execution vertex properties + validateConstraints(eg); + + haltExecution(eg); + } + + private void validateConstraints(ExecutionGraph eg) { + + ExecutionJobVertex[] tasks = eg.getAllVertices().values().toArray(new ExecutionJobVertex[2]); + + for(int i=0; i