Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ public class Balancer {
+ "\tExcludes the specified datanodes."
+ "\n\t[-include [-f <hosts-file> | <comma-separated list of hosts>]]"
+ "\tIncludes only the specified datanodes."
+ "\n\t[-includeRack <Rackname> ]"
+ "\tSpecify the rack to be balanced."
+ "\n\t[-source [-f <hosts-file> | <comma-separated list of hosts>]]"
+ "\tPick only the specified datanodes as source nodes."
+ "\n\t[-blockpools <comma-separated list of blockpool ids>]"
Expand Down Expand Up @@ -352,6 +354,7 @@ static int getFailedTimesSinceLastSuccessfulBalance() {
this.sourceNodes = p.getSourceNodes();
this.runDuringUpgrade = p.getRunDuringUpgrade();
this.sortTopNodes = p.getSortTopNodes();
dispatcher.setRack(p.getRack());

this.maxSizeToMove = getLongBytes(conf,
DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
Expand Down Expand Up @@ -721,6 +724,9 @@ Result runOneIteration() {
try {
metrics.setIterateRunning(true);
final List<DatanodeStorageReport> reports = dispatcher.init();
if (reports.size() == 0) {
return newResult(ExitStatus.ILLEGAL_ARGUMENTS, 0, 0);
}
final long bytesLeftToMove = init(reports);
metrics.setBytesLeftToMove(bytesLeftToMove);
if (bytesLeftToMove == 0) {
Expand Down Expand Up @@ -804,6 +810,8 @@ static private int doBalance(Collection<URI> namenodes,
LOG.info("included nodes = " + p.getIncludedNodes());
LOG.info("excluded nodes = " + p.getExcludedNodes());
LOG.info("source nodes = " + p.getSourceNodes());
LOG.info("included rack = "+ p.getRack());

checkKeytabAndInit(conf);
System.out.println("Time Stamp Iteration#"
+ " Bytes Already Moved Bytes Left To Move Bytes Being Moved"
Expand All @@ -818,11 +826,29 @@ static private int doBalance(Collection<URI> namenodes,
for(int iteration = 0; !done; iteration++) {
done = true;
Collections.shuffle(connectors);
int connectorCount = connectors.size();
for(NameNodeConnector nnc : connectors) {
if (p.getBlockPools().size() == 0
|| p.getBlockPools().contains(nnc.getBlockpoolID())) {
final Balancer b = new Balancer(nnc, p, conf);
final Result r = b.runOneIteration();
if (r.exitStatus == ExitStatus.ILLEGAL_ARGUMENTS) {
if (LOG.isWarnEnabled()) {
LOG.warn("No datanodes matched for the NamenodeUri = "
+ nnc.getNameNodeUri() + ", includeRack = " + p.getRack()
+ ", balancer ignored.");
}
done = false;
// Decrement the connector count, once all connectors identified
// to be ignored then balancer can be stopped.
if (--connectorCount <= 0) {
// Invalid configuration, there is no data identified to move
// across nodes
return r.exitStatus.getExitCode();
}
//There is no matching data nodes for the current NamenodeURI
continue;
}
r.print(iteration, nnc, System.out);

// clean all lists
Expand Down Expand Up @@ -1034,6 +1060,11 @@ static BalancerParameters parse(String[] args) {
Set<String> sourceNodes = new HashSet<>();
i = processHostList(args, i, "source", sourceNodes);
b.setSourceNodes(sourceNodes);
} else if ("-includeRack".equalsIgnoreCase(args[i])) {
String rackName = parseRack(args, i, "includeRack");
i++;
//Rack name / Available Zone name.
b.setRack(rackName);
} else if ("-blockpools".equalsIgnoreCase(args[i])) {
Preconditions.checkArgument(
++i < args.length,
Expand Down Expand Up @@ -1110,6 +1141,12 @@ private static int processHostList(String[] args, int i, String type,
return i;
}

private static String parseRack(String[] args, int i, String type) {
Preconditions.checkArgument(++i < args.length,
"Rack name is missing: args=%s", type, Arrays.toString(args));
return args[i];
}

private static Set<String> parseBlockPoolList(String string) {
String[] addrs = StringUtils.getTrimmedStrings(string);
return new HashSet<String>(Arrays.asList(addrs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ final class BalancerParameters {

static final BalancerParameters DEFAULT = new BalancerParameters();

/**
* Rack / Available Zone name
*/
private String rack;

private BalancerParameters() {
this(new Builder());
}
Expand All @@ -65,6 +70,7 @@ private BalancerParameters(Builder builder) {
this.sourceNodes = builder.sourceNodes;
this.blockpools = builder.blockpools;
this.runDuringUpgrade = builder.runDuringUpgrade;
this.rack = builder.rack;
this.runAsService = builder.runAsService;
this.sortTopNodes = builder.sortTopNodes;
this.hotBlockTimeInterval = builder.hotBlockTimeInterval;
Expand Down Expand Up @@ -119,13 +125,18 @@ public String toString() {
return String.format("%s.%s [%s," + " threshold = %s,"
+ " max idle iteration = %s," + " #excluded nodes = %s,"
+ " #included nodes = %s," + " #source nodes = %s,"
+ " #includeRack = %s,"
+ " #blockpools = %s," + " run during upgrade = %s,"
+ " sort top nodes = %s,"
+ " hot block time interval = %s]",
Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
threshold, maxIdleIteration, excludedNodes.size(),
includedNodes.size(), sourceNodes.size(), blockpools.size(),
runDuringUpgrade, sortTopNodes, hotBlockTimeInterval);
runDuringUpgrade, sortTopNodes, hotBlockTimeInterval, rack);
}

public String getRack() {
return rack;
}

static class Builder {
Expand All @@ -142,6 +153,8 @@ static class Builder {
private boolean runAsService = false;
private boolean sortTopNodes = false;
private long hotBlockTimeInterval = 0;
//Rack / Available Zone name
private String rack;

Builder() {
}
Expand Down Expand Up @@ -181,6 +194,11 @@ Builder setSourceNodes(Set<String> nodes) {
return this;
}

Builder setRack(String rackName) {
this.rack = rackName;
return this;
}

Builder setBlockpools(Set<String> pools) {
this.blockpools = pools;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public class Dispatcher {
private BlockPlacementPolicies placementPolicies;

private long maxIterationTime;

//Rack / Available Zone name
private String rack;
static class Allocator {
private final int max;
private int count = 0;
Expand Down Expand Up @@ -881,17 +882,33 @@ private long getBlockList() throws IOException, IllegalArgumentException {
List<Integer> adjustList = new ArrayList<>();
final String[] datanodeUuids = blkLocs.getDatanodeUuids();
final StorageType[] storageTypes = blkLocs.getStorageTypes();
ArrayList<Byte> indicesList = new ArrayList<Byte>();
for (int i = 0; i < datanodeUuids.length; i++) {
final StorageGroup g = storageGroupMap.get(
datanodeUuids[i], storageTypes[i]);
if (g != null) { // not unknown
block.addLocation(g);
if (blkLocs instanceof StripedBlockWithLocations) {
indicesList.add(
((StripedBlockWithLocations) blkLocs).getIndices()[i]);
}
} else if (blkLocs instanceof StripedBlockWithLocations) {
// some datanode may not in storageGroupMap due to decommission operation
// or balancer cli with "-exclude" parameter
adjustList.add(i);
}
}
if (blkLocs instanceof StripedBlockWithLocations) {
byte[] indices =
((StripedBlockWithLocations) blkLocs).getIndices();
// Rack level balancing has some locations null, so re-assign the
// correct indices
if (indicesList.size() != indices.length) {
for (int i = 0; i < indicesList.size(); i++) {
indices[i] = indicesList.get(i);
}
}
}

if (!adjustList.isEmpty()) {
// block.locations mismatch with block.indices
Expand Down Expand Up @@ -1164,12 +1181,18 @@ private boolean shouldIgnore(DatanodeInfo dn) {
// ignore nodes not in the include list (if include list is not empty)
final boolean notIncluded = !Util.isIncluded(includedNodes, dn);

if (outOfService || excluded || notIncluded) {
// ignore the datanodes belongs to other Rack / Network location AND data
// center feature is enabled in the cluster
String loc = dn.getNetworkLocation();
final boolean partOfRack = (null == this.rack) || loc.contains(this.rack);

if (outOfService || excluded || notIncluded || !partOfRack) {
if (LOG.isTraceEnabled()) {
LOG.trace("Excluding datanode " + dn
+ ": outOfService=" + outOfService
+ ", excluded=" + excluded
+ ", notIncluded=" + notIncluded);
+ ", notIncluded=" + notIncluded
+ ", not part of rack " + this.rack);
}
return true;
}
Expand Down Expand Up @@ -1433,6 +1456,10 @@ public void shutdownNow() {
}
}

public void setRack(String rack) {
this.rack = rack;
}

static class Util {
/** @return true if data node is part of the excludedNodes. */
static boolean isExcluded(Set<String> excludedNodes, DatanodeInfo dn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ Usage:
[-idleiterations <idleiterations>]
[-runDuringUpgrade]
[-asService]
[-includeRack <RackName>]

| COMMAND\_OPTION | Description |
|:---- |:---- |
Expand Down
Loading