Skip to content

Commit

Permalink
YARN-2881. [YARN-2574] Implement PlanFollower for FairScheduler. (Anu…
Browse files Browse the repository at this point in the history
…bhav Dhoot via kasha)
  • Loading branch information
kambatla committed Jan 5, 2015
1 parent 53caeaa commit 0c4b112
Show file tree
Hide file tree
Showing 19 changed files with 574 additions and 59 deletions.
2 changes: 2 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -70,6 +70,8 @@ Release 2.7.0 - UNRELEASED
YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler. YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler.
(Anubhav Dhoot via kasha) (Anubhav Dhoot via kasha)


YARN-2881. [YARN-2574] Implement PlanFollower for FairScheduler.
(Anubhav Dhoot via kasha)


IMPROVEMENTS IMPROVEMENTS


Expand Down
Expand Up @@ -204,6 +204,8 @@ private String getDefaultPlanFollower() {
// currently only capacity scheduler is supported // currently only capacity scheduler is supported
if (scheduler instanceof CapacityScheduler) { if (scheduler instanceof CapacityScheduler) {
return CapacitySchedulerPlanFollower.class.getName(); return CapacitySchedulerPlanFollower.class.getName();
} else if (scheduler instanceof FairScheduler) {
return FairSchedulerPlanFollower.class.getName();
} }
return null; return null;
} }
Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;


import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
Expand Down Expand Up @@ -99,7 +98,7 @@ public synchronized void synchronizePlan(Plan plan) {


// create the default reservation queue if it doesnt exist // create the default reservation queue if it doesnt exist
String defReservationId = getReservationIdFromQueueName(planQueueName) + String defReservationId = getReservationIdFromQueueName(planQueueName) +
PlanQueue.DEFAULT_QUEUE_SUFFIX; ReservationConstants.DEFAULT_QUEUE_SUFFIX;
String defReservationQueue = getReservationQueueName(planQueueName, String defReservationQueue = getReservationQueueName(planQueueName,
defReservationId); defReservationId);
createDefaultReservationQueue(planQueueName, planQueue, createDefaultReservationQueue(planQueueName, planQueue,
Expand Down
@@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.reservation;

import java.util.Collection;
import java.util.List;

import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FairSchedulerPlanFollower extends AbstractSchedulerPlanFollower {
private static final Logger LOG = LoggerFactory
.getLogger(FairSchedulerPlanFollower.class);

private FairScheduler fs;

@Override
public void init(Clock clock, ResourceScheduler sched,
Collection<Plan> plans) {
super.init(clock, sched, plans);
fs = (FairScheduler)sched;
LOG.info("Initializing Plan Follower Policy:"
+ this.getClass().getCanonicalName());
}

@Override
protected Queue getPlanQueue(String planQueueName) {
Queue planQueue = fs.getQueueManager().getParentQueue(planQueueName, false);
if (planQueue == null) {
LOG.error("The queue " + planQueueName + " cannot be found or is not a " +
"ParentQueue");
}
return planQueue;
}

@Override
protected float calculateReservationToPlanRatio(Resource clusterResources,
Resource planResources, Resource capToAssign) {
return Resources.divide(fs.getResourceCalculator(),
clusterResources, capToAssign, planResources);
}

@Override
protected boolean arePlanResourcesLessThanReservations(Resource
clusterResources, Resource planResources, Resource reservedResources) {
return Resources.greaterThan(fs.getResourceCalculator(),
clusterResources, reservedResources, planResources);
}

@Override
protected List<? extends Queue> getChildReservationQueues(Queue queue) {
FSQueue planQueue = (FSQueue)queue;
List<FSQueue> childQueues = planQueue.getChildQueues();
return childQueues;
}


@Override
protected void addReservationQueue(String planQueueName, Queue queue,
String currResId) {
String leafQueueName = getReservationQueueName(planQueueName, currResId);
fs.getQueueManager().getLeafQueue(leafQueueName, true);
}

@Override
protected void createDefaultReservationQueue(String planQueueName,
Queue queue, String defReservationId) {
String defReservationQueueName = getReservationQueueName(planQueueName,
defReservationId);
if (!fs.getQueueManager().exists(defReservationQueueName)) {
fs.getQueueManager().getLeafQueue(defReservationQueueName, true);
}
}

@Override
protected Resource getPlanResources(Plan plan, Queue queue,
Resource clusterResources) {
FSParentQueue planQueue = (FSParentQueue)queue;
Resource planResources = planQueue.getSteadyFairShare();
return planResources;
}

@Override
protected Resource getReservationQueueResourceIfExists(Plan plan,
ReservationId reservationId) {
String reservationQueueName = getReservationQueueName(plan.getQueueName(),
reservationId.toString());
FSLeafQueue reservationQueue =
fs.getQueueManager().getLeafQueue(reservationQueueName, false);
Resource reservationResource = null;
if (reservationQueue != null) {
reservationResource = reservationQueue.getSteadyFairShare();
}
return reservationResource;
}

@Override
protected String getReservationQueueName(String planQueueName,
String reservationQueueName) {
String planQueueNameFullPath = fs.getQueueManager().getQueue
(planQueueName).getName();

if (!reservationQueueName.startsWith(planQueueNameFullPath)) {
// If name is not a path we need full path for FairScheduler. See
// YARN-2773 for the root cause
return planQueueNameFullPath + "." + reservationQueueName;
}
return reservationQueueName;
}

@Override
protected String getReservationIdFromQueueName(String resQueueName) {
return resQueueName.substring(resQueueName.lastIndexOf(".") + 1);
}
}
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.reservation;

public interface ReservationConstants {

/**
* The suffix used for a queue under a reservable queue that will be used
* as a default queue whenever no reservation is used
*/
String DEFAULT_QUEUE_SUFFIX = "-default";
}
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.EnumSet; import java.util.EnumSet;
Expand Down Expand Up @@ -66,6 +65,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
Expand Down Expand Up @@ -1419,7 +1419,7 @@ private synchronized String resolveReservationQueueName(String queueName,
queueName = resQName; queueName = resQName;
} else { } else {
// use the default child queue of the plan for unreserved apps // use the default child queue of the plan for unreserved apps
queueName = queueName + PlanQueue.DEFAULT_QUEUE_SUFFIX; queueName = queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
} }
return queueName; return queueName;
} }
Expand Down Expand Up @@ -1583,7 +1583,7 @@ private String handleMoveToPlanQueue(String targetQueueName) {
CSQueue dest = getQueue(targetQueueName); CSQueue dest = getQueue(targetQueueName);
if (dest != null && dest instanceof PlanQueue) { if (dest != null && dest instanceof PlanQueue) {
// use the default child reservation queue of the plan // use the default child reservation queue of the plan
targetQueueName = targetQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX; targetQueueName = targetQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
} }
return targetQueueName; return targetQueueName;
} }
Expand Down
Expand Up @@ -37,8 +37,6 @@
*/ */
public class PlanQueue extends ParentQueue { public class PlanQueue extends ParentQueue {


public static final String DEFAULT_QUEUE_SUFFIX = "-default";

private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class); private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class);


private int maxAppsForReservation; private int maxAppsForReservation;
Expand Down
Expand Up @@ -207,6 +207,10 @@ public ResourceWeights getQueueWeight(String queue) {
ResourceWeights weight = queueWeights.get(queue); ResourceWeights weight = queueWeights.get(queue);
return (weight == null) ? ResourceWeights.NEUTRAL : weight; return (weight == null) ? ResourceWeights.NEUTRAL : weight;
} }

public void setQueueWeight(String queue, ResourceWeights weight) {
queueWeights.put(queue, weight);
}


public int getUserMaxApps(String user) { public int getUserMaxApps(String user) {
Integer maxApps = userMaxApps.get(user); Integer maxApps = userMaxApps.get(user);
Expand Down Expand Up @@ -323,4 +327,14 @@ public boolean getMoveOnExpiry(String queue) {
public long getEnforcementWindow(String queue) { public long getEnforcementWindow(String queue) {
return globalReservationQueueConfig.getEnforcementWindowMsec(); return globalReservationQueueConfig.getEnforcementWindowMsec();
} }

@VisibleForTesting
public void setReservationWindow(long window) {
globalReservationQueueConfig.setReservationWindow(window);
}

@VisibleForTesting
public void setAverageCapacity(int avgCapacity) {
globalReservationQueueConfig.setAverageCapacity(avgCapacity);
}
} }
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
Expand Down Expand Up @@ -516,6 +517,15 @@ public void updateStarvationStats() {
} }
} }


/** Allows setting weight for a dynamically created queue
* Currently only used for reservation based queues
* @param weight queue weight
*/
public void setWeights(float weight) {
scheduler.getAllocationConfiguration().setQueueWeight(getName(),
new ResourceWeights(weight));
}

/** /**
* Helper method to check if the queue should preempt containers * Helper method to check if the queue should preempt containers
* *
Expand Down

0 comments on commit 0c4b112

Please sign in to comment.