diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index ad4292a6b79fe..ed681dd23b530 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -1938,16 +1938,56 @@ int startReconfigurationUtil(final String nodeType, final String address, final if (!"livenodes".equals(address)) { return startReconfiguration(nodeType, address, out, err); } - if (!"datanode".equals(nodeType)) { - err.println("Only datanode type supports reconfiguration in bulk."); - return 1; - } - ExecutorService executorService = Executors.newFixedThreadPool(5); - DistributedFileSystem dfs = getDFS(); - DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE); - AtomicInteger successCount = new AtomicInteger(); - AtomicInteger failCount = new AtomicInteger(); - if (nodes != null) { + if ("namenode".equals(nodeType)) { + ExecutorService executorService = Executors.newFixedThreadPool(2); + Configuration config = getConf(); + List cnnlist = + DFSUtil.flattenAddressMap(DFSUtil.getNNServiceRpcAddressesForCluster(config)); + if (cnnlist.isEmpty()) { + err.println("DFS namenode stats could not be retrieved."); + return 1; + } + AtomicInteger successCount = new AtomicInteger(); + AtomicInteger failCount = new AtomicInteger(); + for (DFSUtil.ConfiguredNNAddress cnn : cnnlist) { + executorService.submit(() -> { + InetSocketAddress rpc = cnn.getAddress(); + String nnAddress = rpc.getHostName() + ":" + rpc.getPort(); + int status = startReconfiguration(nodeType, nnAddress, out, err); + if (status == 0) { + successCount.incrementAndGet(); + } else { + failCount.incrementAndGet(); + } + }); + } + while ((successCount.get() + failCount.get()) < cnnlist.size()) { + Thread.sleep(1000); + } + executorService.shutdown(); + if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { + err.println("Executor service could not be terminated in 60s. Please wait for" + + " sometime before the system cools down."); + } + String formattedString = String.format( + "Starting of reconfiguration task successful on %d nodes, failed on %d nodes.", + successCount.get(), failCount.get()); + out.println(formattedString); + if (failCount.get() == 0) { + return 0; + } else { + return 1; + } + } else if ("datanode".equals(nodeType)) { + ExecutorService executorService = Executors.newFixedThreadPool(5); + DistributedFileSystem dfs = getDFS(); + DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE); + if (nodes == null) { + err.println("DFS datanode stats could not be retrieved."); + return 1; + } + AtomicInteger successCount = new AtomicInteger(); + AtomicInteger failCount = new AtomicInteger(); for (DatanodeInfo node : nodes) { executorService.submit(() -> { int status = startReconfiguration(nodeType, node.getIpcAddr(false), out, err); @@ -1973,9 +2013,10 @@ int startReconfigurationUtil(final String nodeType, final String address, final } else { return 1; } + } else { + err.println("Only namenode and datanode type supports reconfiguration in bulk."); + return 1; } - err.println("DFS datanode stats could not be retrieved."); - return 1; } int startReconfiguration(final String nodeType, final String address, @@ -2025,16 +2066,56 @@ int getReconfigurationStatusUtil(final String nodeType, final String address, if (!"livenodes".equals(address)) { return getReconfigurationStatus(nodeType, address, out, err); } - if (!"datanode".equals(nodeType)) { - err.println("Only datanode type supports reconfiguration in bulk."); - return 1; - } - ExecutorService executorService = Executors.newFixedThreadPool(5); - DistributedFileSystem dfs = getDFS(); - DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE); - AtomicInteger successCount = new AtomicInteger(); - AtomicInteger failCount = new AtomicInteger(); - if (nodes != null) { + if ("namenode".equals(nodeType)) { + ExecutorService executorService = Executors.newFixedThreadPool(2); + Configuration config = getConf(); + List cnnlist = + DFSUtil.flattenAddressMap(DFSUtil.getNNServiceRpcAddressesForCluster(config)); + if (cnnlist.isEmpty()) { + err.println("DFS namenode stats could not be retrieved."); + return 1; + } + AtomicInteger successCount = new AtomicInteger(); + AtomicInteger failCount = new AtomicInteger(); + for (DFSUtil.ConfiguredNNAddress cnn : cnnlist) { + executorService.submit(() -> { + InetSocketAddress rpc = cnn.getAddress(); + String nnAddress = rpc.getHostName() + ":" + rpc.getPort(); + int status = getReconfigurationStatus(nodeType, nnAddress, out, err); + if (status == 0) { + successCount.incrementAndGet(); + } else { + failCount.incrementAndGet(); + } + }); + } + while ((successCount.get() + failCount.get()) < cnnlist.size()) { + Thread.sleep(1000); + } + executorService.shutdown(); + if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { + err.println("Executor service could not be terminated in 60s. Please wait for" + + " sometime before the system cools down."); + } + String formattedString = String.format( + "Retrieval of reconfiguration status successful on %d nodes, failed on %d nodes.", + successCount.get(), failCount.get()); + out.println(formattedString); + if (failCount.get() == 0) { + return 0; + } else { + return 1; + } + } else if ("datanode".equals(nodeType)) { + ExecutorService executorService = Executors.newFixedThreadPool(5); + DistributedFileSystem dfs = getDFS(); + DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE); + if (nodes == null) { + err.println("DFS datanode stats could not be retrieved."); + return 1; + } + AtomicInteger successCount = new AtomicInteger(); + AtomicInteger failCount = new AtomicInteger(); for (DatanodeInfo node : nodes) { executorService.submit(() -> { int status = getReconfigurationStatus(nodeType, node.getIpcAddr(false), out, err); @@ -2060,9 +2141,10 @@ int getReconfigurationStatusUtil(final String nodeType, final String address, } else { return 1; } + } else { + err.println("Only namenode and datanode type supports reconfiguration in bulk."); + return 1; } - err.println("DFS datanode stats could not be retrieved."); - return 1; } int getReconfigurationStatus(final String nodeType, final String address, final PrintStream out, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index acaf96948a0fb..a6d4cbee86804 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -419,7 +419,7 @@ Usage: | `-refreshSuperUserGroupsConfiguration` | Refresh superuser proxy groups mappings | | `-refreshCallQueue` | Reload the call queue from config. | | `-refresh` \ \ [arg1..argn] | Triggers a runtime-refresh of the resource specified by \ on \. All other args after are sent to the host. | -| `-reconfig` \ \ \ | Starts reconfiguration or gets the status of an ongoing reconfiguration, or gets a list of reconfigurable properties. The second parameter specifies the node type. The third parameter specifies host address. For start or status, datanode supports livenodes as third parameter, which will start or retrieve reconfiguration on all live datanodes. | +| `-reconfig` \ \ \ | Starts reconfiguration or gets the status of an ongoing reconfiguration, or gets a list of reconfigurable properties. The second parameter specifies the node type. The third parameter specifies host address. For start or status, namenode or datanode supports livenodes as third parameter, which will start or retrieve reconfiguration on all live namenodes(active and standby) or datanodes. | | `-printTopology` | Print a tree of the racks and their nodes as reported by the Namenode | | `-refreshNamenodes` datanodehost:port | For the given datanode, reloads the configuration files, stops serving the removed block-pools and starts serving new block-pools. | | `-getVolumeReport` datanodehost:port | For the given datanode, get the volume report. | diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md index 3aa41b4dd8b73..d7041fea7c18e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md @@ -352,12 +352,12 @@ Datanode supports hot swappable drives. The user can add or replace HDFS data vo the reconfiguration process. The user can use `dfsadmin -reconfig datanode HOST:PORT status` to query the running status of the reconfiguration task. In place of - HOST:PORT, we can also specify livenodes for datanode. It would allow - start or query reconfiguration on all live datanodes, whereas specifying + HOST:PORT, we can also specify livenodes for namenode or datanode. It would allow + start or query reconfiguration on all live namenodes(active and standby) or datanodes, whereas specifying HOST:PORT would only allow start or query of reconfiguration on the - particular datanode represented by HOST:PORT. The examples for livenodes - queries are `dfsadmin -reconfig datanode livenodes start` and - `dfsadmin -reconfig datanode livenodes status`. + particular namenode or datanode represented by HOST:PORT. The examples for livenodes + queries are `dfsadmin -reconfig livenodes start` and + `dfsadmin -reconfig livenodes status`. * Once the reconfiguration task has completed, the user can safely `umount` the removed data volume directories and physically remove the disks. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index f1a108bde2f33..3389368bb7a63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -1283,4 +1284,70 @@ public void testAllDatanodesReconfig() Assertions.assertThat(outs.subList(1, 5)).containsSubsequence(success, from, to); Assertions.assertThat(outs.subList(5, 9)).containsSubsequence(success, from, to, retrieval); } + + @Test + public void testAllNamenodesReconfig() throws Exception { + final HdfsConfiguration clusterConf = new HdfsConfiguration(); + clusterConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 5); + try (MiniDFSCluster miniCluster = new MiniDFSCluster.Builder(clusterConf).nnTopology( + MiniDFSNNTopology.simpleHAFederatedTopology(2)).build()) { + miniCluster.waitActive(); + + final HdfsConfiguration clientConf = new HdfsConfiguration(); + // Parse FederatedHAConf + DFSTestUtil.setFederatedHAConfiguration(miniCluster, clientConf); + admin = new DFSAdmin(clientConf); + + ReconfigurationUtil reconfigurationUtil = mock(ReconfigurationUtil.class); + // Active and Standby have a total of 4 + for (int i = 0; i < 4; i++) { + NameNode nn = miniCluster.getNameNode(i); + nn.setReconfigurationUtil(reconfigurationUtil); + } + + List changes = new ArrayList<>(); + changes.add(new ReconfigurationUtil.PropertyChange(DFS_HEARTBEAT_INTERVAL_KEY, "3", + miniCluster.getNameNode(0).getConf().get(DFS_HEARTBEAT_INTERVAL_KEY))); + when(reconfigurationUtil.parseChangedProperties(any(Configuration.class), + any(Configuration.class))).thenReturn(changes); + + int result = admin.startReconfiguration("namenode", "livenodes"); + Assertions.assertThat(result).isEqualTo(0); + + List outsForStartReconfig = new ArrayList<>(); + List errsForStartReconfig = new ArrayList<>(); + reconfigurationOutErrFormatter("startReconfiguration", "namenode", "livenodes", + outsForStartReconfig, errsForStartReconfig); + + String started = "Started reconfiguration task on node"; + String starting = + "Starting of reconfiguration task successful on 4 nodes, failed on 0 nodes."; + Assertions.assertThat(outsForStartReconfig).hasSize(5); + Assertions.assertThat(errsForStartReconfig).hasSize(0); + Assertions.assertThat(outsForStartReconfig.get(0)).startsWith(started); + Assertions.assertThat(outsForStartReconfig.get(1)).startsWith(started); + Assertions.assertThat(outsForStartReconfig.get(2)).startsWith(started); + Assertions.assertThat(outsForStartReconfig.get(3)).startsWith(started); + Assertions.assertThat(outsForStartReconfig.get(4)).startsWith(starting); + + List outs = new ArrayList<>(); + List errs = new ArrayList<>(); + awaitReconfigurationFinished("namenode", "livenodes", outs, errs); + Assertions.assertThat(outs).hasSize(17); + Assertions.assertThat(errs).hasSize(0); + LOG.info("dfsadmin -reconfig namenode livenodes status output:"); + outs.forEach(s -> LOG.info("{}", s)); + + Assertions.assertThat(outs.get(0)).startsWith("Reconfiguring status for node"); + + String success = "SUCCESS: Changed property dfs.heartbeat.interval"; + String from = "\tFrom: \"5\""; + String to = "\tTo: \"3\""; + String retrieval = + "Retrieval of reconfiguration status successful on 4 nodes, failed on 0 nodes."; + + Assertions.assertThat(outs.subList(1, 5)).containsSubsequence(success, from, to); + Assertions.assertThat(outs.subList(5, 17)).containsSubsequence(success, from, to, retrieval); + } + } }