Skip to content

Commit

Permalink
HDFS-10737. disk balancer add volume path to report command. Contribu…
Browse files Browse the repository at this point in the history
…ted by Yuanbo Liu.
  • Loading branch information
anuengineer committed Aug 15, 2016
1 parent d677b68 commit 9f29f42
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 35 deletions.
Expand Up @@ -33,13 +33,17 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
import org.apache.hadoop.hdfs.tools.DiskBalancer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -420,6 +424,37 @@ protected int parseTopNodes(final CommandLine cmd, final StrBuilder result) {
return Math.min(nodes, cluster.getNodes().size());
}

/**
* Reads the Physical path of the disks we are balancing. This is needed to
* make the disk balancer human friendly and not used in balancing.
*
* @param node - Disk Balancer Node.
*/
protected void populatePathNames(
DiskBalancerDataNode node) throws IOException {
// if the cluster is a local file system, there is no need to
// invoke rpc call to dataNode.
if (getClusterURI().getScheme().startsWith("file")) {
return;
}
String dnAddress = node.getDataNodeIP() + ":" + node.getDataNodePort();
ClientDatanodeProtocol dnClient = getDataNodeProxy(dnAddress);
String volumeNameJson = dnClient.getDiskBalancerSetting(
DiskBalancerConstants.DISKBALANCER_VOLUME_NAME);
ObjectMapper mapper = new ObjectMapper();

@SuppressWarnings("unchecked")
Map<String, String> volumeMap =
mapper.readValue(volumeNameJson, HashMap.class);
for (DiskBalancerVolumeSet set : node.getVolumeSets().values()) {
for (DiskBalancerVolume vol : set.getVolumes()) {
if (volumeMap.containsKey(vol.getUuid())) {
vol.setPath(volumeMap.get(vol.getUuid()));
}
}
}
}

/**
* Set top number of nodes to be processed.
* */
Expand Down
Expand Up @@ -24,23 +24,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
.DiskBalancerVolumeSet;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
import org.apache.hadoop.hdfs.tools.DiskBalancer;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Class that implements Plan Command.
Expand Down Expand Up @@ -158,30 +148,6 @@ public void execute(CommandLine cmd) throws Exception {
}
}

/**
* Reads the Physical path of the disks we are balancing. This is needed to
* make the disk balancer human friendly and not used in balancing.
*
* @param node - Disk Balancer Node.
*/
private void populatePathNames(DiskBalancerDataNode node) throws IOException {
String dnAddress = node.getDataNodeIP() + ":" + node.getDataNodePort();
ClientDatanodeProtocol dnClient = getDataNodeProxy(dnAddress);
String volumeNameJson = dnClient.getDiskBalancerSetting(
DiskBalancerConstants.DISKBALANCER_VOLUME_NAME);
ObjectMapper mapper = new ObjectMapper();

@SuppressWarnings("unchecked")
Map<String, String> volumeMap =
mapper.readValue(volumeNameJson, HashMap.class);
for (DiskBalancerVolumeSet set : node.getVolumeSets().values()) {
for (DiskBalancerVolume vol : set.getVolumes()) {
if (volumeMap.containsKey(vol.getUuid())) {
vol.setPath(volumeMap.get(vol.getUuid()));
}
}
}
}

/**
* Gets extended help for this command.
Expand Down
Expand Up @@ -130,7 +130,7 @@ private void handleTopReport(final CommandLine cmd, final StrBuilder result,
}

private void handleNodeReport(final CommandLine cmd, StrBuilder result,
final String nodeFormat, final String volumeFormat) {
final String nodeFormat, final String volumeFormat) throws Exception {
String outputLine = "";
/*
* get value that identifies a DataNode from command line, it could be UUID,
Expand All @@ -152,6 +152,8 @@ private void handleNodeReport(final CommandLine cmd, StrBuilder result,
final String trueStr = "True";
final String falseStr = "False";
DiskBalancerDataNode dbdn = getNode(nodeVal);
// get storage path of datanode
populatePathNames(dbdn);

if (dbdn == null) {
outputLine = String.format(
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.hdfs.server.diskbalancer.command;


import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -50,6 +51,7 @@
import static org.apache.hadoop.hdfs.tools.DiskBalancer.NODE;
import static org.apache.hadoop.hdfs.tools.DiskBalancer.PLAN;
import static org.apache.hadoop.hdfs.tools.DiskBalancer.QUERY;
import static org.apache.hadoop.hdfs.tools.DiskBalancer.REPORT;

import org.junit.Rule;
import org.junit.rules.ExpectedException;
Expand Down Expand Up @@ -261,6 +263,41 @@ public void testReportNode() throws Exception {
containsString("0.25 free: 490407853993/2000000000000"))));
}

@Test(timeout = 60000)
public void testReportNodeWithoutJson() throws Exception {
String dataNodeUuid = cluster.getDataNodes().get(0).getDatanodeUuid();
final String planArg = String.format("-%s -%s %s",
REPORT, NODE, dataNodeUuid);
final String cmdLine = String
.format(
"hdfs diskbalancer %s", planArg);
List<String> outputs = runCommand(cmdLine, cluster);

assertThat(
outputs.get(0),
containsString("Processing report command"));
assertThat(
outputs.get(1),
is(allOf(containsString("Reporting volume information for DataNode"),
containsString(dataNodeUuid))));
assertThat(
outputs.get(2),
is(allOf(containsString(dataNodeUuid),
containsString("2 volumes with node data density 0.00"))));
assertThat(
outputs.get(3),
is(allOf(containsString("DISK"),
containsString("/dfs/data/data1"),
containsString("0.00"),
containsString("1.00"))));
assertThat(
outputs.get(4),
is(allOf(containsString("DISK"),
containsString("/dfs/data/data2"),
containsString("0.00"),
containsString("1.00"))));
}

@Test(timeout = 60000)
public void testReadClusterFromJson() throws Exception {
ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson,
Expand All @@ -283,6 +320,17 @@ public void testPlanNode() throws Exception {
runCommand(cmdLine, cluster);
}

/* test -plan DataNodeID */
@Test(timeout = 60000)
public void testPlanJsonNode() throws Exception {
final String planArg = String.format("-%s %s", PLAN,
"a87654a9-54c7-4693-8dd9-c9c7021dc340");
final String cmdLine = String
.format(
"hdfs diskbalancer %s", planArg);
runCommand(cmdLine);
}

/* Test that illegal arguments are handled correctly*/
@Test(timeout = 60000)
public void testIllegalArgument() throws Exception {
Expand Down

0 comments on commit 9f29f42

Please sign in to comment.