Skip to content

Commit

Permalink
YARN-3739. Add reservation system recovery to RM recovery process. Co…
Browse files Browse the repository at this point in the history
…ntributed by Subru Krishnan.
  • Loading branch information
Anubhav Dhoot committed Oct 22, 2015
1 parent 381610d commit 2798723
Show file tree
Hide file tree
Showing 21 changed files with 556 additions and 185 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -238,6 +238,9 @@ Release 2.8.0 - UNRELEASED
YARN-4262. Allow whitelisted users to run privileged docker containers. YARN-4262. Allow whitelisted users to run privileged docker containers.
(Sidharta Seethana via vvasudev) (Sidharta Seethana via vvasudev)


YARN-3739. Add reservation system recovery to RM recovery process.
(Subru Krishnan via adhoot)

IMPROVEMENTS IMPROVEMENTS


YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before
Expand Down
Expand Up @@ -1363,7 +1363,7 @@ private void refreshScheduler(String planName,
.format( .format(
"Reservation {0} is within threshold so attempting to create synchronously.", "Reservation {0} is within threshold so attempting to create synchronously.",
reservationId)); reservationId));
reservationSystem.synchronizePlan(planName); reservationSystem.synchronizePlan(planName, true);
LOG.info(MessageFormat.format("Created reservation {0} synchronously.", LOG.info(MessageFormat.format("Created reservation {0} synchronously.",
reservationId)); reservationId));
} }
Expand Down
Expand Up @@ -1186,6 +1186,10 @@ public void recover(RMState state) throws Exception {
// recover AMRMTokenSecretManager // recover AMRMTokenSecretManager
rmContext.getAMRMTokenSecretManager().recover(state); rmContext.getAMRMTokenSecretManager().recover(state);


// recover reservations
if (reservationSystem != null) {
reservationSystem.recover(state);
}
// recover applications // recover applications
rmAppManager.recover(state); rmAppManager.recover(state);


Expand Down
Expand Up @@ -18,16 +18,6 @@


package org.apache.hadoop.yarn.server.resourcemanager.reservation; package org.apache.hadoop.yarn.server.resourcemanager.reservation;


import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
Expand All @@ -38,8 +28,11 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
Expand All @@ -52,6 +45,17 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/** /**
* This is the implementation of {@link ReservationSystem} based on the * This is the implementation of {@link ReservationSystem} based on the
* {@link ResourceScheduler} * {@link ResourceScheduler}
Expand Down Expand Up @@ -94,6 +98,8 @@ public abstract class AbstractReservationSystem extends AbstractService


private PlanFollower planFollower; private PlanFollower planFollower;


private boolean isRecoveryEnabled = false;

/** /**
* Construct the service. * Construct the service.
* *
Expand Down Expand Up @@ -149,6 +155,49 @@ private void initialize(Configuration conf) throws YarnException {
Plan plan = initializePlan(planQueueName); Plan plan = initializePlan(planQueueName);
plans.put(planQueueName, plan); plans.put(planQueueName, plan);
} }
isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
}

private void loadPlan(String planName,
Map<ReservationId, ReservationAllocationStateProto> reservations)
throws PlanningException {
Plan plan = plans.get(planName);
Resource minAllocation = getMinAllocation();
ResourceCalculator rescCalculator = getResourceCalculator();
for (Entry<ReservationId, ReservationAllocationStateProto> currentReservation : reservations
.entrySet()) {
plan.addReservation(ReservationSystemUtil.toInMemoryAllocation(planName,
currentReservation.getKey(), currentReservation.getValue(),
minAllocation, rescCalculator), true);
resQMap.put(currentReservation.getKey(), planName);
}
LOG.info("Recovered reservations for Plan: {}", planName);
}

@Override
public void recover(RMState state) throws Exception {
LOG.info("Recovering Reservation system");
writeLock.lock();
try {
Map<String, Map<ReservationId, ReservationAllocationStateProto>> reservationSystemState =
state.getReservationState();
if (planFollower != null) {
for (String plan : plans.keySet()) {
// recover reservations if any from state store
if (reservationSystemState.containsKey(plan)) {
loadPlan(plan, reservationSystemState.get(plan));
}
synchronizePlan(plan, false);
}
startPlanFollower(conf.getLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS));
}
} finally {
writeLock.unlock();
}
} }


private void initializeNewPlans(Configuration conf) { private void initializeNewPlans(Configuration conf) {
Expand All @@ -162,7 +211,7 @@ private void initializeNewPlans(Configuration conf) {
Plan plan = initializePlan(planQueueName); Plan plan = initializePlan(planQueueName);
plans.put(planQueueName, plan); plans.put(planQueueName, plan);
} else { } else {
LOG.warn("Plan based on reservation queue {0} already exists.", LOG.warn("Plan based on reservation queue {} already exists.",
planQueueName); planQueueName);
} }
} }
Expand Down Expand Up @@ -236,18 +285,26 @@ public long getPlanFollowerTimeStep() {
} }


@Override @Override
public void synchronizePlan(String planName) { public void synchronizePlan(String planName, boolean shouldReplan) {
writeLock.lock(); writeLock.lock();
try { try {
Plan plan = plans.get(planName); Plan plan = plans.get(planName);
if (plan != null) { if (plan != null) {
planFollower.synchronizePlan(plan); planFollower.synchronizePlan(plan, shouldReplan);
} }
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
} }


private void startPlanFollower(long initialDelay) {
if (planFollower != null) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
scheduledExecutorService.scheduleWithFixedDelay(planFollower,
initialDelay, planStepSize, TimeUnit.MILLISECONDS);
}
}

@Override @Override
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
Configuration configuration = new Configuration(conf); Configuration configuration = new Configuration(conf);
Expand All @@ -262,10 +319,8 @@ public void serviceInit(Configuration conf) throws Exception {


@Override @Override
public void serviceStart() throws Exception { public void serviceStart() throws Exception {
if (planFollower != null) { if (!isRecoveryEnabled) {
scheduledExecutorService = new ScheduledThreadPoolExecutor(1); startPlanFollower(planStepSize);
scheduledExecutorService.scheduleWithFixedDelay(planFollower, 0L,
planStepSize, TimeUnit.MILLISECONDS);
} }
super.serviceStart(); super.serviceStart();
} }
Expand Down Expand Up @@ -350,7 +405,7 @@ protected Plan initializePlan(String planQueueName) throws YarnException {
minAllocation, maxAllocation, planQueueName, minAllocation, maxAllocation, planQueueName,
getReplanner(planQueuePath), getReservationSchedulerConfiguration() getReplanner(planQueuePath), getReservationSchedulerConfiguration()
.getMoveOnExpiry(planQueuePath), rmContext); .getMoveOnExpiry(planQueuePath), rmContext);
LOG.info("Intialized plan {0} based on reservable queue {1}", LOG.info("Intialized plan {} based on reservable queue {}",
plan.toString(), planQueueName); plan.toString(), planQueueName);
return plan; return plan;
} }
Expand Down
Expand Up @@ -27,8 +27,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;

import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -43,7 +43,7 @@


public abstract class AbstractSchedulerPlanFollower implements PlanFollower { public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(CapacitySchedulerPlanFollower.class); .getLogger(AbstractSchedulerPlanFollower.class);


protected Collection<Plan> plans = new ArrayList<Plan>(); protected Collection<Plan> plans = new ArrayList<Plan>();
protected YarnScheduler scheduler; protected YarnScheduler scheduler;
Expand All @@ -59,7 +59,7 @@ public void init(Clock clock, ResourceScheduler sched, Collection<Plan> plans) {
@Override @Override
public synchronized void run() { public synchronized void run() {
for (Plan plan : plans) { for (Plan plan : plans) {
synchronizePlan(plan); synchronizePlan(plan, true);
} }
} }


Expand All @@ -70,7 +70,7 @@ public synchronized void setPlans(Collection<Plan> plans) {
} }


@Override @Override
public synchronized void synchronizePlan(Plan plan) { public synchronized void synchronizePlan(Plan plan, boolean shouldReplan) {
String planQueueName = plan.getQueueName(); String planQueueName = plan.getQueueName();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Running plan follower edit policy for plan: " + planQueueName); LOG.debug("Running plan follower edit policy for plan: " + planQueueName);
Expand All @@ -88,14 +88,12 @@ public synchronized void synchronizePlan(Plan plan) {
Resource clusterResources = scheduler.getClusterResource(); Resource clusterResources = scheduler.getClusterResource();
Resource planResources = getPlanResources(plan, planQueue, Resource planResources = getPlanResources(plan, planQueue,
clusterResources); clusterResources);

Set<ReservationAllocation> currentReservations = Set<ReservationAllocation> currentReservations =
plan.getReservationsAtTime(now); plan.getReservationsAtTime(now);
Set<String> curReservationNames = new HashSet<String>(); Set<String> curReservationNames = new HashSet<String>();
Resource reservedResources = Resource.newInstance(0, 0); Resource reservedResources = Resource.newInstance(0, 0);
int numRes = getReservedResources(now, currentReservations, int numRes = getReservedResources(now, currentReservations,
curReservationNames, reservedResources); curReservationNames, reservedResources);

// create the default reservation queue if it doesnt exist // create the default reservation queue if it doesnt exist
String defReservationId = getReservationIdFromQueueName(planQueueName) + String defReservationId = getReservationIdFromQueueName(planQueueName) +
ReservationConstants.DEFAULT_QUEUE_SUFFIX; ReservationConstants.DEFAULT_QUEUE_SUFFIX;
Expand All @@ -104,14 +102,18 @@ public synchronized void synchronizePlan(Plan plan) {
createDefaultReservationQueue(planQueueName, planQueue, createDefaultReservationQueue(planQueueName, planQueue,
defReservationId); defReservationId);
curReservationNames.add(defReservationId); curReservationNames.add(defReservationId);

// if the resources dedicated to this plan has shrunk invoke replanner // if the resources dedicated to this plan has shrunk invoke replanner
if (arePlanResourcesLessThanReservations(clusterResources, planResources, boolean shouldResize = false;
reservedResources)) { if (arePlanResourcesLessThanReservations(plan.getResourceCalculator(),
try { clusterResources, planResources, reservedResources)) {
plan.getReplanner().plan(plan, null); if (shouldReplan) {
} catch (PlanningException e) { try {
LOG.warn("Exception while trying to replan: {}", planQueueName, e); plan.getReplanner().plan(plan, null);
} catch (PlanningException e) {
LOG.warn("Exception while trying to replan: {}", planQueueName, e);
}
} else {
shouldResize = true;
} }
} }
// identify the reservations that have expired and new reservations that // identify the reservations that have expired and new reservations that
Expand All @@ -133,7 +135,6 @@ public synchronized void synchronizePlan(Plan plan) {
// garbage collect expired reservations // garbage collect expired reservations
cleanupExpiredQueues(planQueueName, plan.getMoveOnExpiry(), expired, cleanupExpiredQueues(planQueueName, plan.getMoveOnExpiry(), expired,
defReservationQueue); defReservationQueue);

// Add new reservations and update existing ones // Add new reservations and update existing ones
float totalAssignedCapacity = 0f; float totalAssignedCapacity = 0f;
if (currentReservations != null) { if (currentReservations != null) {
Expand All @@ -146,9 +147,8 @@ public synchronized void synchronizePlan(Plan plan) {
planQueueName, e); planQueueName, e);
} }
// sort allocations from the one giving up the most resources, to the // sort allocations from the one giving up the most resources, to the
// one asking for the most // one asking for the most avoid order-of-operation errors that
// avoid order-of-operation errors that temporarily violate 100% // temporarily violate 100% capacity bound
// capacity bound
List<ReservationAllocation> sortedAllocations = List<ReservationAllocation> sortedAllocations =
sortByDelta( sortByDelta(
new ArrayList<ReservationAllocation>(currentReservations), now, new ArrayList<ReservationAllocation>(currentReservations), now,
Expand All @@ -162,10 +162,15 @@ public synchronized void synchronizePlan(Plan plan) {
float targetCapacity = 0f; float targetCapacity = 0f;
if (planResources.getMemory() > 0 if (planResources.getMemory() > 0
&& planResources.getVirtualCores() > 0) { && planResources.getVirtualCores() > 0) {
if (shouldResize) {
capToAssign =
calculateReservationToPlanProportion(
plan.getResourceCalculator(), planResources,
reservedResources, capToAssign);
}
targetCapacity = targetCapacity =
calculateReservationToPlanRatio(clusterResources, calculateReservationToPlanRatio(plan.getResourceCalculator(),
planResources, clusterResources, planResources, capToAssign);
capToAssign);
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug( LOG.debug(
Expand Down Expand Up @@ -211,7 +216,6 @@ public synchronized void synchronizePlan(Plan plan) {
} }
LOG.info("Finished iteration of plan follower edit policy for plan: " LOG.info("Finished iteration of plan follower edit policy for plan: "
+ planQueueName); + planQueueName);

// Extension: update plan with app states, // Extension: update plan with app states,
// useful to support smart replanning // useful to support smart replanning
} }
Expand Down Expand Up @@ -323,19 +327,35 @@ protected List<ReservationAllocation> sortByDelta(
*/ */
protected abstract Queue getPlanQueue(String planQueueName); protected abstract Queue getPlanQueue(String planQueueName);


/**
* Resizes reservations based on currently available resources
*/
private Resource calculateReservationToPlanProportion(
ResourceCalculator rescCalculator, Resource availablePlanResources,
Resource totalReservationResources, Resource reservationResources) {
return Resources.multiply(availablePlanResources, Resources.ratio(
rescCalculator, reservationResources, totalReservationResources));
}

/** /**
* Calculates ratio of reservationResources to planResources * Calculates ratio of reservationResources to planResources
*/ */
protected abstract float calculateReservationToPlanRatio( private float calculateReservationToPlanRatio(
Resource clusterResources, Resource planResources, ResourceCalculator rescCalculator, Resource clusterResources,
Resource reservationResources); Resource planResources, Resource reservationResources) {
return Resources.divide(rescCalculator, clusterResources,
reservationResources, planResources);
}


/** /**
* Check if plan resources are less than expected reservation resources * Check if plan resources are less than expected reservation resources
*/ */
protected abstract boolean arePlanResourcesLessThanReservations( private boolean arePlanResourcesLessThanReservations(
Resource clusterResources, Resource planResources, ResourceCalculator rescCalculator, Resource clusterResources,
Resource reservedResources); Resource planResources, Resource reservedResources) {
return Resources.greaterThan(rescCalculator, clusterResources,
reservedResources, planResources);
}


/** /**
* Get a list of reservation queues for this planQueue * Get a list of reservation queues for this planQueue
Expand Down Expand Up @@ -363,7 +383,7 @@ protected abstract Resource getPlanResources(
Plan plan, Queue queue, Resource clusterResources); Plan plan, Queue queue, Resource clusterResources);


/** /**
* Get reservation queue resources if it exists otherwise return null * Get reservation queue resources if it exists otherwise return null.
*/ */
protected abstract Resource getReservationQueueResourceIfExists(Plan plan, protected abstract Resource getReservationQueueResourceIfExists(Plan plan,
ReservationId reservationId); ReservationId reservationId);
Expand Down
Expand Up @@ -80,22 +80,6 @@ protected Queue getPlanQueue(String planQueueName) {
return queue; return queue;
} }


@Override
protected float calculateReservationToPlanRatio(
Resource clusterResources, Resource planResources,
Resource reservationResources) {
return Resources.divide(cs.getResourceCalculator(),
clusterResources, reservationResources, planResources);
}

@Override
protected boolean arePlanResourcesLessThanReservations(
Resource clusterResources, Resource planResources,
Resource reservedResources) {
return Resources.greaterThan(cs.getResourceCalculator(),
clusterResources, reservedResources, planResources);
}

@Override @Override
protected List<? extends Queue> getChildReservationQueues(Queue queue) { protected List<? extends Queue> getChildReservationQueues(Queue queue) {
PlanQueue planQueue = (PlanQueue)queue; PlanQueue planQueue = (PlanQueue)queue;
Expand Down

0 comments on commit 2798723

Please sign in to comment.