Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-6648. BackPort [GPG] Add SubClusterCleaner in Global Policy Generator. #5676

Merged
merged 9 commits into from Jun 12, 2023
Expand Up @@ -387,6 +387,11 @@
<Method name="initAndStartNodeManager" />
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.globalpolicygenerator.GlobalPolicyGenerator" />
<Medhod name="startGPG" />
<Bug pattern="DM_EXIT" />
</Match>

<!-- Ignore heartbeat exception when killing localizer -->
<Match>
Expand Down
Expand Up @@ -541,7 +541,7 @@ public static boolean isAclEnabled(Configuration conf) {
*/
public static final String GLOBAL_RM_AM_MAX_ATTEMPTS =
RM_PREFIX + "am.global.max-attempts";

/** The keytab for the resource manager.*/
public static final String RM_KEYTAB =
RM_PREFIX + "keytab";
Expand Down Expand Up @@ -597,7 +597,7 @@ public static boolean isAclEnabled(Configuration conf) {
RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms";
public static final int
DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = 0;

/** Path to file with nodes to exclude.*/
public static final String RM_NODES_EXCLUDE_FILE_PATH =
RM_PREFIX + "nodes.exclude-path";
Expand Down Expand Up @@ -1537,7 +1537,7 @@ public static boolean isAclEnabled(Configuration conf) {
+ "log-aggregation.debug.filesize";
public static final long DEFAULT_LOG_AGGREGATION_DEBUG_FILESIZE
= 100 * 1024 * 1024;

/**
* How long to wait between aggregated log retention checks. If set to
* a value {@literal <=} 0 then the value is computed as one-tenth of the
Expand Down Expand Up @@ -2187,7 +2187,7 @@ public static boolean isAclEnabled(Configuration conf) {
public static final long DEFAULT_NM_HEALTH_CHECK_TIMEOUT_MS =
2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS;

/** Health check script time out period.*/
/** Health check script time out period.*/
public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS_TEMPLATE =
NM_PREFIX + "health-checker.%s.timeout-ms";

Expand Down Expand Up @@ -2908,7 +2908,7 @@ public static boolean isAclEnabled(Configuration conf) {
/** Binding address for the web proxy. */
public static final String PROXY_BIND_HOST =
PROXY_PREFIX + "bind-host";

/**
* YARN Service Level Authorization
*/
Expand Down Expand Up @@ -4326,6 +4326,24 @@ public static boolean isAclEnabled(Configuration conf) {
public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
false;

private static final String FEDERATION_GPG_PREFIX = FEDERATION_PREFIX + "gpg.";

// The number of threads to use for the GPG scheduled executor service
public static final String GPG_SCHEDULED_EXECUTOR_THREADS =
FEDERATION_GPG_PREFIX + "scheduled.executor.threads";
public static final int DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS = 10;

// The interval at which the subcluster cleaner runs, -1 means disabled
public static final String GPG_SUBCLUSTER_CLEANER_INTERVAL_MS =
goiri marked this conversation as resolved.
Show resolved Hide resolved
FEDERATION_GPG_PREFIX + "subcluster.cleaner.interval-ms";
public static final long DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS =
TimeUnit.MILLISECONDS.toMillis(-1);

// The expiration time for a subcluster heartbeat, default is 30 minutes
public static final String GPG_SUBCLUSTER_EXPIRATION_MS =
FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms";
public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = TimeUnit.MINUTES.toMillis(30);

/**
* Connection and Read timeout from the Router to RM.
*/
Expand Down
Expand Up @@ -5261,6 +5261,32 @@
<value>false</value>
</property>

<property>
<description>
The number of threads to use for the GPG scheduled executor service.
default is 10.
</description>
<name>yarn.federation.gpg.scheduled.executor.threads</name>
<value>10</value>
</property>

<property>
<description>
The interval at which the subcluster cleaner runs, -1 means disabled.
default is -1.
</description>
<name>yarn.federation.gpg.subcluster.cleaner.interval-ms</name>
<value>-1ms</value>
</property>

<property>
<description>
The expiration time for a subcluster heartbeat, default is 30 minutes.
</description>
<name>yarn.federation.gpg.subcluster.heartbeat.expiration-ms</name>
<value>30m</value>
</property>

Check failure on line 5289 in hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml#L5289

blanks: end of line
<property>
<name>yarn.apps.cache.enable</name>
<value>false</value>
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.conf;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -247,4 +248,9 @@ void testUpdateConnectAddr() throws Exception {
assertNull(conf.get(
HAUtil.addSuffix(YarnConfiguration.NM_LOCALIZER_ADDRESS, "rm1")));
}

@Test
public void testCheck() throws Exception {
System.out.println(TimeUnit.MILLISECONDS.toMillis(-1));
}
}
Expand Up @@ -215,6 +215,17 @@ public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatReques
return SubClusterHeartbeatResponse.newInstance();
}

@VisibleForTesting
public void setSubClusterLastHeartbeat(SubClusterId subClusterId,
long lastHeartbeat) throws YarnException {
SubClusterInfo subClusterInfo = membership.get(subClusterId);
if (subClusterInfo == null) {
throw new YarnException(
"Subcluster " + subClusterId.toString() + " does not exist");
}
subClusterInfo.setLastHeartBeat(lastHeartbeat);
}

@Override
public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request)
throws YarnException {
Expand Down
Expand Up @@ -266,6 +266,26 @@ public Map<SubClusterId, SubClusterInfo> getSubClusters(final boolean filterInac
}
}

/**
* Updates the cache with the central {@link FederationStateStore} and returns
* the {@link SubClusterInfo} of all active sub cluster(s).
*
* @param filterInactiveSubClusters whether to filter out inactive
* sub-clusters
* @param flushCache flag to indicate if the cache should be flushed or not
* @return the sub cluster information
* @throws YarnException if the call to the state store is unsuccessful
*/
public Map<SubClusterId, SubClusterInfo> getSubClusters(
final boolean filterInactiveSubClusters, final boolean flushCache)
throws YarnException {
if (flushCache && federationCache.isCachingEnabled()) {
LOG.info("Flushing subClusters from cache and rehydrating from store.");
federationCache.removeSubCluster(flushCache);
}
return getSubClusters(filterInactiveSubClusters);
}

/**
* Returns the {@link SubClusterPolicyConfiguration} for the specified queue.
*
Expand Down
Expand Up @@ -18,8 +18,11 @@

package org.apache.hadoop.yarn.server.globalpolicygenerator;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.service.CompositeService;
Expand All @@ -28,6 +31,7 @@
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,6 +59,10 @@ public class GlobalPolicyGenerator extends CompositeService {
// Federation Variables
private GPGContext gpgContext;

// Scheduler service that runs tasks periodically
private ScheduledThreadPoolExecutor scheduledExecutorService;
private SubClusterCleaner subClusterCleaner;

public GlobalPolicyGenerator() {
super(GlobalPolicyGenerator.class.getName());
this.gpgContext = new GPGContextImpl();
Expand All @@ -78,6 +86,11 @@ protected void serviceInit(Configuration conf) throws Exception {
this.gpgContext
.setStateStoreFacade(FederationStateStoreFacade.getInstance());

this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);

DefaultMetricsSystem.initialize(METRICS_NAME);

// super.serviceInit after all services are added
Expand All @@ -87,10 +100,33 @@ protected void serviceInit(Configuration conf) throws Exception {
@Override
protected void serviceStart() throws Exception {
super.serviceStart();

// Scheduler SubClusterCleaner service
Configuration config = getConfig();
long scCleanerIntervalMs = config.getTimeDuration(
YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS, TimeUnit.MILLISECONDS);
if (scCleanerIntervalMs > 0) {
this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner,
0, scCleanerIntervalMs, TimeUnit.MILLISECONDS);
LOG.info("Scheduled sub-cluster cleaner with interval: {}",
DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
}
}

@Override
protected void serviceStop() throws Exception {
try {
if (this.scheduledExecutorService != null
&& !this.scheduledExecutorService.isShutdown()) {
this.scheduledExecutorService.shutdown();
LOG.info("Stopped ScheduledExecutorService");
}
} catch (Exception e) {
LOG.error("Failed to shutdown ScheduledExecutorService", e);
throw e;
}

if (this.isStopping.getAndSet(true)) {
return;
}
Expand Down
@@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner;

import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.time.DurationFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The sub-cluster cleaner is one of the GPG's services that periodically checks
* the membership table in FederationStateStore and mark sub-clusters that have
* not sent a heartbeat in certain amount of time as LOST.
*/
public class SubClusterCleaner implements Runnable {

private static final Logger LOG =
LoggerFactory.getLogger(SubClusterCleaner.class);

private GPGContext gpgContext;
private long heartbeatExpirationMillis;

/**
* The sub-cluster cleaner runnable is invoked by the sub cluster cleaner
* service to check the membership table and remove sub clusters that have not
* sent a heart beat in some amount of time.
*
* @param conf configuration.
* @param gpgContext GPGContext.
*/
public SubClusterCleaner(Configuration conf, GPGContext gpgContext) {
this.heartbeatExpirationMillis = conf.getTimeDuration(
YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS,
YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS, TimeUnit.MILLISECONDS);
this.gpgContext = gpgContext;
LOG.info("Initialized SubClusterCleaner with heartbeat expiration of {}",
DurationFormatUtils.formatDurationISO(this.heartbeatExpirationMillis));
}

@Override
public void run() {
try {
Date now = new Date();
LOG.info("SubClusterCleaner at {}", now);

Map<SubClusterId, SubClusterInfo> infoMap =
this.gpgContext.getStateStoreFacade().getSubClusters(false, true);

// Iterate over each sub cluster and check last heartbeat
for (Map.Entry<SubClusterId, SubClusterInfo> entry : infoMap.entrySet()) {
SubClusterInfo subClusterInfo = entry.getValue();

Date lastHeartBeat = new Date(subClusterInfo.getLastHeartBeat());
if (LOG.isDebugEnabled()) {
LOG.debug("Checking subcluster {} in state {}, last heartbeat at {}",
subClusterInfo.getSubClusterId(), subClusterInfo.getState(),
lastHeartBeat);
}

if (subClusterInfo.getState().isUsable()) {
long timeUntilDeregister = this.heartbeatExpirationMillis
- (now.getTime() - lastHeartBeat.getTime());
// Deregister sub-cluster as SC_LOST if last heartbeat too old
if (timeUntilDeregister < 0) {
LOG.warn(
"Deregistering subcluster {} in state {} last heartbeat at {}",
subClusterInfo.getSubClusterId(), subClusterInfo.getState(),
new Date(subClusterInfo.getLastHeartBeat()));
try {
this.gpgContext.getStateStoreFacade().deregisterSubCluster(
subClusterInfo.getSubClusterId(), SubClusterState.SC_LOST);
} catch (Exception e) {
LOG.error("deregisterSubCluster failed on subcluster "
+ subClusterInfo.getSubClusterId(), e);
}
} else if (LOG.isDebugEnabled()) {
LOG.debug("Time until deregister for subcluster {}: {}",
entry.getKey(),
DurationFormatUtils.formatDurationISO(timeUntilDeregister));
}
}
}
} catch (Throwable e) {
LOG.error("Subcluster cleaner fails: ", e);
}
}

}
@@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner;