Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_ROUTER_PREFIX + "namenode.heartbeat.jmx.interval";
public static final long DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS_DEFAULT = 0;

// HDFS Router Msync
public static final String DFS_ROUTER_AUTO_MSYNC_ENABLE =
FEDERATION_ROUTER_PREFIX + "auto.msync.enable";
public static final boolean DFS_ROUTER_AUTO_MSYNC_ENABLE_DEFAULT = false;
public static final String DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS =
FEDERATION_ROUTER_PREFIX + "auto.msync.interval";
public static final long DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(5);

// HDFS Router NN client
public static final String
DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public class Router extends CompositeService implements
private ActiveNamenodeResolver namenodeResolver;
/** Updates the namenode status in the namenode resolver. */
private Collection<NamenodeHeartbeatService> namenodeHeartbeatServices;
/** Router peridically send msync to all nameservices. */
private RouterAutoMsyncService routerAutoMsyncService;

/** Router metrics. */
private RouterMetricsService metrics;
Expand Down Expand Up @@ -239,6 +241,14 @@ protected void serviceInit(Configuration configuration) throws Exception {
addService(this.routerHeartbeatService);
}

boolean isRouterAutoMsyncEnable = conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_ENABLE,
RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_ENABLE_DEFAULT);
if (isRouterAutoMsyncEnable) {
this.routerAutoMsyncService = new RouterAutoMsyncService(this.rpcServer);
addService(this.routerAutoMsyncService);
}

// Router metrics system
if (conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE,
Expand Down Expand Up @@ -842,6 +852,14 @@ Collection<NamenodeHeartbeatService> getNamenodeHeartbeatServices() {
return this.namenodeHeartbeatServices;
}

/**
* Get this router msync service.
*/
@VisibleForTesting
RouterAutoMsyncService getRouterAutoMsyncService() {
return this.routerAutoMsyncService;
}

/**
* Get this router heartbeat service.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS_DEFAULT;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* The {@link Router} periodically send msync to all nameservices.
*/
public class RouterAutoMsyncService extends PeriodicService {

private static final Logger LOG =
LoggerFactory.getLogger(RouterAutoMsyncService.class);

private RouterRpcServer rpcServer;
private Configuration conf;

public RouterAutoMsyncService(RouterRpcServer rpcServer) {
super(RouterAutoMsyncService.class.getSimpleName());
this.rpcServer = rpcServer;
}

@Override
protected void serviceInit(Configuration configuration) throws Exception {
this.conf = DFSHAAdmin.addSecurityConfiguration(configuration);

this.setIntervalMs(conf.getLong(
DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS,
DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS_DEFAULT));
super.serviceInit(this.conf);
}

@Override
public void periodicInvoke() {
try {
this.rpcServer.msync();
} catch (IOException e) {
LOG.warn("RouterMsyncService msync failed: {}", e.getMessage());
}
}

@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping RouterMsyncService.");
super.serviceStop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -650,4 +650,8 @@ public void incInvokedConcurrent(Method method){
concurrentOtherOps.incr();
}
}

public long getMsyncOps(){
return msyncOps.value();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,22 @@
</description>
</property>

<property>
<name>dfs.federation.router.auto.msync.enable</name>
<value>false</value>
<description>
If true, router auto msync to nameservices for update stateid.
</description>
</property>

<property>
<name>dfs.federation.router.auto.msync.interval</name>
<value>5000</value>
<description>
The interval in milliseconds for router auto msync to nameservices.
</description>
</property>

<property>
<name>dfs.federation.router.store.router.expiration</name>
<value>5m</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class RouterConfigBuilder {
private boolean enableMetrics = false;
private boolean enableQuota = false;
private boolean enableSafemode = false;
private boolean enableAutoMsync = false;
private RouterRenameOption routerRenameOption = RouterRenameOption.NONE;
private boolean enableCacheRefresh;
private Map<String, String> innerMap = new HashMap<>();
Expand All @@ -65,6 +66,7 @@ public RouterConfigBuilder all() {
this.enableStateStore = true;
this.enableMetrics = true;
this.enableSafemode = true;
this.enableAutoMsync = true;
return this;
}

Expand Down Expand Up @@ -118,6 +120,11 @@ public RouterConfigBuilder safemode(boolean enable) {
return this;
}

public RouterConfigBuilder autoMsync(boolean enable) {
this.enableAutoMsync = enable;
return this;
}

public RouterConfigBuilder refreshCache(boolean enable) {
this.enableCacheRefresh = enable;
return this;
Expand Down Expand Up @@ -163,6 +170,10 @@ public RouterConfigBuilder safemode() {
return this.safemode(true);
}

public RouterConfigBuilder autoMsync() {
return this.autoMsync(true);
}

public RouterConfigBuilder refreshCache() {
return this.refreshCache(true);
}
Expand Down Expand Up @@ -207,6 +218,7 @@ public Configuration build() {
this.enableSafemode);
conf.setBoolean(RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE,
this.enableCacheRefresh);
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_ENABLE, this.enableAutoMsync);
conf.set(DFS_ROUTER_FEDERATION_RENAME_OPTION, routerRenameOption.name());
for (Map.Entry<String, String> kv : innerMap.entrySet()) {
conf.set(kv.getKey(), kv.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ public void testRouterService() throws InterruptedException, IOException {
// Safemode only
testRouterStartup(new RouterConfigBuilder(conf).rpc().safemode().build());

// Msync only
testRouterStartup(new RouterConfigBuilder(conf).rpc().autoMsync().build());

// Metrics only
testRouterStartup(new RouterConfigBuilder(conf).metrics().build());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* Test the service that msync to all nameservices.
*/
public class TestRouterAutoMsyncService {

private static MiniRouterDFSCluster cluster;
private static Router router;
private static RouterAutoMsyncService service;
private static long msyncInterval = 1000;

@Rule
public TestName name = new TestName();

@BeforeClass
public static void globalSetUp() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_ENABLE, true);
conf.setLong(RBFConfigKeys.DFS_ROUTER_AUTO_MSYNC_INTERVAL_MS, msyncInterval);
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true);

cluster = new MiniRouterDFSCluster(true, 1, conf);

// Start NNs and DNs and wait until ready
cluster.startCluster(conf);
cluster.startRouters();
cluster.waitClusterUp();

// Making one Namenodes active per nameservice
if (cluster.isHighAvailability()) {
for (String ns : cluster.getNameservices()) {
cluster.switchToActive(ns, NAMENODES[0]);
cluster.switchToStandby(ns, NAMENODES[1]);
}
}
cluster.waitActiveNamespaces();

router = cluster.getRandomRouter().getRouter();
service = router.getRouterAutoMsyncService();
}

@AfterClass
public static void tearDown() throws IOException {
cluster.shutdown();
service.stop();
service.close();
}

@Test
public void testMsync() throws InterruptedException, TimeoutException {
GenericTestUtils.waitFor(() -> {
long ops = router.getRouterClientMetrics().getMsyncOps();
return ops >= 1;
}, 500, msyncInterval);
}
}