Skip to content

Commit

Permalink
YARN-2331. Distinguish shutdown during supervision vs. shutdown for
Browse files Browse the repository at this point in the history
rolling upgrade. Contributed by Jason Lowe
  • Loading branch information
xgong committed May 8, 2015
1 parent d0e75e6 commit 088156d
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 23 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -206,6 +206,9 @@ Release 2.8.0 - UNRELEASED
yarn.scheduler.capacity.node-locality-delay in code and default xml file. yarn.scheduler.capacity.node-locality-delay in code and default xml file.
(Nijel SF via vinodkv) (Nijel SF via vinodkv)


YARN-2331. Distinguish shutdown during supervision vs. shutdown for
rolling upgrade. (Jason Lowe via xgong)

OPTIMIZATIONS OPTIMIZATIONS


YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not
Expand Down
Expand Up @@ -1158,6 +1158,10 @@ private static void addDeprecatedKeys() {


public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir"; public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir";


public static final String NM_RECOVERY_SUPERVISED =
NM_RECOVERY_PREFIX + "supervised";
public static final boolean DEFAULT_NM_RECOVERY_SUPERVISED = false;

//////////////////////////////// ////////////////////////////////
// Web Proxy Configs // Web Proxy Configs
//////////////////////////////// ////////////////////////////////
Expand Down
Expand Up @@ -1192,6 +1192,15 @@
<value>${hadoop.tmp.dir}/yarn-nm-recovery</value> <value>${hadoop.tmp.dir}/yarn-nm-recovery</value>
</property> </property>


<property>
<description>Whether the nodemanager is running under supervision. A
nodemanager that supports recovery and is running under supervision
will not try to cleanup containers as it exits with the assumption
it will be immediately be restarted and recover containers.</description>
<name>yarn.nodemanager.recovery.supervised</name>
<value>false</value>
</property>

<!--Docker configuration--> <!--Docker configuration-->


<property> <property>
Expand Down
Expand Up @@ -530,8 +530,11 @@ public void cleanUpApplicationsOnNMShutDown() {


if (this.context.getNMStateStore().canRecover() if (this.context.getNMStateStore().canRecover()
&& !this.context.getDecommissioned()) { && !this.context.getDecommissioned()) {
// do not cleanup apps as they can be recovered on restart if (getConfig().getBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED,
return; YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED)) {
// do not cleanup apps as they can be recovered on restart
return;
}
} }


List<ApplicationId> appIds = List<ApplicationId> appIds =
Expand Down
Expand Up @@ -145,10 +145,13 @@ protected void serviceStop() throws Exception {


private void stopAggregators() { private void stopAggregators() {
threadPool.shutdown(); threadPool.shutdown();
boolean supervised = getConfig().getBoolean(
YarnConfiguration.NM_RECOVERY_SUPERVISED,
YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED);
// if recovery on restart is supported then leave outstanding aggregations // if recovery on restart is supported then leave outstanding aggregations
// to the next restart // to the next restart
boolean shouldAbort = context.getNMStateStore().canRecover() boolean shouldAbort = context.getNMStateStore().canRecover()
&& !context.getDecommissioned(); && !context.getDecommissioned() && supervised;
// politely ask to finish // politely ask to finish
for (AppLogAggregator aggregator : appLogAggregators.values()) { for (AppLogAggregator aggregator : appLogAggregators.values()) {
if (shouldAbort) { if (shouldAbort) {
Expand Down
Expand Up @@ -22,7 +22,11 @@
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;


import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
Expand Down Expand Up @@ -68,6 +72,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
Expand All @@ -82,27 +87,18 @@ public class TestContainerManagerRecovery {
public void testApplicationRecovery() throws Exception { public void testApplicationRecovery() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234"); conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user"); conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
NMStateStoreService stateStore = new NMMemoryStateStoreService(); NMStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(conf); stateStore.init(conf);
stateStore.start(); stateStore.start();
Context context = new NMContext(new NMContainerTokenSecretManager( Context context = createContext(conf, stateStore);
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), stateStore);
ContainerManagerImpl cm = createContainerManager(context); ContainerManagerImpl cm = createContainerManager(context);
cm.init(conf); cm.init(conf);
cm.start(); cm.start();


// simulate registration with RM
MasterKey masterKey = new MasterKeyPBImpl();
masterKey.setKeyId(123);
masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
.byteValue() }));
context.getContainerTokenSecretManager().setMasterKey(masterKey);
context.getNMTokenSecretManager().setMasterKey(masterKey);

// add an application by starting a container // add an application by starting a container
String appUser = "app_user1"; String appUser = "app_user1";
String modUser = "modify_user1"; String modUser = "modify_user1";
Expand Down Expand Up @@ -155,9 +151,7 @@ public void testApplicationRecovery() throws Exception {


// reset container manager and verify app recovered with proper acls // reset container manager and verify app recovered with proper acls
cm.stop(); cm.stop();
context = new NMContext(new NMContainerTokenSecretManager( context = createContext(conf, stateStore);
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), stateStore);
cm = createContainerManager(context); cm = createContainerManager(context);
cm.init(conf); cm.init(conf);
cm.start(); cm.start();
Expand Down Expand Up @@ -201,9 +195,7 @@ public void testApplicationRecovery() throws Exception {


// restart and verify app is marked for finishing // restart and verify app is marked for finishing
cm.stop(); cm.stop();
context = new NMContext(new NMContainerTokenSecretManager( context = createContext(conf, stateStore);
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), stateStore);
cm = createContainerManager(context); cm = createContainerManager(context);
cm.init(conf); cm.init(conf);
cm.start(); cm.start();
Expand Down Expand Up @@ -233,16 +225,103 @@ public void testApplicationRecovery() throws Exception {


// restart and verify app is no longer present after recovery // restart and verify app is no longer present after recovery
cm.stop(); cm.stop();
context = new NMContext(new NMContainerTokenSecretManager( context = createContext(conf, stateStore);
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), stateStore);
cm = createContainerManager(context); cm = createContainerManager(context);
cm.init(conf); cm.init(conf);
cm.start(); cm.start();
assertTrue(context.getApplications().isEmpty()); assertTrue(context.getApplications().isEmpty());
cm.stop(); cm.stop();
} }


@Test
public void testContainerCleanupOnShutdown() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
Map<String, LocalResource> localResources = Collections.emptyMap();
Map<String, String> containerEnv = Collections.emptyMap();
List<String> containerCmds = Collections.emptyList();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
containerCreds.writeTokenStorageToStream(dob);
ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
dob.getLength());
Map<ApplicationAccessType, String> acls = Collections.emptyMap();
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, containerEnv, containerCmds, serviceData,
containerTokens, acls);
// create the logAggregationContext
LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance("includePattern", "excludePattern");

// verify containers are stopped on shutdown without recovery
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false);
conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
Context context = createContext(conf, new NMNullStateStoreService());
ContainerManagerImpl cm = spy(createContainerManager(context));
cm.init(conf);
cm.start();
StartContainersResponse startResponse = startContainer(context, cm, cid,
clc, logAggregationContext);
assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
cm.stop();
verify(cm).handle(isA(CMgrCompletedAppsEvent.class));

// verify containers are stopped on shutdown with unsupervised recovery
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false);
NMMemoryStateStoreService memStore = new NMMemoryStateStoreService();
memStore.init(conf);
memStore.start();
context = createContext(conf, memStore);
cm = spy(createContainerManager(context));
cm.init(conf);
cm.start();
startResponse = startContainer(context, cm, cid,
clc, logAggregationContext);
assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
cm.stop();
memStore.close();
verify(cm).handle(isA(CMgrCompletedAppsEvent.class));

// verify containers are not stopped on shutdown with supervised recovery
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
memStore = new NMMemoryStateStoreService();
memStore.init(conf);
memStore.start();
context = createContext(conf, memStore);
cm = spy(createContainerManager(context));
cm.init(conf);
cm.start();
startResponse = startContainer(context, cm, cid,
clc, logAggregationContext);
assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
cm.stop();
memStore.close();
verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
}

private NMContext createContext(YarnConfiguration conf,
NMStateStoreService stateStore) {
NMContext context = new NMContext(new NMContainerTokenSecretManager(
conf), new NMTokenSecretManagerInNM(), null,
new ApplicationACLsManager(conf), stateStore);

// simulate registration with RM
MasterKey masterKey = new MasterKeyPBImpl();
masterKey.setKeyId(123);
masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
.byteValue() }));
context.getContainerTokenSecretManager().setMasterKey(masterKey);
context.getNMTokenSecretManager().setMasterKey(masterKey);
return context;
}

private StartContainersResponse startContainer(Context context, private StartContainersResponse startContainer(Context context,
final ContainerManagerImpl cm, ContainerId cid, final ContainerManagerImpl cm, ContainerId cid,
ContainerLaunchContext clc, LogAggregationContext logAggregationContext) ContainerLaunchContext clc, LogAggregationContext logAggregationContext)
Expand Down

0 comments on commit 088156d

Please sign in to comment.