From 07a9a56c1726b4a7aeb3e682b887d77ab1b0e440 Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Tue, 10 Mar 2015 19:19:27 +0100 Subject: [PATCH] =?UTF-8?q?[FLINK-1683]=20[jobmanager]=C2=A0Fix=20scheduli?= =?UTF-8?q?ng=20preference=20choice=20for=20non-unary=20execution=20tasks.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This closes #476 --- .../executiongraph/ExecutionVertex.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 41b78f897d3ae..794ca21733591 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -47,6 +47,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import static com.google.common.base.Preconditions.checkElementIndex; @@ -348,22 +349,39 @@ public Iterable getPreferredLocations() { return Collections.emptySet(); } else { - HashSet locations = new HashSet(); - + + Set locations = new HashSet(); + Set inputLocations = new HashSet(); + + // go over all inputs for (int i = 0; i < inputEdges.length; i++) { + inputLocations.clear(); ExecutionEdge[] sources = inputEdges[i]; if (sources != null) { + // go over all input sources for (int k = 0; k < sources.length; k++) { + // look-up assigned slot of input source SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource(); if (sourceSlot != null) { - locations.add(sourceSlot.getInstance()); - if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { - return null; + // add input location + inputLocations.add(sourceSlot.getInstance()); + // inputs which have too many distinct sources are not considered + if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { + inputLocations.clear(); + break; } } } } + // keep the locations of the input with the least preferred locations + if(locations.isEmpty() || // nothing assigned yet + (!inputLocations.isEmpty() && inputLocations.size() < locations.size())) { + // current input has fewer preferred locations + locations.clear(); + locations.addAll(inputLocations); + } } + return locations; } }