From 8538af4638bdd4120c2aa0c0e2803a085e5ced74 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Thu, 14 Sep 2023 21:28:49 +0800 Subject: [PATCH] YARN-7599. [BackPort][GPG] ApplicationCleaner in Global Policy Generator. (#5934) Contributed by Botong Huang, Shilun Fan. Co-authored-by: Botong Huang Co-authored-by: slfan1989 Reviewed-by: Inigo Goiri Signed-off-by: Shilun Fan --- .../hadoop/yarn/conf/YarnConfiguration.java | 25 +++ .../src/main/resources/yarn-default.xml | 28 ++++ .../utils/FederationStateStoreFacade.java | 30 ++++ .../globalpolicygenerator/GPGUtils.java | 24 ++- .../GlobalPolicyGenerator.java | 22 ++- .../ApplicationCleaner.java | 153 ++++++++++++++++++ .../DefaultApplicationCleaner.java | 77 +++++++++ .../applicationcleaner/package-info.java | 19 +++ .../policygenerator/PolicyGenerator.java | 4 +- .../TestDefaultApplicationCleaner.java | 131 +++++++++++++++ .../policygenerator/TestPolicyGenerator.java | 2 +- 11 files changed, 510 insertions(+), 5 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 874ee9d08d9ad..ef06299fcfd8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4432,6 +4432,31 @@ public static boolean isAclEnabled(Configuration conf) { public static final String GPG_KERBEROS_PRINCIPAL_HOSTNAME_KEY = FEDERATION_GPG_PREFIX + "kerberos.principal.hostname"; + // The application cleaner class to use + public static final String GPG_APPCLEANER_CLASS = + FEDERATION_GPG_PREFIX + "application.cleaner.class"; + public static final String DEFAULT_GPG_APPCLEANER_CLASS = + "org.apache.hadoop.yarn.server.globalpolicygenerator" + + ".applicationcleaner.DefaultApplicationCleaner"; + + // The interval at which the application cleaner runs, -1 means disabled + public static final String GPG_APPCLEANER_INTERVAL_MS = + FEDERATION_GPG_PREFIX + "application.cleaner.interval-ms"; + public static final long DEFAULT_GPG_APPCLEANER_INTERVAL_MS = TimeUnit.SECONDS.toMillis(-1); + + /** + * Specifications on how (many times) to contact Router for apps. We need to + * do this because Router might return partial application list because some + * sub-cluster RM is not responsive (e.g. failing over). + * + * Should have three values separated by comma: minimal success retries, + * maximum total retry, retry interval (ms). + */ + public static final String GPG_APPCLEANER_CONTACT_ROUTER_SPEC = + FEDERATION_GPG_PREFIX + "application.cleaner.contact.router.spec"; + public static final String DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC = + "3,10,600000"; + public static final String FEDERATION_GPG_POLICY_PREFIX = FEDERATION_GPG_PREFIX + "policy.generator."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index f82540b8f46cb..9697f7aa88c8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5538,6 +5538,14 @@ LINEAR + + + The Application Cleaner implementation class for GPG to use. + + yarn.federation.gpg.application.cleaner.class + org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.DefaultApplicationCleaner + + Flag to enable cross-origin (CORS) support in the GPG. This flag requires the CORS filter initializer to be added to the filter initializers @@ -5546,6 +5554,14 @@ false + + + The interval at which the application cleaner runs, -1 means disabled. + + yarn.federation.gpg.application.cleaner.interval-ms + -1s + + The http address of the GPG web application. @@ -5556,6 +5572,18 @@ 0.0.0.0:8069 + + + Specifications on how (many times) to contact Router for apps. We need to + do this because Router might return partial application list because some + sub-cluster RM is not responsive (e.g. failing over). + Should have three values separated by comma: minimal success retries, + maximum total retry, retry interval (ms). + + yarn.federation.gpg.application.cleaner.contact.router.spec + 3,10,600000 + + The https address of the GPG web application. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 26136b11de6f4..d4c259b51605e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -83,6 +83,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.webapp.NotFoundException; import org.slf4j.Logger; @@ -884,6 +887,33 @@ public void addApplicationHomeSubCluster(ApplicationId applicationId, } } + /** + * Get the {@code ApplicationHomeSubCluster} list representing the mapping of + * all submitted applications to it's home sub-cluster. + * + * @return the mapping of all submitted application to it's home sub-cluster + * @throws YarnException if the request is invalid/fails + */ + public List getApplicationsHomeSubCluster() throws YarnException { + GetApplicationsHomeSubClusterResponse response = stateStore.getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest.newInstance()); + return response.getAppsHomeSubClusters(); + } + + /** + * Delete the mapping of home {@code SubClusterId} of a previously submitted + * {@code ApplicationId}. Currently response is empty if the operation is + * successful, if not an exception reporting reason for a failure. + * + * @param applicationId the application to delete the home sub-cluster of + * @throws YarnException if the request is invalid/fails + */ + public void deleteApplicationHomeSubCluster(ApplicationId applicationId) + throws YarnException { + stateStore.deleteApplicationHomeSubCluster( + DeleteApplicationHomeSubClusterRequest.newInstance(applicationId)); + } + /** * Update ApplicationHomeSubCluster to FederationStateStore. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java index a802e37979bb7..02344a51493c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java @@ -40,6 +40,7 @@ import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; /** * GPGUtils contains utility functions for the GPG. @@ -58,11 +59,12 @@ private GPGUtils() { * @param webAddr WebAddress. * @param path url path. * @param returnType return type. + * @param selectParam query parameters. * @param conf configuration. * @return response entity. */ public static T invokeRMWebService(String webAddr, String path, final Class returnType, - Configuration conf) { + Configuration conf, String selectParam) { Client client = Client.create(); T obj; @@ -72,6 +74,11 @@ public static T invokeRMWebService(String webAddr, String path, final Class< String scheme = YarnConfiguration.useHttps(conf) ? HTTPS_PREFIX : HTTP_PREFIX; String webAddress = scheme + socketAddress.getHostName() + ":" + socketAddress.getPort(); WebResource webResource = client.resource(webAddress); + + if (selectParam != null) { + webResource = webResource.queryParam(RMWSConsts.DESELECTS, selectParam); + } + ClientResponse response = null; try { response = webResource.path(RM_WEB_SERVICE_PATH).path(path) @@ -92,6 +99,21 @@ public static T invokeRMWebService(String webAddr, String path, final Class< } } + /** + * Performs an invocation of the remote RMWebService. + * + * @param Generic T. + * @param webAddr WebAddress. + * @param path url path. + * @param returnType return type. + * @param config configuration. + * @return response entity. + */ + public static T invokeRMWebService(String webAddr, + String path, final Class returnType, Configuration config) { + return invokeRMWebService(webAddr, path, returnType, config, null); + } + /** * Creates a uniform weighting of 1.0 for each sub cluster. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java index 81a999d76a28e..ba8ce856cdaa5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner; import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator; import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner; import org.apache.hadoop.yarn.server.globalpolicygenerator.webapp.GPGWebApp; @@ -84,6 +85,7 @@ public class GlobalPolicyGenerator extends CompositeService { // Scheduler service that runs tasks periodically private ScheduledThreadPoolExecutor scheduledExecutorService; private SubClusterCleaner subClusterCleaner; + private ApplicationCleaner applicationCleaner; private PolicyGenerator policyGenerator; private String webAppAddress; private JvmPauseMonitor pauseMonitor; @@ -125,6 +127,12 @@ protected void serviceInit(Configuration conf) throws Exception { conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS, YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS)); this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext); + + this.applicationCleaner = FederationStateStoreFacade.createInstance(conf, + YarnConfiguration.GPG_APPCLEANER_CLASS, + YarnConfiguration.DEFAULT_GPG_APPCLEANER_CLASS, ApplicationCleaner.class); + this.applicationCleaner.init(conf, this.gpgContext); + this.policyGenerator = new PolicyGenerator(conf, this.gpgContext); this.webAppAddress = WebAppUtils.getGPGWebAppURLWithoutScheme(conf); @@ -149,7 +157,7 @@ protected void serviceStart() throws Exception { super.serviceStart(); - // Scheduler SubClusterCleaner service + // Schedule SubClusterCleaner service Configuration config = getConfig(); long scCleanerIntervalMs = config.getTimeDuration( YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS, @@ -161,6 +169,18 @@ protected void serviceStart() throws Exception { DurationFormatUtils.formatDurationISO(scCleanerIntervalMs)); } + // Schedule ApplicationCleaner service + long appCleanerIntervalMs = config.getTimeDuration( + YarnConfiguration.GPG_APPCLEANER_INTERVAL_MS, + YarnConfiguration.DEFAULT_GPG_APPCLEANER_INTERVAL_MS, TimeUnit.MILLISECONDS); + + if (appCleanerIntervalMs > 0) { + this.scheduledExecutorService.scheduleAtFixedRate(this.applicationCleaner, + 0, appCleanerIntervalMs, TimeUnit.MILLISECONDS); + LOG.info("Scheduled application cleaner with interval: {}", + DurationFormatUtils.formatDurationISO(appCleanerIntervalMs)); + } + // Schedule PolicyGenerator // We recommend using yarn.federation.gpg.policy.generator.interval // instead of yarn.federation.gpg.policy.generator.interval-ms diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java new file mode 100644 index 0000000000000..cd3f7618558e9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The ApplicationCleaner is a runnable that cleans up old applications from + * table applicationsHomeSubCluster in FederationStateStore. + */ +public abstract class ApplicationCleaner implements Runnable { + private static final Logger LOG = + LoggerFactory.getLogger(ApplicationCleaner.class); + + private Configuration conf; + private GPGContext gpgContext; + + private int minRouterSuccessCount; + private int maxRouterRetry; + private long routerQueryIntevalMillis; + + public void init(Configuration config, GPGContext context) + throws YarnException { + + this.gpgContext = context; + this.conf = config; + + String routerSpecString = + this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC, + YarnConfiguration.DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC); + String[] specs = routerSpecString.split(","); + if (specs.length != 3) { + throw new YarnException("Expect three comma separated values in " + + YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC + " but get " + + routerSpecString); + } + this.minRouterSuccessCount = Integer.parseInt(specs[0]); + this.maxRouterRetry = Integer.parseInt(specs[1]); + this.routerQueryIntevalMillis = Long.parseLong(specs[2]); + + if (this.minRouterSuccessCount > this.maxRouterRetry) { + throw new YarnException("minRouterSuccessCount " + + this.minRouterSuccessCount + + " should not be larger than maxRouterRetry" + this.maxRouterRetry); + } + if (this.minRouterSuccessCount <= 0) { + throw new YarnException("minRouterSuccessCount " + + this.minRouterSuccessCount + " should be positive"); + } + + LOG.info( + "Initialized AppCleaner with Router query with min success {}, " + + "max retry {}, retry interval {}", + this.minRouterSuccessCount, this.maxRouterRetry, + DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis)); + } + + public GPGContext getGPGContext() { + return this.gpgContext; + } + + /** + * Query router for applications. + * + * @return the set of applications + * @throws YarnRuntimeException when router call fails + */ + public Set getAppsFromRouter() throws YarnRuntimeException { + String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf); + + LOG.info(String.format("Contacting router at: %s", webAppAddress)); + AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, "apps", AppsInfo.class, conf, + DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString()); + + Set appSet = new HashSet<>(); + for (AppInfo appInfo : appsInfo.getApps()) { + appSet.add(ApplicationId.fromString(appInfo.getAppId())); + } + return appSet; + } + + /** + * Get the list of known applications in the cluster from Router. + * + * @return the list of known applications + * @throws YarnException if get app fails + */ + public Set getRouterKnownApplications() throws YarnException { + int successCount = 0, totalAttemptCount = 0; + Set resultSet = new HashSet<>(); + while (totalAttemptCount < this.maxRouterRetry) { + try { + Set routerApps = getAppsFromRouter(); + resultSet.addAll(routerApps); + LOG.info("Attempt {}: {} known apps from Router, {} in total", + totalAttemptCount, routerApps.size(), resultSet.size()); + + successCount++; + if (successCount >= this.minRouterSuccessCount) { + return resultSet; + } + + // Wait for the next attempt + try { + Thread.sleep(this.routerQueryIntevalMillis); + } catch (InterruptedException e) { + LOG.warn("Sleep interrupted after attempt {}.", totalAttemptCount); + } + } catch (Exception e) { + LOG.warn("Router query attempt {} failed.", totalAttemptCount, e); + } finally { + totalAttemptCount++; + } + } + throw new YarnException("Only " + successCount + + " success Router queries after " + totalAttemptCount + " retries"); + } + + @Override + public abstract void run(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java new file mode 100644 index 0000000000000..857d2e645d4c4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; + +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The default ApplicationCleaner that cleans up old applications from table + * applicationsHomeSubCluster in FederationStateStore. + */ +public class DefaultApplicationCleaner extends ApplicationCleaner { + private static final Logger LOG = + LoggerFactory.getLogger(DefaultApplicationCleaner.class); + + @Override + public void run() { + Date now = new Date(); + LOG.info("Application cleaner run at time {}", now); + + FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade(); + Set candidates = new HashSet<>(); + try { + List response = + facade.getApplicationsHomeSubCluster(); + for (ApplicationHomeSubCluster app : response) { + candidates.add(app.getApplicationId()); + } + LOG.info("{} app entries in FederationStateStore", candidates.size()); + + Set routerApps = getRouterKnownApplications(); + LOG.info("{} known applications from Router", routerApps.size()); + + candidates.removeAll(routerApps); + LOG.info("Deleting {} applications from statestore", candidates.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("Apps to delete: {}.", candidates.stream().map(Object::toString) + .collect(Collectors.joining(","))); + } + for (ApplicationId appId : candidates) { + try { + facade.deleteApplicationHomeSubCluster(appId); + } catch (Exception e) { + LOG.error("deleteApplicationHomeSubCluster failed at application {}.", appId, e); + } + } + } catch (Throwable e) { + LOG.error("Application cleaner started at time {} fails. ", now, e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java new file mode 100644 index 0000000000000..dd302c81f45ea --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java index df28192a0c668..1f0fbd11a741c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java @@ -159,7 +159,7 @@ protected Map> getInfos( clusterInfo.put(sci.getSubClusterId(), new HashMap<>()); } Object ret = GPGUtils.invokeRMWebService(sci.getRMWebServiceAddress(), - e.getValue(), e.getKey(), getConf()); + e.getValue(), e.getKey(), conf); clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret); } } @@ -181,7 +181,7 @@ protected Map getSchedulerInfo( for (SubClusterInfo sci : activeSubClusters.values()) { SchedulerTypeInfo sti = GPGUtils .invokeRMWebService(sci.getRMWebServiceAddress(), - RMWSConsts.SCHEDULER, SchedulerTypeInfo.class, getConf()); + RMWSConsts.SCHEDULER, SchedulerTypeInfo.class, conf); if(sti != null){ schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo()); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java new file mode 100644 index 0000000000000..2d63c48236fb5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for DefaultApplicationCleaner in GPG. + */ +public class TestDefaultApplicationCleaner { + private Configuration conf; + private MemoryFederationStateStore stateStore; + private FederationStateStoreFacade facade; + private ApplicationCleaner appCleaner; + private GPGContext gpgContext; + + private List appIds; + // The list of applications returned by mocked router + private Set routerAppIds; + + @Before + public void setup() throws Exception { + conf = new YarnConfiguration(); + + // No Router query retry + conf.set(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC, "1,1,0"); + + stateStore = new MemoryFederationStateStore(); + stateStore.init(conf); + + facade = FederationStateStoreFacade.getInstance(); + facade.reinitialize(stateStore, conf); + + gpgContext = new GPGContextImpl(); + gpgContext.setStateStoreFacade(facade); + + appCleaner = new TestableDefaultApplicationCleaner(); + appCleaner.init(conf, gpgContext); + + routerAppIds = new HashSet<>(); + + appIds = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + ApplicationId appId = ApplicationId.newInstance(0, i); + appIds.add(appId); + + SubClusterId subClusterId = + SubClusterId.newInstance("SUBCLUSTER-" + i); + + stateStore.addApplicationHomeSubCluster( + AddApplicationHomeSubClusterRequest.newInstance( + ApplicationHomeSubCluster.newInstance(appId, subClusterId))); + } + } + + @After + public void breakDown() { + if (stateStore != null) { + stateStore.close(); + stateStore = null; + } + } + + @Test + public void testFederationStateStoreAppsCleanUp() throws YarnException { + // Set first app to be still known by Router + ApplicationId appId = appIds.get(0); + routerAppIds.add(appId); + + // Another random app not in stateStore known by Router + appId = ApplicationId.newInstance(100, 200); + routerAppIds.add(appId); + + appCleaner.run(); + + // Only one app should be left + Assert.assertEquals(1, + stateStore + .getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest.newInstance()) + .getAppsHomeSubClusters().size()); + } + + /** + * Testable version of DefaultApplicationCleaner. + */ + public class TestableDefaultApplicationCleaner + extends DefaultApplicationCleaner { + @Override + public Set getAppsFromRouter() throws YarnRuntimeException { + return routerAppIds; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java index 72e97f8a75087..446eeee2cd922 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java @@ -299,7 +299,7 @@ public void testCallRM() { String webAppAddress = getServiceAddress(NetUtils.createSocketAddr(rmAddress)); SchedulerTypeInfo sti = GPGUtils.invokeRMWebService(webAppAddress, RMWSConsts.SCHEDULER, - SchedulerTypeInfo.class, this.conf); + SchedulerTypeInfo.class, conf); Assert.assertNotNull(sti); SchedulerInfo schedulerInfo = sti.getSchedulerInfo();