Skip to content

Commit

Permalink
YARN-6127. Add support for work preserving NM restart when AMRMProxy …
Browse files Browse the repository at this point in the history
…is enabled. (Botong Huang via asuresh).
  • Loading branch information
xslogic committed Jun 22, 2017
1 parent 8dbd53e commit 49aa60e
Show file tree
Hide file tree
Showing 15 changed files with 1,001 additions and 81 deletions.
Expand Up @@ -45,12 +45,13 @@ public class AMRMProxyApplicationContextImpl implements

/**
* Create an instance of the AMRMProxyApplicationContext.
*
* @param nmContext
* @param conf
* @param applicationAttemptId
* @param user
* @param amrmToken
*
* @param nmContext NM context
* @param conf configuration
* @param applicationAttemptId attempt id
* @param user user name of the application
* @param amrmToken amrmToken issued by RM
* @param localToken amrmToken issued by AMRMProxy
*/
public AMRMProxyApplicationContextImpl(Context nmContext,
Configuration conf, ApplicationAttemptId applicationAttemptId,
Expand Down Expand Up @@ -86,6 +87,8 @@ public synchronized Token<AMRMTokenIdentifier> getAMRMToken() {

/**
* Sets the application's AMRMToken.
*
* @param amrmToken amrmToken issued by RM
*/
public synchronized void setAMRMToken(
Token<AMRMTokenIdentifier> amrmToken) {
Expand All @@ -99,6 +102,8 @@ public synchronized Token<AMRMTokenIdentifier> getLocalAMRMToken() {

/**
* Sets the application's AMRMToken.
*
* @param localToken amrmToken issued by AMRMProxy
*/
public synchronized void setLocalAMRMToken(
Token<AMRMTokenIdentifier> localToken) {
Expand Down
Expand Up @@ -64,7 +64,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;

import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
Expand All @@ -86,6 +86,10 @@ public class AMRMProxyService extends AbstractService implements
ApplicationMasterProtocol {
private static final Logger LOG = LoggerFactory
.getLogger(AMRMProxyService.class);

private static final String NMSS_USER_KEY = "user";
private static final String NMSS_AMRMTOKEN_KEY = "amrmtoken";

private Server server;
private final Context nmContext;
private final AsyncDispatcher dispatcher;
Expand All @@ -95,9 +99,9 @@ public class AMRMProxyService extends AbstractService implements

/**
* Creates an instance of the service.
*
* @param nmContext
* @param dispatcher
*
* @param nmContext NM context
* @param dispatcher NM dispatcher
*/
public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) {
super(AMRMProxyService.class.getName());
Expand All @@ -112,6 +116,14 @@ public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) {
new ApplicationEventHandler());
}

@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
this.secretManager =
new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore());
this.secretManager.init(conf);
}

@Override
protected void serviceStart() throws Exception {
LOG.info("Starting AMRMProxyService");
Expand All @@ -134,7 +146,6 @@ protected void serviceStart() throws Exception {
YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT);

this.secretManager = new AMRMProxyTokenSecretManager(serverConf);
this.secretManager.start();

this.server =
Expand All @@ -160,6 +171,62 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}

/**
* Recover from NM state store. Called after serviceInit before serviceStart.
*
* @throws IOException if recover fails
*/
public void recover() throws IOException {
LOG.info("Recovering AMRMProxyService");

RecoveredAMRMProxyState state =
this.nmContext.getNMStateStore().loadAMRMProxyState();

this.secretManager.recover(state);

LOG.info("Recovering {} running applications for AMRMProxy",
state.getAppContexts().size());
for (Map.Entry<ApplicationAttemptId, Map<String, byte[]>> entry : state
.getAppContexts().entrySet()) {
ApplicationAttemptId attemptId = entry.getKey();
LOG.info("Recovering app attempt {}", attemptId);

// Try recover for the running application attempt
try {
String user = null;
Token<AMRMTokenIdentifier> amrmToken = null;
for (Map.Entry<String, byte[]> contextEntry : entry.getValue()
.entrySet()) {
if (contextEntry.getKey().equals(NMSS_USER_KEY)) {
user = new String(contextEntry.getValue(), "UTF-8");
} else if (contextEntry.getKey().equals(NMSS_AMRMTOKEN_KEY)) {
amrmToken = new Token<>();
amrmToken.decodeFromUrlString(
new String(contextEntry.getValue(), "UTF-8"));
}
}

if (amrmToken == null) {
throw new IOException(
"No amrmToken found for app attempt " + attemptId);
}
if (user == null) {
throw new IOException("No user found for app attempt " + attemptId);
}

Token<AMRMTokenIdentifier> localToken =
this.secretManager.createAndGetAMRMToken(attemptId);

initializePipeline(attemptId, user, amrmToken, localToken,
entry.getValue(), true);
} catch (Exception e) {
LOG.error("Exception when recovering " + attemptId
+ ", removing it from NMStateStore and move on", e);
this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId);
}
}
}

/**
* This is called by the AMs started on this node to register with the RM.
* This method does the initial authorization and then forwards the request to
Expand Down Expand Up @@ -221,8 +288,8 @@ public AllocateResponse allocate(AllocateRequest request)
* application request processing pipeline.
*
* @param request - encapsulates information for starting an AM
* @throws IOException
* @throws YarnException
* @throws IOException if fails
* @throws YarnException if fails
*/
public void processApplicationStartRequest(StartContainerRequest request)
throws IOException, YarnException {
Expand Down Expand Up @@ -257,22 +324,25 @@ public void processApplicationStartRequest(StartContainerRequest request)
request.getContainerLaunchContext().setTokens(
ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));

initializePipeline(containerTokenIdentifierForKey.getContainerID()
.getApplicationAttemptId(),
containerTokenIdentifierForKey.getApplicationSubmitter(),
amrmToken, localToken);
initializePipeline(appAttemptId,
containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken,
localToken, null, false);
}

/**
* Initializes the request intercepter pipeline for the specified application.
*
* @param applicationAttemptId
* @param user
* @param amrmToken
*
* @param applicationAttemptId attempt id
* @param user user name
* @param amrmToken amrmToken issued by RM
* @param localToken amrmToken issued by AMRMProxy
* @param recoveredDataMap the recovered states for AMRMProxy from NMSS
* @param isRecovery whether this is to recover a previously existing pipeline
*/
protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
String user, Token<AMRMTokenIdentifier> amrmToken,
Token<AMRMTokenIdentifier> localToken) {
Token<AMRMTokenIdentifier> localToken,
Map<String, byte[]> recoveredDataMap, boolean isRecovery) {
RequestInterceptorChainWrapper chainWrapper = null;
synchronized (applPipelineMap) {
if (applPipelineMap
Expand All @@ -288,11 +358,23 @@ protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
&& chainWrapperBackup.getApplicationAttemptId() != null
&& !chainWrapperBackup.getApplicationAttemptId()
.equals(applicationAttemptId)) {
// TODO: revisit in AMRMProxy HA in YARN-6128
// Remove the existing pipeline
LOG.info("Remove the previous pipeline for ApplicationId: "
+ applicationAttemptId.toString());
RequestInterceptorChainWrapper pipeline =
applPipelineMap.remove(applicationAttemptId.getApplicationId());

if (!isRecovery && this.nmContext.getNMStateStore() != null) {
try {
this.nmContext.getNMStateStore()
.removeAMRMProxyAppContext(applicationAttemptId);
} catch (IOException e) {
LOG.error("Error removing AMRMProxy application context for "
+ applicationAttemptId, e);
}
}

try {
pipeline.getRootInterceptor().shutdown();
} catch (Throwable ex) {
Expand Down Expand Up @@ -324,7 +406,27 @@ protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
this.createRequestInterceptorChain();
interceptorChain.init(createApplicationMasterContext(this.nmContext,
applicationAttemptId, user, amrmToken, localToken));
if (isRecovery) {
if (recoveredDataMap == null) {
throw new YarnRuntimeException(
"null recoveredDataMap recieved for recover");
}
interceptorChain.recover(recoveredDataMap);
}
chainWrapper.init(interceptorChain, applicationAttemptId);

if (!isRecovery && this.nmContext.getNMStateStore() != null) {
try {
this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
applicationAttemptId, NMSS_USER_KEY, user.getBytes("UTF-8"));
this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
applicationAttemptId, NMSS_AMRMTOKEN_KEY,
amrmToken.encodeToUrlString().getBytes("UTF-8"));
} catch (IOException e) {
LOG.error("Error storing AMRMProxy application context entry for "
+ applicationAttemptId, e);
}
}
} catch (Exception e) {
this.applPipelineMap.remove(applicationAttemptId.getApplicationId());
throw e;
Expand All @@ -335,7 +437,7 @@ protected void initializePipeline(ApplicationAttemptId applicationAttemptId,
* Shuts down the request processing pipeline for the specified application
* attempt id.
*
* @param applicationId
* @param applicationId application id
*/
protected void stopApplication(ApplicationId applicationId) {
Preconditions.checkArgument(applicationId != null,
Expand All @@ -362,6 +464,17 @@ protected void stopApplication(ApplicationId applicationId) {
"Failed to shutdown the request processing pipeline for app:"
+ applicationId, ex);
}

// Remove the app context from NMSS after the interceptors are shutdown
if (this.nmContext.getNMStateStore() != null) {
try {
this.nmContext.getNMStateStore()
.removeAMRMProxyAppContext(pipeline.getApplicationAttemptId());
} catch (IOException e) {
LOG.error("Error removing AMRMProxy application context for "
+ applicationId, e);
}
}
}
}

Expand All @@ -383,12 +496,24 @@ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier,
// Do not propagate this info back to AM
allocateResponse.setAMRMToken(null);

org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newTokenId =
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken =
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(
token.getIdentifier().array(), token.getPassword().array(),
new Text(token.getKind()), new Text(token.getService()));

context.setAMRMToken(newTokenId);
context.setAMRMToken(newToken);

// Update the AMRMToken in context map in NM state store
if (this.nmContext.getNMStateStore() != null) {
try {
this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY,
newToken.encodeToUrlString().getBytes("UTF-8"));
} catch (IOException e) {
LOG.error("Error storing AMRMProxy application context entry for "
+ context.getApplicationAttemptId(), e);
}
}
}

// Check if the local AMRMToken is rolled up and update the context and
Expand Down Expand Up @@ -431,7 +556,7 @@ private AMRMProxyApplicationContext createApplicationMasterContext(

/**
* Gets the Request intercepter chains for all the applications.
*
*
* @return the request intercepter chains.
*/
protected Map<ApplicationId, RequestInterceptorChainWrapper> getPipelines() {
Expand Down Expand Up @@ -489,7 +614,7 @@ protected RequestInterceptor createRequestInterceptorChain() {
/**
* Returns the comma separated intercepter class names from the configuration.
*
* @param conf
* @param conf configuration
* @return the intercepter class names as an instance of ArrayList
*/
private List<String> getInterceptorClassNames(Configuration conf) {
Expand Down Expand Up @@ -518,7 +643,7 @@ private List<String> getInterceptorClassNames(Configuration conf) {
* processing pipeline.
*
* @return the the intercepter wrapper instance
* @throws YarnException
* @throws YarnException if fails
*/
private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain()
throws YarnException {
Expand Down Expand Up @@ -612,8 +737,8 @@ public static class RequestInterceptorChainWrapper {
/**
* Initializes the wrapper with the specified parameters.
*
* @param rootInterceptor
* @param applicationAttemptId
* @param rootInterceptor the root request intercepter
* @param applicationAttemptId attempt id
*/
public synchronized void init(RequestInterceptor rootInterceptor,
ApplicationAttemptId applicationAttemptId) {
Expand All @@ -623,7 +748,7 @@ public synchronized void init(RequestInterceptor rootInterceptor,

/**
* Gets the root request intercepter.
*
*
* @return the root request intercepter
*/
public synchronized RequestInterceptor getRootInterceptor() {
Expand All @@ -632,7 +757,7 @@ public synchronized RequestInterceptor getRootInterceptor() {

/**
* Gets the application attempt identifier.
*
*
* @return the application attempt identifier
*/
public synchronized ApplicationAttemptId getApplicationAttemptId() {
Expand All @@ -641,7 +766,7 @@ public synchronized ApplicationAttemptId getApplicationAttemptId() {

/**
* Gets the application identifier.
*
*
* @return the application identifier
*/
public synchronized ApplicationId getApplicationId() {
Expand Down

0 comments on commit 49aa60e

Please sign in to comment.