diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java index 6d4fdfc3dc9a3..9938b370b24ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyApplicationContextImpl.java @@ -45,12 +45,13 @@ public class AMRMProxyApplicationContextImpl implements /** * Create an instance of the AMRMProxyApplicationContext. - * - * @param nmContext - * @param conf - * @param applicationAttemptId - * @param user - * @param amrmToken + * + * @param nmContext NM context + * @param conf configuration + * @param applicationAttemptId attempt id + * @param user user name of the application + * @param amrmToken amrmToken issued by RM + * @param localToken amrmToken issued by AMRMProxy */ public AMRMProxyApplicationContextImpl(Context nmContext, Configuration conf, ApplicationAttemptId applicationAttemptId, @@ -86,6 +87,8 @@ public synchronized Token getAMRMToken() { /** * Sets the application's AMRMToken. + * + * @param amrmToken amrmToken issued by RM */ public synchronized void setAMRMToken( Token amrmToken) { @@ -99,6 +102,8 @@ public synchronized Token getLocalAMRMToken() { /** * Sets the application's AMRMToken. + * + * @param localToken amrmToken issued by AMRMProxy */ public synchronized void setLocalAMRMToken( Token localToken) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index aeb3be8aa627a..d63b2cf589b5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -64,7 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; - +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.scheduler.DistributedScheduler; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -86,6 +86,10 @@ public class AMRMProxyService extends AbstractService implements ApplicationMasterProtocol { private static final Logger LOG = LoggerFactory .getLogger(AMRMProxyService.class); + + private static final String NMSS_USER_KEY = "user"; + private static final String NMSS_AMRMTOKEN_KEY = "amrmtoken"; + private Server server; private final Context nmContext; private final AsyncDispatcher dispatcher; @@ -95,9 +99,9 @@ public class AMRMProxyService extends AbstractService implements /** * Creates an instance of the service. - * - * @param nmContext - * @param dispatcher + * + * @param nmContext NM context + * @param dispatcher NM dispatcher */ public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) { super(AMRMProxyService.class.getName()); @@ -112,6 +116,14 @@ public AMRMProxyService(Context nmContext, AsyncDispatcher dispatcher) { new ApplicationEventHandler()); } + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + this.secretManager = + new AMRMProxyTokenSecretManager(this.nmContext.getNMStateStore()); + this.secretManager.init(conf); + } + @Override protected void serviceStart() throws Exception { LOG.info("Starting AMRMProxyService"); @@ -134,7 +146,6 @@ protected void serviceStart() throws Exception { YarnConfiguration.AMRM_PROXY_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_AMRM_PROXY_CLIENT_THREAD_COUNT); - this.secretManager = new AMRMProxyTokenSecretManager(serverConf); this.secretManager.start(); this.server = @@ -160,6 +171,62 @@ protected void serviceStop() throws Exception { super.serviceStop(); } + /** + * Recover from NM state store. Called after serviceInit before serviceStart. + * + * @throws IOException if recover fails + */ + public void recover() throws IOException { + LOG.info("Recovering AMRMProxyService"); + + RecoveredAMRMProxyState state = + this.nmContext.getNMStateStore().loadAMRMProxyState(); + + this.secretManager.recover(state); + + LOG.info("Recovering {} running applications for AMRMProxy", + state.getAppContexts().size()); + for (Map.Entry> entry : state + .getAppContexts().entrySet()) { + ApplicationAttemptId attemptId = entry.getKey(); + LOG.info("Recovering app attempt {}", attemptId); + + // Try recover for the running application attempt + try { + String user = null; + Token amrmToken = null; + for (Map.Entry contextEntry : entry.getValue() + .entrySet()) { + if (contextEntry.getKey().equals(NMSS_USER_KEY)) { + user = new String(contextEntry.getValue(), "UTF-8"); + } else if (contextEntry.getKey().equals(NMSS_AMRMTOKEN_KEY)) { + amrmToken = new Token<>(); + amrmToken.decodeFromUrlString( + new String(contextEntry.getValue(), "UTF-8")); + } + } + + if (amrmToken == null) { + throw new IOException( + "No amrmToken found for app attempt " + attemptId); + } + if (user == null) { + throw new IOException("No user found for app attempt " + attemptId); + } + + Token localToken = + this.secretManager.createAndGetAMRMToken(attemptId); + + initializePipeline(attemptId, user, amrmToken, localToken, + entry.getValue(), true); + } catch (Exception e) { + LOG.error("Exception when recovering " + attemptId + + ", removing it from NMStateStore and move on", e); + this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId); + } + } + } + /** * This is called by the AMs started on this node to register with the RM. * This method does the initial authorization and then forwards the request to @@ -221,8 +288,8 @@ public AllocateResponse allocate(AllocateRequest request) * application request processing pipeline. * * @param request - encapsulates information for starting an AM - * @throws IOException - * @throws YarnException + * @throws IOException if fails + * @throws YarnException if fails */ public void processApplicationStartRequest(StartContainerRequest request) throws IOException, YarnException { @@ -257,22 +324,25 @@ public void processApplicationStartRequest(StartContainerRequest request) request.getContainerLaunchContext().setTokens( ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); - initializePipeline(containerTokenIdentifierForKey.getContainerID() - .getApplicationAttemptId(), - containerTokenIdentifierForKey.getApplicationSubmitter(), - amrmToken, localToken); + initializePipeline(appAttemptId, + containerTokenIdentifierForKey.getApplicationSubmitter(), amrmToken, + localToken, null, false); } /** * Initializes the request intercepter pipeline for the specified application. - * - * @param applicationAttemptId - * @param user - * @param amrmToken + * + * @param applicationAttemptId attempt id + * @param user user name + * @param amrmToken amrmToken issued by RM + * @param localToken amrmToken issued by AMRMProxy + * @param recoveredDataMap the recovered states for AMRMProxy from NMSS + * @param isRecovery whether this is to recover a previously existing pipeline */ protected void initializePipeline(ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, - Token localToken) { + Token localToken, + Map recoveredDataMap, boolean isRecovery) { RequestInterceptorChainWrapper chainWrapper = null; synchronized (applPipelineMap) { if (applPipelineMap @@ -288,11 +358,23 @@ protected void initializePipeline(ApplicationAttemptId applicationAttemptId, && chainWrapperBackup.getApplicationAttemptId() != null && !chainWrapperBackup.getApplicationAttemptId() .equals(applicationAttemptId)) { + // TODO: revisit in AMRMProxy HA in YARN-6128 // Remove the existing pipeline LOG.info("Remove the previous pipeline for ApplicationId: " + applicationAttemptId.toString()); RequestInterceptorChainWrapper pipeline = applPipelineMap.remove(applicationAttemptId.getApplicationId()); + + if (!isRecovery && this.nmContext.getNMStateStore() != null) { + try { + this.nmContext.getNMStateStore() + .removeAMRMProxyAppContext(applicationAttemptId); + } catch (IOException e) { + LOG.error("Error removing AMRMProxy application context for " + + applicationAttemptId, e); + } + } + try { pipeline.getRootInterceptor().shutdown(); } catch (Throwable ex) { @@ -324,7 +406,27 @@ protected void initializePipeline(ApplicationAttemptId applicationAttemptId, this.createRequestInterceptorChain(); interceptorChain.init(createApplicationMasterContext(this.nmContext, applicationAttemptId, user, amrmToken, localToken)); + if (isRecovery) { + if (recoveredDataMap == null) { + throw new YarnRuntimeException( + "null recoveredDataMap recieved for recover"); + } + interceptorChain.recover(recoveredDataMap); + } chainWrapper.init(interceptorChain, applicationAttemptId); + + if (!isRecovery && this.nmContext.getNMStateStore() != null) { + try { + this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( + applicationAttemptId, NMSS_USER_KEY, user.getBytes("UTF-8")); + this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( + applicationAttemptId, NMSS_AMRMTOKEN_KEY, + amrmToken.encodeToUrlString().getBytes("UTF-8")); + } catch (IOException e) { + LOG.error("Error storing AMRMProxy application context entry for " + + applicationAttemptId, e); + } + } } catch (Exception e) { this.applPipelineMap.remove(applicationAttemptId.getApplicationId()); throw e; @@ -335,7 +437,7 @@ protected void initializePipeline(ApplicationAttemptId applicationAttemptId, * Shuts down the request processing pipeline for the specified application * attempt id. * - * @param applicationId + * @param applicationId application id */ protected void stopApplication(ApplicationId applicationId) { Preconditions.checkArgument(applicationId != null, @@ -362,6 +464,17 @@ protected void stopApplication(ApplicationId applicationId) { "Failed to shutdown the request processing pipeline for app:" + applicationId, ex); } + + // Remove the app context from NMSS after the interceptors are shutdown + if (this.nmContext.getNMStateStore() != null) { + try { + this.nmContext.getNMStateStore() + .removeAMRMProxyAppContext(pipeline.getApplicationAttemptId()); + } catch (IOException e) { + LOG.error("Error removing AMRMProxy application context for " + + applicationId, e); + } + } } } @@ -383,12 +496,24 @@ private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, // Do not propagate this info back to AM allocateResponse.setAMRMToken(null); - org.apache.hadoop.security.token.Token newTokenId = + org.apache.hadoop.security.token.Token newToken = new org.apache.hadoop.security.token.Token( token.getIdentifier().array(), token.getPassword().array(), new Text(token.getKind()), new Text(token.getService())); - context.setAMRMToken(newTokenId); + context.setAMRMToken(newToken); + + // Update the AMRMToken in context map in NM state store + if (this.nmContext.getNMStateStore() != null) { + try { + this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry( + context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY, + newToken.encodeToUrlString().getBytes("UTF-8")); + } catch (IOException e) { + LOG.error("Error storing AMRMProxy application context entry for " + + context.getApplicationAttemptId(), e); + } + } } // Check if the local AMRMToken is rolled up and update the context and @@ -431,7 +556,7 @@ private AMRMProxyApplicationContext createApplicationMasterContext( /** * Gets the Request intercepter chains for all the applications. - * + * * @return the request intercepter chains. */ protected Map getPipelines() { @@ -489,7 +614,7 @@ protected RequestInterceptor createRequestInterceptorChain() { /** * Returns the comma separated intercepter class names from the configuration. * - * @param conf + * @param conf configuration * @return the intercepter class names as an instance of ArrayList */ private List getInterceptorClassNames(Configuration conf) { @@ -518,7 +643,7 @@ private List getInterceptorClassNames(Configuration conf) { * processing pipeline. * * @return the the intercepter wrapper instance - * @throws YarnException + * @throws YarnException if fails */ private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain() throws YarnException { @@ -612,8 +737,8 @@ public static class RequestInterceptorChainWrapper { /** * Initializes the wrapper with the specified parameters. * - * @param rootInterceptor - * @param applicationAttemptId + * @param rootInterceptor the root request intercepter + * @param applicationAttemptId attempt id */ public synchronized void init(RequestInterceptor rootInterceptor, ApplicationAttemptId applicationAttemptId) { @@ -623,7 +748,7 @@ public synchronized void init(RequestInterceptor rootInterceptor, /** * Gets the root request intercepter. - * + * * @return the root request intercepter */ public synchronized RequestInterceptor getRootInterceptor() { @@ -632,7 +757,7 @@ public synchronized RequestInterceptor getRootInterceptor() { /** * Gets the application attempt identifier. - * + * * @return the application attempt identifier */ public synchronized ApplicationAttemptId getApplicationAttemptId() { @@ -641,7 +766,7 @@ public synchronized ApplicationAttemptId getApplicationAttemptId() { /** * Gets the application identifier. - * + * * @return the application identifier */ public synchronized ApplicationId getApplicationId() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java index d09ce41d08239..aa3c70f069666 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyTokenSecretManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; +import java.io.IOException; import java.security.SecureRandom; import java.util.HashSet; import java.util.Set; @@ -37,6 +38,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.security.MasterKeyData; import com.google.common.annotations.VisibleForTesting; @@ -60,17 +64,24 @@ public class AMRMProxyTokenSecretManager extends private final Lock writeLock = readWriteLock.writeLock(); private final Timer timer; - private final long rollingInterval; - private final long activationDelay; + private long rollingInterval; + private long activationDelay; + + private NMStateStoreService nmStateStore; private final Set appAttemptSet = new HashSet(); /** * Create an {@link AMRMProxyTokenSecretManager}. + * @param nmStateStoreService NM state store */ - public AMRMProxyTokenSecretManager(Configuration conf) { + public AMRMProxyTokenSecretManager(NMStateStoreService nmStateStoreService) { this.timer = new Timer(); + this.nmStateStore = nmStateStoreService; + } + + public void init(Configuration conf) { this.rollingInterval = conf.getLong( YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, @@ -94,6 +105,14 @@ public AMRMProxyTokenSecretManager(Configuration conf) { public void start() { if (this.currentMasterKey == null) { this.currentMasterKey = createNewMasterKey(); + if (this.nmStateStore != null) { + try { + this.nmStateStore.storeAMRMProxyCurrentMasterKey( + this.currentMasterKey.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to update current master key in state store", e); + } + } } this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval, rollingInterval); @@ -103,6 +122,11 @@ public void stop() { this.timer.cancel(); } + @VisibleForTesting + public void setNMStateStoreService(NMStateStoreService nmStateStoreService) { + this.nmStateStore = nmStateStoreService; + } + public void applicationMasterFinished(ApplicationAttemptId appAttemptId) { this.writeLock.lock(); try { @@ -122,11 +146,21 @@ public void run() { } @Private - void rollMasterKey() { + @VisibleForTesting + public void rollMasterKey() { this.writeLock.lock(); try { LOG.info("Rolling master-key for amrm-tokens"); this.nextMasterKey = createNewMasterKey(); + if (this.nmStateStore != null) { + try { + this.nmStateStore + .storeAMRMProxyNextMasterKey(this.nextMasterKey.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to update next master key in state store", e); + } + } + this.timer.schedule(new NextKeyActivator(), this.activationDelay); } finally { this.writeLock.unlock(); @@ -140,6 +174,8 @@ public void run() { } } + @Private + @VisibleForTesting public void activateNextMasterKey() { this.writeLock.lock(); try { @@ -147,6 +183,15 @@ public void activateNextMasterKey() { + this.nextMasterKey.getMasterKey().getKeyId()); this.currentMasterKey = this.nextMasterKey; this.nextMasterKey = null; + if (this.nmStateStore != null) { + try { + this.nmStateStore.storeAMRMProxyCurrentMasterKey( + this.currentMasterKey.getMasterKey()); + this.nmStateStore.storeAMRMProxyNextMasterKey(null); + } catch (IOException e) { + LOG.error("Unable to update current master key in state store", e); + } + } } finally { this.writeLock.unlock(); } @@ -237,6 +282,17 @@ public AMRMTokenIdentifier createIdentifier() { return new AMRMTokenIdentifier(); } + @Private + @VisibleForTesting + public MasterKeyData getCurrentMasterKeyData() { + this.readLock.lock(); + try { + return this.currentMasterKey; + } finally { + this.readLock.unlock(); + } + } + @Private @VisibleForTesting public MasterKeyData getNextMasterKeyData() { @@ -262,4 +318,33 @@ protected byte[] createPassword(AMRMTokenIdentifier identifier) { this.readLock.unlock(); } } + + /** + * Recover secretManager from state store. Called after serviceInit before + * serviceStart. + * + * @param state the state to recover from + */ + public void recover(RecoveredAMRMProxyState state) { + if (state != null) { + // recover the current master key + MasterKey currentKey = state.getCurrentMasterKey(); + if (currentKey != null) { + this.currentMasterKey = new MasterKeyData(currentKey, + createSecretKey(currentKey.getBytes().array())); + } else { + LOG.warn("No current master key recovered from NM StateStore" + + " for AMRMProxyTokenSecretManager"); + } + + // recover the next master key if not null + MasterKey nextKey = state.getNextMasterKey(); + if (nextKey != null) { + this.nextMasterKey = new MasterKeyData(nextKey, + createSecretKey(nextKey.getBytes().array())); + this.timer.schedule(new NextKeyActivator(), this.activationDelay); + } + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java index e6c9bbdce6110..93b42cb5e560b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AbstractRequestInterceptor.java @@ -18,16 +18,17 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; -import org.apache.hadoop.conf.Configuration; +import java.io.IOException; +import java.util.Map; import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse; - -import java.io.IOException; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; /** * Implements the RequestInterceptor interface and provides common functionality @@ -82,6 +83,16 @@ public void init(AMRMProxyApplicationContext appContext) { } } + /** + * Recover {@link RequestInterceptor} state from store. + */ + @Override + public void recover(Map recoveredDataMap) { + if (this.nextInterceptor != null) { + this.nextInterceptor.recover(recoveredDataMap); + } + } + /** * Disposes the {@link RequestInterceptor}. */ @@ -113,8 +124,8 @@ public AMRMProxyApplicationContext getApplicationContext() { * * @param request ApplicationMaster allocate request * @return Distribtued Scheduler Allocate Response - * @throws YarnException - * @throws IOException + * @throws YarnException if fails + * @throws IOException if fails */ @Override public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( @@ -130,8 +141,8 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( * * @param request ApplicationMaster registration request * @return Distributed Scheduler Register Response - * @throws YarnException - * @throws IOException + * @throws YarnException if fails + * @throws IOException if fails */ @Override public RegisterDistributedSchedulingAMResponse @@ -141,4 +152,16 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling( return (this.nextInterceptor != null) ? this.nextInterceptor .registerApplicationMasterForDistributedScheduling(request) : null; } + + /** + * A helper method for getting NM state store. + * + * @return the NMSS instance + */ + public NMStateStoreService getNMStateStore() { + if (this.appContext == null || this.appContext.getNMCotext() == null) { + return null; + } + return this.appContext.getNMCotext().getNMStateStore(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java index 5995af16e10ca..950bdcc043048 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/RequestInterceptor.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy; +import java.util.Map; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol; @@ -32,10 +34,24 @@ public interface RequestInterceptor extends DistributedSchedulingAMProtocol, * This method is called for initializing the intercepter. This is guaranteed * to be called only once in the lifetime of this instance. * - * @param ctx + * @param ctx AMRMProxy application context */ void init(AMRMProxyApplicationContext ctx); + /** + * Recover intercepter state when NM recovery is enabled. AMRMProxy will + * recover the data map into + * AMRMProxyApplicationContext.getRecoveredDataMap(). All intercepters should + * recover state from it. + * + * For example, registerRequest has to be saved by the last intercepter (i.e. + * the one that actually connects to RM), in order to re-register when RM + * fails over. + * + * @param recoveredDataMap states for all intercepters recovered from NMSS + */ + void recover(Map recoveredDataMap); + /** * This method is called to release the resources held by the intercepter. * This will be called when the application pipeline is being destroyed. The @@ -51,7 +67,7 @@ public interface RequestInterceptor extends DistributedSchedulingAMProtocol, * send the messages to the resource manager service and so the last * intercepter will not receive this method call. * - * @param nextInterceptor + * @param nextInterceptor the next intercepter to set */ void setNextInterceptor(RequestInterceptor nextInterceptor); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index cbf617b6d1eb9..b1d634aaebf0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -341,6 +341,10 @@ private void recover() throws IOException, URISyntaxException { rsrcLocalizationSrvc.recoverLocalizedResources( stateStore.loadLocalizationState()); + if (this.amrmProxyEnabled) { + this.getAMRMProxyService().recover(); + } + RecoveredApplicationsState appsState = stateStore.loadApplicationsState(); for (ContainerManagerApplicationProto proto : appsState.getApplications()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index d1b9d65ac8d26..a0502df1f6fdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -24,12 +24,15 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Timer; import java.util.TimerTask; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -81,9 +84,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String DB_NAME = "yarn-nm-state"; private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version"; - - private static final Version CURRENT_VERSION_INFO = Version - .newInstance(2, 0); + + private static final Version CURRENT_VERSION_INFO = Version.newInstance(3, 0); private static final String DELETION_TASK_KEY_PREFIX = "DeletionService/deltask_"; @@ -122,6 +124,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; + private static final String NEXT_MASTER_KEY_SUFFIX = "NextMasterKey"; private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/"; private static final String NM_TOKENS_CURRENT_MASTER_KEY = NM_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX; @@ -136,6 +139,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String LOG_DELETER_KEY_PREFIX = "LogDeleters/"; + private static final String AMRMPROXY_KEY_PREFIX = "AMRMProxy/"; + private static final byte[] EMPTY_VALUE = new byte[0]; private DB db; @@ -1125,6 +1130,177 @@ private String getLogDeleterKey(ApplicationId appId) { return LOG_DELETER_KEY_PREFIX + appId; } + @Override + public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException { + RecoveredAMRMProxyState result = new RecoveredAMRMProxyState(); + Set unknownKeys = new HashSet<>(); + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(AMRMPROXY_KEY_PREFIX)); + while (iter.hasNext()) { + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(AMRMPROXY_KEY_PREFIX)) { + break; + } + + String suffix = key.substring(AMRMPROXY_KEY_PREFIX.length()); + if (suffix.equals(CURRENT_MASTER_KEY_SUFFIX)) { + iter.next(); + result.setCurrentMasterKey(parseMasterKey(entry.getValue())); + LOG.info("Recovered for AMRMProxy: current master key id " + + result.getCurrentMasterKey().getKeyId()); + + } else if (suffix.equals(NEXT_MASTER_KEY_SUFFIX)) { + iter.next(); + result.setNextMasterKey(parseMasterKey(entry.getValue())); + LOG.info("Recovered for AMRMProxy: next master key id " + + result.getNextMasterKey().getKeyId()); + + } else { // Load AMRMProxy application context map for an app attempt + // Parse appAttemptId, also handle the unknown keys + int idEndPos; + ApplicationAttemptId attemptId; + try { + idEndPos = key.indexOf('/', AMRMPROXY_KEY_PREFIX.length()); + if (idEndPos < 0) { + throw new IOException( + "Unable to determine attemptId in key: " + key); + } + attemptId = ApplicationAttemptId.fromString( + key.substring(AMRMPROXY_KEY_PREFIX.length(), idEndPos)); + } catch (Exception e) { + // Try to move on for back-forward compatibility + LOG.warn("Unknown key " + key + ", remove and move on", e); + // Do this because iter.remove() is not supported here + unknownKeys.add(key); + continue; + } + // Parse the context map for the appAttemptId + Map appContext = + loadAMRMProxyAppContextMap(iter, key.substring(0, idEndPos + 1)); + result.getAppContexts().put(attemptId, appContext); + + LOG.info("Recovered for AMRMProxy: " + attemptId + ", map size " + + appContext.size()); + } + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + + // Delete all unknown keys + try { + for (String key : unknownKeys) { + db.delete(bytes(key)); + } + } catch (DBException e) { + throw new IOException(e); + } + + return result; + } + + private Map loadAMRMProxyAppContextMap(LeveldbIterator iter, + String keyPrefix) throws IOException { + Map appContextMap = new HashMap<>(); + while (iter.hasNext()) { + Entry entry = iter.peekNext(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + iter.next(); + String suffix = key.substring(keyPrefix.length()); + byte[] data = entry.getValue(); + appContextMap.put(suffix, Arrays.copyOf(data, data.length)); + } + return appContextMap; + } + + @Override + public void storeAMRMProxyCurrentMasterKey(MasterKey key) throws IOException { + storeMasterKey(AMRMPROXY_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX, key); + } + + @Override + public void storeAMRMProxyNextMasterKey(MasterKey key) throws IOException { + String dbkey = AMRMPROXY_KEY_PREFIX + NEXT_MASTER_KEY_SUFFIX; + if (key == null) { + // When key is null, delete the entry instead + try { + db.delete(bytes(dbkey)); + } catch (DBException e) { + throw new IOException(e); + } + return; + } + storeMasterKey(dbkey, key); + } + + @Override + public void storeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, + String key, byte[] data) throws IOException { + String fullkey = AMRMPROXY_KEY_PREFIX + attempt + "/" + key; + try { + db.put(bytes(fullkey), data); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void removeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, + String key) throws IOException { + String fullkey = AMRMPROXY_KEY_PREFIX + attempt + "/" + key; + try { + db.delete(bytes(fullkey)); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) + throws IOException { + Set candidates = new HashSet<>(); + String keyPrefix = AMRMPROXY_KEY_PREFIX + attempt + "/"; + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(keyPrefix)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + if (!key.startsWith(keyPrefix)) { + break; + } + // Do this because iter.remove() is not supported here + candidates.add(key); + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + + // Delete all candidate keys + try { + for (String key : candidates) { + db.delete(bytes(key)); + } + } catch (DBException e) { + throw new IOException(e); + } + } + @Override protected void initStorage(Configuration conf) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 4bcdf5cd7c569..96c3f9e5c6f60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -228,6 +228,35 @@ public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto) public void removeLogDeleter(ApplicationId appId) throws IOException { } + @Override + public RecoveredAMRMProxyState loadAMRMProxyState() throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void storeAMRMProxyCurrentMasterKey(MasterKey key) throws IOException { + } + + @Override + public void storeAMRMProxyNextMasterKey(MasterKey key) throws IOException { + } + + @Override + public void storeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, + String key, byte[] data) throws IOException { + } + + @Override + public void removeAMRMProxyAppContextEntry(ApplicationAttemptId attempt, + String key) throws IOException { + } + + @Override + public void removeAMRMProxyAppContext(ApplicationAttemptId attempt) + throws IOException { + } + @Override protected void initStorage(Configuration conf) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 9dd1eb06fe961..9f87279fb250b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -266,6 +266,41 @@ public Map getLogDeleterMap() { } } + /** + * Recovered states for AMRMProxy. + */ + public static class RecoveredAMRMProxyState { + private MasterKey currentMasterKey; + private MasterKey nextMasterKey; + // For each app, stores amrmToken, user name, as well as various AMRMProxy + // intercepter states + private Map> appContexts; + + public RecoveredAMRMProxyState() { + appContexts = new HashMap<>(); + } + + public MasterKey getCurrentMasterKey() { + return currentMasterKey; + } + + public MasterKey getNextMasterKey() { + return nextMasterKey; + } + + public Map> getAppContexts() { + return appContexts; + } + + public void setCurrentMasterKey(MasterKey currentKey) { + currentMasterKey = currentKey; + } + + public void setNextMasterKey(MasterKey nextKey) { + nextMasterKey = nextKey; + } + } + /** Initialize the state storage */ @Override public void serviceInit(Configuration conf) throws IOException { @@ -601,6 +636,57 @@ public abstract void storeLogDeleter(ApplicationId appId, public abstract void removeLogDeleter(ApplicationId appId) throws IOException; + /** + * Load the state of AMRMProxy. + * @return recovered state of AMRMProxy + * @throws IOException if fails + */ + public abstract RecoveredAMRMProxyState loadAMRMProxyState() + throws IOException; + + /** + * Record the current AMRMProxyTokenSecretManager master key. + * @param key the current master key + * @throws IOException if fails + */ + public abstract void storeAMRMProxyCurrentMasterKey(MasterKey key) + throws IOException; + + /** + * Record the next AMRMProxyTokenSecretManager master key. + * @param key the next master key + * @throws IOException if fails + */ + public abstract void storeAMRMProxyNextMasterKey(MasterKey key) + throws IOException; + + /** + * Add a context entry for an application attempt in AMRMProxyService. + * @param attempt app attempt ID + * @param key key string + * @param data state data to store + * @throws IOException if fails + */ + public abstract void storeAMRMProxyAppContextEntry( + ApplicationAttemptId attempt, String key, byte[] data) throws IOException; + + /** + * Remove a context entry for an application attempt in AMRMProxyService. + * @param attempt attempt ID + * @param key key string + * @throws IOException if fails + */ + public abstract void removeAMRMProxyAppContextEntry( + ApplicationAttemptId attempt, String key) throws IOException; + + /** + * Remove the entire context map for an application attempt in + * AMRMProxyService. + * @param attempt attempt ID + * @throws IOException if fails + */ + public abstract void removeAMRMProxyAppContext(ApplicationAttemptId attempt) + throws IOException; protected abstract void initStorage(Configuration conf) throws IOException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 6f5009eebf11f..a24c83b4cb782 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; @@ -62,12 +63,13 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; - +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; @@ -87,17 +89,15 @@ public abstract class BaseAMRMProxyTest { private static final Log LOG = LogFactory .getLog(BaseAMRMProxyTest.class); - /** - * The AMRMProxyService instance that will be used by all the test cases - */ + // The AMRMProxyService instance that will be used by all the test cases private MockAMRMProxyService amrmProxyService; - /** - * Thread pool used for asynchronous operations - */ + + // Thread pool used for asynchronous operations private static ExecutorService threadpool = Executors .newCachedThreadPool(); private Configuration conf; private AsyncDispatcher dispatcher; + private Context nmContext; protected MockAMRMProxyService getAMRMProxyService() { Assert.assertNotNull(this.amrmProxyService); @@ -105,32 +105,40 @@ protected MockAMRMProxyService getAMRMProxyService() { } @Before - public void setUp() { - this.conf = new YarnConfiguration(); - this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + public void setUp() throws IOException { + this.conf = createConfiguration(); + this.dispatcher = new AsyncDispatcher(); + this.dispatcher.init(this.conf); + this.dispatcher.start(); + createAndStartAMRMProxyService(this.conf); + } + + protected YarnConfiguration createConfiguration() { + YarnConfiguration config = new YarnConfiguration(); + config.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); String mockPassThroughInterceptorClass = PassThroughRequestInterceptor.class.getName(); // Create a request intercepter pipeline for testing. The last one in the // chain will call the mock resource manager. The others in the chain will // simply forward it to the next one in the chain - this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, - mockPassThroughInterceptorClass + "," - + mockPassThroughInterceptorClass + "," - + mockPassThroughInterceptorClass + "," + config.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + mockPassThroughInterceptorClass + "," + MockRequestInterceptor.class.getName()); - this.dispatcher = new AsyncDispatcher(); - this.dispatcher.init(this.conf); - this.dispatcher.start(); - createAndStartAMRMProxyService(this.conf); + config.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + return config; } @After public void tearDown() { - amrmProxyService.stop(); - amrmProxyService = null; + this.amrmProxyService.stop(); + this.amrmProxyService = null; this.dispatcher.stop(); + if (this.nmContext.getNMStateStore() != null) { + this.nmContext.getNMStateStore().stop(); + } } protected ExecutorService getThreadPool() { @@ -141,17 +149,33 @@ protected Configuration getConf() { return this.conf; } - protected void createAndStartAMRMProxyService(Configuration config) { + protected AsyncDispatcher getDispatcher() { + return this.dispatcher; + } + + protected void createAndStartAMRMProxyService(Configuration config) + throws IOException { // Stop the existing instance first if not null if (this.amrmProxyService != null) { this.amrmProxyService.stop(); } + if (this.nmContext == null) { + this.nmContext = createContext(); + } this.amrmProxyService = - new MockAMRMProxyService(new NullContext(), dispatcher); + new MockAMRMProxyService(this.nmContext, this.dispatcher); this.amrmProxyService.init(config); + this.amrmProxyService.recover(); this.amrmProxyService.start(); } + protected Context createContext() { + NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(this.conf); + stateStore.start(); + return new NMContext(null, null, null, null, stateStore, false, this.conf); + } + /** * This helper method will invoke the specified function in parallel for each * end point in the specified list using a thread pool and return the @@ -579,6 +603,13 @@ public MockAMRMProxyService(Context nmContext, super(nmContext, dispatcher); } + @Override + protected void serviceStart() throws Exception { + // Override this method and do nothing to avoid the base class from + // listening to server end point + getSecretManager().start(); + } + /** * This method is used by the test code to initialize the pipeline. In the * actual service, the initialization is called by the @@ -588,7 +619,8 @@ public MockAMRMProxyService(Context nmContext, * @param user */ public void initApp(ApplicationAttemptId applicationId, String user) { - super.initializePipeline(applicationId, user, null, null); + super.initializePipeline(applicationId, user, + new Token(), null, null, false); } public void stopApp(ApplicationId applicationId) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java index fa17f2668e67b..e734bdd63d1a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java @@ -26,9 +26,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -38,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -48,6 +52,8 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest { private static final Log LOG = LogFactory .getLog(TestAMRMProxyService.class); + private static MockResourceManagerFacade mockRM; + /** * Test if the pipeline is created properly. */ @@ -99,9 +105,11 @@ public void testRegisterOneApplicationMaster() throws Exception { /** * Tests the case when interceptor pipeline initialization fails. + * + * @throws IOException */ @Test - public void testInterceptorInitFailure() { + public void testInterceptorInitFailure() throws IOException { Configuration conf = this.getConf(); // Override with a bad interceptor configuration conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, @@ -434,8 +442,8 @@ public void testMultipleAttemptsSameNode() // Second Attempt applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2); - getAMRMProxyService().initializePipeline(applicationAttemptId, user, null, - null); + getAMRMProxyService().initializePipeline(applicationAttemptId, user, + new Token(), null, null, false); RequestInterceptorChainWrapper chain2 = getAMRMProxyService().getPipelines().get(appId); @@ -559,4 +567,109 @@ private void releaseContainersAndAssert(int appId, Assert.assertEquals(relList.size(), containersForReleasedContainerIds.size()); } + + /** + * Test AMRMProxy restart with recovery. + */ + @Test + public void testRecovery() throws YarnException, Exception { + + Configuration conf = createConfiguration(); + // Use the MockRequestInterceptorAcrossRestart instead for the chain + conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, + MockRequestInterceptorAcrossRestart.class.getName()); + + mockRM = new MockResourceManagerFacade(new YarnConfiguration(conf), 0); + + createAndStartAMRMProxyService(conf); + + int testAppId1 = 1; + RegisterApplicationMasterResponse registerResponse = + registerApplicationMaster(testAppId1); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(testAppId1), + registerResponse.getQueue()); + + int testAppId2 = 2; + registerResponse = registerApplicationMaster(testAppId2); + Assert.assertNotNull(registerResponse); + Assert.assertEquals(Integer.toString(testAppId2), + registerResponse.getQueue()); + + AllocateResponse allocateResponse = allocate(testAppId2); + Assert.assertNotNull(allocateResponse); + + // At the time of kill, app1 just registerAM, app2 already did one allocate. + // Both application should be recovered + createAndStartAMRMProxyService(conf); + Assert.assertTrue(getAMRMProxyService().getPipelines().size() == 2); + + allocateResponse = allocate(testAppId1); + Assert.assertNotNull(allocateResponse); + + FinishApplicationMasterResponse finshResponse = + finishApplicationMaster(testAppId1, FinalApplicationStatus.SUCCEEDED); + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + + allocateResponse = allocate(testAppId2); + Assert.assertNotNull(allocateResponse); + + finshResponse = + finishApplicationMaster(testAppId2, FinalApplicationStatus.SUCCEEDED); + + Assert.assertNotNull(finshResponse); + Assert.assertEquals(true, finshResponse.getIsUnregistered()); + + int testAppId3 = 3; + try { + // Try to finish an application master that is not registered. + finishApplicationMaster(testAppId3, FinalApplicationStatus.SUCCEEDED); + Assert + .fail("The Mock RM should complain about not knowing the third app"); + } catch (Throwable ex) { + } + + mockRM = null; + } + + /** + * A mock intercepter implementation that uses the same mockRM instance across + * restart. + */ + public static class MockRequestInterceptorAcrossRestart + extends AbstractRequestInterceptor { + + public MockRequestInterceptorAcrossRestart() { + } + + @Override + public void init(AMRMProxyApplicationContext appContext) { + super.init(appContext); + if (mockRM == null) { + throw new RuntimeException("mockRM not initialized yet"); + } + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnException, IOException { + return mockRM.registerApplicationMaster(request); + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnException, IOException { + return mockRM.finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnException, IOException { + return mockRM.allocate(request); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyTokenSecretManager.java index 927563ecce7b3..db7167d4ebced 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyTokenSecretManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -40,11 +41,19 @@ public class TestAMRMProxyTokenSecretManager { private YarnConfiguration conf; private AMRMProxyTokenSecretManager secretManager; + private NMMemoryStateStoreService stateStore; @Before public void setup() { conf = new YarnConfiguration(); - secretManager = new AMRMProxyTokenSecretManager(conf); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + + stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + + secretManager = new AMRMProxyTokenSecretManager(stateStore); + secretManager.init(conf); secretManager.start(); } @@ -53,6 +62,9 @@ public void breakdown() { if (secretManager != null) { secretManager.stop(); } + if (stateStore != null) { + stateStore.stop(); + } } @Test @@ -78,4 +90,52 @@ public void testNormalCase() throws IOException { } } + @Test + public void testRecovery() throws IOException { + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + + Token localToken = + secretManager.createAndGetAMRMToken(attemptId); + + AMRMTokenIdentifier identifier = secretManager.createIdentifier(); + identifier.readFields(new DataInputStream( + new ByteArrayInputStream(localToken.getIdentifier()))); + + secretManager.retrievePassword(identifier); + + // Generate next master key + secretManager.rollMasterKey(); + + // Restart and recover + secretManager.stop(); + secretManager = new AMRMProxyTokenSecretManager(stateStore); + secretManager.init(conf); + secretManager.recover(stateStore.loadAMRMProxyState()); + secretManager.start(); + // Recover the app + secretManager.createAndGetAMRMToken(attemptId); + + // Current master key should be recovered, and thus pass here + secretManager.retrievePassword(identifier); + + // Roll key, current master key will be replaced + secretManager.activateNextMasterKey(); + + // Restart and recover + secretManager.stop(); + secretManager = new AMRMProxyTokenSecretManager(stateStore); + secretManager.init(conf); + secretManager.recover(stateStore.loadAMRMProxyState()); + secretManager.start(); + // Recover the app + secretManager.createAndGetAMRMToken(attemptId); + + try { + secretManager.retrievePassword(identifier); + Assert.fail("Expect InvalidToken exception because the " + + "old master key should have expired"); + } catch (InvalidToken e) { + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 15c0e84eaae9a..0e039947c491d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -20,11 +20,10 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -50,6 +49,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { private RecoveredNMTokensState nmTokenState; private RecoveredContainerTokensState containerTokenState; private Map logDeleterState; + private RecoveredAMRMProxyState amrmProxyState; public NMMemoryStateStoreService() { super(NMMemoryStateStoreService.class.getName()); @@ -67,6 +67,7 @@ protected void initStorage(Configuration conf) { trackerStates = new HashMap(); deleteTasks = new HashMap(); logDeleterState = new HashMap(); + amrmProxyState = new RecoveredAMRMProxyState(); } @Override @@ -417,6 +418,66 @@ public synchronized void removeLogDeleter(ApplicationId appId) logDeleterState.remove(appId); } + @Override + public synchronized RecoveredAMRMProxyState loadAMRMProxyState() + throws IOException { + // return a copy so caller can't modify our state + RecoveredAMRMProxyState result = new RecoveredAMRMProxyState(); + result.setCurrentMasterKey(amrmProxyState.getCurrentMasterKey()); + result.setNextMasterKey(amrmProxyState.getNextMasterKey()); + for (Map.Entry> entry : + amrmProxyState.getAppContexts().entrySet()) { + result.getAppContexts().put(entry.getKey(), + new HashMap(entry.getValue())); + } + return result; + } + + @Override + public synchronized void storeAMRMProxyCurrentMasterKey(MasterKey key) + throws IOException { + MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; + amrmProxyState.setCurrentMasterKey(new MasterKeyPBImpl(keypb.getProto())); + } + + @Override + public synchronized void storeAMRMProxyNextMasterKey(MasterKey key) + throws IOException { + if (key == null) { + amrmProxyState.setNextMasterKey(null); + return; + } + MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; + amrmProxyState.setNextMasterKey(new MasterKeyPBImpl(keypb.getProto())); + } + + @Override + public synchronized void storeAMRMProxyAppContextEntry( + ApplicationAttemptId attempt, String key, byte[] data) + throws IOException { + Map entryMap = amrmProxyState.getAppContexts().get(attempt); + if (entryMap == null) { + entryMap = new HashMap<>(); + amrmProxyState.getAppContexts().put(attempt, entryMap); + } + entryMap.put(key, Arrays.copyOf(data, data.length)); + } + + @Override + public synchronized void removeAMRMProxyAppContextEntry( + ApplicationAttemptId attempt, String key) throws IOException { + Map entryMap = amrmProxyState.getAppContexts().get(attempt); + if (entryMap != null) { + entryMap.remove(key); + } + } + + @Override + public synchronized void removeAMRMProxyAppContext( + ApplicationAttemptId attempt) throws IOException { + amrmProxyState.getAppContexts().remove(attempt); + } + private static class TrackerState { Map inProgressMap = new HashMap(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 537f849372687..01331560e6cb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -20,10 +20,11 @@ import static org.fusesource.leveldbjni.JniDBFactory.bytes; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; @@ -33,6 +34,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -65,7 +67,9 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; @@ -1016,6 +1020,105 @@ public void testUnexpectedKeyDoesntThrowException() throws IOException { assertNull(stateStore.getDB().get(invalidKey)); } + @Test + public void testAMRMProxyStorage() throws IOException { + RecoveredAMRMProxyState state = stateStore.loadAMRMProxyState(); + assertEquals(state.getCurrentMasterKey(), null); + assertEquals(state.getNextMasterKey(), null); + assertEquals(state.getAppContexts().size(), 0); + + ApplicationId appId1 = ApplicationId.newInstance(1, 1); + ApplicationId appId2 = ApplicationId.newInstance(1, 2); + ApplicationAttemptId attemptId1 = + ApplicationAttemptId.newInstance(appId1, 1); + ApplicationAttemptId attemptId2 = + ApplicationAttemptId.newInstance(appId2, 2); + String key1 = "key1"; + String key2 = "key2"; + byte[] data1 = "data1".getBytes(); + byte[] data2 = "data2".getBytes(); + + AMRMProxyTokenSecretManager secretManager = + new AMRMProxyTokenSecretManager(stateStore); + secretManager.init(conf); + // Generate currentMasterKey + secretManager.start(); + + try { + // Add two applications, each with two data entries + stateStore.storeAMRMProxyAppContextEntry(attemptId1, key1, data1); + stateStore.storeAMRMProxyAppContextEntry(attemptId2, key1, data1); + stateStore.storeAMRMProxyAppContextEntry(attemptId1, key2, data2); + stateStore.storeAMRMProxyAppContextEntry(attemptId2, key2, data2); + + // restart state store and verify recovered + restartStateStore(); + secretManager.setNMStateStoreService(stateStore); + state = stateStore.loadAMRMProxyState(); + assertEquals(state.getCurrentMasterKey(), + secretManager.getCurrentMasterKeyData().getMasterKey()); + assertEquals(state.getNextMasterKey(), null); + assertEquals(state.getAppContexts().size(), 2); + // app1 + Map map = state.getAppContexts().get(attemptId1); + assertNotEquals(map, null); + assertEquals(map.size(), 2); + assertTrue(Arrays.equals(map.get(key1), data1)); + assertTrue(Arrays.equals(map.get(key2), data2)); + // app2 + map = state.getAppContexts().get(attemptId2); + assertNotEquals(map, null); + assertEquals(map.size(), 2); + assertTrue(Arrays.equals(map.get(key1), data1)); + assertTrue(Arrays.equals(map.get(key2), data2)); + + // Generate next master key and remove one entry of app2 + secretManager.rollMasterKey(); + stateStore.removeAMRMProxyAppContextEntry(attemptId2, key1); + + // restart state store and verify recovered + restartStateStore(); + secretManager.setNMStateStoreService(stateStore); + state = stateStore.loadAMRMProxyState(); + assertEquals(state.getCurrentMasterKey(), + secretManager.getCurrentMasterKeyData().getMasterKey()); + assertEquals(state.getNextMasterKey(), + secretManager.getNextMasterKeyData().getMasterKey()); + assertEquals(state.getAppContexts().size(), 2); + // app1 + map = state.getAppContexts().get(attemptId1); + assertNotEquals(map, null); + assertEquals(map.size(), 2); + assertTrue(Arrays.equals(map.get(key1), data1)); + assertTrue(Arrays.equals(map.get(key2), data2)); + // app2 + map = state.getAppContexts().get(attemptId2); + assertNotEquals(map, null); + assertEquals(map.size(), 1); + assertTrue(Arrays.equals(map.get(key2), data2)); + + // Activate next master key and remove all entries of app1 + secretManager.activateNextMasterKey(); + stateStore.removeAMRMProxyAppContext(attemptId1); + + // restart state store and verify recovered + restartStateStore(); + secretManager.setNMStateStoreService(stateStore); + state = stateStore.loadAMRMProxyState(); + assertEquals(state.getCurrentMasterKey(), + secretManager.getCurrentMasterKeyData().getMasterKey()); + assertEquals(state.getNextMasterKey(), null); + assertEquals(state.getAppContexts().size(), 1); + // app2 only + map = state.getAppContexts().get(attemptId2); + assertNotEquals(map, null); + assertEquals(map.size(), 1); + assertTrue(Arrays.equals(map.get(key2), data2)); + } finally { + secretManager.stop(); + } + } + private static class NMTokenSecretManagerForTest extends BaseNMTokenSecretManager { public MasterKey generateKey() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java index 77240c63f644e..329d57ebfca8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java @@ -23,6 +23,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collection; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -938,9 +939,10 @@ public ShortCircuitedAMRMProxy(Context context, @Override protected void initializePipeline(ApplicationAttemptId applicationAttemptId, String user, Token amrmToken, - Token localToken) { + Token localToken, + Map recoveredDataMap, boolean isRecovery) { super.initializePipeline(applicationAttemptId, user, amrmToken, - localToken); + localToken, recoveredDataMap, isRecovery); RequestInterceptor rt = getPipelines() .get(applicationAttemptId.getApplicationId()).getRootInterceptor(); // The DefaultRequestInterceptor will generally be the last