From b995fa83375ff517783503e64da3c1cff6ae5a1c Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Fri, 15 Sep 2023 18:23:21 +0800 Subject: [PATCH 1/2] YARN-11566. Yarn app kill command can not kill the application in secondary sub cluster. --- .../server/uam/UnmanagedAMPoolManager.java | 62 +++++++------------ .../uam/UnmanagedApplicationManager.java | 24 +++---- .../server/MockResourceManagerFacade.java | 12 +++- .../amrmproxy/AMRMProxyService.java | 14 ++++- .../amrmproxy/TestFederationInterceptor.java | 56 +++++++++++++++++ 5 files changed, 117 insertions(+), 51 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 0ff4260c5e358..81e6d58f8b2ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -97,17 +97,19 @@ protected void serviceStart() throws Exception { /** * Normally we should finish all applications before stop. If there are still - * UAMs running, force kill all of them. Do parallel kill because of - * performance reasons. + * UAMs running, force kill all of them. Do kill in asynchronous thread. * */ @Override protected void serviceStop() throws Exception { if (!this.unmanagedAppMasterMap.isEmpty()) { - finishApplicationThread = new Thread(createForceFinishApplicationThread()); + // Save a local copy of the key set so that it won't change with the map + finishApplicationThread = + new Thread(createForceFinishApplicationThread(new HashMap(this.unmanagedAppMasterMap))); finishApplicationThread.setName(dispatcherThreadName); finishApplicationThread.start(); + this.unmanagedAppMasterMap.clear(); } super.serviceStop(); @@ -485,45 +487,29 @@ public Map batchFinishApplicationMaster return responseMap; } - Runnable createForceFinishApplicationThread() { + Runnable createForceFinishApplicationThread( + Map unmanagedAppToFinish) { return () -> { - - ExecutorCompletionService> completionService = - new ExecutorCompletionService<>(threadpool); - - // Save a local copy of the key set so that it won't change with the map - Set addressList = new HashSet<>(unmanagedAppMasterMap.keySet()); - - LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map", addressList.size()); - - for (final String uamId : addressList) { - completionService.submit(() -> { - try { - ApplicationId appId = appIdMap.get(uamId); - LOG.info("Force-killing UAM id {} for application {}", uamId, appId); - UnmanagedApplicationManager applicationManager = unmanagedAppMasterMap.remove(uamId); - KillApplicationResponse response = applicationManager.forceKillApplication(); - return Pair.of(uamId, response); - } catch (Exception e) { - LOG.error("Failed to kill unmanaged application master", e); - return Pair.of(uamId, null); - } - }); - } - - for (int i = 0; i < addressList.size(); ++i) { + LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map", + unmanagedAppToFinish.size()); + + // We should not use threadpool to execute the operation 'forceKillApplication'. + // Because ForceFinishApplication run in asynchronous thread, threadpool may be destroyed. + // Since we kill app in asynchronous thread, we could kill app sequentially, + // no operations will get stuck. + for (Map.Entry entry : unmanagedAppToFinish.entrySet()) { + String uamId = entry.getKey(); + UnmanagedApplicationManager applicationManager = entry.getValue(); try { - Future> future = completionService.take(); - Pair pairs = future.get(); - String uamId = pairs.getLeft(); ApplicationId appId = appIdMap.get(uamId); - KillApplicationResponse response = pairs.getRight(); + LOG.info("Force-killing UAM id {} for application {}", uamId, appId); + KillApplicationResponse response = applicationManager.forceKillApplication(); if (response == null) { - throw new YarnException( - "Failed Force-killing UAM id " + uamId + " for application " + appId); + LOG.error("Failed Force-killing UAM id {} for application {}", uamId, appId); + } else { + LOG.info("Force-killing UAM id = {} for application {} KillCompleted {}.", uamId, appId, + response.getIsKillCompleted()); } - LOG.info("Force-killing UAM id = {} for application {} KillCompleted {}.", - uamId, appId, response.getIsKillCompleted()); } catch (Exception e) { LOG.error("Failed to kill unmanaged application master", e); } @@ -548,7 +534,7 @@ protected Map getUnmanagedAppMasterMap() { } @VisibleForTesting - protected Thread getFinishApplicationThread() { + public Thread getFinishApplicationThread() { return finishApplicationThread; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 66cc8f02790bb..0721dcebc4519 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -292,8 +292,7 @@ public KillApplicationResponse forceKillApplication() KillApplicationRequest request = KillApplicationRequest.newInstance(this.applicationId); if (this.rmClient == null) { - this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf, - UserGroupInformation.createRemoteUser(this.submitter), null); + this.rmClient = createRMClient(); } return this.rmClient.forceKillApplication(request); } @@ -384,15 +383,7 @@ protected T createRMProxy(Class protocol, Configuration config, protected Token initializeUnmanagedAM( ApplicationId appId) throws IOException, YarnException { try { - UserGroupInformation appSubmitter; - if (UserGroupInformation.isSecurityEnabled()) { - appSubmitter = UserGroupInformation.createProxyUser(this.submitter, - UserGroupInformation.getLoginUser()); - } else { - appSubmitter = UserGroupInformation.createRemoteUser(this.submitter); - } - this.rmClient = createRMProxy(ApplicationClientProtocol.class, this.conf, - appSubmitter, null); + this.rmClient = createRMClient(); // Submit the application submitUnmanagedApp(appId); @@ -409,6 +400,17 @@ protected Token initializeUnmanagedAM( } } + private ApplicationClientProtocol createRMClient() throws IOException { + UserGroupInformation appSubmitter; + if (UserGroupInformation.isSecurityEnabled()) { + appSubmitter = UserGroupInformation.createProxyUser(this.submitter, + UserGroupInformation.getLoginUser()); + } else { + appSubmitter = UserGroupInformation.createRemoteUser(this.submitter); + } + return createRMProxy(ApplicationClientProtocol.class, this.conf, appSubmitter, null); + } + private void submitUnmanagedApp(ApplicationId appId) throws YarnException, IOException { SubmitApplicationRequest submitRequest = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index ed66dadefb497..a4c1ed3000220 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; @@ -570,7 +571,16 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request) validateRunning(); - return GetApplicationsResponse.newInstance(null); + List applications = applicationMap.stream().map(applicationId -> { + ApplicationReport report = Records.newRecord(ApplicationReport.class); + report.setYarnApplicationState(YarnApplicationState.ACCEPTED); + report.setApplicationId(applicationId); + report.setCurrentApplicationAttemptId(ApplicationAttemptId.newInstance(applicationId, 1)); + report.setAMRMToken(Token.newInstance(new byte[0], "", new byte[0], "")); + return report; + }).collect(Collectors.toList()); + + return GetApplicationsResponse.newInstance(applications); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index efdd8d66e2821..45350dba02292 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -567,7 +567,9 @@ protected void stopApplication(ApplicationId applicationId) { this.secretManager.applicationMasterFinished(pipeline.getApplicationAttemptId()); LOG.info("Stopping the request processing pipeline for application: {}.", applicationId); try { - pipeline.getRootInterceptor().shutdown(); + RequestInterceptor interceptor = pipeline.getRootInterceptor(); + stopUnmanagedApplicaiton(interceptor); + interceptor.shutdown(); } catch (Throwable ex) { LOG.warn("Failed to shutdown the request processing pipeline for app: {}.", applicationId, ex); @@ -595,6 +597,16 @@ protected void stopApplication(ApplicationId applicationId) { } } + private void stopUnmanagedApplicaiton(RequestInterceptor interceptor) { + if (interceptor == null) { + return; + } + if (interceptor instanceof FederationInterceptor) { + ((FederationInterceptor) interceptor).getUnmanagedAMPool().stop(); + } + stopUnmanagedApplicaiton(interceptor.getNextInterceptor()); + } + private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier, RequestInterceptorChainWrapper pipeline, AllocateResponse allocateResponse) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index c60fda5f72bfc..a70854d1d03a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -38,12 +38,15 @@ import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -1449,4 +1452,57 @@ public void testLaunchUAMAndRegisterApplicationMasterRetry() throws Exception { Assert.assertEquals(0, interceptor.getRetryCount()); } + + @Test + public void testKillUnManagedApplication() throws IOException, InterruptedException { + UserGroupInformation ugi = + interceptor.getUGIWithToken(interceptor.getAttemptId()); + ugi.doAs((PrivilegedExceptionAction) () -> { + // Register the application + RegisterApplicationMasterRequest registerReq = + Records.newRecord(RegisterApplicationMasterRequest.class); + registerReq.setHost(Integer.toString(testAppId)); + registerReq.setRpcPort(0); + registerReq.setTrackingUrl(""); + + RegisterApplicationMasterResponse registerResponse = + interceptor.registerApplicationMaster(registerReq); + Assert.assertNotNull(registerResponse); + lastResponseId = 0; + + Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize()); + + // Allocate the first batch of containers, with sc1 and sc2 active + registerSubCluster(SubClusterId.newInstance("SC-1")); + registerSubCluster(SubClusterId.newInstance("SC-2")); + int numberOfContainers = 3; + List containers = + getContainersAndAssert(numberOfContainers, numberOfContainers * 2); + Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(numberOfContainers * 2, containers.size()); + + // check application whether exist in RMs or not + GetApplicationsRequest request = GetApplicationsRequest.newInstance(); + Map secondaries = interceptor.getSecondaryRMs(); + Assert.assertEquals(2, secondaries.size()); + for (MockResourceManagerFacade rm : secondaries.values()) { + GetApplicationsResponse response = rm.getApplications(request); + Assert.assertEquals(1, response.getApplicationList().size()); + } + + // Force kill application + interceptor.getUnmanagedAMPool().stop(); + Thread finishApplicationThread = + interceptor.getUnmanagedAMPool().getFinishApplicationThread(); + GenericTestUtils.waitFor(() -> !finishApplicationThread.isAlive(), 100, 2000); + + // check application whether exist in RMs or not + for (MockResourceManagerFacade rm : secondaries.values()) { + GetApplicationsResponse response = rm.getApplications(request); + Assert.assertEquals(0, response.getApplicationList().size()); + } + return null; + }); + } } From 2549ed7a210dbc661ca9c12dea7039454187319b Mon Sep 17 00:00:00 2001 From: zhengchenyu Date: Fri, 15 Sep 2023 19:06:29 +0800 Subject: [PATCH 2/2] remove unused import --- .../apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 81e6d58f8b2ff..39ea8df5d0e34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -30,7 +30,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration;