From 2ed110957ef1731ea7dd04827d894311311a8a8d Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 19 May 2023 14:26:19 +0800 Subject: [PATCH 1/5] YARN-6648. BackPort [GPG] Add SubClusterCleaner in Global Policy Generator. --- .../dev-support/findbugs-exclude.xml | 5 + .../hadoop/yarn/conf/YarnConfiguration.java | 28 ++++- .../src/main/resources/yarn-default.xml | 24 ++++ .../impl/MemoryFederationStateStore.java | 11 ++ .../utils/FederationStateStoreFacade.java | 20 +++ .../GlobalPolicyGenerator.java | 35 ++++++ .../subclustercleaner/SubClusterCleaner.java | 109 ++++++++++++++++ .../subclustercleaner/package-info.java | 19 +++ .../TestSubClusterCleaner.java | 118 ++++++++++++++++++ 9 files changed, 364 insertions(+), 5 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 81e888472d8a3..cf457c23eb1d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -387,6 +387,11 @@ + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index e8189a2b945b5..06647542f8a36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -541,7 +541,7 @@ public static boolean isAclEnabled(Configuration conf) { */ public static final String GLOBAL_RM_AM_MAX_ATTEMPTS = RM_PREFIX + "am.global.max-attempts"; - + /** The keytab for the resource manager.*/ public static final String RM_KEYTAB = RM_PREFIX + "keytab"; @@ -597,7 +597,7 @@ public static boolean isAclEnabled(Configuration conf) { RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms"; public static final int DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = 0; - + /** Path to file with nodes to exclude.*/ public static final String RM_NODES_EXCLUDE_FILE_PATH = RM_PREFIX + "nodes.exclude-path"; @@ -1537,7 +1537,7 @@ public static boolean isAclEnabled(Configuration conf) { + "log-aggregation.debug.filesize"; public static final long DEFAULT_LOG_AGGREGATION_DEBUG_FILESIZE = 100 * 1024 * 1024; - + /** * How long to wait between aggregated log retention checks. If set to * a value {@literal <=} 0 then the value is computed as one-tenth of the @@ -2187,7 +2187,7 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_NM_HEALTH_CHECK_TIMEOUT_MS = 2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS; - /** Health check script time out period.*/ + /** Health check script time out period.*/ public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS_TEMPLATE = NM_PREFIX + "health-checker.%s.timeout-ms"; @@ -2908,7 +2908,7 @@ public static boolean isAclEnabled(Configuration conf) { /** Binding address for the web proxy. */ public static final String PROXY_BIND_HOST = PROXY_PREFIX + "bind-host"; - + /** * YARN Service Level Authorization */ @@ -4326,6 +4326,24 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED = false; + private static final String FEDERATION_GPG_PREFIX = + FEDERATION_PREFIX + "gpg."; + + // The number of threads to use for the GPG scheduled executor service + public static final String GPG_SCHEDULED_EXECUTOR_THREADS = + FEDERATION_GPG_PREFIX + "scheduled.executor.threads"; + public static final int DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS = 10; + + // The interval at which the subcluster cleaner runs, -1 means disabled + public static final String GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = + FEDERATION_GPG_PREFIX + "subcluster.cleaner.interval-ms"; + public static final long DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = -1; + + // The expiration time for a subcluster heartbeat, default is 30 minutes + public static final String GPG_SUBCLUSTER_EXPIRATION_MS = + FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms"; + public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000; + /** * Connection and Read timeout from the Router to RM. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 5e11c6526e898..e1c57befb2b1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -4370,6 +4370,30 @@ 0.0.0.0:8091 + + + The number of threads to use for the GPG scheduled executor service. + + yarn.federation.gpg.scheduled.executor.threads + 10 + + + + + The interval at which the subcluster cleaner runs, -1 means disabled. + + yarn.federation.gpg.subcluster.cleaner.interval-ms + -1 + + + + + The expiration time for a subcluster heartbeat, default is 30 minutes. + + yarn.federation.gpg.subcluster.heartbeat.expiration-ms + 1800000 + + It is TimelineClient 1.5 configuration whether to store active diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 959edafcf28b7..b3847b20d55bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -215,6 +215,17 @@ public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatReques return SubClusterHeartbeatResponse.newInstance(); } + @VisibleForTesting + public void setSubClusterLastHeartbeat(SubClusterId subClusterId, + long lastHeartbeat) throws YarnException { + SubClusterInfo subClusterInfo = membership.get(subClusterId); + if (subClusterInfo == null) { + throw new YarnException( + "Subcluster " + subClusterId.toString() + " does not exist"); + } + subClusterInfo.setLastHeartBeat(lastHeartbeat); + } + @Override public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request) throws YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index b2de68b506661..9f83cea0489d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -266,6 +266,26 @@ public Map getSubClusters(final boolean filterInac } } + /** + * Updates the cache with the central {@link FederationStateStore} and returns + * the {@link SubClusterInfo} of all active sub cluster(s). + * + * @param filterInactiveSubClusters whether to filter out inactive + * sub-clusters + * @param flushCache flag to indicate if the cache should be flushed or not + * @return the sub cluster information + * @throws YarnException if the call to the state store is unsuccessful + */ + public Map getSubClusters( + final boolean filterInactiveSubClusters, final boolean flushCache) + throws YarnException { + if (flushCache && federationCache.isCachingEnabled()) { + LOG.info("Flushing subClusters from cache and rehydrating from store."); + federationCache.removeSubCluster(flushCache); + } + return getSubClusters(filterInactiveSubClusters); + } + /** * Returns the {@link SubClusterPolicyConfiguration} for the specified queue. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java index 01f4a4c41a6cf..3eec5b76376a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java @@ -18,8 +18,11 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang.time.DurationFormatUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.service.CompositeService; @@ -28,6 +31,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +59,10 @@ public class GlobalPolicyGenerator extends CompositeService { // Federation Variables private GPGContext gpgContext; + // Scheduler service that runs tasks periodically + private ScheduledThreadPoolExecutor scheduledExecutorService; + private SubClusterCleaner subClusterCleaner; + public GlobalPolicyGenerator() { super(GlobalPolicyGenerator.class.getName()); this.gpgContext = new GPGContextImpl(); @@ -78,6 +86,11 @@ protected void serviceInit(Configuration conf) throws Exception { this.gpgContext .setStateStoreFacade(FederationStateStoreFacade.getInstance()); + this.scheduledExecutorService = new ScheduledThreadPoolExecutor( + conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS, + YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS)); + this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext); + DefaultMetricsSystem.initialize(METRICS_NAME); // super.serviceInit after all services are added @@ -87,10 +100,32 @@ protected void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws Exception { super.serviceStart(); + + // Scheduler SubClusterCleaner service + long scCleanerIntervalMs = getConfig().getLong( + YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS, + YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS); + if (scCleanerIntervalMs > 0) { + this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner, + 0, scCleanerIntervalMs, TimeUnit.MILLISECONDS); + LOG.info("Scheduled sub-cluster cleaner with interval: {}", + DurationFormatUtils.formatDurationISO(scCleanerIntervalMs)); + } } @Override protected void serviceStop() throws Exception { + try { + if (this.scheduledExecutorService != null + && !this.scheduledExecutorService.isShutdown()) { + this.scheduledExecutorService.shutdown(); + LOG.info("Stopped ScheduledExecutorService"); + } + } catch (Exception e) { + LOG.error("Failed to shutdown ScheduledExecutorService", e); + throw e; + } + if (this.isStopping.getAndSet(true)) { return; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java new file mode 100644 index 0000000000000..e65bba7a59b61 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner; + +import java.util.Date; +import java.util.Map; + +import org.apache.commons.lang.time.DurationFormatUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The sub-cluster cleaner is one of the GPG's services that periodically checks + * the membership table in FederationStateStore and mark sub-clusters that have + * not sent a heartbeat in certain amount of time as LOST. + */ +public class SubClusterCleaner implements Runnable { + + private static final Logger LOG = + LoggerFactory.getLogger(SubClusterCleaner.class); + + private GPGContext gpgContext; + private long heartbeatExpirationMillis; + + /** + * The sub-cluster cleaner runnable is invoked by the sub cluster cleaner + * service to check the membership table and remove sub clusters that have not + * sent a heart beat in some amount of time. + */ + public SubClusterCleaner(Configuration conf, GPGContext gpgContext) { + this.heartbeatExpirationMillis = + conf.getLong(YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS, + YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS); + this.gpgContext = gpgContext; + LOG.info("Initialized SubClusterCleaner with heartbeat expiration of {}", + DurationFormatUtils.formatDurationISO(this.heartbeatExpirationMillis)); + } + + @Override + public void run() { + try { + Date now = new Date(); + LOG.info("SubClusterCleaner at {}", now); + + Map infoMap = + this.gpgContext.getStateStoreFacade().getSubClusters(false, true); + + // Iterate over each sub cluster and check last heartbeat + for (Map.Entry entry : infoMap.entrySet()) { + SubClusterInfo subClusterInfo = entry.getValue(); + + Date lastHeartBeat = new Date(subClusterInfo.getLastHeartBeat()); + if (LOG.isDebugEnabled()) { + LOG.debug("Checking subcluster {} in state {}, last heartbeat at {}", + subClusterInfo.getSubClusterId(), subClusterInfo.getState(), + lastHeartBeat); + } + + if (subClusterInfo.getState().isUsable()) { + long timeUntilDeregister = this.heartbeatExpirationMillis + - (now.getTime() - lastHeartBeat.getTime()); + // Deregister sub-cluster as SC_LOST if last heartbeat too old + if (timeUntilDeregister < 0) { + LOG.warn( + "Deregistering subcluster {} in state {} last heartbeat at {}", + subClusterInfo.getSubClusterId(), subClusterInfo.getState(), + new Date(subClusterInfo.getLastHeartBeat())); + try { + this.gpgContext.getStateStoreFacade().deregisterSubCluster( + subClusterInfo.getSubClusterId(), SubClusterState.SC_LOST); + } catch (Exception e) { + LOG.error("deregisterSubCluster failed on subcluster " + + subClusterInfo.getSubClusterId(), e); + } + } else if (LOG.isDebugEnabled()) { + LOG.debug("Time until deregister for subcluster {}: {}", + entry.getKey(), + DurationFormatUtils.formatDurationISO(timeUntilDeregister)); + } + } + } + } catch (Throwable e) { + LOG.error("Subcluster cleaner fails: ", e); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java new file mode 100644 index 0000000000000..f65444aa3663d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java new file mode 100644 index 0000000000000..19b8802b00d4f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner; + +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext; +import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit test for Sub-cluster Cleaner in GPG. + */ +public class TestSubClusterCleaner { + + private Configuration conf; + private MemoryFederationStateStore stateStore; + private FederationStateStoreFacade facade; + private SubClusterCleaner cleaner; + private GPGContext gpgContext; + + private ArrayList subClusterIds; + + @Before + public void setup() throws YarnException { + conf = new YarnConfiguration(); + + // subcluster expires in one second + conf.setLong(YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS, 1000); + + stateStore = new MemoryFederationStateStore(); + stateStore.init(conf); + + facade = FederationStateStoreFacade.getInstance(); + facade.reinitialize(stateStore, conf); + + gpgContext = new GPGContextImpl(); + gpgContext.setStateStoreFacade(facade); + + cleaner = new SubClusterCleaner(conf, gpgContext); + + // Create and register six sub clusters + subClusterIds = new ArrayList(); + for (int i = 0; i < 3; i++) { + // Create sub cluster id and info + SubClusterId subClusterId = + SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i)); + + SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId, + "1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3", "1.2.3.4:4", + SubClusterState.SC_RUNNING, System.currentTimeMillis(), ""); + // Register the sub cluster + stateStore.registerSubCluster( + SubClusterRegisterRequest.newInstance(subClusterInfo)); + // Append the id to a local list + subClusterIds.add(subClusterId); + } + } + + @After + public void breakDown() throws Exception { + stateStore.close(); + } + + @Test + public void testSubClusterRegisterHeartBeatTime() throws YarnException { + cleaner.run(); + Assert.assertEquals(3, facade.getSubClusters(true, true).size()); + } + + /** + * Test the base use case. + */ + @Test + public void testSubClusterHeartBeat() throws YarnException { + // The first subcluster reports as Unhealthy + SubClusterId subClusterId = subClusterIds.get(0); + stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest + .newInstance(subClusterId, SubClusterState.SC_UNHEALTHY, "capacity")); + + // The second subcluster didn't heartbeat for two seconds, should mark lost + subClusterId = subClusterIds.get(1); + stateStore.setSubClusterLastHeartbeat(subClusterId, + System.currentTimeMillis() - 2000); + + cleaner.run(); + Assert.assertEquals(1, facade.getSubClusters(true, true).size()); + } +} \ No newline at end of file From 91484a0a50db781772b1f66ca4a02059bb8b8464 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 19 May 2023 23:43:44 +0800 Subject: [PATCH 2/5] YARN-6648. Fix CheckStyle. --- .../subclustercleaner/SubClusterCleaner.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java index e65bba7a59b61..a22f9aa61f1ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java @@ -48,6 +48,9 @@ public class SubClusterCleaner implements Runnable { * The sub-cluster cleaner runnable is invoked by the sub cluster cleaner * service to check the membership table and remove sub clusters that have not * sent a heart beat in some amount of time. + * + * @param conf configuration. + * @param gpgContext GPGContext. */ public SubClusterCleaner(Configuration conf, GPGContext gpgContext) { this.heartbeatExpirationMillis = From 9742423dd4aaad62d8c0d24eac33206ef2ad2367 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sun, 21 May 2023 18:47:27 +0800 Subject: [PATCH 3/5] YARN-6648. Fix CheckStyle. --- .../hadoop/yarn/conf/YarnConfiguration.java | 11 ++++---- .../src/main/resources/yarn-default.xml | 26 +++++++++++++++++++ .../subclustercleaner/SubClusterCleaner.java | 7 ++--- 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 06647542f8a36..44be264db4158 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4326,23 +4326,22 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED = false; - private static final String FEDERATION_GPG_PREFIX = - FEDERATION_PREFIX + "gpg."; + private static final String FEDERATION_GPG_PREFIX = FEDERATION_PREFIX + "gpg."; // The number of threads to use for the GPG scheduled executor service public static final String GPG_SCHEDULED_EXECUTOR_THREADS = - FEDERATION_GPG_PREFIX + "scheduled.executor.threads"; + FEDERATION_GPG_PREFIX + "scheduled.executor.threads"; public static final int DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS = 10; // The interval at which the subcluster cleaner runs, -1 means disabled public static final String GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = - FEDERATION_GPG_PREFIX + "subcluster.cleaner.interval-ms"; + FEDERATION_GPG_PREFIX + "subcluster.cleaner.interval-ms"; public static final long DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = -1; // The expiration time for a subcluster heartbeat, default is 30 minutes public static final String GPG_SUBCLUSTER_EXPIRATION_MS = - FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms"; - public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = 1800000; + FEDERATION_GPG_PREFIX + "subcluster.heartbeat.expiration-ms"; + public static final long DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS = TimeUnit.MINUTES.toMillis(30); /** * Connection and Read timeout from the Router to RM. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e1c57befb2b1f..85a86fa7ccd93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5285,4 +5285,30 @@ false + + + The number of threads to use for the GPG scheduled executor service. + default is 10. + + yarn.federation.gpg.scheduled.executor.threads + 10 + + + + + The interval at which the subcluster cleaner runs, -1 means disabled. + default is -1. + + yarn.federation.gpg.subcluster.cleaner.interval-ms + -1 + + + + + The expiration time for a subcluster heartbeat, default is 30 minutes. + + yarn.federation.gpg.subcluster.heartbeat.expiration-ms + 30m + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java index a22f9aa61f1ee..aa1dda7464c36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/SubClusterCleaner.java @@ -20,6 +20,7 @@ import java.util.Date; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.DurationFormatUtils; import org.apache.hadoop.conf.Configuration; @@ -53,9 +54,9 @@ public class SubClusterCleaner implements Runnable { * @param gpgContext GPGContext. */ public SubClusterCleaner(Configuration conf, GPGContext gpgContext) { - this.heartbeatExpirationMillis = - conf.getLong(YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS, - YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS); + this.heartbeatExpirationMillis = conf.getTimeDuration( + YarnConfiguration.GPG_SUBCLUSTER_EXPIRATION_MS, + YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_EXPIRATION_MS, TimeUnit.MILLISECONDS); this.gpgContext = gpgContext; LOG.info("Initialized SubClusterCleaner with heartbeat expiration of {}", DurationFormatUtils.formatDurationISO(this.heartbeatExpirationMillis)); From b02819605c58eb409f5b87e9528fcb00aa3a1997 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Thu, 25 May 2023 05:53:33 +0800 Subject: [PATCH 4/5] YARN-6648. Fix CheckStyle. --- .../hadoop/yarn/conf/YarnConfiguration.java | 3 ++- .../src/main/resources/yarn-default.xml | 26 +------------------ .../yarn/conf/TestYarnConfiguration.java | 6 +++++ .../GlobalPolicyGenerator.java | 5 ++-- .../TestSubClusterCleaner.java | 5 +++- 5 files changed, 16 insertions(+), 29 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 44be264db4158..b0184ce7e9dac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4336,7 +4336,8 @@ public static boolean isAclEnabled(Configuration conf) { // The interval at which the subcluster cleaner runs, -1 means disabled public static final String GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = FEDERATION_GPG_PREFIX + "subcluster.cleaner.interval-ms"; - public static final long DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = -1; + public static final long DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS = + TimeUnit.MILLISECONDS.toMillis(-1); // The expiration time for a subcluster heartbeat, default is 30 minutes public static final String GPG_SUBCLUSTER_EXPIRATION_MS = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 85a86fa7ccd93..544343fadcfbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -4370,30 +4370,6 @@ 0.0.0.0:8091 - - - The number of threads to use for the GPG scheduled executor service. - - yarn.federation.gpg.scheduled.executor.threads - 10 - - - - - The interval at which the subcluster cleaner runs, -1 means disabled. - - yarn.federation.gpg.subcluster.cleaner.interval-ms - -1 - - - - - The expiration time for a subcluster heartbeat, default is 30 minutes. - - yarn.federation.gpg.subcluster.heartbeat.expiration-ms - 1800000 - - It is TimelineClient 1.5 configuration whether to store active @@ -5300,7 +5276,7 @@ default is -1. yarn.federation.gpg.subcluster.cleaner.interval-ms - -1 + -1ms diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java index e4547a9163dda..59fc31b0db977 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.conf; import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -247,4 +248,9 @@ void testUpdateConnectAddr() throws Exception { assertNull(conf.get( HAUtil.addSuffix(YarnConfiguration.NM_LOCALIZER_ADDRESS, "rm1"))); } + + @Test + public void testCheck() throws Exception { + System.out.println(TimeUnit.MILLISECONDS.toMillis(-1)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java index 3eec5b76376a0..5ba47dcd57c53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java @@ -102,9 +102,10 @@ protected void serviceStart() throws Exception { super.serviceStart(); // Scheduler SubClusterCleaner service - long scCleanerIntervalMs = getConfig().getLong( + Configuration config = getConfig(); + long scCleanerIntervalMs = config.getTimeDuration( YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS, - YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS); + YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS, TimeUnit.MILLISECONDS); if (scCleanerIntervalMs > 0) { this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner, 0, scCleanerIntervalMs, TimeUnit.MILLISECONDS); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java index 19b8802b00d4f..202733940d294 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/subclustercleaner/TestSubClusterCleaner.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner; import java.util.ArrayList; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -48,6 +49,8 @@ public class TestSubClusterCleaner { private SubClusterCleaner cleaner; private GPGContext gpgContext; + private static final long TWO_SECONDS = TimeUnit.SECONDS.toMillis(2); + private ArrayList subClusterIds; @Before @@ -110,7 +113,7 @@ public void testSubClusterHeartBeat() throws YarnException { // The second subcluster didn't heartbeat for two seconds, should mark lost subClusterId = subClusterIds.get(1); stateStore.setSubClusterLastHeartbeat(subClusterId, - System.currentTimeMillis() - 2000); + System.currentTimeMillis() - TWO_SECONDS); cleaner.run(); Assert.assertEquals(1, facade.getSubClusters(true, true).size()); From 37b649099e4bda300df75e6c41dc58904d354c94 Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Tue, 30 May 2023 23:40:52 +0800 Subject: [PATCH 5/5] YARN-5292. Fix CheckStyle. --- .../hadoop-yarn-common/src/main/resources/yarn-default.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 851a4a1f398a5..aee9c52c9807f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5289,7 +5289,7 @@ yarn.federation.gpg.subcluster.heartbeat.expiration-ms 30m - + yarn.apps.cache.enable false