From f8183f83bbf556ca248f755e7c364881b4f5275d Mon Sep 17 00:00:00 2001 From: lgh Date: Wed, 3 Jan 2024 21:13:55 +0800 Subject: [PATCH 1/2] RBF: Add RouterAutoMsyncService for auto msync to all nameservices. --- .../federation/router/RBFConfigKeys.java | 9 ++ .../hdfs/server/federation/router/Router.java | 18 ++++ .../router/RouterAutoMsyncService.java | 69 +++++++++++++++ .../router/RouterClientMetrics.java | 4 + .../src/main/resources/hdfs-rbf-default.xml | 16 ++++ .../federation/RouterConfigBuilder.java | 12 +++ .../server/federation/router/TestRouter.java | 3 + .../router/TestRouterAutoMsyncService.java | 86 +++++++++++++++++++ 8 files changed, 217 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAutoMsyncService.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAutoMsyncService.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 7000a72b3a058..a48a6cb4c194a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -119,6 +119,15 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_ROUTER_PREFIX + "namenode.heartbeat.jmx.interval"; public static final long DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS_DEFAULT = 0; + // HDFS Router Msync + public static final String DFS_ROUTER_AUTO_MSYNC_ENABLE = + FEDERATION_ROUTER_PREFIX + "auto.msync.enable"; + public static final boolean DFS_ROUTER_AUTO_MSYNC_ENABLE_DEFAULT = false; + public static final String DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS = + FEDERATION_ROUTER_PREFIX + "auto.msync.interval"; + public static final long DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS_DEFAULT = + TimeUnit.SECONDS.toMillis(5); + // HDFS Router NN client public static final String DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE = diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index 3d996b3e849f8..3ea6c2534aa03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -118,6 +118,8 @@ public class Router extends CompositeService implements private ActiveNamenodeResolver namenodeResolver; /** Updates the namenode status in the namenode resolver. */ private Collection namenodeHeartbeatServices; + /** Router peridically send msync to all nameservices. */ + private RouterAutoMsyncService routerAutoMsyncService; /** Router metrics. */ private RouterMetricsService metrics; @@ -239,6 +241,14 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(this.routerHeartbeatService); } + boolean isRouterAutoMsyncEnable = conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_ENABLE, + RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_ENABLE_DEFAULT); + if (isRouterAutoMsyncEnable) { + this.routerAutoMsyncService = new RouterAutoMsyncService(this.rpcServer); + addService(this.routerAutoMsyncService); + } + // Router metrics system if (conf.getBoolean( RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, @@ -842,6 +852,14 @@ Collection getNamenodeHeartbeatServices() { return this.namenodeHeartbeatServices; } + /** + * Get this router msync service. + */ + @VisibleForTesting + RouterAutoMsyncService getRouterAutoMsyncService() { + return this.routerAutoMsyncService; + } + /** * Get this router heartbeat service. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAutoMsyncService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAutoMsyncService.java new file mode 100644 index 0000000000000..470e759a63596 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAutoMsyncService.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS_DEFAULT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.tools.DFSHAAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * The {@link Router} periodically send msync to all nameservices. + */ +public class RouterAutoMsyncService extends PeriodicService { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterAutoMsyncService.class); + + private RouterRpcServer rpcServer; + private Configuration conf; + + public RouterAutoMsyncService(RouterRpcServer rpcServer) { + super(RouterAutoMsyncService.class.getSimpleName()); + this.rpcServer = rpcServer; + } + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + this.conf = DFSHAAdmin.addSecurityConfiguration(configuration); + + this.setIntervalMs(conf.getLong( + DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS, + DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS_DEFAULT)); + super.serviceInit(this.conf); + } + + @Override + public void periodicInvoke() { + try { + this.rpcServer.msync(); + } catch (IOException e) { + LOG.warn("RouterMsyncService msync failed: {}", e.getMessage()); + } + } + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping RouterMsyncService."); + super.serviceStop(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientMetrics.java index 5c4b8ff022736..3b11b35f6b2da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientMetrics.java @@ -650,4 +650,8 @@ public void incInvokedConcurrent(Method method){ concurrentOtherOps.incr(); } } + + public long getMsyncOps(){ + return msyncOps.value(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 43bd17d75f479..c35dca74f52f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -481,6 +481,22 @@ + + dfs.federation.router.auto.msync.enable + false + + If true, router auto msync to nameservices for update stateid. + + + + + dfs.federation.router.auto.msync.interval + 5000 + + The interval in milliseconds for router auto msync to nameservices. + + + dfs.federation.router.store.router.expiration 5m diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java index 8b9ff106306b1..8baa659b6f764 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java @@ -44,6 +44,7 @@ public class RouterConfigBuilder { private boolean enableMetrics = false; private boolean enableQuota = false; private boolean enableSafemode = false; + private boolean enableAutoMsync = false; private RouterRenameOption routerRenameOption = RouterRenameOption.NONE; private boolean enableCacheRefresh; private Map innerMap = new HashMap<>(); @@ -65,6 +66,7 @@ public RouterConfigBuilder all() { this.enableStateStore = true; this.enableMetrics = true; this.enableSafemode = true; + this.enableAutoMsync = true; return this; } @@ -118,6 +120,11 @@ public RouterConfigBuilder safemode(boolean enable) { return this; } + public RouterConfigBuilder autoMsync(boolean enable) { + this.enableAutoMsync = enable; + return this; + } + public RouterConfigBuilder refreshCache(boolean enable) { this.enableCacheRefresh = enable; return this; @@ -163,6 +170,10 @@ public RouterConfigBuilder safemode() { return this.safemode(true); } + public RouterConfigBuilder autoMsync() { + return this.autoMsync(true); + } + public RouterConfigBuilder refreshCache() { return this.refreshCache(true); } @@ -207,6 +218,7 @@ public Configuration build() { this.enableSafemode); conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE, this.enableCacheRefresh); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_ENABLE, this.enableAutoMsync); conf.set(DFS_ROUTER_FEDERATION_RENAME_OPTION, routerRenameOption.name()); for (Map.Entry kv : innerMap.entrySet()) { conf.set(kv.getKey(), kv.getValue()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java index 9b8fb67e68122..c223bebf75016 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java @@ -128,6 +128,9 @@ public void testRouterService() throws InterruptedException, IOException { // Safemode only testRouterStartup(new RouterConfigBuilder(conf).rpc().safemode().build()); + // Msync only + testRouterStartup(new RouterConfigBuilder(conf).rpc().autoMsync().build()); + // Metrics only testRouterStartup(new RouterConfigBuilder(conf).metrics().build()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAutoMsyncService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAutoMsyncService.java new file mode 100644 index 0000000000000..95dbcdbfb7eda --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAutoMsyncService.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.io.IOException; + +/** + * Test the service that msync to all nameservices. + */ +public class TestRouterAutoMsyncService { + + private static MiniRouterDFSCluster cluster; + private static Router router; + private static RouterAutoMsyncService service; + private static long msyncInterval = 1000; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void globalSetUp() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_ENABLE, true); + conf.setLong(RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS, msyncInterval); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); + + cluster = new MiniRouterDFSCluster(true, 1, conf); + + // Start NNs and DNs and wait until ready + cluster.startCluster(conf); + cluster.startRouters(); + cluster.waitClusterUp(); + + // Making one Namenodes active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + } + } + cluster.waitActiveNamespaces(); + + router = cluster.getRandomRouter().getRouter(); + service = router.getRouterAutoMsyncService(); + } + + @AfterClass + public static void tearDown() throws IOException { + cluster.shutdown(); + service.stop(); + service.close(); + } + + @Test + public void testMsync() throws InterruptedException, IOException { + Thread.sleep(msyncInterval); + long ops = router.getRouterClientMetrics().getMsyncOps(); + // For a interval, router send one msync to active namenode + Assert.assertTrue(ops >= 1); + } +} From efcc74feef4208036e29f21129682ad6155ccd89 Mon Sep 17 00:00:00 2001 From: lgh Date: Fri, 5 Jan 2024 14:06:23 +0800 Subject: [PATCH 2/2] modify test case --- .../router/TestRouterAutoMsyncService.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAutoMsyncService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAutoMsyncService.java index 95dbcdbfb7eda..ba5ddc755271b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAutoMsyncService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAutoMsyncService.java @@ -20,14 +20,15 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; import java.io.IOException; +import java.util.concurrent.TimeoutException; /** * Test the service that msync to all nameservices. @@ -77,10 +78,10 @@ public static void tearDown() throws IOException { } @Test - public void testMsync() throws InterruptedException, IOException { - Thread.sleep(msyncInterval); - long ops = router.getRouterClientMetrics().getMsyncOps(); - // For a interval, router send one msync to active namenode - Assert.assertTrue(ops >= 1); + public void testMsync() throws InterruptedException, TimeoutException { + GenericTestUtils.waitFor(() -> { + long ops = router.getRouterClientMetrics().getMsyncOps(); + return ops >= 1; + }, 500, msyncInterval); } }