Skip to content

Commit

Permalink
HDFS-8826. In Balancer, add an option to specify the source node list…
Browse files Browse the repository at this point in the history
… so that balancer only selects blocks to move from those nodes.
  • Loading branch information
Tsz-Wo Nicholas Sze committed Aug 19, 2015
1 parent 30e342a commit 7ecbfd4
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 112 deletions.
Expand Up @@ -80,13 +80,14 @@ public static void readFileToSetWithFileInputStream(String type,
String[] nodes = line.split("[ \t\n\f\r]+"); String[] nodes = line.split("[ \t\n\f\r]+");
if (nodes != null) { if (nodes != null) {
for (int i = 0; i < nodes.length; i++) { for (int i = 0; i < nodes.length; i++) {
if (nodes[i].trim().startsWith("#")) { nodes[i] = nodes[i].trim();
if (nodes[i].startsWith("#")) {
// Everything from now on is a comment // Everything from now on is a comment
break; break;
} }
if (!nodes[i].isEmpty()) { if (!nodes[i].isEmpty()) {
LOG.info("Adding " + nodes[i] + " to the list of " + type + LOG.info("Adding a node \"" + nodes[i] + "\" to the list of "
" hosts from " + filename); + type + " hosts from " + filename);
set.add(nodes[i]); set.add(nodes[i]);
} }
} }
Expand Down
Expand Up @@ -18,7 +18,6 @@


package org.apache.hadoop.util; package org.apache.hadoop.util;


import com.google.common.base.Preconditions;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.net.URI; import java.net.URI;
Expand All @@ -28,7 +27,6 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
Expand All @@ -45,6 +43,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;


import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses; import com.google.common.net.InetAddresses;


/** /**
Expand Down Expand Up @@ -379,19 +378,6 @@ public static String[] getTrimmedStrings(String str){
return str.trim().split("\\s*,\\s*"); return str.trim().split("\\s*,\\s*");
} }


/**
* Trims all the strings in a Collection<String> and returns a Set<String>.
* @param strings
* @return
*/
public static Set<String> getTrimmedStrings(Collection<String> strings) {
Set<String> trimmedStrings = new HashSet<String>();
for (String string: strings) {
trimmedStrings.add(string.trim());
}
return trimmedStrings;
}

final public static String[] emptyStringArray = {}; final public static String[] emptyStringArray = {};
final public static char COMMA = ','; final public static char COMMA = ',';
final public static String COMMA_STR = ","; final public static String COMMA_STR = ",";
Expand Down
3 changes: 3 additions & 0 deletions hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Expand Up @@ -808,6 +808,9 @@ Release 2.8.0 - UNRELEASED


HDFS-8435. Support CreateFlag in WebHDFS. (Jakob Homan via cdouglas) HDFS-8435. Support CreateFlag in WebHDFS. (Jakob Homan via cdouglas)


HDFS-8826. In Balancer, add an option to specify the source node list
so that balancer only selects blocks to move from those nodes. (szetszwo)

OPTIMIZATIONS OPTIMIZATIONS


HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
Expand Down Expand Up @@ -188,6 +190,7 @@ public class Balancer {
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
private final NameNodeConnector nnc; private final NameNodeConnector nnc;
private final BalancingPolicy policy; private final BalancingPolicy policy;
private final Set<String> sourceNodes;
private final boolean runDuringUpgrade; private final boolean runDuringUpgrade;
private final double threshold; private final double threshold;
private final long maxSizeToMove; private final long maxSizeToMove;
Expand Down Expand Up @@ -260,11 +263,12 @@ static int getInt(Configuration conf, String key, int defaultValue) {
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT); DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);


this.nnc = theblockpool; this.nnc = theblockpool;
this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, this.dispatcher = new Dispatcher(theblockpool, p.includedNodes,
p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads, p.excludedNodes, movedWinWidth, moverThreads, dispatcherThreads,
maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf); maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf);
this.threshold = p.threshold; this.threshold = p.threshold;
this.policy = p.policy; this.policy = p.policy;
this.sourceNodes = p.sourceNodes;
this.runDuringUpgrade = p.runDuringUpgrade; this.runDuringUpgrade = p.runDuringUpgrade;


this.maxSizeToMove = getLong(conf, this.maxSizeToMove = getLong(conf,
Expand Down Expand Up @@ -318,14 +322,23 @@ private long init(List<DatanodeStorageReport> reports) {
long overLoadedBytes = 0L, underLoadedBytes = 0L; long overLoadedBytes = 0L, underLoadedBytes = 0L;
for(DatanodeStorageReport r : reports) { for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo());
for(StorageType t : StorageType.getMovableTypes()) { for(StorageType t : StorageType.getMovableTypes()) {
final Double utilization = policy.getUtilization(r, t); final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type if (utilization == null) { // datanode does not have such storage type
continue; continue;
} }


final double average = policy.getAvgUtilization(t);
if (utilization >= average && !isSource) {
LOG.info(dn + "[" + t + "] has utilization=" + utilization
+ " >= average=" + average
+ " but it is not specified as a source; skipping it.");
continue;
}

final double utilizationDiff = utilization - average;
final long capacity = getCapacity(r, t); final long capacity = getCapacity(r, t);
final double utilizationDiff = utilization - policy.getAvgUtilization(t);
final double thresholdDiff = Math.abs(utilizationDiff) - threshold; final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
final long maxSize2Move = computeMaxSize2Move(capacity, final long maxSize2Move = computeMaxSize2Move(capacity,
getRemaining(r, t), utilizationDiff, maxSizeToMove); getRemaining(r, t), utilizationDiff, maxSizeToMove);
Expand Down Expand Up @@ -623,6 +636,9 @@ static int run(Collection<URI> namenodes, final Parameters p,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000; DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
LOG.info("namenodes = " + namenodes); LOG.info("namenodes = " + namenodes);
LOG.info("parameters = " + p); LOG.info("parameters = " + p);
LOG.info("included nodes = " + p.includedNodes);
LOG.info("excluded nodes = " + p.excludedNodes);
LOG.info("source nodes = " + p.sourceNodes);


System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");


Expand Down Expand Up @@ -686,43 +702,50 @@ static class Parameters {
static final Parameters DEFAULT = new Parameters( static final Parameters DEFAULT = new Parameters(
BalancingPolicy.Node.INSTANCE, 10.0, BalancingPolicy.Node.INSTANCE, 10.0,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(), Collections.<String>emptySet(), Collections.<String>emptySet(),
Collections.<String>emptySet(),
false); false);


final BalancingPolicy policy; final BalancingPolicy policy;
final double threshold; final double threshold;
final int maxIdleIteration; final int maxIdleIteration;
// exclude the nodes in this set from balancing operations /** Exclude the nodes in this set. */
Set<String> nodesToBeExcluded; final Set<String> excludedNodes;
//include only these nodes in balancing operations /** If empty, include any node; otherwise, include only these nodes. */
Set<String> nodesToBeIncluded; final Set<String> includedNodes;
/** If empty, any node can be a source;
* otherwise, use only these nodes as source nodes.
*/
final Set<String> sourceNodes;
/** /**
* Whether to run the balancer during upgrade. * Whether to run the balancer during upgrade.
*/ */
final boolean runDuringUpgrade; final boolean runDuringUpgrade;


Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration, Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded, Set<String> excludedNodes, Set<String> includedNodes,
boolean runDuringUpgrade) { Set<String> sourceNodes, boolean runDuringUpgrade) {
this.policy = policy; this.policy = policy;
this.threshold = threshold; this.threshold = threshold;
this.maxIdleIteration = maxIdleIteration; this.maxIdleIteration = maxIdleIteration;
this.nodesToBeExcluded = nodesToBeExcluded; this.excludedNodes = excludedNodes;
this.nodesToBeIncluded = nodesToBeIncluded; this.includedNodes = includedNodes;
this.sourceNodes = sourceNodes;
this.runDuringUpgrade = runDuringUpgrade; this.runDuringUpgrade = runDuringUpgrade;
} }


@Override @Override
public String toString() { public String toString() {
return String.format("%s.%s [%s," return String.format("%s.%s [%s,"
+ " threshold = %s," + " threshold = %s,"
+ " max idle iteration = %s, " + " max idle iteration = %s,"
+ "number of nodes to be excluded = %s," + " #excluded nodes = %s,"
+ " number of nodes to be included = %s," + " #included nodes = %s,"
+ " #source nodes = %s,"
+ " run during upgrade = %s]", + " run during upgrade = %s]",
Balancer.class.getSimpleName(), getClass().getSimpleName(), Balancer.class.getSimpleName(), getClass().getSimpleName(),
policy, threshold, maxIdleIteration, policy, threshold, maxIdleIteration,
nodesToBeExcluded.size(), nodesToBeIncluded.size(), excludedNodes.size(), includedNodes.size(), sourceNodes.size(),
runDuringUpgrade); runDuringUpgrade);
} }
} }
Expand Down Expand Up @@ -763,8 +786,9 @@ static Parameters parse(String[] args) {
BalancingPolicy policy = Parameters.DEFAULT.policy; BalancingPolicy policy = Parameters.DEFAULT.policy;
double threshold = Parameters.DEFAULT.threshold; double threshold = Parameters.DEFAULT.threshold;
int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration; int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration;
Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded; Set<String> excludedNodes = Parameters.DEFAULT.excludedNodes;
Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded; Set<String> includedNodes = Parameters.DEFAULT.includedNodes;
Set<String> sourceNodes = Parameters.DEFAULT.sourceNodes;
boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade; boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade;


if (args != null) { if (args != null) {
Expand Down Expand Up @@ -796,29 +820,14 @@ static Parameters parse(String[] args) {
throw e; throw e;
} }
} else if ("-exclude".equalsIgnoreCase(args[i])) { } else if ("-exclude".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length, excludedNodes = new HashSet<>();
"List of nodes to exclude | -f <filename> is missing: args = " i = processHostList(args, i, "exclude", excludedNodes);
+ Arrays.toString(args));
if ("-f".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length,
"File containing nodes to exclude is not specified: args = "
+ Arrays.toString(args));
nodesTobeExcluded = Util.getHostListFromFile(args[i], "exclude");
} else {
nodesTobeExcluded = Util.parseHostList(args[i]);
}
} else if ("-include".equalsIgnoreCase(args[i])) { } else if ("-include".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length, includedNodes = new HashSet<>();
"List of nodes to include | -f <filename> is missing: args = " i = processHostList(args, i, "include", includedNodes);
+ Arrays.toString(args)); } else if ("-source".equalsIgnoreCase(args[i])) {
if ("-f".equalsIgnoreCase(args[i])) { sourceNodes = new HashSet<>();
checkArgument(++i < args.length, i = processHostList(args, i, "source", sourceNodes);
"File containing nodes to include is not specified: args = "
+ Arrays.toString(args));
nodesTobeIncluded = Util.getHostListFromFile(args[i], "include");
} else {
nodesTobeIncluded = Util.parseHostList(args[i]);
}
} else if ("-idleiterations".equalsIgnoreCase(args[i])) { } else if ("-idleiterations".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length, checkArgument(++i < args.length,
"idleiterations value is missing: args = " + Arrays "idleiterations value is missing: args = " + Arrays
Expand All @@ -836,7 +845,7 @@ static Parameters parse(String[] args) {
+ Arrays.toString(args)); + Arrays.toString(args));
} }
} }
checkArgument(nodesTobeExcluded.isEmpty() || nodesTobeIncluded.isEmpty(), checkArgument(excludedNodes.isEmpty() || includedNodes.isEmpty(),
"-exclude and -include options cannot be specified together."); "-exclude and -include options cannot be specified together.");
} catch(RuntimeException e) { } catch(RuntimeException e) {
printUsage(System.err); printUsage(System.err);
Expand All @@ -845,7 +854,31 @@ static Parameters parse(String[] args) {
} }


return new Parameters(policy, threshold, maxIdleIteration, return new Parameters(policy, threshold, maxIdleIteration,
nodesTobeExcluded, nodesTobeIncluded, runDuringUpgrade); excludedNodes, includedNodes, sourceNodes, runDuringUpgrade);
}

private static int processHostList(String[] args, int i, String type,
Set<String> nodes) {
Preconditions.checkArgument(++i < args.length,
"List of %s nodes | -f <filename> is missing: args=%s",
type, Arrays.toString(args));
if ("-f".equalsIgnoreCase(args[i])) {
Preconditions.checkArgument(++i < args.length,
"File containing %s nodes is not specified: args=%s",
type, Arrays.toString(args));

final String filename = args[i];
try {
HostsFileReader.readFileToSet(type, filename, nodes);
} catch (IOException e) {
throw new IllegalArgumentException(
"Failed to read " + type + " node list from file: " + filename);
}
} else {
final String[] addresses = StringUtils.getTrimmedStrings(args[i]);
nodes.addAll(Arrays.asList(addresses));
}
return i;
} }


private static void printUsage(PrintStream out) { private static void printUsage(PrintStream out) {
Expand Down
Expand Up @@ -28,7 +28,6 @@
import java.io.OutputStream; import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EnumMap; import java.util.EnumMap;
import java.util.HashMap; import java.util.HashMap;
Expand Down Expand Up @@ -70,7 +69,6 @@
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;


Expand Down Expand Up @@ -796,7 +794,11 @@ private void dispatchBlocks() {
if (shouldFetchMoreBlocks()) { if (shouldFetchMoreBlocks()) {
// fetch new blocks // fetch new blocks
try { try {
blocksToReceive -= getBlockList(); final long received = getBlockList();
if (received == 0) {
return;
}
blocksToReceive -= received;
continue; continue;
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Exception while getting block list", e); LOG.warn("Exception while getting block list", e);
Expand Down Expand Up @@ -925,8 +927,11 @@ private boolean shouldIgnore(DatanodeInfo dn) {


if (decommissioned || decommissioning || excluded || notIncluded) { if (decommissioned || decommissioning || excluded || notIncluded) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", " LOG.trace("Excluding datanode " + dn
+ decommissioning + ", " + excluded + ", " + notIncluded); + ": decommissioned=" + decommissioned
+ ", decommissioning=" + decommissioning
+ ", excluded=" + excluded
+ ", notIncluded=" + notIncluded);
} }
return true; return true;
} }
Expand Down Expand Up @@ -1213,31 +1218,5 @@ private static boolean isIn(Set<String> nodes, String host, int port) {
} }
return (nodes.contains(host) || nodes.contains(host + ":" + port)); return (nodes.contains(host) || nodes.contains(host + ":" + port));
} }

/**
* Parse a comma separated string to obtain set of host names
*
* @return set of host names
*/
static Set<String> parseHostList(String string) {
String[] addrs = StringUtils.getTrimmedStrings(string);
return new HashSet<String>(Arrays.asList(addrs));
}

/**
* Read set of host names from a file
*
* @return set of host names
*/
static Set<String> getHostListFromFile(String fileName, String type) {
Set<String> nodes = new HashSet<String>();
try {
HostsFileReader.readFileToSet(type, fileName, nodes);
return StringUtils.getTrimmedStrings(nodes);
} catch (IOException e) {
throw new IllegalArgumentException(
"Failed to read host list from file: " + fileName);
}
}
} }
} }

0 comments on commit 7ecbfd4

Please sign in to comment.