Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -97,17 +96,19 @@ protected void serviceStart() throws Exception {

/**
* Normally we should finish all applications before stop. If there are still
* UAMs running, force kill all of them. Do parallel kill because of
* performance reasons.
* UAMs running, force kill all of them. Do kill in asynchronous thread.
*
*/
@Override
protected void serviceStop() throws Exception {

if (!this.unmanagedAppMasterMap.isEmpty()) {
finishApplicationThread = new Thread(createForceFinishApplicationThread());
// Save a local copy of the key set so that it won't change with the map
finishApplicationThread =
new Thread(createForceFinishApplicationThread(new HashMap(this.unmanagedAppMasterMap)));
finishApplicationThread.setName(dispatcherThreadName);
finishApplicationThread.start();
this.unmanagedAppMasterMap.clear();
}

super.serviceStop();
Expand Down Expand Up @@ -485,45 +486,29 @@ public Map<String, FinishApplicationMasterResponse> batchFinishApplicationMaster
return responseMap;
}

Runnable createForceFinishApplicationThread() {
Runnable createForceFinishApplicationThread(
Map<String, UnmanagedApplicationManager> unmanagedAppToFinish) {
return () -> {

ExecutorCompletionService<Pair<String, KillApplicationResponse>> completionService =
new ExecutorCompletionService<>(threadpool);

// Save a local copy of the key set so that it won't change with the map
Set<String> addressList = new HashSet<>(unmanagedAppMasterMap.keySet());

LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map", addressList.size());

for (final String uamId : addressList) {
completionService.submit(() -> {
try {
ApplicationId appId = appIdMap.get(uamId);
LOG.info("Force-killing UAM id {} for application {}", uamId, appId);
UnmanagedApplicationManager applicationManager = unmanagedAppMasterMap.remove(uamId);
KillApplicationResponse response = applicationManager.forceKillApplication();
return Pair.of(uamId, response);
} catch (Exception e) {
LOG.error("Failed to kill unmanaged application master", e);
return Pair.of(uamId, null);
}
});
}

for (int i = 0; i < addressList.size(); ++i) {
LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map",
unmanagedAppToFinish.size());

// We should not use threadpool to execute the operation 'forceKillApplication'.
// Because ForceFinishApplication run in asynchronous thread, threadpool may be destroyed.
// Since we kill app in asynchronous thread, we could kill app sequentially,
// no operations will get stuck.
for (Map.Entry<String, UnmanagedApplicationManager> entry : unmanagedAppToFinish.entrySet()) {
String uamId = entry.getKey();
UnmanagedApplicationManager applicationManager = entry.getValue();
try {
Future<Pair<String, KillApplicationResponse>> future = completionService.take();
Pair<String, KillApplicationResponse> pairs = future.get();
String uamId = pairs.getLeft();
ApplicationId appId = appIdMap.get(uamId);
KillApplicationResponse response = pairs.getRight();
LOG.info("Force-killing UAM id {} for application {}", uamId, appId);
KillApplicationResponse response = applicationManager.forceKillApplication();
if (response == null) {
throw new YarnException(
"Failed Force-killing UAM id " + uamId + " for application " + appId);
LOG.error("Failed Force-killing UAM id {} for application {}", uamId, appId);
} else {
LOG.info("Force-killing UAM id = {} for application {} KillCompleted {}.", uamId, appId,
response.getIsKillCompleted());
}
LOG.info("Force-killing UAM id = {} for application {} KillCompleted {}.",
uamId, appId, response.getIsKillCompleted());
} catch (Exception e) {
LOG.error("Failed to kill unmanaged application master", e);
}
Expand All @@ -548,7 +533,7 @@ protected Map<String, UnmanagedApplicationManager> getUnmanagedAppMasterMap() {
}

@VisibleForTesting
protected Thread getFinishApplicationThread() {
public Thread getFinishApplicationThread() {
return finishApplicationThread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ public KillApplicationResponse forceKillApplication()
KillApplicationRequest request =
KillApplicationRequest.newInstance(this.applicationId);
if (this.rmClient == null) {
this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
UserGroupInformation.createRemoteUser(this.submitter), null);
this.rmClient = createRMClient();
}
return this.rmClient.forceKillApplication(request);
}
Expand Down Expand Up @@ -384,15 +383,7 @@ protected <T> T createRMProxy(Class<T> protocol, Configuration config,
protected Token<AMRMTokenIdentifier> initializeUnmanagedAM(
ApplicationId appId) throws IOException, YarnException {
try {
UserGroupInformation appSubmitter;
if (UserGroupInformation.isSecurityEnabled()) {
appSubmitter = UserGroupInformation.createProxyUser(this.submitter,
UserGroupInformation.getLoginUser());
} else {
appSubmitter = UserGroupInformation.createRemoteUser(this.submitter);
}
this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf,
appSubmitter, null);
this.rmClient = createRMClient();

// Submit the application
submitUnmanagedApp(appId);
Expand All @@ -409,6 +400,17 @@ protected Token<AMRMTokenIdentifier> initializeUnmanagedAM(
}
}

private ApplicationClientProtocol createRMClient() throws IOException {
UserGroupInformation appSubmitter;
if (UserGroupInformation.isSecurityEnabled()) {
appSubmitter = UserGroupInformation.createProxyUser(this.submitter,
UserGroupInformation.getLoginUser());
} else {
appSubmitter = UserGroupInformation.createRemoteUser(this.submitter);
}
return createRMProxy(ApplicationClientProtocol.class, this.conf, appSubmitter, null);
}

private void submitUnmanagedApp(ApplicationId appId) throws YarnException, IOException {

SubmitApplicationRequest submitRequest =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -570,7 +571,16 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request)

validateRunning();

return GetApplicationsResponse.newInstance(null);
List<ApplicationReport> applications = applicationMap.stream().map(applicationId -> {
ApplicationReport report = Records.newRecord(ApplicationReport.class);
report.setYarnApplicationState(YarnApplicationState.ACCEPTED);
report.setApplicationId(applicationId);
report.setCurrentApplicationAttemptId(ApplicationAttemptId.newInstance(applicationId, 1));
report.setAMRMToken(Token.newInstance(new byte[0], "", new byte[0], ""));
return report;
}).collect(Collectors.toList());

return GetApplicationsResponse.newInstance(applications);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,9 @@ protected void stopApplication(ApplicationId applicationId) {
this.secretManager.applicationMasterFinished(pipeline.getApplicationAttemptId());
LOG.info("Stopping the request processing pipeline for application: {}.", applicationId);
try {
pipeline.getRootInterceptor().shutdown();
RequestInterceptor interceptor = pipeline.getRootInterceptor();
stopUnmanagedApplicaiton(interceptor);
interceptor.shutdown();
} catch (Throwable ex) {
LOG.warn("Failed to shutdown the request processing pipeline for app: {}.",
applicationId, ex);
Expand Down Expand Up @@ -595,6 +597,16 @@ protected void stopApplication(ApplicationId applicationId) {
}
}

private void stopUnmanagedApplicaiton(RequestInterceptor interceptor) {
if (interceptor == null) {
return;
}
if (interceptor instanceof FederationInterceptor) {
((FederationInterceptor) interceptor).getUnmanagedAMPool().stop();
}
stopUnmanagedApplicaiton(interceptor.getNextInterceptor());
}

private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier,
RequestInterceptorChainWrapper pipeline,
AllocateResponse allocateResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
Expand Down Expand Up @@ -1449,4 +1452,57 @@ public void testLaunchUAMAndRegisterApplicationMasterRetry() throws Exception {

Assert.assertEquals(0, interceptor.getRetryCount());
}

@Test
public void testKillUnManagedApplication() throws IOException, InterruptedException {
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");

RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;

Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());

// Allocate the first batch of containers, with sc1 and sc2 active
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance("SC-2"));
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(numberOfContainers * 2, containers.size());

// check application whether exist in RMs or not
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
Map<String, MockResourceManagerFacade> secondaries = interceptor.getSecondaryRMs();
Assert.assertEquals(2, secondaries.size());
for (MockResourceManagerFacade rm : secondaries.values()) {
GetApplicationsResponse response = rm.getApplications(request);
Assert.assertEquals(1, response.getApplicationList().size());
}

// Force kill application
interceptor.getUnmanagedAMPool().stop();
Thread finishApplicationThread =
interceptor.getUnmanagedAMPool().getFinishApplicationThread();
GenericTestUtils.waitFor(() -> !finishApplicationThread.isAlive(), 100, 2000);

// check application whether exist in RMs or not
for (MockResourceManagerFacade rm : secondaries.values()) {
GetApplicationsResponse response = rm.getApplications(request);
Assert.assertEquals(0, response.getApplicationList().size());
}
return null;
});
}
}