Skip to content

Commit

Permalink
YARN-7899. [AMRMProxy] Stateful FederationInterceptor for pending req…
Browse files Browse the repository at this point in the history
…uests. Contributed by Botong Huang.
  • Loading branch information
gifuma committed Jul 9, 2018
1 parent e12d93b commit ea9b608
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 142 deletions.
Expand Up @@ -36,19 +36,9 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -67,87 +57,6 @@ public final class AMRMClientUtils {
private AMRMClientUtils() { private AMRMClientUtils() {
} }


/**
* Handle ApplicationNotRegistered exception and re-register.
*
* @param appId application Id
* @param rmProxy RM proxy instance
* @param registerRequest the AM re-register request
* @throws YarnException if re-register fails
*/
public static void handleNotRegisteredExceptionAndReRegister(
ApplicationId appId, ApplicationMasterProtocol rmProxy,
RegisterApplicationMasterRequest registerRequest) throws YarnException {
LOG.info("App attempt {} not registered, most likely due to RM failover. "
+ " Trying to re-register.", appId);
try {
rmProxy.registerApplicationMaster(registerRequest);
} catch (Exception e) {
if (e instanceof InvalidApplicationMasterRequestException
&& e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) {
LOG.info("Concurrent thread successfully registered, moving on.");
} else {
LOG.error("Error trying to re-register AM", e);
throw new YarnException(e);
}
}
}

/**
* Helper method for client calling ApplicationMasterProtocol.allocate that
* handles re-register if RM fails over.
*
* @param request allocate request
* @param rmProxy RM proxy
* @param registerRequest the register request for re-register
* @param appId application id
* @return allocate response
* @throws YarnException if RM call fails
* @throws IOException if RM call fails
*/
public static AllocateResponse allocateWithReRegister(AllocateRequest request,
ApplicationMasterProtocol rmProxy,
RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
throws YarnException, IOException {
try {
return rmProxy.allocate(request);
} catch (ApplicationMasterNotRegisteredException e) {
handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
registerRequest);
// reset responseId after re-register
request.setResponseId(0);
// retry allocate
return allocateWithReRegister(request, rmProxy, registerRequest, appId);
}
}

/**
* Helper method for client calling
* ApplicationMasterProtocol.finishApplicationMaster that handles re-register
* if RM fails over.
*
* @param request finishApplicationMaster request
* @param rmProxy RM proxy
* @param registerRequest the register request for re-register
* @param appId application id
* @return finishApplicationMaster response
* @throws YarnException if RM call fails
* @throws IOException if RM call fails
*/
public static FinishApplicationMasterResponse finishAMWithReRegister(
FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
throws YarnException, IOException {
try {
return rmProxy.finishApplicationMaster(request);
} catch (ApplicationMasterNotRegisteredException ex) {
handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
registerRequest);
// retry finishAM after re-register
return finishAMWithReRegister(request, rmProxy, registerRequest, appId);
}
}

/** /**
* Create a proxy for the specified protocol. * Create a proxy for the specified protocol.
* *
Expand Down
Expand Up @@ -147,6 +147,11 @@ protected void serviceStop() throws Exception {
super.serviceStop(); super.serviceStop();
} }


public void setAMRegistrationRequest(
RegisterApplicationMasterRequest registerRequest) {
this.amRegistrationRequest = registerRequest;
}

@Override @Override
public RegisterApplicationMasterResponse registerApplicationMaster( public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) RegisterApplicationMasterRequest request)
Expand Down Expand Up @@ -259,8 +264,10 @@ public AllocateResponse allocate(AllocateRequest allocateRequest)
} }
} }


// re register with RM, then retry allocate recursively // re-register with RM, then retry allocate recursively
registerApplicationMaster(this.amRegistrationRequest); registerApplicationMaster(this.amRegistrationRequest);
// Reset responseId after re-register
allocateRequest.setResponseId(0);
return allocate(allocateRequest); return allocate(allocateRequest);
} }


Expand Down
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.util.AsyncCallback; import org.apache.hadoop.yarn.util.AsyncCallback;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -385,4 +386,19 @@ public boolean hasUAMId(String uamId) {
return this.unmanagedAppMasterMap.containsKey(uamId); return this.unmanagedAppMasterMap.containsKey(uamId);
} }


/**
* Return the rmProxy relayer of an UAM.
*
* @param uamId uam Id
* @return the rmProxy relayer
* @throws YarnException if fails
*/
public AMRMClientRelayer getAMRMClientRelayer(String uamId)
throws YarnException {
if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
throw new YarnException("UAM " + uamId + " does not exist");
}
return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer();
}

} }
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.AsyncCallback; import org.apache.hadoop.yarn.util.AsyncCallback;
Expand Down Expand Up @@ -90,7 +91,7 @@ public class UnmanagedApplicationManager {


private BlockingQueue<AsyncAllocateRequestInfo> requestQueue; private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
private AMRequestHandlerThread handlerThread; private AMRequestHandlerThread handlerThread;
private ApplicationMasterProtocol rmProxy; private AMRMClientRelayer rmProxyRelayer;
private ApplicationId applicationId; private ApplicationId applicationId;
private String submitter; private String submitter;
private String appNameSuffix; private String appNameSuffix;
Expand Down Expand Up @@ -138,7 +139,7 @@ public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
this.appNameSuffix = appNameSuffix; this.appNameSuffix = appNameSuffix;
this.handlerThread = new AMRequestHandlerThread(); this.handlerThread = new AMRequestHandlerThread();
this.requestQueue = new LinkedBlockingQueue<>(); this.requestQueue = new LinkedBlockingQueue<>();
this.rmProxy = null; this.rmProxyRelayer = null;
this.connectionInitiated = false; this.connectionInitiated = false;
this.registerRequest = null; this.registerRequest = null;
this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
Expand Down Expand Up @@ -190,8 +191,9 @@ protected void createUAMProxy(Token<AMRMTokenIdentifier> amrmToken)
throws IOException { throws IOException {
this.userUgi = UserGroupInformation.createProxyUser( this.userUgi = UserGroupInformation.createProxyUser(
this.applicationId.toString(), UserGroupInformation.getCurrentUser()); this.applicationId.toString(), UserGroupInformation.getCurrentUser());
this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, this.rmProxyRelayer =
this.userUgi, amrmToken); new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
this.conf, this.userUgi, amrmToken));
} }


/** /**
Expand All @@ -209,19 +211,18 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
// Save the register request for re-register later // Save the register request for re-register later
this.registerRequest = request; this.registerRequest = request;


// Since we have setKeepContainersAcrossApplicationAttempts = true for UAM.
// We do not expect application already registered exception here
LOG.info("Registering the Unmanaged application master {}", LOG.info("Registering the Unmanaged application master {}",
this.applicationId); this.applicationId);
RegisterApplicationMasterResponse response = RegisterApplicationMasterResponse response =
this.rmProxy.registerApplicationMaster(this.registerRequest); this.rmProxyRelayer.registerApplicationMaster(this.registerRequest);
this.lastResponseId = 0;


for (Container container : response.getContainersFromPreviousAttempts()) { for (Container container : response.getContainersFromPreviousAttempts()) {
LOG.info("RegisterUAM returned existing running container " LOG.debug("RegisterUAM returned existing running container "
+ container.getId()); + container.getId());
} }
for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) { for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
LOG.info("RegisterUAM returned existing NM token for node " LOG.debug("RegisterUAM returned existing NM token for node "
+ nmToken.getNodeId()); + nmToken.getNodeId());
} }


Expand Down Expand Up @@ -249,7 +250,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(


this.handlerThread.shutdown(); this.handlerThread.shutdown();


if (this.rmProxy == null) { if (this.rmProxyRelayer == null) {
if (this.connectionInitiated) { if (this.connectionInitiated) {
// This is possible if the async launchUAM is still // This is possible if the async launchUAM is still
// blocked and retrying. Return a dummy response in this case. // blocked and retrying. Return a dummy response in this case.
Expand All @@ -261,8 +262,7 @@ public FinishApplicationMasterResponse finishApplicationMaster(
+ "be called before createAndRegister"); + "be called before createAndRegister");
} }
} }
return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy, return this.rmProxyRelayer.finishApplicationMaster(request);
this.registerRequest, this.applicationId);
} }


/** /**
Expand Down Expand Up @@ -308,7 +308,7 @@ public void allocateAsync(AllocateRequest request,
// //
// In case 2, we have already save the allocate request above, so if the // In case 2, we have already save the allocate request above, so if the
// registration succeed later, no request is lost. // registration succeed later, no request is lost.
if (this.rmProxy == null) { if (this.rmProxyRelayer == null) {
if (this.connectionInitiated) { if (this.connectionInitiated) {
LOG.info("Unmanaged AM still not successfully launched/registered yet." LOG.info("Unmanaged AM still not successfully launched/registered yet."
+ " Saving the allocate request and send later."); + " Saving the allocate request and send later.");
Expand All @@ -328,6 +328,15 @@ public ApplicationId getAppId() {
return this.applicationId; return this.applicationId;
} }


/**
* Returns the rmProxy relayer of this UAM.
*
* @return rmProxy relayer of the UAM
*/
public AMRMClientRelayer getAMRMClientRelayer() {
return this.rmProxyRelayer;
}

/** /**
* Returns RM proxy for the specified protocol type. Unit test cases can * Returns RM proxy for the specified protocol type. Unit test cases can
* override this method and return mock proxy instances. * override this method and return mock proxy instances.
Expand Down Expand Up @@ -592,10 +601,7 @@ public void run() {
} }


request.setResponseId(lastResponseId); request.setResponseId(lastResponseId);

AllocateResponse response = rmProxyRelayer.allocate(request);
AllocateResponse response = AMRMClientUtils.allocateWithReRegister(
request, rmProxy, registerRequest, applicationId);

if (response == null) { if (response == null) {
throw new YarnException("Null allocateResponse from allocate"); throw new YarnException("Null allocateResponse from allocate");
} }
Expand Down
Expand Up @@ -251,8 +251,6 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
ApplicationAttemptId attemptId = getAppIdentifier(); ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Registering application attempt: " + attemptId); LOG.info("Registering application attempt: " + attemptId);


shouldReRegisterNext = false;

List<Container> containersFromPreviousAttempt = null; List<Container> containersFromPreviousAttempt = null;


synchronized (applicationContainerIdMap) { synchronized (applicationContainerIdMap) {
Expand All @@ -266,7 +264,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
containersFromPreviousAttempt.add(Container.newInstance(containerId, containersFromPreviousAttempt.add(Container.newInstance(containerId,
null, null, null, null, null)); null, null, null, null, null));
} }
} else { } else if (!shouldReRegisterNext) {
throw new InvalidApplicationMasterRequestException( throw new InvalidApplicationMasterRequestException(
AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE); AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
} }
Expand All @@ -276,6 +274,8 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
} }
} }


shouldReRegisterNext = false;

// Make sure we wait for certain test cases last in the method // Make sure we wait for certain test cases last in the method
synchronized (syncObj) { synchronized (syncObj) {
syncObj.notifyAll(); syncObj.notifyAll();
Expand Down Expand Up @@ -339,13 +339,6 @@ public AllocateResponse allocate(AllocateRequest request)


validateRunning(); validateRunning();


if (request.getAskList() != null && request.getAskList().size() > 0
&& request.getReleaseList() != null
&& request.getReleaseList().size() > 0) {
Assert.fail("The mock RM implementation does not support receiving "
+ "askList and releaseList in the same heartbeat");
}

ApplicationAttemptId attemptId = getAppIdentifier(); ApplicationAttemptId attemptId = getAppIdentifier();
LOG.info("Allocate from application attempt: " + attemptId); LOG.info("Allocate from application attempt: " + attemptId);


Expand Down

0 comments on commit ea9b608

Please sign in to comment.