Skip to content

Commit

Permalink
HDFS-8549. Abort the balancer if an upgrade is in progress.
Browse files Browse the repository at this point in the history
  • Loading branch information
umbrant committed Jun 10, 2015
1 parent c7729ef commit a7a7768
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 30 deletions.
2 changes: 2 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -618,6 +618,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8568. TestClusterId#testFormatWithEmptyClusterIdOption is failing. HDFS-8568. TestClusterId#testFormatWithEmptyClusterIdOption is failing.
(Rakesh R. via xyao) (Rakesh R. via xyao)


HDFS-8549. Abort the balancer if an upgrade is in progress. (wang)

OPTIMIZATIONS OPTIMIZATIONS


HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
Expand Up @@ -180,12 +180,19 @@ public class Balancer {
+ "\n\t[-include [-f <hosts-file> | <comma-separated list of hosts>]]" + "\n\t[-include [-f <hosts-file> | <comma-separated list of hosts>]]"
+ "\tIncludes only the specified datanodes." + "\tIncludes only the specified datanodes."
+ "\n\t[-idleiterations <idleiterations>]" + "\n\t[-idleiterations <idleiterations>]"
+ "\tNumber of consecutive idle iterations (-1 for Infinite) before exit."; + "\tNumber of consecutive idle iterations (-1 for Infinite) before "

+ "exit."
+ "\n\t[-runDuringUpgrade]"
+ "\tWhether to run the balancer during an ongoing HDFS upgrade."
+ "This is usually not desired since it will not affect used space "
+ "on over-utilized machines.";

private final Dispatcher dispatcher; private final Dispatcher dispatcher;
private final NameNodeConnector nnc;
private final BalancingPolicy policy; private final BalancingPolicy policy;
private final boolean runDuringUpgrade;
private final double threshold; private final double threshold;

// all data node lists // all data node lists
private final Collection<Source> overUtilized = new LinkedList<Source>(); private final Collection<Source> overUtilized = new LinkedList<Source>();
private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>(); private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
Expand Down Expand Up @@ -227,11 +234,13 @@ private static void checkReplicationPolicyCompatibility(Configuration conf
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);


this.nnc = theblockpool;
this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads, p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
maxConcurrentMovesPerNode, conf); maxConcurrentMovesPerNode, conf);
this.threshold = p.threshold; this.threshold = p.threshold;
this.policy = p.policy; this.policy = p.policy;
this.runDuringUpgrade = p.runDuringUpgrade;
} }


private static long getCapacity(DatanodeStorageReport report, StorageType t) { private static long getCapacity(DatanodeStorageReport report, StorageType t) {
Expand Down Expand Up @@ -293,7 +302,7 @@ private long init(List<DatanodeStorageReport> reports) {
if (thresholdDiff <= 0) { // within threshold if (thresholdDiff <= 0) { // within threshold
aboveAvgUtilized.add(s); aboveAvgUtilized.add(s);
} else { } else {
overLoadedBytes += precentage2bytes(thresholdDiff, capacity); overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
overUtilized.add(s); overUtilized.add(s);
} }
g = s; g = s;
Expand All @@ -302,7 +311,7 @@ private long init(List<DatanodeStorageReport> reports) {
if (thresholdDiff <= 0) { // within threshold if (thresholdDiff <= 0) { // within threshold
belowAvgUtilized.add(g); belowAvgUtilized.add(g);
} else { } else {
underLoadedBytes += precentage2bytes(thresholdDiff, capacity); underLoadedBytes += percentage2bytes(thresholdDiff, capacity);
underUtilized.add(g); underUtilized.add(g);
} }
} }
Expand All @@ -324,17 +333,17 @@ private long init(List<DatanodeStorageReport> reports) {
private static long computeMaxSize2Move(final long capacity, final long remaining, private static long computeMaxSize2Move(final long capacity, final long remaining,
final double utilizationDiff, final double threshold) { final double utilizationDiff, final double threshold) {
final double diff = Math.min(threshold, Math.abs(utilizationDiff)); final double diff = Math.min(threshold, Math.abs(utilizationDiff));
long maxSizeToMove = precentage2bytes(diff, capacity); long maxSizeToMove = percentage2bytes(diff, capacity);
if (utilizationDiff < 0) { if (utilizationDiff < 0) {
maxSizeToMove = Math.min(remaining, maxSizeToMove); maxSizeToMove = Math.min(remaining, maxSizeToMove);
} }
return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
} }


private static long precentage2bytes(double precentage, long capacity) { private static long percentage2bytes(double percentage, long capacity) {
Preconditions.checkArgument(precentage >= 0, Preconditions.checkArgument(percentage >= 0, "percentage = %s < 0",
"precentage = " + precentage + " < 0"); percentage);
return (long)(precentage * capacity / 100.0); return (long)(percentage * capacity / 100.0);
} }


/* log the over utilized & under utilized nodes */ /* log the over utilized & under utilized nodes */
Expand Down Expand Up @@ -516,7 +525,13 @@ Result runOneIteration() {
LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove) LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
+ " to make the cluster balanced." ); + " to make the cluster balanced." );
} }


// Should not run the balancer during an unfinalized upgrade, since moved
// blocks are not deleted on the source datanode.
if (!runDuringUpgrade && nnc.isUpgrading()) {
return newResult(ExitStatus.UNFINALIZED_UPGRADE, bytesLeftToMove, -1);
}

/* Decide all the nodes that will participate in the block move and /* Decide all the nodes that will participate in the block move and
* the number of bytes that need to be moved from one node to another * the number of bytes that need to be moved from one node to another
* in this iteration. Maximum bytes to be moved per node is * in this iteration. Maximum bytes to be moved per node is
Expand All @@ -530,7 +545,7 @@ Result runOneIteration() {
LOG.info( "Will move " + StringUtils.byteDesc(bytesBeingMoved) + LOG.info( "Will move " + StringUtils.byteDesc(bytesBeingMoved) +
" in this iteration"); " in this iteration");
} }

/* For each pair of <source, target>, start a thread that repeatedly /* For each pair of <source, target>, start a thread that repeatedly
* decide a block to be moved and its proxy source, * decide a block to be moved and its proxy source,
* then initiates the move until all bytes are moved or no more block * then initiates the move until all bytes are moved or no more block
Expand Down Expand Up @@ -634,7 +649,8 @@ static class Parameters {
static final Parameters DEFAULT = new Parameters( static final Parameters DEFAULT = new Parameters(
BalancingPolicy.Node.INSTANCE, 10.0, BalancingPolicy.Node.INSTANCE, 10.0,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet()); Collections.<String> emptySet(), Collections.<String> emptySet(),
false);


final BalancingPolicy policy; final BalancingPolicy policy;
final double threshold; final double threshold;
Expand All @@ -643,23 +659,34 @@ static class Parameters {
Set<String> nodesToBeExcluded; Set<String> nodesToBeExcluded;
//include only these nodes in balancing operations //include only these nodes in balancing operations
Set<String> nodesToBeIncluded; Set<String> nodesToBeIncluded;
/**
* Whether to run the balancer during upgrade.
*/
final boolean runDuringUpgrade;


Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration, Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) { Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded,
boolean runDuringUpgrade) {
this.policy = policy; this.policy = policy;
this.threshold = threshold; this.threshold = threshold;
this.maxIdleIteration = maxIdleIteration; this.maxIdleIteration = maxIdleIteration;
this.nodesToBeExcluded = nodesToBeExcluded; this.nodesToBeExcluded = nodesToBeExcluded;
this.nodesToBeIncluded = nodesToBeIncluded; this.nodesToBeIncluded = nodesToBeIncluded;
this.runDuringUpgrade = runDuringUpgrade;
} }


@Override @Override
public String toString() { public String toString() {
return Balancer.class.getSimpleName() + "." + getClass().getSimpleName() return String.format("%s.%s [%s,"
+ "[" + policy + ", threshold=" + threshold + + " threshold = %s,"
", max idle iteration = " + maxIdleIteration + + " max idle iteration = %s, "
", number of nodes to be excluded = "+ nodesToBeExcluded.size() + + "number of nodes to be excluded = %s,"
", number of nodes to be included = "+ nodesToBeIncluded.size() +"]"; + " number of nodes to be included = %s,"
+ " run during upgrade = %s]",
Balancer.class.getSimpleName(), getClass().getSimpleName(),
policy, threshold, maxIdleIteration,
nodesToBeExcluded.size(), nodesToBeIncluded.size(),
runDuringUpgrade);
} }
} }


Expand Down Expand Up @@ -701,6 +728,7 @@ static Parameters parse(String[] args) {
int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration; int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration;
Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded; Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded; Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade;


if (args != null) { if (args != null) {
try { try {
Expand Down Expand Up @@ -756,9 +784,16 @@ static Parameters parse(String[] args) {
} }
} else if ("-idleiterations".equalsIgnoreCase(args[i])) { } else if ("-idleiterations".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length, checkArgument(++i < args.length,
"idleiterations value is missing: args = " + Arrays.toString(args)); "idleiterations value is missing: args = " + Arrays
.toString(args));
maxIdleIteration = Integer.parseInt(args[i]); maxIdleIteration = Integer.parseInt(args[i]);
LOG.info("Using a idleiterations of " + maxIdleIteration); LOG.info("Using a idleiterations of " + maxIdleIteration);
} else if ("-runDuringUpgrade".equalsIgnoreCase(args[i])) {
runDuringUpgrade = true;
LOG.info("Will run the balancer even during an ongoing HDFS "
+ "upgrade. Most users will not want to run the balancer "
+ "during an upgrade since it will not affect used space "
+ "on over-utilized machines.");
} else { } else {
throw new IllegalArgumentException("args = " throw new IllegalArgumentException("args = "
+ Arrays.toString(args)); + Arrays.toString(args));
Expand All @@ -772,7 +807,8 @@ static Parameters parse(String[] args) {
} }
} }


return new Parameters(policy, threshold, maxIdleIteration, nodesTobeExcluded, nodesTobeIncluded); return new Parameters(policy, threshold, maxIdleIteration,
nodesTobeExcluded, nodesTobeIncluded, runDuringUpgrade);
} }


private static void printUsage(PrintStream out) { private static void printUsage(PrintStream out) {
Expand Down
Expand Up @@ -29,7 +29,8 @@ public enum ExitStatus {
NO_MOVE_PROGRESS(-3), NO_MOVE_PROGRESS(-3),
IO_EXCEPTION(-4), IO_EXCEPTION(-4),
ILLEGAL_ARGUMENTS(-5), ILLEGAL_ARGUMENTS(-5),
INTERRUPTED(-6); INTERRUPTED(-6),
UNFINALIZED_UPGRADE(-7);


private final int code; private final int code;


Expand Down
Expand Up @@ -43,7 +43,9 @@
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
Expand Down Expand Up @@ -163,6 +165,20 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
return namenode.getBlocks(datanode, size); return namenode.getBlocks(datanode, size);
} }


/**
* @return true if an upgrade is in progress, false if not.
* @throws IOException
*/
public boolean isUpgrading() throws IOException {
// fsimage upgrade
final boolean isUpgrade = !namenode.isUpgradeFinalized();
// rolling upgrade
RollingUpgradeInfo info = fs.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY);
final boolean isRollingUpgrade = (info != null && !info.isFinalized());
return (isUpgrade || isRollingUpgrade);
}

/** @return live datanode storage reports. */ /** @return live datanode storage reports. */
public DatanodeStorageReport[] getLiveDatanodeStorageReport() public DatanodeStorageReport[] getLiveDatanodeStorageReport()
throws IOException { throws IOException {
Expand Down
Expand Up @@ -64,6 +64,7 @@
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
Expand Down Expand Up @@ -627,7 +628,8 @@ private void doTest(Configuration conf, long[] capacities,
Balancer.Parameters.DEFAULT.policy, Balancer.Parameters.DEFAULT.policy,
Balancer.Parameters.DEFAULT.threshold, Balancer.Parameters.DEFAULT.threshold,
Balancer.Parameters.DEFAULT.maxIdleIteration, Balancer.Parameters.DEFAULT.maxIdleIteration,
nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded()); nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(),
false);
} }


int expectedExcludedNodes = 0; int expectedExcludedNodes = 0;
Expand Down Expand Up @@ -866,7 +868,8 @@ public void testUnknownDatanode() throws Exception {
Balancer.Parameters.DEFAULT.policy, Balancer.Parameters.DEFAULT.policy,
Balancer.Parameters.DEFAULT.threshold, Balancer.Parameters.DEFAULT.threshold,
Balancer.Parameters.DEFAULT.maxIdleIteration, Balancer.Parameters.DEFAULT.maxIdleIteration,
datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded); datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded,
false);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r); assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
} finally { } finally {
Expand Down Expand Up @@ -1296,12 +1299,7 @@ public void testBalancerWithRamDisk() throws Exception {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);


// Run Balancer // Run Balancer
Balancer.Parameters p = new Balancer.Parameters( final Balancer.Parameters p = Parameters.DEFAULT;
Parameters.DEFAULT.policy,
Parameters.DEFAULT.threshold,
Balancer.Parameters.DEFAULT.maxIdleIteration,
Parameters.DEFAULT.nodesToBeExcluded,
Parameters.DEFAULT.nodesToBeIncluded);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);


// Validate no RAM_DISK block should be moved // Validate no RAM_DISK block should be moved
Expand All @@ -1315,6 +1313,75 @@ public void testBalancerWithRamDisk() throws Exception {
} }
} }


/**
* Check that the balancer exits when there is an unfinalized upgrade.
*/
@Test(timeout=300000)
public void testBalancerDuringUpgrade() throws Exception {
final int SEED = 0xFADED;
Configuration conf = new HdfsConfiguration();
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);

final int BLOCK_SIZE = 1024*1024;
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(1)
.storageCapacities(new long[] { BLOCK_SIZE * 10 })
.storageTypes(new StorageType[] { DEFAULT })
.storagesPerDatanode(1)
.build();

try {
cluster.waitActive();
// Create a file on the single DN
final String METHOD_NAME = GenericTestUtils.getMethodName();
final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");

DistributedFileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE,
(short) 1, SEED);

// Add another DN with the same capacity, cluster is now unbalanced
cluster.startDataNodes(conf, 1, true, null, null);
cluster.triggerHeartbeats();
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);

// Run balancer
final Balancer.Parameters p = Parameters.DEFAULT;

fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE);
fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);

// Rolling upgrade should abort the balancer
assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(),
Balancer.run(namenodes, p, conf));

// Should work with the -runDuringUpgrade flag.
final Balancer.Parameters runDuringUpgrade =
new Balancer.Parameters(Parameters.DEFAULT.policy,
Parameters.DEFAULT.threshold,
Parameters.DEFAULT.maxIdleIteration,
Parameters.DEFAULT.nodesToBeExcluded,
Parameters.DEFAULT.nodesToBeIncluded,
true);
assertEquals(ExitStatus.SUCCESS.getExitCode(),
Balancer.run(namenodes, runDuringUpgrade, conf));

// Finalize the rolling upgrade
fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE);

// Should also work after finalization.
assertEquals(ExitStatus.SUCCESS.getExitCode(),
Balancer.run(namenodes, p, conf));

} finally {
cluster.shutdown();
}
}

/** /**
* Test special case. Two replicas belong to same block should not in same node. * Test special case. Two replicas belong to same block should not in same node.
* We have 2 nodes. * We have 2 nodes.
Expand Down

0 comments on commit a7a7768

Please sign in to comment.