Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1938,16 +1938,56 @@ int startReconfigurationUtil(final String nodeType, final String address, final
if (!"livenodes".equals(address)) {
return startReconfiguration(nodeType, address, out, err);
}
if (!"datanode".equals(nodeType)) {
err.println("Only datanode type supports reconfiguration in bulk.");
return 1;
}
ExecutorService executorService = Executors.newFixedThreadPool(5);
DistributedFileSystem dfs = getDFS();
DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE);
AtomicInteger successCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
if (nodes != null) {
if ("namenode".equals(nodeType)) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Configuration config = getConf();
List<DFSUtil.ConfiguredNNAddress> cnnlist =
DFSUtil.flattenAddressMap(DFSUtil.getNNServiceRpcAddressesForCluster(config));
if (cnnlist.isEmpty()) {
err.println("DFS namenode stats could not be retrieved.");
return 1;
}
AtomicInteger successCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
for (DFSUtil.ConfiguredNNAddress cnn : cnnlist) {
executorService.submit(() -> {
InetSocketAddress rpc = cnn.getAddress();
String nnAddress = rpc.getHostName() + ":" + rpc.getPort();
int status = startReconfiguration(nodeType, nnAddress, out, err);
if (status == 0) {
successCount.incrementAndGet();
} else {
failCount.incrementAndGet();
}
});
}
while ((successCount.get() + failCount.get()) < cnnlist.size()) {
Thread.sleep(1000);
}
executorService.shutdown();
if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
err.println("Executor service could not be terminated in 60s. Please wait for"
+ " sometime before the system cools down.");
}
String formattedString = String.format(
"Starting of reconfiguration task successful on %d nodes, failed on %d nodes.",
successCount.get(), failCount.get());
out.println(formattedString);
if (failCount.get() == 0) {
return 0;
} else {
return 1;
}
} else if ("datanode".equals(nodeType)) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
DistributedFileSystem dfs = getDFS();
DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE);
if (nodes == null) {
err.println("DFS datanode stats could not be retrieved.");
return 1;
}
AtomicInteger successCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
for (DatanodeInfo node : nodes) {
executorService.submit(() -> {
int status = startReconfiguration(nodeType, node.getIpcAddr(false), out, err);
Expand All @@ -1973,9 +2013,10 @@ int startReconfigurationUtil(final String nodeType, final String address, final
} else {
return 1;
}
} else {
err.println("Only namenode and datanode type supports reconfiguration in bulk.");
return 1;
}
err.println("DFS datanode stats could not be retrieved.");
return 1;
}

int startReconfiguration(final String nodeType, final String address,
Expand Down Expand Up @@ -2025,16 +2066,56 @@ int getReconfigurationStatusUtil(final String nodeType, final String address,
if (!"livenodes".equals(address)) {
return getReconfigurationStatus(nodeType, address, out, err);
}
if (!"datanode".equals(nodeType)) {
err.println("Only datanode type supports reconfiguration in bulk.");
return 1;
}
ExecutorService executorService = Executors.newFixedThreadPool(5);
DistributedFileSystem dfs = getDFS();
DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE);
AtomicInteger successCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
if (nodes != null) {
if ("namenode".equals(nodeType)) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Configuration config = getConf();
List<DFSUtil.ConfiguredNNAddress> cnnlist =
DFSUtil.flattenAddressMap(DFSUtil.getNNServiceRpcAddressesForCluster(config));
if (cnnlist.isEmpty()) {
err.println("DFS namenode stats could not be retrieved.");
return 1;
}
AtomicInteger successCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
for (DFSUtil.ConfiguredNNAddress cnn : cnnlist) {
executorService.submit(() -> {
InetSocketAddress rpc = cnn.getAddress();
String nnAddress = rpc.getHostName() + ":" + rpc.getPort();
int status = getReconfigurationStatus(nodeType, nnAddress, out, err);
if (status == 0) {
successCount.incrementAndGet();
} else {
failCount.incrementAndGet();
}
});
}
while ((successCount.get() + failCount.get()) < cnnlist.size()) {
Thread.sleep(1000);
}
executorService.shutdown();
if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
err.println("Executor service could not be terminated in 60s. Please wait for"
+ " sometime before the system cools down.");
}
String formattedString = String.format(
"Retrieval of reconfiguration status successful on %d nodes, failed on %d nodes.",
successCount.get(), failCount.get());
out.println(formattedString);
if (failCount.get() == 0) {
return 0;
} else {
return 1;
}
} else if ("datanode".equals(nodeType)) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
DistributedFileSystem dfs = getDFS();
DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE);
if (nodes == null) {
err.println("DFS datanode stats could not be retrieved.");
return 1;
}
AtomicInteger successCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
for (DatanodeInfo node : nodes) {
executorService.submit(() -> {
int status = getReconfigurationStatus(nodeType, node.getIpcAddr(false), out, err);
Expand All @@ -2060,9 +2141,10 @@ int getReconfigurationStatusUtil(final String nodeType, final String address,
} else {
return 1;
}
} else {
err.println("Only namenode and datanode type supports reconfiguration in bulk.");
return 1;
}
err.println("DFS datanode stats could not be retrieved.");
return 1;
}

int getReconfigurationStatus(final String nodeType, final String address, final PrintStream out,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ Usage:
| `-refreshSuperUserGroupsConfiguration` | Refresh superuser proxy groups mappings |
| `-refreshCallQueue` | Reload the call queue from config. |
| `-refresh` \<host:ipc\_port\> \<key\> [arg1..argn] | Triggers a runtime-refresh of the resource specified by \<key\> on \<host:ipc\_port\>. All other args after are sent to the host. |
| `-reconfig` \<datanode \|namenode\> \<host:ipc\_port\|livenodes> \<start\|status\|properties\> | Starts reconfiguration or gets the status of an ongoing reconfiguration, or gets a list of reconfigurable properties. The second parameter specifies the node type. The third parameter specifies host address. For start or status, datanode supports livenodes as third parameter, which will start or retrieve reconfiguration on all live datanodes. |
| `-reconfig` \<datanode \|namenode\> \<host:ipc\_port\|livenodes> \<start\|status\|properties\> | Starts reconfiguration or gets the status of an ongoing reconfiguration, or gets a list of reconfigurable properties. The second parameter specifies the node type. The third parameter specifies host address. For start or status, namenode or datanode supports livenodes as third parameter, which will start or retrieve reconfiguration on all live namenodes(active and standby) or datanodes. |
| `-printTopology` | Print a tree of the racks and their nodes as reported by the Namenode |
| `-refreshNamenodes` datanodehost:port | For the given datanode, reloads the configuration files, stops serving the removed block-pools and starts serving new block-pools. |
| `-getVolumeReport` datanodehost:port | For the given datanode, get the volume report. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,12 @@ Datanode supports hot swappable drives. The user can add or replace HDFS data vo
the reconfiguration process. The user can use
`dfsadmin -reconfig datanode HOST:PORT status`
to query the running status of the reconfiguration task. In place of
HOST:PORT, we can also specify livenodes for datanode. It would allow
start or query reconfiguration on all live datanodes, whereas specifying
HOST:PORT, we can also specify livenodes for namenode or datanode. It would allow
start or query reconfiguration on all live namenodes(active and standby) or datanodes, whereas specifying
HOST:PORT would only allow start or query of reconfiguration on the
particular datanode represented by HOST:PORT. The examples for livenodes
queries are `dfsadmin -reconfig datanode livenodes start` and
`dfsadmin -reconfig datanode livenodes status`.
particular namenode or datanode represented by HOST:PORT. The examples for livenodes
queries are `dfsadmin -reconfig <namenode|datanode> livenodes start` and
`dfsadmin -reconfig <namenode|datanode> livenodes status`.

* Once the reconfiguration task has completed, the user can safely `umount`
the removed data volume directories and physically remove the disks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
Expand Down Expand Up @@ -1283,4 +1284,70 @@ public void testAllDatanodesReconfig()
Assertions.assertThat(outs.subList(1, 5)).containsSubsequence(success, from, to);
Assertions.assertThat(outs.subList(5, 9)).containsSubsequence(success, from, to, retrieval);
}

@Test
public void testAllNamenodesReconfig() throws Exception {
final HdfsConfiguration clusterConf = new HdfsConfiguration();
clusterConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 5);
try (MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(clusterConf).nnTopology(
MiniDFSNNTopology.simpleHAFederatedTopology(2)).build()) {
miniCluster.waitActive();

final HdfsConfiguration clientConf = new HdfsConfiguration();
// Parse FederatedHAConf
DFSTestUtil.setFederatedHAConfiguration(miniCluster, clientConf);
admin = new DFSAdmin(clientConf);

ReconfigurationUtil reconfigurationUtil = mock(ReconfigurationUtil.class);
// Active and Standby have a total of 4
for (int i = 0; i < 4; i++) {
NameNode nn = miniCluster.getNameNode(i);
nn.setReconfigurationUtil(reconfigurationUtil);
}

List<ReconfigurationUtil.PropertyChange> changes = new ArrayList<>();
changes.add(new ReconfigurationUtil.PropertyChange(DFS_HEARTBEAT_INTERVAL_KEY, "3",
miniCluster.getNameNode(0).getConf().get(DFS_HEARTBEAT_INTERVAL_KEY)));
when(reconfigurationUtil.parseChangedProperties(any(Configuration.class),
any(Configuration.class))).thenReturn(changes);

int result = admin.startReconfiguration("namenode", "livenodes");
Assertions.assertThat(result).isEqualTo(0);

List<String> outsForStartReconfig = new ArrayList<>();
List<String> errsForStartReconfig = new ArrayList<>();
reconfigurationOutErrFormatter("startReconfiguration", "namenode", "livenodes",
outsForStartReconfig, errsForStartReconfig);

String started = "Started reconfiguration task on node";
String starting =
"Starting of reconfiguration task successful on 4 nodes, failed on 0 nodes.";
Assertions.assertThat(outsForStartReconfig).hasSize(5);
Assertions.assertThat(errsForStartReconfig).hasSize(0);
Assertions.assertThat(outsForStartReconfig.get(0)).startsWith(started);
Assertions.assertThat(outsForStartReconfig.get(1)).startsWith(started);
Assertions.assertThat(outsForStartReconfig.get(2)).startsWith(started);
Assertions.assertThat(outsForStartReconfig.get(3)).startsWith(started);
Assertions.assertThat(outsForStartReconfig.get(4)).startsWith(starting);

List<String> outs = new ArrayList<>();
List<String> errs = new ArrayList<>();
awaitReconfigurationFinished("namenode", "livenodes", outs, errs);
Assertions.assertThat(outs).hasSize(17);
Assertions.assertThat(errs).hasSize(0);
LOG.info("dfsadmin -reconfig namenode livenodes status output:");
outs.forEach(s -> LOG.info("{}", s));

Assertions.assertThat(outs.get(0)).startsWith("Reconfiguring status for node");

String success = "SUCCESS: Changed property dfs.heartbeat.interval";
String from = "\tFrom: \"5\"";
String to = "\tTo: \"3\"";
String retrieval =
"Retrieval of reconfiguration status successful on 4 nodes, failed on 0 nodes.";

Assertions.assertThat(outs.subList(1, 5)).containsSubsequence(success, from, to);
Assertions.assertThat(outs.subList(5, 17)).containsSubsequence(success, from, to, retrieval);
}
}
}