Skip to content

Commit

Permalink
[FLINK-1478] [jobmanager] Scheduler support for external location con…
Browse files Browse the repository at this point in the history
…straints
  • Loading branch information
StephanEwen committed Feb 5, 2015
1 parent a9ac7aa commit 970b2b7
Show file tree
Hide file tree
Showing 7 changed files with 586 additions and 59 deletions.
Expand Up @@ -259,6 +259,15 @@ public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
//--------------------------------------------------------------------------------------------- //---------------------------------------------------------------------------------------------


public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException { public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {

// ExecutionVertex[] vertices = this.taskVertices;
//
// for (int i = 0; i < vertices.length; i++) {
// ExecutionVertex v = vertices[i];
//
// if (v.get
// }

for (ExecutionVertex ev : getTaskVertices()) { for (ExecutionVertex ev : getTaskVertices()) {
ev.scheduleForExecution(scheduler, queued); ev.scheduleForExecution(scheduler, queued);
} }
Expand Down
Expand Up @@ -43,6 +43,7 @@


import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
Expand Down Expand Up @@ -82,6 +83,11 @@ public class ExecutionVertex implements Serializable {


private volatile Execution currentExecution; // this field must never be null private volatile Execution currentExecution; // this field must never be null



private volatile List<Instance> locationConstraintInstances;

private volatile boolean scheduleLocalOnly;

// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------


public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex, public ExecutionVertex(ExecutionJobVertex jobVertex, int subTaskIndex,
Expand Down Expand Up @@ -294,10 +300,22 @@ else if (numSources < parallelism) {
} }
} }


public void setTargetHostConstraint(String hostname) { public void setLocationConstraintHosts(List<Instance> instances) {
this.locationConstraintInstances = instances;
}

public void setScheduleLocalOnly(boolean scheduleLocalOnly) {
if (scheduleLocalOnly && inputEdges != null && inputEdges.length > 0) {
throw new IllegalArgumentException("Strictly local scheduling is only supported for sources.");
}


this.scheduleLocalOnly = scheduleLocalOnly;
} }


public boolean isScheduleLocalOnly() {
return scheduleLocalOnly;
}

/** /**
* Gets the location preferences of this task, determined by the locations of the predecessors from which * Gets the location preferences of this task, determined by the locations of the predecessors from which
* it receives input data. * it receives input data.
Expand All @@ -307,23 +325,37 @@ public void setTargetHostConstraint(String hostname) {
* @return The preferred locations for this vertex execution, or null, if there is no preference. * @return The preferred locations for this vertex execution, or null, if there is no preference.
*/ */
public Iterable<Instance> getPreferredLocations() { public Iterable<Instance> getPreferredLocations() {
HashSet<Instance> locations = new HashSet<Instance>(); // if we have hard location constraints, use those
{
List<Instance> constraintInstances = this.locationConstraintInstances;
if (constraintInstances != null && !constraintInstances.isEmpty()) {
return constraintInstances;
}
}

// otherwise, base the preferred locations on the input connections
if (inputEdges == null) {
return Collections.emptySet();
}
else {
HashSet<Instance> locations = new HashSet<Instance>();


for (int i = 0; i < inputEdges.length; i++) { for (int i = 0; i < inputEdges.length; i++) {
ExecutionEdge[] sources = inputEdges[i]; ExecutionEdge[] sources = inputEdges[i];
if (sources != null) { if (sources != null) {
for (int k = 0; k < sources.length; k++) { for (int k = 0; k < sources.length; k++) {
SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource(); SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
if (sourceSlot != null) { if (sourceSlot != null) {
locations.add(sourceSlot.getInstance()); locations.add(sourceSlot.getInstance());
if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
return null; return null;
}
} }
} }
} }
} }
return locations;
} }
return locations;
} }


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -407,6 +439,7 @@ public void prepareForArchiving() {
// clear the unnecessary fields in this class // clear the unnecessary fields in this class
this.resultPartitions = null; this.resultPartitions = null;
this.inputEdges = null; this.inputEdges = null;
this.locationConstraintInstances = null;
} }


// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;


import akka.dispatch.Futures; import akka.dispatch.Futures;

import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {


private int nonLocalizedAssignments; private int nonLocalizedAssignments;



public Scheduler() { public Scheduler() {
this.newlyAvailableInstances = new LinkedBlockingQueue<Instance>(); this.newlyAvailableInstances = new LinkedBlockingQueue<Instance>();
} }
Expand Down Expand Up @@ -164,6 +166,10 @@ private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throw
} }


final ExecutionVertex vertex = task.getTaskToExecute().getVertex(); final ExecutionVertex vertex = task.getTaskToExecute().getVertex();

final Iterable<Instance> preferredLocations = vertex.getPreferredLocations();
final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&
preferredLocations != null && preferredLocations.iterator().hasNext();


synchronized (globalLock) { synchronized (globalLock) {


Expand All @@ -179,6 +185,12 @@ private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throw
final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment(); final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
final CoLocationConstraint constraint = task.getLocationConstraint(); final CoLocationConstraint constraint = task.getLocationConstraint();


// sanity check that we do not use an externally forced location and a co-location constraint together
if (constraint != null && forceExternalLocation) {
throw new IllegalArgumentException("The scheduling cannot be contrained simultaneously by a "
+ "co-location constriaint and an external location constraint.");
}

// get a slot from the group, if the group has one for us (and can fulfill the constraint) // get a slot from the group, if the group has one for us (and can fulfill the constraint)
SimpleSlot slotFromGroup; SimpleSlot slotFromGroup;
if (constraint == null) { if (constraint == null) {
Expand Down Expand Up @@ -206,16 +218,28 @@ private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throw
vertex.getPreferredLocations() : Collections.singleton(constraint.getLocation()); vertex.getPreferredLocations() : Collections.singleton(constraint.getLocation());


// get a new slot, since we could not place it into the group, or we could not place it locally // get a new slot, since we could not place it into the group, or we could not place it locally
newSlot = getFreeSubSlotForTask(vertex, locations, assignment, constraint); newSlot = getFreeSubSlotForTask(vertex, locations, assignment, constraint, forceExternalLocation);


SimpleSlot toUse; SimpleSlot toUse;


if (newSlot == null) { if (newSlot == null) {
if (slotFromGroup == null) { if (slotFromGroup == null) {
// both null // both null
if (constraint == null || constraint.isUnassigned()) { if (constraint == null || constraint.isUnassigned()) {
throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots()); if (forceExternalLocation) {
} else { // could not satisfy the external location constraint
String hosts = getHostnamesFromInstances(preferredLocations);
throw new NoResourceAvailableException("Could not schedule task " + vertex
+ " to any of the required hosts: " + hosts);
}
else {
// simply nothing is available
throw new NoResourceAvailableException(task, getNumberOfAvailableInstances(),
getTotalNumberOfSlots(), getNumberOfAvailableSlots());
}
}
else {
// nothing is available on the node where the co-location constraint pushes us
throw new NoResourceAvailableException("Could not allocate a slot on instance " + throw new NoResourceAvailableException("Could not allocate a slot on instance " +
constraint.getLocation() + ", as required by the co-location constraint."); constraint.getLocation() + ", as required by the co-location constraint.");
} }
Expand Down Expand Up @@ -269,26 +293,49 @@ else if (slotFromGroup == null || newSlot.getLocality() == Locality.LOCAL) {
} }


// 2) === schedule without hints and sharing === // 2) === schedule without hints and sharing ===

{
SimpleSlot slot = getFreeSlotForTask(vertex, vertex.getPreferredLocations()); SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation);
if (slot != null) { if (slot != null) {
updateLocalityCounters(slot.getLocality()); updateLocalityCounters(slot.getLocality());
return slot; return slot;
}
else {
// no resource available now, so queue the request
if (queueIfNoResource) {
SlotAllocationFuture future = new SlotAllocationFuture();
this.taskQueue.add(new QueuedTask(task, future));
return future;
} }
else { else {
throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots()); // no resource available now, so queue the request
if (queueIfNoResource) {
SlotAllocationFuture future = new SlotAllocationFuture();
this.taskQueue.add(new QueuedTask(task, future));
return future;
}
else if (forceExternalLocation) {
String hosts = getHostnamesFromInstances(preferredLocations);
throw new NoResourceAvailableException("Could not schedule task " + vertex
+ " to any of the required hosts: " + hosts);
}
else {
throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots());
}
} }
} }
} }
} }

private String getHostnamesFromInstances(Iterable<Instance> instances) {
StringBuilder bld = new StringBuilder();

for (Instance i : instances) {
bld.append(i.getInstanceConnectionInfo().getHostname());
bld.append(", ");
}


if (bld.length() == 0) {
return "";
}
else {
bld.setLength(bld.length() - 2);
return bld.toString();
}
}

/** /**
* Gets a suitable instance to schedule the vertex execution to. * Gets a suitable instance to schedule the vertex execution to.
* <p> * <p>
Expand All @@ -297,21 +344,21 @@ else if (slotFromGroup == null || newSlot.getLocality() == Locality.LOCAL) {
* @param vertex The task to run. * @param vertex The task to run.
* @return The instance to run the vertex on, it {@code null}, if no instance is available. * @return The instance to run the vertex on, it {@code null}, if no instance is available.
*/ */
protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instance> requestedLocations) { protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instance> requestedLocations, boolean localOnly) {


// we need potentially to loop multiple times, because there may be false positives // we need potentially to loop multiple times, because there may be false positives
// in the set-with-available-instances // in the set-with-available-instances
while (true) { while (true) {
Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations); Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly);


if(instanceLocalityPair == null){ if (instanceLocalityPair == null){
return null; return null;
} }


Instance instanceToUse = instanceLocalityPair.getLeft(); Instance instanceToUse = instanceLocalityPair.getLeft();
Locality locality = instanceLocalityPair.getRight(); Locality locality = instanceLocalityPair.getRight();


if(LOG.isDebugEnabled()){ if (LOG.isDebugEnabled()){
if(locality == Locality.LOCAL){ if(locality == Locality.LOCAL){
LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
}else if(locality == Locality.NON_LOCAL){ }else if(locality == Locality.NON_LOCAL){
Expand Down Expand Up @@ -348,25 +395,26 @@ protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<Instanc
protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex vertex, protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex vertex,
Iterable<Instance> requestedLocations, Iterable<Instance> requestedLocations,
SlotSharingGroupAssignment groupAssignment, SlotSharingGroupAssignment groupAssignment,
CoLocationConstraint constraint) { CoLocationConstraint constraint,
boolean localOnly) {
// we need potentially to loop multiple times, because there may be false positives // we need potentially to loop multiple times, because there may be false positives
// in the set-with-available-instances // in the set-with-available-instances
while (true) { while (true) {
Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations); Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly);


if(instanceLocalityPair == null){ if (instanceLocalityPair == null) {
return null; return null;
} }


Instance instanceToUse = instanceLocalityPair.getLeft(); Instance instanceToUse = instanceLocalityPair.getLeft();
Locality locality = instanceLocalityPair.getRight(); Locality locality = instanceLocalityPair.getRight();


if(LOG.isDebugEnabled()){ if (LOG.isDebugEnabled()) {
if(locality == Locality.LOCAL){ if (locality == Locality.LOCAL) {
LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); LOG.debug("Local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
}else if(locality == Locality.NON_LOCAL){ } else if(locality == Locality.NON_LOCAL) {
LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); LOG.debug("Non-local assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
}else if(locality == Locality.UNCONSTRAINED) { } else if(locality == Locality.UNCONSTRAINED) {
LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse); LOG.debug("Unconstrained assignment: " + vertex.getSimpleName() + " --> " + instanceToUse);
} }
} }
Expand Down Expand Up @@ -409,7 +457,8 @@ protected SimpleSlot getFreeSubSlotForTask(ExecutionVertex vertex,
* *
* @param requestedLocations * @param requestedLocations
*/ */
private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations){ private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations, boolean localOnly){

if (this.instancesWithAvailableResources.isEmpty()) { if (this.instancesWithAvailableResources.isEmpty()) {
// check if the asynchronous calls did not yet return the queues // check if the asynchronous calls did not yet return the queues
Instance queuedInstance = this.newlyAvailableInstances.poll(); Instance queuedInstance = this.newlyAvailableInstances.poll();
Expand All @@ -434,14 +483,18 @@ private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocati
if (location != null && this.instancesWithAvailableResources.remove(location)) { if (location != null && this.instancesWithAvailableResources.remove(location)) {
instanceToUse = location; instanceToUse = location;
locality = Locality.LOCAL; locality = Locality.LOCAL;

break; break;
} }
} }


if (instanceToUse == null) { if (instanceToUse == null) {
instanceToUse = this.instancesWithAvailableResources.poll(); if (localOnly) {
locality = Locality.NON_LOCAL; return null;
}
else {
instanceToUse = this.instancesWithAvailableResources.poll();
locality = Locality.NON_LOCAL;
}
} }
} }
else { else {
Expand Down Expand Up @@ -603,8 +656,8 @@ public void instanceDied(Instance instance) {
public int getNumberOfAvailableInstances() { public int getNumberOfAvailableInstances() {
int numberAvailableInstances = 0; int numberAvailableInstances = 0;
synchronized (this.globalLock) { synchronized (this.globalLock) {
for(Instance instance: allInstances){ for (Instance instance: allInstances ){
if(instance.isAlive()){ if (instance.isAlive()){
numberAvailableInstances++; numberAvailableInstances++;
} }
} }
Expand Down
Expand Up @@ -18,7 +18,6 @@


package org.apache.flink.runtime.jobmanager.scheduler; package org.apache.flink.runtime.jobmanager.scheduler;


import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
Expand All @@ -41,13 +40,11 @@
import org.slf4j.Logger; import org.slf4j.Logger;




public class SlotSharingGroupAssignment implements Serializable { public class SlotSharingGroupAssignment {

static final long serialVersionUID = 42L;


private static final Logger LOG = Scheduler.LOG; private static final Logger LOG = Scheduler.LOG;


private transient final Object lock = new Object(); private final Object lock = new Object();


/** All slots currently allocated to this sharing group */ /** All slots currently allocated to this sharing group */
private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>(); private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
Expand Down
Expand Up @@ -101,9 +101,8 @@ public static Instance getInstance(final ActorRef taskManager) throws
return getInstance(taskManager, 1); return getInstance(taskManager, 1);
} }


public static Instance getInstance(final ActorRef taskManager, final int numberOfSlots) throws public static Instance getInstance(final ActorRef taskManager, final int numberOfSlots) throws Exception {
Exception { HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024);
InetAddress address = InetAddress.getByName("127.0.0.1"); InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001); InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001);


Expand Down

0 comments on commit 970b2b7

Please sign in to comment.