From 60738d19aa4ed13a29141a5d446074203d815809 Mon Sep 17 00:00:00 2001 From: hzyuqi1 Date: Mon, 9 Apr 2018 21:39:17 +0800 Subject: [PATCH] [FLINK-9056] [jobmanager] Job submission fails with AskTimeoutException if not enough slots are available --- .../runtime/executiongraph/Execution.java | 9 +++- .../ExecutionVertexSchedulingTest.java | 41 +++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 64e602f4fbfe2..83e64b1ea8f65 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; @@ -409,7 +410,13 @@ public CompletableFuture scheduleForExecution( // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so // that we directly deploy the tasks if the slot allocation future is completed. This is // necessary for immediate deployment. - final CompletableFuture deploymentFuture = allocationFuture.handle( + final CompletableFuture deploymentFuture = allocationFuture.exceptionally((Throwable t) -> { + if (t.getCause() instanceof TimeoutException) { + throw new CompletionException(new NoResourceAvailableException("Can't allocated resource for " + + this.vertex + ", possibly there is no more slot available...")); + } + throw new CompletionException(t.getCause()); + }).handle( (Execution ignored, Throwable throwable) -> { if (throwable != null) { markFailed(ExceptionUtils.stripCompletionException(throwable)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index 51d1827e66834..379e931136ca0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; @@ -37,6 +38,8 @@ import org.junit.Test; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; @@ -146,4 +149,42 @@ public void testScheduleToDeploying() { fail(e.getMessage()); } } + + + @Test + public void testScheduleToDeployingAndTestTimeOutException() { + try { + final ExecutionJobVertex ejv = getExecutionVertex(new JobVertexID()); + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); + + final Instance instance = getInstance(new ActorTaskManagerGateway( + new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext()))); + final SimpleSlot slot = instance.allocateSimpleSlot(); + + Scheduler scheduler = mock(Scheduler.class); + CompletableFuture future = new CompletableFuture<>(); + future.complete(slot); + when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenReturn(future); + + assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); + + //throw TimeOutException intentially + + when(scheduler.allocateSlot(any(SlotRequestId.class), any(ScheduledUnit.class), anyBoolean(), any(SlotProfile.class), any(Time.class))).thenThrow(new CompletionException(new TimeoutException())); + + vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL); + assertEquals(ExecutionState.FAILED, vertex.getExecutionState()); + + try { + future.get(); + } catch (Throwable e) { + assertTrue(e.getCause() instanceof NoResourceAvailableException); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } }