Skip to content

Commit

Permalink
YARN-3738. Add support for recovery of reserved apps running under dy…
Browse files Browse the repository at this point in the history
…namic queues (subru via asuresh)
  • Loading branch information
xslogic committed Oct 25, 2015
1 parent 446212a commit ab8eb87
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 13 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -537,6 +537,9 @@ Release 2.8.0 - UNRELEASED
YARN-4296. DistributedShell Log.info is not friendly. YARN-4296. DistributedShell Log.info is not friendly.
(Xiaowei Wang via stevel) (Xiaowei Wang via stevel)


YARN-3738. Add support for recovery of reserved apps running under dynamic
queues (subru via asuresh)

OPTIMIZATIONS OPTIMIZATIONS


YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not
Expand Down
Expand Up @@ -1320,10 +1320,9 @@ public void handle(SchedulerEvent event) {
case APP_ADDED: case APP_ADDED:
{ {
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
String queueName = String queueName = resolveReservationQueueName(appAddedEvent.getQueue(),
resolveReservationQueueName(appAddedEvent.getQueue(), appAddedEvent.getApplicationId(), appAddedEvent.getReservationID(),
appAddedEvent.getApplicationId(), appAddedEvent.getIsAppRecovering());
appAddedEvent.getReservationID());
if (queueName != null) { if (queueName != null) {
if (!appAddedEvent.getIsAppRecovering()) { if (!appAddedEvent.getIsAppRecovering()) {
addApplication(appAddedEvent.getApplicationId(), queueName, addApplication(appAddedEvent.getApplicationId(), queueName,
Expand Down Expand Up @@ -1664,8 +1663,13 @@ private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration(
} }
} }


private String getDefaultReservationQueueName(String planQueueName) {
return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
}

private synchronized String resolveReservationQueueName(String queueName, private synchronized String resolveReservationQueueName(String queueName,
ApplicationId applicationId, ReservationId reservationID) { ApplicationId applicationId, ReservationId reservationID,
boolean isRecovering) {
CSQueue queue = getQueue(queueName); CSQueue queue = getQueue(queueName);
// Check if the queue is a plan queue // Check if the queue is a plan queue
if ((queue == null) || !(queue instanceof PlanQueue)) { if ((queue == null) || !(queue instanceof PlanQueue)) {
Expand All @@ -1675,10 +1679,15 @@ private synchronized String resolveReservationQueueName(String queueName,
String resQName = reservationID.toString(); String resQName = reservationID.toString();
queue = getQueue(resQName); queue = getQueue(resQName);
if (queue == null) { if (queue == null) {
// reservation has terminated during failover
if (isRecovering
&& conf.getMoveOnExpiry(getQueue(queueName).getQueuePath())) {
// move to the default child queue of the plan
return getDefaultReservationQueueName(queueName);
}
String message = String message =
"Application " "Application " + applicationId
+ applicationId + " submitted to a reservation which is not currently active: "
+ " submitted to a reservation which is not yet currently active: "
+ resQName; + resQName;
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, .handle(new RMAppEvent(applicationId,
Expand All @@ -1699,7 +1708,7 @@ private synchronized String resolveReservationQueueName(String queueName,
queueName = resQName; queueName = resQName;
} else { } else {
// use the default child queue of the plan for unreserved apps // use the default child queue of the plan for unreserved apps
queueName = queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; queueName = getDefaultReservationQueueName(queueName);
} }
return queueName; return queueName;
} }
Expand Down
Expand Up @@ -1244,7 +1244,8 @@ public void handle(SchedulerEvent event) {
String queueName = String queueName =
resolveReservationQueueName(appAddedEvent.getQueue(), resolveReservationQueueName(appAddedEvent.getQueue(),
appAddedEvent.getApplicationId(), appAddedEvent.getApplicationId(),
appAddedEvent.getReservationID()); appAddedEvent.getReservationID(),
appAddedEvent.getIsAppRecovering());
if (queueName != null) { if (queueName != null) {
addApplication(appAddedEvent.getApplicationId(), addApplication(appAddedEvent.getApplicationId(),
queueName, appAddedEvent.getUser(), queueName, appAddedEvent.getUser(),
Expand Down Expand Up @@ -1317,7 +1318,8 @@ public void handle(SchedulerEvent event) {
} }


private synchronized String resolveReservationQueueName(String queueName, private synchronized String resolveReservationQueueName(String queueName,
ApplicationId applicationId, ReservationId reservationID) { ApplicationId applicationId, ReservationId reservationID,
boolean isRecovering) {
FSQueue queue = queueMgr.getQueue(queueName); FSQueue queue = queueMgr.getQueue(queueName);
if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) { if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
return queueName; return queueName;
Expand All @@ -1328,6 +1330,11 @@ private synchronized String resolveReservationQueueName(String queueName,
String resQName = queueName + "." + reservationID.toString(); String resQName = queueName + "." + reservationID.toString();
queue = queueMgr.getQueue(resQName); queue = queueMgr.getQueue(resQName);
if (queue == null) { if (queue == null) {
// reservation has terminated during failover
if (isRecovering && allocConf.getMoveOnExpiry(queueName)) {
// move to the default child queue of the plan
return getDefaultQueueForPlanQueue(queueName);
}
String message = String message =
"Application " "Application "
+ applicationId + applicationId
Expand Down
Expand Up @@ -25,14 +25,17 @@
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;


import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
Expand All @@ -57,6 +60,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
Expand All @@ -66,6 +70,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
Expand All @@ -77,6 +82,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
Expand All @@ -94,8 +101,6 @@
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.mortbay.log.Log;

import com.google.common.base.Supplier; import com.google.common.base.Supplier;




Expand Down Expand Up @@ -132,6 +137,7 @@ public void tearDown() {
if (rm2 != null) { if (rm2 != null) {
rm2.stop(); rm2.stop();
} }
conf = null;
} }


// Test common scheduler state including SchedulerAttempt, SchedulerNode, // Test common scheduler state including SchedulerAttempt, SchedulerNode,
Expand Down Expand Up @@ -257,6 +263,152 @@ public void testSchedulerRecovery() throws Exception {
assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId()); assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
} }


private Configuration getSchedulerDynamicConfiguration() throws IOException {
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
conf.setTimeDuration(
YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, 1L,
TimeUnit.SECONDS);
if (getSchedulerType() == SchedulerType.CAPACITY) {
CapacitySchedulerConfiguration schedulerConf =
new CapacitySchedulerConfiguration(conf);
ReservationSystemTestUtil.setupDynamicQueueConfiguration(schedulerConf);
return schedulerConf;
} else {
String allocFile = new File(FairSchedulerTestBase.TEST_DIR,
TestWorkPreservingRMRestart.class.getSimpleName() + ".xml")
.getAbsolutePath();
ReservationSystemTestUtil.setupFSAllocationFile(allocFile);
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
return conf;
}
}

// Test work preserving recovery of apps running under reservation.
// This involves:
// 1. Setting up a dynamic reservable queue,
// 2. Submitting an app to it,
// 3. Failing over RM,
// 4. Validating that the app is recovered post failover,
// 5. Check if all running containers are recovered,
// 6. Verify the scheduler state like attempt info,
// 7. Verify the queue/user metrics for the dynamic reservable queue.
@Test(timeout = 30000)
public void testDynamicQueueRecovery() throws Exception {
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getName());

// 1. Set up dynamic reservable queue.
Configuration schedulerConf = getSchedulerDynamicConfiguration();
int containerMemory = 1024;
Resource containerResource = Resource.newInstance(containerMemory, 1);

MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(schedulerConf);
rm1 = new MockRM(schedulerConf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
// 2. Run plan follower to update the added node & then submit app to
// dynamic queue.
rm1.getRMContext().getReservationSystem()
.synchronizePlan(ReservationSystemTestUtil.reservationQ, true);
RMApp app1 = rm1.submitApp(200, "dynamicQApp",
UserGroupInformation.getCurrentUser().getShortUserName(), null,
ReservationSystemTestUtil.getReservationQueueName());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

// clear queue metrics
rm1.clearQueueMetrics(app1);

// 3. Fail over (restart) RM.
rm2 = new MockRM(schedulerConf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// 4. Validate app is recovered post failover.
RMApp recoveredApp1 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt();
NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus(
am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus(
am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
NMContainerStatus completedContainer =
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
ContainerState.COMPLETE);

nm1.registerNode(
Arrays.asList(amContainer, runningContainer, completedContainer), null);

// Wait for RM to settle down on recovering containers.
waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
Set<ContainerId> launchedContainers =
((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
.getLaunchedContainers();
assertTrue(launchedContainers.contains(amContainer.getContainerId()));
assertTrue(launchedContainers.contains(runningContainer.getContainerId()));

// 5. Check RMContainers are re-recreated and the container state is
// correct.
rm2.waitForState(nm1, amContainer.getContainerId(),
RMContainerState.RUNNING);
rm2.waitForState(nm1, runningContainer.getContainerId(),
RMContainerState.RUNNING);
rm2.waitForContainerToComplete(loadedAttempt1, completedContainer);

AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm2.getResourceScheduler();
SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId());

// ********* check scheduler node state.*******
// 2 running containers.
Resource usedResources = Resources.multiply(containerResource, 2);
Resource nmResource =
Resource.newInstance(nm1.getMemory(), nm1.getvCores());

assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
assertTrue(
schedulerNode1.isValidContainer(runningContainer.getContainerId()));
assertFalse(
schedulerNode1.isValidContainer(completedContainer.getContainerId()));
// 2 launched containers, 1 completed container
assertEquals(2, schedulerNode1.getNumContainers());

assertEquals(Resources.subtract(nmResource, usedResources),
schedulerNode1.getAvailableResource());
assertEquals(usedResources, schedulerNode1.getUsedResource());
Resource availableResources = Resources.subtract(nmResource, usedResources);

// 6. Verify the scheduler state like attempt info.
Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> sa =
((AbstractYarnScheduler) rm2.getResourceScheduler())
.getSchedulerApplications();
SchedulerApplication<SchedulerApplicationAttempt> schedulerApp =
sa.get(recoveredApp1.getApplicationId());

// 7. Verify the queue/user metrics for the dynamic reservable queue.
if (getSchedulerType() == SchedulerType.CAPACITY) {
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
} else {
checkFSQueue(rm2, schedulerApp, usedResources, availableResources);
}

// *********** check scheduler attempt state.********
SchedulerApplicationAttempt schedulerAttempt =
schedulerApp.getCurrentAppAttempt();
assertTrue(schedulerAttempt.getLiveContainers()
.contains(scheduler.getRMContainer(amContainer.getContainerId())));
assertTrue(schedulerAttempt.getLiveContainers()
.contains(scheduler.getRMContainer(runningContainer.getContainerId())));
assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);

// *********** check appSchedulingInfo state ***********
assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
}

private void checkCSQueue(MockRM rm, private void checkCSQueue(MockRM rm,
SchedulerApplication<SchedulerApplicationAttempt> app, SchedulerApplication<SchedulerApplicationAttempt> app,
Resource clusterResource, Resource queueResource, Resource usedResource, Resource clusterResource, Resource queueResource, Resource usedResource,
Expand Down
Expand Up @@ -304,6 +304,18 @@ public static void setupQueueConfiguration(
conf.setCapacity(A2, 70); conf.setCapacity(A2, 70);
} }


public static void setupDynamicQueueConfiguration(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { reservationQ });
final String dedicated = CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT + reservationQ;
conf.setCapacity(dedicated, 100);
// Set as reservation queue
conf.setReservable(dedicated, true);
}

public static String getFullReservationQueueName() { public static String getFullReservationQueueName() {
return CapacitySchedulerConfiguration.ROOT return CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT + reservationQ; + CapacitySchedulerConfiguration.DOT + reservationQ;
Expand Down

0 comments on commit ab8eb87

Please sign in to comment.