Skip to content

Commit

Permalink
YARN-7599. [BackPort][GPG] ApplicationCleaner in Global Policy Genera…
Browse files Browse the repository at this point in the history
…tor. (#5934) Contributed by Botong Huang, Shilun Fan.

Co-authored-by: Botong Huang <botong@apache.org>
Co-authored-by: slfan1989 <slfan1989@apache.org>
Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
  • Loading branch information
3 people committed Sep 14, 2023
1 parent 56b928b commit 8538af4
Show file tree
Hide file tree
Showing 11 changed files with 510 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4432,6 +4432,31 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String GPG_KERBEROS_PRINCIPAL_HOSTNAME_KEY = FEDERATION_GPG_PREFIX +
"kerberos.principal.hostname";

// The application cleaner class to use
public static final String GPG_APPCLEANER_CLASS =
FEDERATION_GPG_PREFIX + "application.cleaner.class";
public static final String DEFAULT_GPG_APPCLEANER_CLASS =
"org.apache.hadoop.yarn.server.globalpolicygenerator"
+ ".applicationcleaner.DefaultApplicationCleaner";

// The interval at which the application cleaner runs, -1 means disabled
public static final String GPG_APPCLEANER_INTERVAL_MS =
FEDERATION_GPG_PREFIX + "application.cleaner.interval-ms";
public static final long DEFAULT_GPG_APPCLEANER_INTERVAL_MS = TimeUnit.SECONDS.toMillis(-1);

/**
* Specifications on how (many times) to contact Router for apps. We need to
* do this because Router might return partial application list because some
* sub-cluster RM is not responsive (e.g. failing over).
*
* Should have three values separated by comma: minimal success retries,
* maximum total retry, retry interval (ms).
*/
public static final String GPG_APPCLEANER_CONTACT_ROUTER_SPEC =
FEDERATION_GPG_PREFIX + "application.cleaner.contact.router.spec";
public static final String DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC =
"3,10,600000";

public static final String FEDERATION_GPG_POLICY_PREFIX =
FEDERATION_GPG_PREFIX + "policy.generator.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5538,6 +5538,14 @@
<value>LINEAR</value>
</property>

<property>
<description>
The Application Cleaner implementation class for GPG to use.
</description>
<name>yarn.federation.gpg.application.cleaner.class</name>
<value>org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.DefaultApplicationCleaner</value>
</property>

<property>
<description>Flag to enable cross-origin (CORS) support in the GPG. This flag
requires the CORS filter initializer to be added to the filter initializers
Expand All @@ -5546,6 +5554,14 @@
<value>false</value>
</property>

<property>
<description>
The interval at which the application cleaner runs, -1 means disabled.
</description>
<name>yarn.federation.gpg.application.cleaner.interval-ms</name>
<value>-1s</value>
</property>

<property>
<description>
The http address of the GPG web application.
Expand All @@ -5556,6 +5572,18 @@
<value>0.0.0.0:8069</value>
</property>

<property>
<description>
Specifications on how (many times) to contact Router for apps. We need to
do this because Router might return partial application list because some
sub-cluster RM is not responsive (e.g. failing over).
Should have three values separated by comma: minimal success retries,
maximum total retry, retry interval (ms).
</description>
<name>yarn.federation.gpg.application.cleaner.contact.router.spec</name>
<value>3,10,600000</value>
</property>

<property>
<description>
The https address of the GPG web application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -884,6 +887,33 @@ public void addApplicationHomeSubCluster(ApplicationId applicationId,
}
}

/**
* Get the {@code ApplicationHomeSubCluster} list representing the mapping of
* all submitted applications to it's home sub-cluster.
*
* @return the mapping of all submitted application to it's home sub-cluster
* @throws YarnException if the request is invalid/fails
*/
public List<ApplicationHomeSubCluster> getApplicationsHomeSubCluster() throws YarnException {
GetApplicationsHomeSubClusterResponse response = stateStore.getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest.newInstance());
return response.getAppsHomeSubClusters();
}

/**
* Delete the mapping of home {@code SubClusterId} of a previously submitted
* {@code ApplicationId}. Currently response is empty if the operation is
* successful, if not an exception reporting reason for a failure.
*
* @param applicationId the application to delete the home sub-cluster of
* @throws YarnException if the request is invalid/fails
*/
public void deleteApplicationHomeSubCluster(ApplicationId applicationId)
throws YarnException {
stateStore.deleteApplicationHomeSubCluster(
DeleteApplicationHomeSubClusterRequest.newInstance(applicationId));
}

/**
* Update ApplicationHomeSubCluster to FederationStateStore.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;

/**
* GPGUtils contains utility functions for the GPG.
Expand All @@ -58,11 +59,12 @@ private GPGUtils() {
* @param webAddr WebAddress.
* @param path url path.
* @param returnType return type.
* @param selectParam query parameters.
* @param conf configuration.
* @return response entity.
*/
public static <T> T invokeRMWebService(String webAddr, String path, final Class<T> returnType,
Configuration conf) {
Configuration conf, String selectParam) {
Client client = Client.create();
T obj;

Expand All @@ -72,6 +74,11 @@ public static <T> T invokeRMWebService(String webAddr, String path, final Class<
String scheme = YarnConfiguration.useHttps(conf) ? HTTPS_PREFIX : HTTP_PREFIX;
String webAddress = scheme + socketAddress.getHostName() + ":" + socketAddress.getPort();
WebResource webResource = client.resource(webAddress);

if (selectParam != null) {
webResource = webResource.queryParam(RMWSConsts.DESELECTS, selectParam);
}

ClientResponse response = null;
try {
response = webResource.path(RM_WEB_SERVICE_PATH).path(path)
Expand All @@ -92,6 +99,21 @@ public static <T> T invokeRMWebService(String webAddr, String path, final Class<
}
}

/**
* Performs an invocation of the remote RMWebService.
*
* @param <T> Generic T.
* @param webAddr WebAddress.
* @param path url path.
* @param returnType return type.
* @param config configuration.
* @return response entity.
*/
public static <T> T invokeRMWebService(String webAddr,
String path, final Class<T> returnType, Configuration config) {
return invokeRMWebService(webAddr, path, returnType, config, null);
}

/**
* Creates a uniform weighting of 1.0 for each sub cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.apache.hadoop.yarn.server.globalpolicygenerator.webapp.GPGWebApp;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class GlobalPolicyGenerator extends CompositeService {
// Scheduler service that runs tasks periodically
private ScheduledThreadPoolExecutor scheduledExecutorService;
private SubClusterCleaner subClusterCleaner;
private ApplicationCleaner applicationCleaner;
private PolicyGenerator policyGenerator;
private String webAppAddress;
private JvmPauseMonitor pauseMonitor;
Expand Down Expand Up @@ -125,6 +127,12 @@ protected void serviceInit(Configuration conf) throws Exception {
conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);

this.applicationCleaner = FederationStateStoreFacade.createInstance(conf,
YarnConfiguration.GPG_APPCLEANER_CLASS,
YarnConfiguration.DEFAULT_GPG_APPCLEANER_CLASS, ApplicationCleaner.class);
this.applicationCleaner.init(conf, this.gpgContext);

this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);

this.webAppAddress = WebAppUtils.getGPGWebAppURLWithoutScheme(conf);
Expand All @@ -149,7 +157,7 @@ protected void serviceStart() throws Exception {

super.serviceStart();

// Scheduler SubClusterCleaner service
// Schedule SubClusterCleaner service
Configuration config = getConfig();
long scCleanerIntervalMs = config.getTimeDuration(
YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
Expand All @@ -161,6 +169,18 @@ protected void serviceStart() throws Exception {
DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
}

// Schedule ApplicationCleaner service
long appCleanerIntervalMs = config.getTimeDuration(
YarnConfiguration.GPG_APPCLEANER_INTERVAL_MS,
YarnConfiguration.DEFAULT_GPG_APPCLEANER_INTERVAL_MS, TimeUnit.MILLISECONDS);

if (appCleanerIntervalMs > 0) {
this.scheduledExecutorService.scheduleAtFixedRate(this.applicationCleaner,
0, appCleanerIntervalMs, TimeUnit.MILLISECONDS);
LOG.info("Scheduled application cleaner with interval: {}",
DurationFormatUtils.formatDurationISO(appCleanerIntervalMs));
}

// Schedule PolicyGenerator
// We recommend using yarn.federation.gpg.policy.generator.interval
// instead of yarn.federation.gpg.policy.generator.interval-ms
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;

import java.util.HashSet;
import java.util.Set;

import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The ApplicationCleaner is a runnable that cleans up old applications from
* table applicationsHomeSubCluster in FederationStateStore.
*/
public abstract class ApplicationCleaner implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(ApplicationCleaner.class);

private Configuration conf;
private GPGContext gpgContext;

private int minRouterSuccessCount;
private int maxRouterRetry;
private long routerQueryIntevalMillis;

public void init(Configuration config, GPGContext context)
throws YarnException {

this.gpgContext = context;
this.conf = config;

String routerSpecString =
this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC,
YarnConfiguration.DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC);
String[] specs = routerSpecString.split(",");
if (specs.length != 3) {
throw new YarnException("Expect three comma separated values in "
+ YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC + " but get "
+ routerSpecString);
}
this.minRouterSuccessCount = Integer.parseInt(specs[0]);
this.maxRouterRetry = Integer.parseInt(specs[1]);
this.routerQueryIntevalMillis = Long.parseLong(specs[2]);

if (this.minRouterSuccessCount > this.maxRouterRetry) {
throw new YarnException("minRouterSuccessCount "
+ this.minRouterSuccessCount
+ " should not be larger than maxRouterRetry" + this.maxRouterRetry);
}
if (this.minRouterSuccessCount <= 0) {
throw new YarnException("minRouterSuccessCount "
+ this.minRouterSuccessCount + " should be positive");
}

LOG.info(
"Initialized AppCleaner with Router query with min success {}, "
+ "max retry {}, retry interval {}",
this.minRouterSuccessCount, this.maxRouterRetry,
DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis));
}

public GPGContext getGPGContext() {
return this.gpgContext;
}

/**
* Query router for applications.
*
* @return the set of applications
* @throws YarnRuntimeException when router call fails
*/
public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf);

LOG.info(String.format("Contacting router at: %s", webAppAddress));
AppsInfo appsInfo = GPGUtils.invokeRMWebService(webAppAddress, "apps", AppsInfo.class, conf,
DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());

Set<ApplicationId> appSet = new HashSet<>();
for (AppInfo appInfo : appsInfo.getApps()) {
appSet.add(ApplicationId.fromString(appInfo.getAppId()));
}
return appSet;
}

/**
* Get the list of known applications in the cluster from Router.
*
* @return the list of known applications
* @throws YarnException if get app fails
*/
public Set<ApplicationId> getRouterKnownApplications() throws YarnException {
int successCount = 0, totalAttemptCount = 0;
Set<ApplicationId> resultSet = new HashSet<>();
while (totalAttemptCount < this.maxRouterRetry) {
try {
Set<ApplicationId> routerApps = getAppsFromRouter();
resultSet.addAll(routerApps);
LOG.info("Attempt {}: {} known apps from Router, {} in total",
totalAttemptCount, routerApps.size(), resultSet.size());

successCount++;
if (successCount >= this.minRouterSuccessCount) {
return resultSet;
}

// Wait for the next attempt
try {
Thread.sleep(this.routerQueryIntevalMillis);
} catch (InterruptedException e) {
LOG.warn("Sleep interrupted after attempt {}.", totalAttemptCount);
}
} catch (Exception e) {
LOG.warn("Router query attempt {} failed.", totalAttemptCount, e);
} finally {
totalAttemptCount++;
}
}
throw new YarnException("Only " + successCount
+ " success Router queries after " + totalAttemptCount + " retries");
}

@Override
public abstract void run();
}
Loading

0 comments on commit 8538af4

Please sign in to comment.