From e0a4ee07084bc6ab56a20fbc4a18863462da93eb Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Sun, 7 Dec 2014 22:28:22 +0100 Subject: [PATCH] [FLINK-1287] LocalizableSplitAssigner prefers splits with less degrees of freedom This closes #258 --- .../io/LocatableInputSplitAssigner.java | 343 +++++++++++++----- .../core/io/LocatableSplitAssignerTest.java | 178 ++++++++- 2 files changed, 430 insertions(+), 91 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java index 6fbde4924763e..92fbdcaded349 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java @@ -18,12 +18,9 @@ package org.apache.flink.api.common.io; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; -import java.util.List; +import java.util.LinkedList; import java.util.Locale; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -36,89 +33,111 @@ /** * The locatable input split assigner assigns to each host splits that are local, before assigning - * splits that are not local. + * splits that are not local. */ public final class LocatableInputSplitAssigner implements InputSplitAssigner { private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class); + // unassigned input splits + private final Set unassigned = new HashSet(); + + // input splits indexed by host for local assignment + private final ConcurrentHashMap localPerHost = new ConcurrentHashMap(); + + // unassigned splits for remote assignment + private final LocatableInputSplitChooser remoteSplitChooser; - private final Set unassigned = new HashSet(); - - private final ConcurrentHashMap> localPerHost = new ConcurrentHashMap>(); - private int localAssignments; // lock protected by the unassigned set lock - + private int remoteAssignments; // lock protected by the unassigned set lock // -------------------------------------------------------------------------------------------- - + public LocatableInputSplitAssigner(Collection splits) { - this.unassigned.addAll(splits); + for(LocatableInputSplit split : splits) { + this.unassigned.add(new LocatableInputSplitWithCount(split)); + } + this.remoteSplitChooser = new LocatableInputSplitChooser(unassigned); } - + public LocatableInputSplitAssigner(LocatableInputSplit[] splits) { - Collections.addAll(this.unassigned, splits); + for(LocatableInputSplit split : splits) { + this.unassigned.add(new LocatableInputSplitWithCount(split)); + } + this.remoteSplitChooser = new LocatableInputSplitChooser(unassigned); } - + // -------------------------------------------------------------------------------------------- @Override public LocatableInputSplit getNextInputSplit(String host) { - // for a null host, we return an arbitrary split + + // for a null host, we return a remote split if (host == null) { - - synchronized (this.unassigned) { - Iterator iter = this.unassigned.iterator(); - if (iter.hasNext()) { - LocatableInputSplit next = iter.next(); - iter.remove(); - - if (LOG.isInfoEnabled()) { - LOG.info("Assigning split to null host (random assignment)."); - } - - remoteAssignments++; - return next; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("No more unassigned input splits remaining."); + synchronized (this.remoteSplitChooser) { + synchronized (this.unassigned) { + + LocatableInputSplitWithCount split = this.remoteSplitChooser.getNextUnassignedMinLocalCountSplit(this.unassigned); + + if (split != null) { + // got a split to assign. Double check that it hasn't been assigned before. + if (this.unassigned.remove(split)) { + if (LOG.isInfoEnabled()) { + LOG.info("Assigning split to null host (random assignment)."); + } + + remoteAssignments++; + return split.getSplit(); + } else { + throw new IllegalStateException("Chosen InputSplit has already been assigned. This should not happen!"); + } + } else { + // all splits consumed + if (LOG.isDebugEnabled()) { + LOG.debug("No more unassigned input splits remaining."); + } + return null; } - return null; } } } - + host = host.toLowerCase(Locale.US); - + // for any non-null host, we take the list of non-null splits - List localSplits = this.localPerHost.get(host); - + LocatableInputSplitChooser localSplits = this.localPerHost.get(host); + // if we have no list for this host yet, create one if (localSplits == null) { - localSplits = new ArrayList(16); - + localSplits = new LocatableInputSplitChooser(); + // lock the list, to be sure that others have to wait for that host's local list synchronized (localSplits) { - List prior = this.localPerHost.putIfAbsent(host, localSplits); - + LocatableInputSplitChooser prior = this.localPerHost.putIfAbsent(host, localSplits); + // if someone else beat us in the case to create this list, then we do not populate this one, but // simply work with that other list if (prior == null) { // we are the first, we populate - + // first, copy the remaining splits to release the lock on the set early // because that is shared among threads - LocatableInputSplit[] remaining; + LocatableInputSplitWithCount[] remaining; synchronized (this.unassigned) { - remaining = (LocatableInputSplit[]) this.unassigned.toArray(new LocatableInputSplit[this.unassigned.size()]); + remaining = this.unassigned.toArray(new LocatableInputSplitWithCount[this.unassigned.size()]); } - - for (LocatableInputSplit is : remaining) { - if (isLocal(host, is.getHostnames())) { - localSplits.add(is); + + for (LocatableInputSplitWithCount isw : remaining) { + if (isLocal(host, isw.getSplit().getHostnames())) { + // Split is local on host. + // Increment local count + isw.incrementLocalCount(); + // and add to local split list + localSplits.addInputSplit(isw); } } + } else { // someone else was faster @@ -126,55 +145,61 @@ public LocatableInputSplit getNextInputSplit(String host) { } } } - - + + // at this point, we have a list of local splits (possibly empty) // we need to make sure no one else operates in the current list (that protects against // list creation races) and that the unassigned set is consistent // NOTE: we need to obtain the locks in this order, strictly!!! synchronized (localSplits) { - int size = localSplits.size(); - if (size > 0) { - synchronized (this.unassigned) { - do { - --size; - LocatableInputSplit split = localSplits.remove(size); - if (this.unassigned.remove(split)) { - - if (LOG.isInfoEnabled()) { - LOG.info("Assigning local split to host " + host); - } - - localAssignments++; - return split; + synchronized (this.unassigned) { + + LocatableInputSplitWithCount split = localSplits.getNextUnassignedMinLocalCountSplit(this.unassigned); + + if (split != null) { + // found a valid split. Double check that it hasn't been assigned before. + if (this.unassigned.remove(split)) { + if (LOG.isInfoEnabled()) { + LOG.info("Assigning local split to host " + host); } - } while (size > 0); + + localAssignments++; + return split.getSplit(); + } else { + throw new IllegalStateException("Chosen InputSplit has already been assigned. This should not happen!"); + } } } } - - // we did not find a local split, return any - synchronized (this.unassigned) { - Iterator iter = this.unassigned.iterator(); - if (iter.hasNext()) { - LocatableInputSplit next = iter.next(); - iter.remove(); - - if (LOG.isInfoEnabled()) { - LOG.info("Assigning remote split to host " + host); - } - - remoteAssignments++; - return next; - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("No more input splits remaining."); + + // we did not find a local split, return a remote split + synchronized (this.remoteSplitChooser) { + synchronized (this.unassigned) { + LocatableInputSplitWithCount split = this.remoteSplitChooser.getNextUnassignedMinLocalCountSplit(this.unassigned); + + if (split != null) { + // found a valid split. Double check that it hasn't been assigned yet. + if (this.unassigned.remove(split)) { + if (LOG.isInfoEnabled()) { + LOG.info("Assigning remote split to host " + host); + } + + remoteAssignments++; + return split.getSplit(); + } else { + throw new IllegalStateException("Chosen InputSplit has already been assigned. This should not happen!"); + } + } else { + // all splits consumed + if (LOG.isDebugEnabled()) { + LOG.debug("No more input splits remaining."); + } + return null; } - return null; } } } - + private static final boolean isLocal(String flinkHost, String[] hosts) { if (flinkHost == null || hosts == null) { return false; @@ -184,15 +209,159 @@ private static final boolean isLocal(String flinkHost, String[] hosts) { return true; } } - + return false; } - + public int getNumberOfLocalAssignments() { return localAssignments; } - + public int getNumberOfRemoteAssignments() { return remoteAssignments; } + + /** + * Wraps a LocatableInputSplit and adds a count for the number of observed hosts + * that can access the split locally. + */ + private static class LocatableInputSplitWithCount { + + private final LocatableInputSplit split; + private int localCount; + + public LocatableInputSplitWithCount(LocatableInputSplit split) { + this.split = split; + this.localCount = 0; + } + + public void incrementLocalCount() { + this.localCount++; + } + + public int getLocalCount() { + return this.localCount; + } + + public LocatableInputSplit getSplit() { + return this.split; + } + + } + + /** + * Holds a list of LocatableInputSplits and returns the split with the lowest local count. + * The rational is that splits which are local on few hosts should be preferred over others which + * have more degrees of freedom for local assignment. + * + * Internally, the splits are stored in a linked list. Sorting the list is not a good solution, + * as local counts are updated whenever a previously unseen host requests a split. + * Instead, we track the minimum local count and iteratively look for splits with that minimum count. + */ + private static class LocatableInputSplitChooser { + + // list of input splits + private final LinkedList splits; + + // the current minimum local count. We look for splits with this local count. + private int minLocalCount = -1; + // the second smallest count observed so far. + private int nextMinLocalCount = -1; + // number of elements we need to inspect for the minimum local count. + private int elementCycleCount = 0; + + public LocatableInputSplitChooser() { + this.splits = new LinkedList(); + } + + public LocatableInputSplitChooser(Collection splits) { + this.splits = new LinkedList(); + for(LocatableInputSplitWithCount isw : splits) { + this.addInputSplit(isw); + } + } + + /** + * Adds a single input split + * + * @param split The input split to add + */ + public void addInputSplit(LocatableInputSplitWithCount split) { + int localCount = split.getLocalCount(); + + if (minLocalCount == -1) { + // first split to add + this.minLocalCount = localCount; + this.elementCycleCount = 1; + this.splits.offerFirst(split); + } else if (localCount < minLocalCount) { + // split with new min local count + this.nextMinLocalCount = this.minLocalCount; + this.minLocalCount = localCount; + // all other splits have more local host than this one + this.elementCycleCount = 1; + splits.offerFirst(split); + } else if (localCount == minLocalCount ) { + this.elementCycleCount++; + this.splits.offerFirst(split); + } else { + if (localCount < nextMinLocalCount) { + nextMinLocalCount = localCount; + } + splits.offerLast(split); + } + } + + /** + * Retrieves a LocatableInputSplit with minimum local count. + * InputSplits which have already been assigned (i.e., which are not contained in the provided set) are filtered out. + * The returned input split is NOT removed from the provided set. + * + * @param unassignedSplits Set of unassigned input splits. + * @return An input split with minimum local count or null if all splits have been assigned. + */ + public LocatableInputSplitWithCount getNextUnassignedMinLocalCountSplit(Set unassignedSplits) { + + if(splits.size() == 0) { + return null; + } + + do { + elementCycleCount--; + // take first split of the list + LocatableInputSplitWithCount split = splits.pollFirst(); + if (unassignedSplits.contains(split)) { + int localCount = split.getLocalCount(); + // still unassigned, check local count + if (localCount > minLocalCount) { + // re-insert at end of the list and continue to look for split with smaller local count + splits.offerLast(split); + // check and update second smallest local count + if (nextMinLocalCount == -1 || split.getLocalCount() < nextMinLocalCount) { + nextMinLocalCount = split.getLocalCount(); + } + split = null; + } + } else { + // split was already assigned + split = null; + } + if(elementCycleCount == 0) { + // one full cycle, but no split with min local count found + // update minLocalCnt and element cycle count for next pass over the splits + minLocalCount = nextMinLocalCount; + nextMinLocalCount = -1; + elementCycleCount = splits.size(); + } + if (split != null) { + // found a split to assign + return split; + } + } while (elementCycleCount > 0); + + // no split left + return null; + } + + } } diff --git a/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java index a962129648ed2..7edad43cc3f45 100644 --- a/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java @@ -20,7 +20,10 @@ import static org.junit.Assert.*; +import java.util.Arrays; +import java.util.Calendar; import java.util.HashSet; +import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -131,18 +134,141 @@ public void testSerialSplitAssignmentAllForRemoteHost() { } } + @Test + public void testSerialSplitAssignmentSomeForRemoteHost() { + try { + + // host1 reads all local + // host2 reads 10 local and 10 remote + // host3 reads all remote + final String[] hosts = { "host1", "host2", "host3" }; + final int NUM_LOCAL_HOST1_SPLITS = 20; + final int NUM_LOCAL_HOST2_SPLITS = 10; + final int NUM_REMOTE_SPLITS = 30; + final int NUM_LOCAL_SPLITS = NUM_LOCAL_HOST1_SPLITS + NUM_LOCAL_HOST2_SPLITS; + + // load local splits + int splitCnt = 0; + Set splits = new HashSet(); + // host1 splits + for (int i = 0; i < NUM_LOCAL_HOST1_SPLITS; i++) { + splits.add(new LocatableInputSplit(splitCnt++, "host1")); + } + // host2 splits + for (int i = 0; i < NUM_LOCAL_HOST2_SPLITS; i++) { + splits.add(new LocatableInputSplit(splitCnt++, "host2")); + } + // load remote splits + for (int i = 0; i < NUM_REMOTE_SPLITS; i++) { + splits.add(new LocatableInputSplit(splitCnt++, "remoteHost")); + } + + // get all available splits + LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); + InputSplit is = null; + int i = 0; + while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) { + assertTrue(splits.remove(is)); + } + + // check we had all + assertTrue(splits.isEmpty()); + assertNull(ia.getNextInputSplit("anotherHost")); + + assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments()); + assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSerialSplitAssignmentMultiLocalHost() { + try { + + final String[] localHosts = { "local1", "local2", "local3" }; + final String[] remoteHosts = { "remote1", "remote2", "remote3" }; + final String[] requestingHosts = { "local3", "local2", "local1", "other" }; + + final int NUM_THREE_LOCAL_SPLITS = 10; + final int NUM_TWO_LOCAL_SPLITS = 10; + final int NUM_ONE_LOCAL_SPLITS = 10; + final int NUM_LOCAL_SPLITS = 30; + final int NUM_REMOTE_SPLITS = 10; + final int NUM_SPLITS = 40; + + String[] threeLocalHosts = localHosts; + String[] twoLocalHosts = {localHosts[0], localHosts[1], remoteHosts[0]}; + String[] oneLocalHost = {localHosts[0], remoteHosts[0], remoteHosts[1]}; + String[] noLocalHost = remoteHosts; + + int splitCnt = 0; + Set splits = new HashSet(); + // add splits with three local hosts + for (int i = 0; i < NUM_THREE_LOCAL_SPLITS; i++) { + splits.add(new LocatableInputSplit(splitCnt++, threeLocalHosts)); + } + // add splits with two local hosts + for (int i = 0; i < NUM_TWO_LOCAL_SPLITS; i++) { + splits.add(new LocatableInputSplit(splitCnt++, twoLocalHosts)); + } + // add splits with two local hosts + for (int i = 0; i < NUM_ONE_LOCAL_SPLITS; i++) { + splits.add(new LocatableInputSplit(splitCnt++, oneLocalHost)); + } + // add splits with two local hosts + for (int i = 0; i < NUM_REMOTE_SPLITS; i++) { + splits.add(new LocatableInputSplit(splitCnt++, noLocalHost)); + } + + // get all available splits + LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); + LocatableInputSplit is = null; + for (int i = 0; i < NUM_SPLITS; i++) { + String host = requestingHosts[i % requestingHosts.length]; + is = ia.getNextInputSplit(host); + // check valid split + assertTrue(is != null); + // check unassigned split + assertTrue(splits.remove(is)); + // check priority of split + if (host.equals(localHosts[0])) { + assertTrue(Arrays.equals(is.getHostnames(), oneLocalHost)); + } else if (host.equals(localHosts[1])) { + assertTrue(Arrays.equals(is.getHostnames(), twoLocalHosts)); + } else if (host.equals(localHosts[2])) { + assertTrue(Arrays.equals(is.getHostnames(), threeLocalHosts)); + } else { + assertTrue(Arrays.equals(is.getHostnames(), noLocalHost)); + } + } + // check we had all + assertTrue(splits.isEmpty()); + assertNull(ia.getNextInputSplit("anotherHost")); + + assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments()); + assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + @Test public void testSerialSplitAssignmentMixedLocalHost() { try { final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" }; final int NUM_SPLITS = 10 * hosts.length; - + // load some splits Set splits = new HashSet(); for (int i = 0; i < NUM_SPLITS; i++) { splits.add(new LocatableInputSplit(i, hosts[i % hosts.length])); } - + // get all available splits LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); InputSplit is = null; @@ -150,11 +276,11 @@ public void testSerialSplitAssignmentMixedLocalHost() { while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) { assertTrue(splits.remove(is)); } - + // check we had all assertTrue(splits.isEmpty()); assertNull(ia.getNextInputSplit("anotherHost")); - + assertEquals(0, ia.getNumberOfRemoteAssignments()); assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments()); } @@ -380,4 +506,48 @@ public void run() { fail(e.getMessage()); } } + + @Test + public void testAssignmentOfManySplitsRandomly() { + + long seed = Calendar.getInstance().getTimeInMillis(); + + final int NUM_SPLITS = 65536; + final String[] splitHosts = new String[256]; + final String[] requestingHosts = new String[256]; + final Random rand = new Random(seed); + + for (int i = 0; i < splitHosts.length; i++) { + splitHosts[i] = "localHost" + i; + } + for (int i = 0; i < requestingHosts.length; i++) { + if (i % 2 == 0) { + requestingHosts[i] = "localHost" + i; + } else { + requestingHosts[i] = "remoteHost" + i; + } + } + + String[] stringArray = {}; + Set hosts = new HashSet(); + Set splits = new HashSet(); + for (int i = 0; i < NUM_SPLITS; i++) { + while (hosts.size() < 3) { + hosts.add(splitHosts[rand.nextInt(splitHosts.length)]); + } + splits.add(new LocatableInputSplit(i, hosts.toArray(stringArray))); + hosts.clear(); + } + + final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); + + for (int i = 0; i < NUM_SPLITS; i++) { + LocatableInputSplit split = ia.getNextInputSplit(requestingHosts[rand.nextInt(requestingHosts.length)]); + assertTrue(split != null); + assertTrue(splits.remove(split)); + } + + assertTrue(splits.isEmpty()); + assertNull(ia.getNextInputSplit("testHost")); + } }