diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fad2707bc6704..3c857734db4e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -618,6 +618,8 @@ Release 2.8.0 - UNRELEASED HDFS-8568. TestClusterId#testFormatWithEmptyClusterIdOption is failing. (Rakesh R. via xyao) + HDFS-8549. Abort the balancer if an upgrade is in progress. (wang) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index bc7e4489e0bc6..8b7d802de6d03 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -180,12 +180,19 @@ public class Balancer { + "\n\t[-include [-f | ]]" + "\tIncludes only the specified datanodes." + "\n\t[-idleiterations ]" - + "\tNumber of consecutive idle iterations (-1 for Infinite) before exit."; - + + "\tNumber of consecutive idle iterations (-1 for Infinite) before " + + "exit." + + "\n\t[-runDuringUpgrade]" + + "\tWhether to run the balancer during an ongoing HDFS upgrade." + + "This is usually not desired since it will not affect used space " + + "on over-utilized machines."; + private final Dispatcher dispatcher; + private final NameNodeConnector nnc; private final BalancingPolicy policy; + private final boolean runDuringUpgrade; private final double threshold; - + // all data node lists private final Collection overUtilized = new LinkedList(); private final Collection aboveAvgUtilized = new LinkedList(); @@ -227,11 +234,13 @@ private static void checkReplicationPolicyCompatibility(Configuration conf DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + this.nnc = theblockpool; this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, conf); this.threshold = p.threshold; this.policy = p.policy; + this.runDuringUpgrade = p.runDuringUpgrade; } private static long getCapacity(DatanodeStorageReport report, StorageType t) { @@ -293,7 +302,7 @@ private long init(List reports) { if (thresholdDiff <= 0) { // within threshold aboveAvgUtilized.add(s); } else { - overLoadedBytes += precentage2bytes(thresholdDiff, capacity); + overLoadedBytes += percentage2bytes(thresholdDiff, capacity); overUtilized.add(s); } g = s; @@ -302,7 +311,7 @@ private long init(List reports) { if (thresholdDiff <= 0) { // within threshold belowAvgUtilized.add(g); } else { - underLoadedBytes += precentage2bytes(thresholdDiff, capacity); + underLoadedBytes += percentage2bytes(thresholdDiff, capacity); underUtilized.add(g); } } @@ -324,17 +333,17 @@ private long init(List reports) { private static long computeMaxSize2Move(final long capacity, final long remaining, final double utilizationDiff, final double threshold) { final double diff = Math.min(threshold, Math.abs(utilizationDiff)); - long maxSizeToMove = precentage2bytes(diff, capacity); + long maxSizeToMove = percentage2bytes(diff, capacity); if (utilizationDiff < 0) { maxSizeToMove = Math.min(remaining, maxSizeToMove); } return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); } - private static long precentage2bytes(double precentage, long capacity) { - Preconditions.checkArgument(precentage >= 0, - "precentage = " + precentage + " < 0"); - return (long)(precentage * capacity / 100.0); + private static long percentage2bytes(double percentage, long capacity) { + Preconditions.checkArgument(percentage >= 0, "percentage = %s < 0", + percentage); + return (long)(percentage * capacity / 100.0); } /* log the over utilized & under utilized nodes */ @@ -516,7 +525,13 @@ Result runOneIteration() { LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove) + " to make the cluster balanced." ); } - + + // Should not run the balancer during an unfinalized upgrade, since moved + // blocks are not deleted on the source datanode. + if (!runDuringUpgrade && nnc.isUpgrading()) { + return newResult(ExitStatus.UNFINALIZED_UPGRADE, bytesLeftToMove, -1); + } + /* Decide all the nodes that will participate in the block move and * the number of bytes that need to be moved from one node to another * in this iteration. Maximum bytes to be moved per node is @@ -530,7 +545,7 @@ Result runOneIteration() { LOG.info( "Will move " + StringUtils.byteDesc(bytesBeingMoved) + " in this iteration"); } - + /* For each pair of , start a thread that repeatedly * decide a block to be moved and its proxy source, * then initiates the move until all bytes are moved or no more block @@ -634,7 +649,8 @@ static class Parameters { static final Parameters DEFAULT = new Parameters( BalancingPolicy.Node.INSTANCE, 10.0, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections. emptySet(), Collections. emptySet()); + Collections. emptySet(), Collections. emptySet(), + false); final BalancingPolicy policy; final double threshold; @@ -643,23 +659,34 @@ static class Parameters { Set nodesToBeExcluded; //include only these nodes in balancing operations Set nodesToBeIncluded; + /** + * Whether to run the balancer during upgrade. + */ + final boolean runDuringUpgrade; Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration, - Set nodesToBeExcluded, Set nodesToBeIncluded) { + Set nodesToBeExcluded, Set nodesToBeIncluded, + boolean runDuringUpgrade) { this.policy = policy; this.threshold = threshold; this.maxIdleIteration = maxIdleIteration; this.nodesToBeExcluded = nodesToBeExcluded; this.nodesToBeIncluded = nodesToBeIncluded; + this.runDuringUpgrade = runDuringUpgrade; } @Override public String toString() { - return Balancer.class.getSimpleName() + "." + getClass().getSimpleName() - + "[" + policy + ", threshold=" + threshold + - ", max idle iteration = " + maxIdleIteration + - ", number of nodes to be excluded = "+ nodesToBeExcluded.size() + - ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]"; + return String.format("%s.%s [%s," + + " threshold = %s," + + " max idle iteration = %s, " + + "number of nodes to be excluded = %s," + + " number of nodes to be included = %s," + + " run during upgrade = %s]", + Balancer.class.getSimpleName(), getClass().getSimpleName(), + policy, threshold, maxIdleIteration, + nodesToBeExcluded.size(), nodesToBeIncluded.size(), + runDuringUpgrade); } } @@ -701,6 +728,7 @@ static Parameters parse(String[] args) { int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration; Set nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded; Set nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded; + boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade; if (args != null) { try { @@ -756,9 +784,16 @@ static Parameters parse(String[] args) { } } else if ("-idleiterations".equalsIgnoreCase(args[i])) { checkArgument(++i < args.length, - "idleiterations value is missing: args = " + Arrays.toString(args)); + "idleiterations value is missing: args = " + Arrays + .toString(args)); maxIdleIteration = Integer.parseInt(args[i]); LOG.info("Using a idleiterations of " + maxIdleIteration); + } else if ("-runDuringUpgrade".equalsIgnoreCase(args[i])) { + runDuringUpgrade = true; + LOG.info("Will run the balancer even during an ongoing HDFS " + + "upgrade. Most users will not want to run the balancer " + + "during an upgrade since it will not affect used space " + + "on over-utilized machines."); } else { throw new IllegalArgumentException("args = " + Arrays.toString(args)); @@ -772,7 +807,8 @@ static Parameters parse(String[] args) { } } - return new Parameters(policy, threshold, maxIdleIteration, nodesTobeExcluded, nodesTobeIncluded); + return new Parameters(policy, threshold, maxIdleIteration, + nodesTobeExcluded, nodesTobeIncluded, runDuringUpgrade); } private static void printUsage(PrintStream out) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java index e36258ffca712..6bf298640f859 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java @@ -29,7 +29,8 @@ public enum ExitStatus { NO_MOVE_PROGRESS(-3), IO_EXCEPTION(-4), ILLEGAL_ARGUMENTS(-5), - INTERRUPTED(-6); + INTERRUPTED(-6), + UNFINALIZED_UPGRADE(-7); private final int code; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 2e4f21475fb95..e62dd08afbfd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -43,7 +43,9 @@ import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; @@ -163,6 +165,20 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) return namenode.getBlocks(datanode, size); } + /** + * @return true if an upgrade is in progress, false if not. + * @throws IOException + */ + public boolean isUpgrading() throws IOException { + // fsimage upgrade + final boolean isUpgrade = !namenode.isUpgradeFinalized(); + // rolling upgrade + RollingUpgradeInfo info = fs.rollingUpgrade( + HdfsConstants.RollingUpgradeAction.QUERY); + final boolean isRollingUpgrade = (info != null && !info.isFinalized()); + return (isUpgrade || isRollingUpgrade); + } + /** @return live datanode storage reports. */ public DatanodeStorageReport[] getLiveDatanodeStorageReport() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 92d31d00af85a..1f7bade5df0a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; @@ -627,7 +628,8 @@ private void doTest(Configuration conf, long[] capacities, Balancer.Parameters.DEFAULT.policy, Balancer.Parameters.DEFAULT.threshold, Balancer.Parameters.DEFAULT.maxIdleIteration, - nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded()); + nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(), + false); } int expectedExcludedNodes = 0; @@ -866,7 +868,8 @@ public void testUnknownDatanode() throws Exception { Balancer.Parameters.DEFAULT.policy, Balancer.Parameters.DEFAULT.threshold, Balancer.Parameters.DEFAULT.maxIdleIteration, - datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded); + datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded, + false); final int r = Balancer.run(namenodes, p, conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } finally { @@ -1296,12 +1299,7 @@ public void testBalancerWithRamDisk() throws Exception { Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); // Run Balancer - Balancer.Parameters p = new Balancer.Parameters( - Parameters.DEFAULT.policy, - Parameters.DEFAULT.threshold, - Balancer.Parameters.DEFAULT.maxIdleIteration, - Parameters.DEFAULT.nodesToBeExcluded, - Parameters.DEFAULT.nodesToBeIncluded); + final Balancer.Parameters p = Parameters.DEFAULT; final int r = Balancer.run(namenodes, p, conf); // Validate no RAM_DISK block should be moved @@ -1315,6 +1313,75 @@ public void testBalancerWithRamDisk() throws Exception { } } + /** + * Check that the balancer exits when there is an unfinalized upgrade. + */ + @Test(timeout=300000) + public void testBalancerDuringUpgrade() throws Exception { + final int SEED = 0xFADED; + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); + + final int BLOCK_SIZE = 1024*1024; + cluster = new MiniDFSCluster + .Builder(conf) + .numDataNodes(1) + .storageCapacities(new long[] { BLOCK_SIZE * 10 }) + .storageTypes(new StorageType[] { DEFAULT }) + .storagesPerDatanode(1) + .build(); + + try { + cluster.waitActive(); + // Create a file on the single DN + final String METHOD_NAME = GenericTestUtils.getMethodName(); + final Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); + + DistributedFileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE, + (short) 1, SEED); + + // Add another DN with the same capacity, cluster is now unbalanced + cluster.startDataNodes(conf, 1, true, null, null); + cluster.triggerHeartbeats(); + Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + + // Run balancer + final Balancer.Parameters p = Parameters.DEFAULT; + + fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); + fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE); + fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + + // Rolling upgrade should abort the balancer + assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(), + Balancer.run(namenodes, p, conf)); + + // Should work with the -runDuringUpgrade flag. + final Balancer.Parameters runDuringUpgrade = + new Balancer.Parameters(Parameters.DEFAULT.policy, + Parameters.DEFAULT.threshold, + Parameters.DEFAULT.maxIdleIteration, + Parameters.DEFAULT.nodesToBeExcluded, + Parameters.DEFAULT.nodesToBeIncluded, + true); + assertEquals(ExitStatus.SUCCESS.getExitCode(), + Balancer.run(namenodes, runDuringUpgrade, conf)); + + // Finalize the rolling upgrade + fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE); + + // Should also work after finalization. + assertEquals(ExitStatus.SUCCESS.getExitCode(), + Balancer.run(namenodes, p, conf)); + + } finally { + cluster.shutdown(); + } + } + /** * Test special case. Two replicas belong to same block should not in same node. * We have 2 nodes.