From f681caa7e869c4b85f7f966bfcbb0ff9522bcda3 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Mon, 20 Jul 2020 16:20:12 +0200 Subject: [PATCH] [FLINK-18533][coordination] Tolerate pending deployments being reported --- .../runtime/executiongraph/Execution.java | 3 +- .../ExecutionDeploymentListener.java | 6 +- .../executiongraph/ExecutionGraphBuilder.java | 2 +- .../executiongraph/ExecutionVertex.java | 14 ++++- .../NoOpExecutionDeploymentListener.java | 37 ++++++++++++ .../DefaultExecutionDeploymentReconciler.java | 21 +++---- .../DefaultExecutionDeploymentTracker.java | 19 ++++++- .../ExecutionDeploymentReconciler.java | 9 ++- .../jobmaster/ExecutionDeploymentState.java | 28 +++++++++ .../jobmaster/ExecutionDeploymentTracker.java | 16 ++++-- ...ymentTrackerDeploymentListenerAdapter.java | 43 ++++++++++++++ .../flink/runtime/jobmaster/JobMaster.java | 7 ++- .../runtime/scheduler/SchedulerBase.java | 3 +- .../TestingExecutionGraphBuilder.java | 2 +- ...aultExecutionDeploymentReconcilerTest.java | 30 ++++++++-- ...DefaultExecutionDeploymentTrackerTest.java | 46 ++++++++++++--- ...ExecutionDeploymentReconciliationTest.java | 57 +++++++++++++++++-- 17 files changed, 294 insertions(+), 49 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/NoOpExecutionDeploymentListener.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentState.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTrackerDeploymentListenerAdapter.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 734d0bc9e61fb..fc3166ade2bff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -747,6 +747,7 @@ public void deploy() throws JobException { final ComponentMainThreadExecutor jobMasterMainThreadExecutor = vertex.getExecutionGraph().getJobMasterMainThreadExecutor(); + getVertex().notifyPendingDeployment(this); // We run the submission in the future executor so that the serialization of large TDDs does not block // the main thread and sync back to the main thread once submission is completed. CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor) @@ -754,7 +755,7 @@ public void deploy() throws JobException { .whenCompleteAsync( (ack, failure) -> { if (failure == null) { - vertex.notifyDeployment(this); + vertex.notifyCompletedDeployment(this); } else { if (failure instanceof TimeoutException) { String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')'; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionDeploymentListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionDeploymentListener.java index 6f8b0477838fc..e9983cde42417 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionDeploymentListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionDeploymentListener.java @@ -20,8 +20,10 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; /** - * A listener that is called when an execution has been deployed. + * A listener that is called when the deployment of an execution has been started/completed. */ public interface ExecutionDeploymentListener { - void onCompletedDeployment(ExecutionAttemptID execution, ResourceID host); + void onStartedDeployment(ExecutionAttemptID execution, ResourceID host); + + void onCompletedDeployment(ExecutionAttemptID execution); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 9ca4754ce09d5..f4c4dd874a6ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -122,7 +122,7 @@ public static ExecutionGraph buildGraph( shuffleMaster, partitionTracker, failoverStrategy, - (execution, host) -> {}, + NoOpExecutionDeploymentListener.get(), (execution, newState) -> {}); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 8407b342cd71b..35023eecafb87 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -874,16 +874,26 @@ void executionFinished(Execution execution) { // Miscellaneous // -------------------------------------------------------------------------------------------- - void notifyDeployment(Execution execution) { + void notifyPendingDeployment(Execution execution) { // only forward this notification if the execution is still the current execution // otherwise we have an outdated execution if (isCurrentExecution(execution)) { - getExecutionGraph().getExecutionDeploymentListener().onCompletedDeployment( + getExecutionGraph().getExecutionDeploymentListener().onStartedDeployment( execution.getAttemptId(), execution.getAssignedResourceLocation().getResourceID()); } } + void notifyCompletedDeployment(Execution execution) { + // only forward this notification if the execution is still the current execution + // otherwise we have an outdated execution + if (isCurrentExecution(execution)) { + getExecutionGraph().getExecutionDeploymentListener().onCompletedDeployment( + execution.getAttemptId() + ); + } + } + /** * Simply forward this notification. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/NoOpExecutionDeploymentListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/NoOpExecutionDeploymentListener.java new file mode 100644 index 0000000000000..101b2fc641dc4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/NoOpExecutionDeploymentListener.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; + +/** + * No-op implementation of {@link ExecutionDeploymentListener}. + */ +public enum NoOpExecutionDeploymentListener implements ExecutionDeploymentListener { + INSTANCE; + + @Override + public void onStartedDeployment(ExecutionAttemptID execution, ResourceID host) {} + + @Override + public void onCompletedDeployment(ExecutionAttemptID execution) {} + + public static ExecutionDeploymentListener get() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentReconciler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentReconciler.java index 9756fca189c40..8a8cbd27279a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentReconciler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentReconciler.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport; import java.util.HashSet; +import java.util.Map; import java.util.Set; /** @@ -37,21 +38,21 @@ public DefaultExecutionDeploymentReconciler(ExecutionDeploymentReconciliationHan } @Override - public void reconcileExecutionDeployments(ResourceID taskExecutorHost, ExecutionDeploymentReport executionDeploymentReport, Set expectedDeployedExecutions) { - final Set unknownExecutions = new HashSet<>(); - final Set expectedExecutions = new HashSet<>(expectedDeployedExecutions); - - for (ExecutionAttemptID executionAttemptID : executionDeploymentReport.getExecutions()) { - boolean isTracked = expectedExecutions.remove(executionAttemptID); - if (!isTracked) { - unknownExecutions.add(executionAttemptID); + public void reconcileExecutionDeployments(ResourceID taskExecutorHost, ExecutionDeploymentReport executionDeploymentReport, Map expectedDeployedExecutions) { + final Set unknownExecutions = new HashSet<>(executionDeploymentReport.getExecutions()); + final Set missingExecutions = new HashSet<>(); + + for (Map.Entry execution : expectedDeployedExecutions.entrySet()) { + boolean deployed = unknownExecutions.remove(execution.getKey()); + if (!deployed && execution.getValue() != ExecutionDeploymentState.PENDING) { + missingExecutions.add(execution.getKey()); } } if (!unknownExecutions.isEmpty()) { handler.onUnknownDeploymentsOf(unknownExecutions, taskExecutorHost); } - if (!expectedExecutions.isEmpty()) { - handler.onMissingDeploymentsOf(expectedExecutions, taskExecutorHost); + if (!missingExecutions.isEmpty()) { + handler.onMissingDeploymentsOf(missingExecutions, taskExecutorHost); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentTracker.java index c985316b2c9de..a586368697d17 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentTracker.java @@ -25,23 +25,32 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Default {@link ExecutionDeploymentTracker} implementation. */ public class DefaultExecutionDeploymentTracker implements ExecutionDeploymentTracker { + private final Set pendingDeployments = new HashSet<>(); private final Map> executionsByHost = new HashMap<>(); private final Map hostByExecution = new HashMap<>(); @Override - public void startTrackingDeploymentOf(ExecutionAttemptID executionAttemptId, ResourceID host) { + public void startTrackingPendingDeploymentOf(ExecutionAttemptID executionAttemptId, ResourceID host) { + pendingDeployments.add(executionAttemptId); hostByExecution.put(executionAttemptId, host); executionsByHost.computeIfAbsent(host, ignored -> new HashSet<>()).add(executionAttemptId); } + @Override + public void completeDeploymentOf(ExecutionAttemptID executionAttemptId) { + pendingDeployments.remove(executionAttemptId); + } + @Override public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId) { + pendingDeployments.remove(executionAttemptId); ResourceID host = hostByExecution.remove(executionAttemptId); if (host != null) { executionsByHost.computeIfPresent(host, (resourceID, executionAttemptIds) -> { @@ -55,7 +64,11 @@ public void stopTrackingDeploymentOf(ExecutionAttemptID executionAttemptId) { } @Override - public Set getExecutionsOn(ResourceID host) { - return executionsByHost.getOrDefault(host, Collections.emptySet()); + public Map getExecutionsOn(ResourceID host) { + return executionsByHost.getOrDefault(host, Collections.emptySet()) + .stream() + .collect(Collectors.toMap( + x -> x, + x -> pendingDeployments.contains(x) ? ExecutionDeploymentState.PENDING : ExecutionDeploymentState.DEPLOYED)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconciler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconciler.java index 4372fb8b810d2..5219908bb594b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconciler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentReconciler.java @@ -21,7 +21,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport; -import java.util.Set; +import java.util.Map; /** * Component for reconciling the deployment state of executions. @@ -40,8 +40,11 @@ interface Factory { * * @param taskExecutorHost hosting task executor * @param executionDeploymentReport task executor report for deployed executions - * @param expectedDeployedExecutionIds set of expected deployed executions + * @param expectedDeployedExecutionIds map of expected executions and their current deployment status */ - void reconcileExecutionDeployments(ResourceID taskExecutorHost, ExecutionDeploymentReport executionDeploymentReport, Set expectedDeployedExecutionIds); + void reconcileExecutionDeployments( + ResourceID taskExecutorHost, + ExecutionDeploymentReport executionDeploymentReport, + Map expectedDeployedExecutionIds); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentState.java new file mode 100644 index 0000000000000..6b94f4f45b55f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentState.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +/** + * Possible states for the deployment of an execution. + */ +public enum ExecutionDeploymentState { + /** The deployment has or is about to be started. */ + PENDING, + /** The deployment has been acknowledged by the TaskExecutor. */ + DEPLOYED +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTracker.java index 5cadf010a2f4d..181ca4a4c8946 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTracker.java @@ -20,7 +20,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import java.util.Set; +import java.util.Map; /** * A tracker for deployed executions. @@ -28,12 +28,19 @@ public interface ExecutionDeploymentTracker { /** - * Starts tracking the given execution that was deployed on the given host. + * Starts tracking the given execution that is being deployed on the given host. * * @param executionAttemptId execution to start tracking * @param host hosting task executor */ - void startTrackingDeploymentOf(ExecutionAttemptID executionAttemptId, ResourceID host); + void startTrackingPendingDeploymentOf(ExecutionAttemptID executionAttemptId, ResourceID host); + + /** + * Marks the deployment of the given execution as complete. + * + * @param executionAttemptId execution whose deployment to mark as complete + */ + void completeDeploymentOf(ExecutionAttemptID executionAttemptId); /** * Stops tracking the given execution. @@ -48,6 +55,5 @@ public interface ExecutionDeploymentTracker { * @param host hosting task executor * @return tracked executions */ - Set getExecutionsOn(ResourceID host); - + Map getExecutionsOn(ResourceID host); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTrackerDeploymentListenerAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTrackerDeploymentListenerAdapter.java new file mode 100644 index 0000000000000..da0aa9fbc9521 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ExecutionDeploymentTrackerDeploymentListenerAdapter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener; + +/** + * An adapter for using an {@link ExecutionDeploymentTracker} as an {@link ExecutionDeploymentListener}. + */ +public class ExecutionDeploymentTrackerDeploymentListenerAdapter implements ExecutionDeploymentListener { + private final ExecutionDeploymentTracker executionDeploymentTracker; + + public ExecutionDeploymentTrackerDeploymentListenerAdapter(ExecutionDeploymentTracker executionDeploymentTracker) { + this.executionDeploymentTracker = executionDeploymentTracker; + } + + @Override + public void onStartedDeployment(ExecutionAttemptID execution, ResourceID host) { + executionDeploymentTracker.startTrackingPendingDeploymentOf(execution, host); + } + + @Override + public void onCompletedDeployment(ExecutionAttemptID execution) { + executionDeploymentTracker.completeDeploymentOf(execution); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 7e54f3a5dec3c..d29a0d03d2ae6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -254,7 +254,7 @@ public void onUnknownDeploymentsOf(Collection executionAttem for (ExecutionAttemptID executionAttemptId : executionAttemptIds) { Tuple2 taskManagerInfo = registeredTaskManagers.get(host); if (taskManagerInfo != null) { - //taskManagerInfo.f1.cancelTask(executionAttemptId, rpcTimeout); + taskManagerInfo.f1.cancelTask(executionAttemptId, rpcTimeout); } } } @@ -1234,7 +1234,10 @@ public void notifyHeartbeatTimeout(ResourceID resourceID) { @Override public void reportPayload(ResourceID resourceID, TaskExecutorToJobManagerHeartbeatPayload payload) { validateRunsInMainThread(); - executionDeploymentReconciler.reconcileExecutionDeployments(resourceID, payload.getExecutionDeploymentReport(), executionDeploymentTracker.getExecutionsOn(resourceID)); + executionDeploymentReconciler.reconcileExecutionDeployments( + resourceID, + payload.getExecutionDeploymentReport(), + executionDeploymentTracker.getExecutionsOn(resourceID)); for (AccumulatorSnapshot snapshot : payload.getAccumulatorReport().getAccumulatorSnapshots()) { schedulerNG.updateAccumulators(snapshot); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 8537e57c701a2..92734e5940122 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -72,6 +72,7 @@ import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; +import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter; import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; @@ -272,7 +273,7 @@ private ExecutionGraph createExecutionGraph( final JobMasterPartitionTracker partitionTracker, ExecutionDeploymentTracker executionDeploymentTracker) throws JobExecutionException, JobException { - ExecutionDeploymentListener executionDeploymentListener = executionDeploymentTracker::startTrackingDeploymentOf; + ExecutionDeploymentListener executionDeploymentListener = new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker); ExecutionStateUpdateListener executionStateUpdateListener = (execution, newState) -> { if (newState.isTerminal()) { executionDeploymentTracker.stopTrackingDeploymentOf(execution); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java index 4112ce544395c..4740d1efe395b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingExecutionGraphBuilder.java @@ -75,7 +75,7 @@ public static TestingExecutionGraphBuilder newBuilder() { private JobGraph jobGraph = new JobGraph(); private MetricGroup metricGroup = new UnregisteredMetricsGroup(); private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory(); - private ExecutionDeploymentListener executionDeploymentListener = (execution, host) -> {}; + private ExecutionDeploymentListener executionDeploymentListener = NoOpExecutionDeploymentListener.get(); private ExecutionStateUpdateListener executionStateUpdateListener = (execution, newState) -> {}; private TestingExecutionGraphBuilder() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentReconcilerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentReconcilerTest.java index 8a1cac9807a9b..3dc13cacc2c02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentReconcilerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentReconcilerTest.java @@ -29,6 +29,8 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.runtime.clusterframework.types.ResourceID.generate; import static org.hamcrest.Matchers.empty; @@ -52,7 +54,7 @@ public void testMatchingDeployments() { reconciler.reconcileExecutionDeployments( resourceId, new ExecutionDeploymentReport(Collections.singleton(attemptId)), - Collections.singleton(attemptId)); + Collections.singletonMap(attemptId, ExecutionDeploymentState.DEPLOYED)); assertThat(handler.getMissingExecutions(), empty()); assertThat(handler.getUnknownExecutions(), empty()); @@ -70,7 +72,7 @@ public void testMissingDeployments() { reconciler.reconcileExecutionDeployments( resourceId, new ExecutionDeploymentReport(Collections.emptySet()), - Collections.singleton(attemptId)); + Collections.singletonMap(attemptId, ExecutionDeploymentState.DEPLOYED)); assertThat(handler.getUnknownExecutions(), empty()); assertThat(handler.getMissingExecutions(), hasItem(attemptId)); @@ -88,7 +90,7 @@ public void testUnknownDeployments() { reconciler.reconcileExecutionDeployments( resourceId, new ExecutionDeploymentReport(Collections.singleton(attemptId)), - Collections.emptySet()); + Collections.emptyMap()); assertThat(handler.getMissingExecutions(), empty()); assertThat(handler.getUnknownExecutions(), hasItem(attemptId)); @@ -108,12 +110,32 @@ public void testMissingAndUnknownDeployments() { reconciler.reconcileExecutionDeployments( resourceId, new ExecutionDeploymentReport(new HashSet<>(Arrays.asList(unknownId, matchingId))), - new HashSet<>(Arrays.asList(missingId, matchingId))); + Stream.of(missingId, matchingId).collect(Collectors.toMap(x -> x, x -> ExecutionDeploymentState.DEPLOYED))); assertThat(handler.getMissingExecutions(), hasItem(missingId)); assertThat(handler.getUnknownExecutions(), hasItem(unknownId)); } + @Test + public void testPendingDeployments() { + TestingExecutionDeploymentReconciliationHandler handler = new TestingExecutionDeploymentReconciliationHandler(); + + DefaultExecutionDeploymentReconciler reconciler = new DefaultExecutionDeploymentReconciler(handler); + + ResourceID resourceId = generate(); + ExecutionAttemptID matchingId = new ExecutionAttemptID(); + ExecutionAttemptID unknownId = new ExecutionAttemptID(); + ExecutionAttemptID missingId = new ExecutionAttemptID(); + + reconciler.reconcileExecutionDeployments( + resourceId, + new ExecutionDeploymentReport(new HashSet<>(Arrays.asList(matchingId, unknownId))), + Stream.of(matchingId, missingId).collect(Collectors.toMap(x -> x, x -> ExecutionDeploymentState.PENDING))); + + assertThat(handler.getMissingExecutions(), empty()); + assertThat(handler.getUnknownExecutions(), hasItem(unknownId)); + } + private static class TestingExecutionDeploymentReconciliationHandler implements ExecutionDeploymentReconciliationHandler { private final Collection missingExecutions = new ArrayList<>(); private final Collection unknownExecutions = new ArrayList<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentTrackerTest.java index 17243777f370c..f6065fc27b86f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultExecutionDeploymentTrackerTest.java @@ -25,8 +25,9 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.core.IsCollectionContaining.hasItems; import static org.junit.Assert.assertThat; /** @@ -40,22 +41,41 @@ public void testStartTracking() { final ExecutionAttemptID attemptId1 = new ExecutionAttemptID(); final ResourceID resourceId1 = ResourceID.generate(); - tracker.startTrackingDeploymentOf(attemptId1, resourceId1); + tracker.startTrackingPendingDeploymentOf(attemptId1, resourceId1); - assertThat(tracker.getExecutionsOn(resourceId1), hasItems(attemptId1)); + assertThat(tracker.getExecutionsOn(resourceId1), hasEntry(attemptId1, ExecutionDeploymentState.PENDING)); + + tracker.completeDeploymentOf(attemptId1); + + assertThat(tracker.getExecutionsOn(resourceId1), hasEntry(attemptId1, ExecutionDeploymentState.DEPLOYED)); + } + + @Test + public void testStopTrackingCompletedDeployment() { + final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker(); + + final ExecutionAttemptID attemptId1 = new ExecutionAttemptID(); + final ResourceID resourceId1 = ResourceID.generate(); + tracker.startTrackingPendingDeploymentOf(attemptId1, resourceId1); + + tracker.completeDeploymentOf(attemptId1); + + tracker.stopTrackingDeploymentOf(attemptId1); + + assertThat(tracker.getExecutionsOn(resourceId1).entrySet(), empty()); } @Test - public void testStopTracking() { + public void testStopTrackingPendingDeployment() { final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker(); final ExecutionAttemptID attemptId1 = new ExecutionAttemptID(); final ResourceID resourceId1 = ResourceID.generate(); - tracker.startTrackingDeploymentOf(attemptId1, resourceId1); + tracker.startTrackingPendingDeploymentOf(attemptId1, resourceId1); tracker.stopTrackingDeploymentOf(attemptId1); - assertThat(tracker.getExecutionsOn(resourceId1), empty()); + assertThat(tracker.getExecutionsOn(resourceId1).entrySet(), empty()); } @Test @@ -64,11 +84,19 @@ public void testStopTrackingDoesNotAffectOtherIds() { final ExecutionAttemptID attemptId1 = new ExecutionAttemptID(); final ResourceID resourceId1 = ResourceID.generate(); - tracker.startTrackingDeploymentOf(attemptId1, resourceId1); + tracker.startTrackingPendingDeploymentOf(attemptId1, resourceId1); + tracker.completeDeploymentOf(attemptId1); tracker.stopTrackingDeploymentOf(new ExecutionAttemptID()); - assertThat(tracker.getExecutionsOn(resourceId1), hasItems(attemptId1)); + assertThat(tracker.getExecutionsOn(resourceId1), hasKey(attemptId1)); + } + + @Test + public void testCompleteDeploymentUnknownExecutionDoesNotThrowException() { + final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker(); + + tracker.completeDeploymentOf(new ExecutionAttemptID()); } @Test @@ -83,6 +111,6 @@ public void testStopTrackingUnknownExecutionDoesNotThrowException() { public void testGetExecutionsReturnsEmptySetForUnknownHost() { final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker(); - assertThat(tracker.getExecutionsOn(ResourceID.generate()), allOf(notNullValue(), empty())); + assertThat(tracker.getExecutionsOn(ResourceID.generate()).entrySet(), allOf(notNullValue(), empty())); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java index b9fddacea487f..0260df3115b5e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterExecutionDeploymentReconciliationTest.java @@ -21,7 +21,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; @@ -48,7 +47,6 @@ import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -88,8 +86,10 @@ public void setup() { haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); } + /** + * Tests how the job master handles unknown/missing executions. + */ @Test - @Ignore public void testExecutionDeploymentReconciliation() throws Exception { JobMasterBuilder.TestingOnCompletionActions onCompletionActions = new JobMasterBuilder.TestingOnCompletionActions(); @@ -122,11 +122,50 @@ public void testExecutionDeploymentReconciliation() throws Exception { assertThat(onCompletionActions.getJobReachedGloballyTerminalStateFuture().get().getState(), is(JobStatus.FAILED)); } + /** + * Tests that the job master does not issue a cancel call if the heartbeat reports an execution for which the + * deployment was not yet acknowledged. + */ + @Test + public void testExecutionDeploymentReconciliationForPendingExecution() throws Exception { + final CompletableFuture taskSubmissionFuture = new CompletableFuture<>(); + final CompletableFuture taskDeploymentAcknowledgedFuture = new CompletableFuture<>(); + JobMaster jobMaster = createAndStartJobMaster(taskDeploymentAcknowledgedFuture); + JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); + + final AllocationIdsExposingResourceManagerGateway resourceManagerGateway = createResourceManagerGateway(); + + final CompletableFuture taskCancellationFuture = new CompletableFuture<>(); + final CompletableFuture taskSubmissionAcknowledgeFuture = new CompletableFuture<>(); + TaskExecutorGateway taskExecutorGateway = createTaskExecutorGateway(taskCancellationFuture, taskSubmissionFuture, taskSubmissionAcknowledgeFuture); + LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation(); + + registerTaskExecutorAndOfferSlots(resourceManagerGateway, jobMasterGateway, taskExecutorGateway, localUnresolvedTaskManagerLocation); + + ExecutionAttemptID pendingExecutionId = taskSubmissionFuture.get(); + + // the execution has not been acknowledged yet by the TaskExecutor, but we already allow the ID to be in the heartbeat payload + jobMasterGateway.heartbeatFromTaskManager(localUnresolvedTaskManagerLocation.getResourceID(), new TaskExecutorToJobManagerHeartbeatPayload( + new AccumulatorReport(Collections.emptyList()), + new ExecutionDeploymentReport(Collections.singleton(pendingExecutionId)) + )); + + taskSubmissionAcknowledgeFuture.complete(Acknowledge.get()); + + taskDeploymentAcknowledgedFuture.get(); + assertFalse(taskCancellationFuture.isDone()); + } + + private JobMaster createAndStartJobMaster(CompletableFuture taskDeploymentFuture) throws Exception { + return createAndStartJobMaster(new JobMasterBuilder.TestingOnCompletionActions(), taskDeploymentFuture); + } + private JobMaster createAndStartJobMaster(OnCompletionActions onCompletionActions, CompletableFuture taskDeploymentFuture) throws Exception { ExecutionDeploymentTracker executionDeploymentTracker = new DefaultExecutionDeploymentTracker() { @Override - public void startTrackingDeploymentOf(ExecutionAttemptID executionAttemptId, ResourceID host) { - super.startTrackingDeploymentOf(executionAttemptId, host); + public void completeDeploymentOf(ExecutionAttemptID executionAttemptId) { + super.completeDeploymentOf(executionAttemptId); taskDeploymentFuture.complete(null); } }; @@ -152,12 +191,20 @@ private AllocationIdsExposingResourceManagerGateway createResourceManagerGateway } private TaskExecutorGateway createTaskExecutorGateway(CompletableFuture taskCancellationFuture) { + return createTaskExecutorGateway(taskCancellationFuture, new CompletableFuture<>(), CompletableFuture.completedFuture(Acknowledge.get())); + } + + private TaskExecutorGateway createTaskExecutorGateway(CompletableFuture taskCancellationFuture, CompletableFuture taskSubmissionFuture, CompletableFuture taskSubmissionResponse) { TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() .setAddress(UUID.randomUUID().toString()) .setCancelTaskFunction(executionAttemptId -> { taskCancellationFuture.complete(executionAttemptId); return CompletableFuture.completedFuture(Acknowledge.get()); }) + .setSubmitTaskConsumer((tdd, ignored) -> { + taskSubmissionFuture.complete(tdd.getExecutionAttemptId()); + return taskSubmissionResponse; + }) .createTestingTaskExecutorGateway(); RPC_SERVICE_RESOURCE.getTestingRpcService().registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);