Skip to content

Commit

Permalink
HDFS-17075. Reconfig disk balancer parameters for datanode (#5823). C…
Browse files Browse the repository at this point in the history
…ontributed by Haiyang Hu.

Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
  • Loading branch information
haiyang1987 committed Jul 16, 2023
1 parent b955951 commit c44823d
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
Expand Down Expand Up @@ -356,7 +360,9 @@ public class DataNode extends ReconfigurableBase
DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY,
FS_DU_INTERVAL_KEY,
FS_GETSPACEUSED_JITTER_KEY,
FS_GETSPACEUSED_CLASSNAME));
FS_GETSPACEUSED_CLASSNAME,
DFS_DISK_BALANCER_ENABLED,
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL));

public static final String METRICS_LOG_NAME = "DataNodeMetricsLog";

Expand Down Expand Up @@ -706,6 +712,9 @@ public String reconfigurePropertyImpl(String property, String newVal)
case FS_GETSPACEUSED_JITTER_KEY:
case FS_GETSPACEUSED_CLASSNAME:
return reconfDfsUsageParameters(property, newVal);
case DFS_DISK_BALANCER_ENABLED:
case DFS_DISK_BALANCER_PLAN_VALID_INTERVAL:
return reconfDiskBalancerParameters(property, newVal);
default:
break;
}
Expand Down Expand Up @@ -951,6 +960,44 @@ private String reconfDfsUsageParameters(String property, String newVal)
}
}

private String reconfDiskBalancerParameters(String property, String newVal)
throws ReconfigurationException {
String result = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
if (property.equals(DFS_DISK_BALANCER_ENABLED)) {
if (newVal != null && !newVal.equalsIgnoreCase("true")
&& !newVal.equalsIgnoreCase("false")) {
throw new IllegalArgumentException("Not a valid Boolean value for " + property);
}
boolean enable = (newVal == null ? DFS_DISK_BALANCER_ENABLED_DEFAULT :
Boolean.parseBoolean(newVal));
getDiskBalancer().setDiskBalancerEnabled(enable);
result = Boolean.toString(enable);
} else if (property.equals(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL)) {
if (newVal == null) {
// set to default
long defaultInterval = getConf().getTimeDuration(
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
getDiskBalancer().setPlanValidityInterval(defaultInterval);
result = DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT;
} else {
long newInterval = getConf()
.getTimeDurationHelper(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
newVal, TimeUnit.MILLISECONDS);
getDiskBalancer().setPlanValidityInterval(newInterval);
result = newVal;
}
}
LOG.info("RECONFIGURE* changed {} to {}", property, result);
return result;
} catch (IllegalArgumentException | IOException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}

/**
* Get a list of the keys of the re-configurable properties in configuration.
*/
Expand Down Expand Up @@ -4201,7 +4248,8 @@ public List<DatanodeVolumeInfo> getVolumeReport() throws IOException {
return volumeInfoList;
}

private DiskBalancer getDiskBalancer() throws IOException {
@VisibleForTesting
public DiskBalancer getDiskBalancer() throws IOException {
if (this.diskBalancer == null) {
throw new IOException("DiskBalancer is not initialized");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ public class DiskBalancer {
private final BlockMover blockMover;
private final ReentrantLock lock;
private final ConcurrentHashMap<VolumePair, DiskBalancerWorkItem> workMap;
private boolean isDiskBalancerEnabled = false;
private volatile boolean isDiskBalancerEnabled = false;
private ExecutorService scheduler;
private Future future;
private String planID;
private String planFile;
private DiskBalancerWorkStatus.Result currentResult;
private long bandwidth;
private long planValidityInterval;
private volatile long planValidityInterval;
private final Configuration config;

/**
Expand Down Expand Up @@ -341,6 +341,58 @@ private void checkDiskBalancerEnabled()
}
}

/**
* Sets Disk balancer is to enable or not to enable.
*
* @param diskBalancerEnabled
* true, enable diskBalancer, otherwise false to disable it.
*/
public void setDiskBalancerEnabled(boolean diskBalancerEnabled) {
isDiskBalancerEnabled = diskBalancerEnabled;
}

/**
* Returns the value indicating if diskBalancer is enabled.
*
* @return boolean.
*/
@VisibleForTesting
public boolean isDiskBalancerEnabled() {
return isDiskBalancerEnabled;
}

/**
* Sets maximum amount of time disk balancer plan is valid.
*
* @param planValidityInterval - maximum amount of time in the unit of milliseconds.
*/
public void setPlanValidityInterval(long planValidityInterval) {
this.config.setTimeDuration(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
planValidityInterval, TimeUnit.MILLISECONDS);
this.planValidityInterval = planValidityInterval;
}

/**
* Gets maximum amount of time disk balancer plan is valid.
*
* @return the maximum amount of time in milliseconds.
*/
@VisibleForTesting
public long getPlanValidityInterval() {
return planValidityInterval;
}

/**
* Gets maximum amount of time disk balancer plan is valid in config.
*
* @return the maximum amount of time in milliseconds.
*/
@VisibleForTesting
public long getPlanValidityIntervalInConfig() {
return config.getTimeDuration(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
}

/**
* Verifies that user provided plan is valid.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
Expand All @@ -57,6 +61,7 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
Expand Down Expand Up @@ -794,4 +799,56 @@ public void testDfsUsageKlass() throws ReconfigurationException, InterruptedExce
Thread.sleep(5000);
assertTrue(counter > lastCounter);
}

@Test
public void testDiskBalancerParameters() throws Exception {
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);

// Verify DFS_DISK_BALANCER_ENABLED.
// Try invalid values.
LambdaTestUtils.intercept(ReconfigurationException.class,
"Could not change property dfs.disk.balancer.enabled from 'true' to 'text'",
() -> dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "text"));

// Set default value.
dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, null);
assertEquals(dn.getConf().getBoolean(DFS_DISK_BALANCER_ENABLED,
DFS_DISK_BALANCER_ENABLED_DEFAULT), dn.getDiskBalancer().isDiskBalancerEnabled());

// Set DFS_DISK_BALANCER_ENABLED to false.
dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "false");
assertFalse(dn.getDiskBalancer().isDiskBalancerEnabled());

// Set DFS_DISK_BALANCER_ENABLED to true.
dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "true");
assertTrue(dn.getDiskBalancer().isDiskBalancerEnabled());

// Verify DFS_DISK_BALANCER_PLAN_VALID_INTERVAL.
// Try invalid values.
LambdaTestUtils.intercept(ReconfigurationException.class,
"Could not change property dfs.disk.balancer.plan.valid.interval from " +
"'1d' to 'text'",
() -> dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "text"));

// Set default value.
dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, null);
assertEquals(dn.getConf().getTimeDuration(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS),
dn.getDiskBalancer().getPlanValidityInterval());
assertEquals(dn.getConf().getTimeDuration(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS),
dn.getDiskBalancer().getPlanValidityIntervalInConfig());

// Set value is 6 then 6 milliseconds.
dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "" + 6);
assertEquals(6, dn.getDiskBalancer().getPlanValidityInterval());
assertEquals(6, dn.getDiskBalancer().getPlanValidityIntervalInConfig());

// Set value with time unit.
dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "1m");
assertEquals(60000, dn.getDiskBalancer().getPlanValidityInterval());
assertEquals(60000, dn.getDiskBalancer().getPlanValidityIntervalInConfig());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void testGetDiskBalancerInvalidSetting() throws Exception {
}

@Test
public void testgetDiskBalancerBandwidth() throws Exception {
public void testGetDiskBalancerBandwidth() throws Exception {
RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
DataNode dataNode = rpcTestHelper.getDataNode();
String planHash = rpcTestHelper.getPlanHash();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException, Interr
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(20, outs.size());
assertEquals(22, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
}

Expand Down

0 comments on commit c44823d

Please sign in to comment.