diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 02004f337c10a..972217ec8d9c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -193,6 +193,8 @@ public class Balancer { + "\tExcludes the specified datanodes." + "\n\t[-include [-f | ]]" + "\tIncludes only the specified datanodes." + + "\n\t[-includeRack ]" + + "\tSpecify the rack to be balanced." + "\n\t[-source [-f | ]]" + "\tPick only the specified datanodes as source nodes." + "\n\t[-blockpools ]" @@ -352,6 +354,7 @@ static int getFailedTimesSinceLastSuccessfulBalance() { this.sourceNodes = p.getSourceNodes(); this.runDuringUpgrade = p.getRunDuringUpgrade(); this.sortTopNodes = p.getSortTopNodes(); + dispatcher.setRack(p.getRack()); this.maxSizeToMove = getLongBytes(conf, DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, @@ -721,6 +724,9 @@ Result runOneIteration() { try { metrics.setIterateRunning(true); final List reports = dispatcher.init(); + if (reports.size() == 0) { + return newResult(ExitStatus.ILLEGAL_ARGUMENTS, 0, 0); + } final long bytesLeftToMove = init(reports); metrics.setBytesLeftToMove(bytesLeftToMove); if (bytesLeftToMove == 0) { @@ -804,6 +810,8 @@ static private int doBalance(Collection namenodes, LOG.info("included nodes = " + p.getIncludedNodes()); LOG.info("excluded nodes = " + p.getExcludedNodes()); LOG.info("source nodes = " + p.getSourceNodes()); + LOG.info("included rack = "+ p.getRack()); + checkKeytabAndInit(conf); System.out.println("Time Stamp Iteration#" + " Bytes Already Moved Bytes Left To Move Bytes Being Moved" @@ -818,11 +826,29 @@ static private int doBalance(Collection namenodes, for(int iteration = 0; !done; iteration++) { done = true; Collections.shuffle(connectors); + int connectorCount = connectors.size(); for(NameNodeConnector nnc : connectors) { if (p.getBlockPools().size() == 0 || p.getBlockPools().contains(nnc.getBlockpoolID())) { final Balancer b = new Balancer(nnc, p, conf); final Result r = b.runOneIteration(); + if (r.exitStatus == ExitStatus.ILLEGAL_ARGUMENTS) { + if (LOG.isWarnEnabled()) { + LOG.warn("No datanodes matched for the NamenodeUri = " + + nnc.getNameNodeUri() + ", includeRack = " + p.getRack() + + ", balancer ignored."); + } + done = false; + // Decrement the connector count, once all connectors identified + // to be ignored then balancer can be stopped. + if (--connectorCount <= 0) { + // Invalid configuration, there is no data identified to move + // across nodes + return r.exitStatus.getExitCode(); + } + //There is no matching data nodes for the current NamenodeURI + continue; + } r.print(iteration, nnc, System.out); // clean all lists @@ -1034,6 +1060,11 @@ static BalancerParameters parse(String[] args) { Set sourceNodes = new HashSet<>(); i = processHostList(args, i, "source", sourceNodes); b.setSourceNodes(sourceNodes); + } else if ("-includeRack".equalsIgnoreCase(args[i])) { + String rackName = parseRack(args, i, "includeRack"); + i++; + //Rack name / Available Zone name. + b.setRack(rackName); } else if ("-blockpools".equalsIgnoreCase(args[i])) { Preconditions.checkArgument( ++i < args.length, @@ -1110,6 +1141,12 @@ private static int processHostList(String[] args, int i, String type, return i; } + private static String parseRack(String[] args, int i, String type) { + Preconditions.checkArgument(++i < args.length, + "Rack name is missing: args=%s", type, Arrays.toString(args)); + return args[i]; + } + private static Set parseBlockPoolList(String string) { String[] addrs = StringUtils.getTrimmedStrings(string); return new HashSet(Arrays.asList(addrs)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java index 2b53c15d1deee..61cc7689c368a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java @@ -52,6 +52,11 @@ final class BalancerParameters { static final BalancerParameters DEFAULT = new BalancerParameters(); + /** + * Rack / Available Zone name + */ + private String rack; + private BalancerParameters() { this(new Builder()); } @@ -65,6 +70,7 @@ private BalancerParameters(Builder builder) { this.sourceNodes = builder.sourceNodes; this.blockpools = builder.blockpools; this.runDuringUpgrade = builder.runDuringUpgrade; + this.rack = builder.rack; this.runAsService = builder.runAsService; this.sortTopNodes = builder.sortTopNodes; this.hotBlockTimeInterval = builder.hotBlockTimeInterval; @@ -119,13 +125,18 @@ public String toString() { return String.format("%s.%s [%s," + " threshold = %s," + " max idle iteration = %s," + " #excluded nodes = %s," + " #included nodes = %s," + " #source nodes = %s," + + " #includeRack = %s," + " #blockpools = %s," + " run during upgrade = %s," + " sort top nodes = %s," + " hot block time interval = %s]", Balancer.class.getSimpleName(), getClass().getSimpleName(), policy, threshold, maxIdleIteration, excludedNodes.size(), includedNodes.size(), sourceNodes.size(), blockpools.size(), - runDuringUpgrade, sortTopNodes, hotBlockTimeInterval); + runDuringUpgrade, sortTopNodes, hotBlockTimeInterval, rack); + } + + public String getRack() { + return rack; } static class Builder { @@ -142,6 +153,8 @@ static class Builder { private boolean runAsService = false; private boolean sortTopNodes = false; private long hotBlockTimeInterval = 0; + //Rack / Available Zone name + private String rack; Builder() { } @@ -181,6 +194,11 @@ Builder setSourceNodes(Set nodes) { return this; } + Builder setRack(String rackName) { + this.rack = rackName; + return this; + } + Builder setBlockpools(Set pools) { this.blockpools = pools; return this; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 612f4c028eff5..5a932aebeef86 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -142,7 +142,8 @@ public class Dispatcher { private BlockPlacementPolicies placementPolicies; private long maxIterationTime; - + //Rack / Available Zone name + private String rack; static class Allocator { private final int max; private int count = 0; @@ -881,17 +882,33 @@ private long getBlockList() throws IOException, IllegalArgumentException { List adjustList = new ArrayList<>(); final String[] datanodeUuids = blkLocs.getDatanodeUuids(); final StorageType[] storageTypes = blkLocs.getStorageTypes(); + ArrayList indicesList = new ArrayList(); for (int i = 0; i < datanodeUuids.length; i++) { final StorageGroup g = storageGroupMap.get( datanodeUuids[i], storageTypes[i]); if (g != null) { // not unknown block.addLocation(g); + if (blkLocs instanceof StripedBlockWithLocations) { + indicesList.add( + ((StripedBlockWithLocations) blkLocs).getIndices()[i]); + } } else if (blkLocs instanceof StripedBlockWithLocations) { // some datanode may not in storageGroupMap due to decommission operation // or balancer cli with "-exclude" parameter adjustList.add(i); } } + if (blkLocs instanceof StripedBlockWithLocations) { + byte[] indices = + ((StripedBlockWithLocations) blkLocs).getIndices(); + // Rack level balancing has some locations null, so re-assign the + // correct indices + if (indicesList.size() != indices.length) { + for (int i = 0; i < indicesList.size(); i++) { + indices[i] = indicesList.get(i); + } + } + } if (!adjustList.isEmpty()) { // block.locations mismatch with block.indices @@ -1164,12 +1181,18 @@ private boolean shouldIgnore(DatanodeInfo dn) { // ignore nodes not in the include list (if include list is not empty) final boolean notIncluded = !Util.isIncluded(includedNodes, dn); - if (outOfService || excluded || notIncluded) { + // ignore the datanodes belongs to other Rack / Network location AND data + // center feature is enabled in the cluster + String loc = dn.getNetworkLocation(); + final boolean partOfRack = (null == this.rack) || loc.contains(this.rack); + + if (outOfService || excluded || notIncluded || !partOfRack) { if (LOG.isTraceEnabled()) { LOG.trace("Excluding datanode " + dn + ": outOfService=" + outOfService + ", excluded=" + excluded - + ", notIncluded=" + notIncluded); + + ", notIncluded=" + notIncluded + + ", not part of rack " + this.rack); } return true; } @@ -1433,6 +1456,10 @@ public void shutdownNow() { } } + public void setRack(String rack) { + this.rack = rack; + } + static class Util { /** @return true if data node is part of the excludedNodes. */ static boolean isExcluded(Set excludedNodes, DatanodeInfo dn) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index a11f209870550..19d5610949eec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -289,6 +289,7 @@ Usage: [-idleiterations ] [-runDuringUpgrade] [-asService] + [-includeRack ] | COMMAND\_OPTION | Description | |:---- |:---- | diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 49d7a7ea97b33..ea29435190852 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -553,6 +553,7 @@ static abstract class NewNodeInfo { Set nodesToBeExcluded = new HashSet(); Set nodesToBeIncluded = new HashSet(); + private String rack; abstract String[] getNames(); abstract int getNumberofNewNodes(); @@ -565,6 +566,14 @@ public Set getNodesToBeIncluded() { public Set getNodesToBeExcluded() { return nodesToBeExcluded; } + + public String getRack() { + return rack; + } + + public void setRack(String dataCenter) { + this.rack = dataCenter; + } } /** @@ -640,10 +649,18 @@ private void doTest(Configuration conf, long[] capacities, String[] racks, private void doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, boolean useTool, boolean useFile) throws Exception { - doTest(conf, capacities, racks, newCapacity, newRack, nodes, - useTool, useFile, false, 0.3); + doTest(conf, capacities, racks, newCapacity, newRack, nodes, useTool, + useFile, false, 0.3, false); } + private void doTest(Configuration conf, long[] capacities, String[] racks, + long newCapacity, String newRack, NewNodeInfo nodes, boolean useTool, + boolean useFile, boolean useNamesystemSpy, double clusterUtilization, + boolean enableRackBasedBalance) throws Exception { + doTest(conf, capacities, racks, newCapacity, newRack, nodes, useTool, + useFile, useNamesystemSpy, clusterUtilization, null, + enableRackBasedBalance); + } /** This test start a cluster with specified number of nodes, * and fills it to be 30% full (with a single file replicated identically * to all datanodes); @@ -666,8 +683,9 @@ private void doTest(Configuration conf, long[] capacities, String[] racks, */ private void doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, - boolean useTool, boolean useFile, - boolean useNamesystemSpy, double clusterUtilization) throws Exception { + boolean useTool, boolean useFile, boolean useNamesystemSpy, + double clusterUtilization, String includeRack, + boolean enableRackBasedBalance) throws Exception { LOG.info("capacities = " + long2String(capacities)); LOG.info("racks = " + Arrays.asList(racks)); LOG.info("newCapacity= " + newCapacity); @@ -758,6 +776,7 @@ private void doTest(Configuration conf, long[] capacities, pBuilder.setExcludedNodes(nodes.getNodesToBeExcluded()); pBuilder.setIncludedNodes(nodes.getNodesToBeIncluded()); pBuilder.setRunDuringUpgrade(false); + pBuilder.setRack(nodes.getRack()); } BalancerParameters p = pBuilder.build(); @@ -773,9 +792,13 @@ private void doTest(Configuration conf, long[] capacities, // run balancer and validate results if (useTool) { - runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, expectedExcludedNodes); + runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, + expectedExcludedNodes, includeRack); } else { - runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes); + runBalancer(conf, totalUsedSpace, totalCapacity, p, + expectedExcludedNodes, enableRackBasedBalance, + null == nodes || StringUtils + .equalsIgnoreCase(nodes.getRack(), newRack)); } } finally { if(cluster != null) { @@ -787,18 +810,21 @@ private void doTest(Configuration conf, long[] capacities, private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity) throws Exception { runBalancer(conf, totalUsedSpace, totalCapacity, - BalancerParameters.DEFAULT, 0); + BalancerParameters.DEFAULT, 0, false, true); } private void runBalancer(Configuration conf, long totalUsedSpace, - long totalCapacity, BalancerParameters p, int excludedNodes) + long totalCapacity, BalancerParameters p, int excludedNodes, boolean enableRackBasedBalance, + boolean isSameRack) throws Exception { - runBalancer(conf, totalUsedSpace, totalCapacity, p, excludedNodes, true); + runBalancer(conf, totalUsedSpace, totalCapacity, p, excludedNodes, true, + enableRackBasedBalance, isSameRack); } private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity, BalancerParameters p, int excludedNodes, - boolean checkExcludeNodesUtilization) throws Exception { + boolean checkExcludeNodesUtilization, boolean enableRackBasedBalance, + boolean isSameRack) throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); int retry = 5; @@ -812,6 +838,9 @@ private void runBalancer(Configuration conf, long totalUsedSpace, == 0) { assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), run); return; + } else if (enableRackBasedBalance && !isSameRack) { + assertEquals(ExitStatus.ILLEGAL_ARGUMENTS.getExitCode(), run); + break; } else { assertEquals(ExitStatus.SUCCESS.getExitCode(), run); } @@ -893,11 +922,15 @@ private static int runBalancer(Collection namenodes, private void runBalancerCli(Configuration conf, long totalUsedSpace, long totalCapacity, BalancerParameters p, boolean useFile, - int expectedExcludedNodes) throws Exception { + int expectedExcludedNodes, String includeRack) throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); List args = new ArrayList(); args.add("-policy"); args.add("datanode"); + if (null != includeRack) { + args.add("-includeRack"); + args.add(includeRack); + } File excludeHostsFile = null; if (!p.getExcludedNodes().isEmpty()) { @@ -1685,7 +1718,7 @@ private void doTestBalancerWithStripedFile(Configuration conf) throws Exception // run balancer and validate results BalancerParameters p = BalancerParameters.DEFAULT; - runBalancer(conf, totalUsedSpace, totalCapacity, p, 0); + runBalancer(conf, totalUsedSpace, totalCapacity, p, 0, false, true); // namenode will ask datanode to delete replicas in heartbeat response cluster.triggerHeartbeats(); @@ -1773,7 +1806,7 @@ private void doTestBalancerWithExcludeListWithStripedFile(Configuration conf) th // start balancer and check the failed num of moving task runBalancer(conf, totalUsedSpace, totalCapacity, pBuilder.build(), - excludedList.size(), false); + excludedList.size(), false, false, true); // check total blocks, max wait time 60s final long blocksBeforeBalancer = totalBlocks; @@ -1891,13 +1924,13 @@ void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception { racks[i] = (i < numDNs/2 ? RACK0 : RACK1); } doTest(conf, capacities, racks, CAPACITY, RACK2, + new PortNumberBasedNodes(3, 0, 0), false, false, true, 0.5, false); // Use only 1 node and set the starting capacity to 50% to allow the // balancing to complete in only one iteration. This is necessary // because the startGetBlocksTime and endGetBlocksTime measures across // all get block calls, so if two iterations are performed, the duration // also includes the time it took to perform the block move ops in the // first iteration - new PortNumberBasedNodes(1, 0, 0), false, false, true, 0.5); assertTrue("Number of getBlocks should be not less than " + getBlocksMaxQps, numGetBlocksCalls.get() >= getBlocksMaxQps); long durationMs = 1 + endGetBlocksTime.get() - startGetBlocksTime.get(); @@ -1909,6 +1942,87 @@ void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception { getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps); } + @Test(timeout = 100000) public void testBalancerWithRackSameLocation() + throws Exception { + Configuration conf = new HdfsConfiguration(); + initConf(conf); + + Set excludeHosts = new HashSet(); + + String[] hostnames = new String[] {"datanodeX", "datanodeY", "datanodeZ" }; + HostNameBasedNodes nodes = new HostNameBasedNodes(hostnames, excludeHosts, + BalancerParameters.DEFAULT.getIncludedNodes()); + nodes.setRack(RACK0); + doTest(conf, new long[] {CAPACITY }, new String[] {RACK0 }, CAPACITY / 2, + RACK0, nodes, false, false, true, 0.3, false); + } + + @Test(timeout = 100000) public void testBalancerWithRackDiffLocation() + throws Exception { + Configuration conf = new HdfsConfiguration(); + initConf(conf); + + String[] hostnames = new String[] {"datanodeX", "datanodeY", "datanodeZ" }; + HostNameBasedNodes nodes = + new HostNameBasedNodes(hostnames, Collections.emptySet(), + BalancerParameters.DEFAULT.getIncludedNodes()); + nodes.setRack(RACK1); + doTest(conf, new long[] {CAPACITY }, new String[] {RACK0 }, CAPACITY / 2, + RACK0, nodes, false, false, true, 0.3, true); + } + + @Test//TODO + public void testBalancerCliParseRackConf() { + + String[] parameters = new String[] {"-includeRack", "/rack" }; + BalancerParameters p = Balancer.Cli.parse(parameters); + assertEquals("/rack", p.getRack()); + + parameters = new String[] {"-includeRack", "/dc1/rack1" }; + p = Balancer.Cli.parse(parameters); + assertEquals("/dc1/rack1", p.getRack()); + + parameters = new String[] {"-includeRack" }; + try { + p = Balancer.Cli.parse(parameters); + assertTrue("Validation failed for 'DataCenter' configuration", false); + } catch (IllegalArgumentException e) { + assertTrue(true); + } + } + + /** + * Test a cluster with multiple racks and specify a rack to balance. Only + * includeRack should get balance and other racks should be ingored. + */ + @Test(timeout = 100000) public void testBalancerWithRackNameInvalid() + throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + // add an empty node with half of the capacities(4 * CAPACITY) & the same + // rack + long[] capacities = new long[] {4 * CAPACITY }; + String[] racks = new String[] {RACK1 }; + long newCapacity = 2 * CAPACITY; + String newRack = RACK2; + LOG.info("capacities = " + long2String(capacities)); + LOG.info("racks = " + Arrays.asList(racks)); + LOG.info("newCapacity= " + newCapacity); + LOG.info("newRack = " + newRack); + LOG.info("useTool = " + false); + assertEquals(capacities.length, racks.length); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length) + .racks(racks).simulatedCapacities(capacities).build(); + cluster.waitActive(); + + // start rebalancing + Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); + int exitCode = Balancer + .run(namenodes, new BalancerParameters.Builder().setRack(RACK0).build(), + conf); + assertEquals(ExitStatus.ILLEGAL_ARGUMENTS.getExitCode(), exitCode); + } + /** * @param args */