Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17075. Reconfig disk balancer parameters for datanode #5823

Merged
merged 4 commits into from
Jul 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
Expand Down Expand Up @@ -356,7 +360,9 @@ public class DataNode extends ReconfigurableBase
DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY,
FS_DU_INTERVAL_KEY,
FS_GETSPACEUSED_JITTER_KEY,
FS_GETSPACEUSED_CLASSNAME));
FS_GETSPACEUSED_CLASSNAME,
DFS_DISK_BALANCER_ENABLED,
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL));

public static final String METRICS_LOG_NAME = "DataNodeMetricsLog";

Expand Down Expand Up @@ -706,6 +712,9 @@ public String reconfigurePropertyImpl(String property, String newVal)
case FS_GETSPACEUSED_JITTER_KEY:
case FS_GETSPACEUSED_CLASSNAME:
return reconfDfsUsageParameters(property, newVal);
case DFS_DISK_BALANCER_ENABLED:
case DFS_DISK_BALANCER_PLAN_VALID_INTERVAL:
return reconfDiskBalancerParameters(property, newVal);
default:
break;
}
Expand Down Expand Up @@ -951,6 +960,45 @@ private String reconfDfsUsageParameters(String property, String newVal)
}
}

private String reconfDiskBalancerParameters(String property, String newVal)
throws ReconfigurationException {
String result = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
if (property.equals(DFS_DISK_BALANCER_ENABLED)) {
if (newVal != null && !newVal.equalsIgnoreCase("true")
&& !newVal.equalsIgnoreCase("false")) {
throw new IllegalArgumentException("Not a valid Boolean value for " + property +
" in reconfDiskBalancerParameters");
ayushtkn marked this conversation as resolved.
Show resolved Hide resolved
}
boolean enable = (newVal == null ? DFS_DISK_BALANCER_ENABLED_DEFAULT :
Boolean.parseBoolean(newVal));
ayushtkn marked this conversation as resolved.
Show resolved Hide resolved
getDiskBalancer().setDiskBalancerEnabled(enable);
result = Boolean.toString(enable);
} else if (property.equals(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL)) {
if (newVal == null) {
// set to default
long defaultInterval = getConf().getTimeDuration(
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
getDiskBalancer().setPlanValidityInterval(defaultInterval);
result = DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT;
} else {
long newInterval = getConf()
.getTimeDurationHelper(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
newVal, TimeUnit.MILLISECONDS);
getDiskBalancer().setPlanValidityInterval(newInterval);
result = newVal;
}
}
LOG.info("RECONFIGURE* changed {} to {}", property, result);
return result;
} catch (IllegalArgumentException | IOException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}

/**
* Get a list of the keys of the re-configurable properties in configuration.
*/
Expand Down Expand Up @@ -4201,7 +4249,7 @@ public List<DatanodeVolumeInfo> getVolumeReport() throws IOException {
return volumeInfoList;
}

private DiskBalancer getDiskBalancer() throws IOException {
public DiskBalancer getDiskBalancer() throws IOException {
ayushtkn marked this conversation as resolved.
Show resolved Hide resolved
if (this.diskBalancer == null) {
throw new IOException("DiskBalancer is not initialized");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ public class DiskBalancer {
private final BlockMover blockMover;
private final ReentrantLock lock;
private final ConcurrentHashMap<VolumePair, DiskBalancerWorkItem> workMap;
private boolean isDiskBalancerEnabled = false;
private volatile boolean isDiskBalancerEnabled = false;
private ExecutorService scheduler;
private Future future;
private String planID;
private String planFile;
private DiskBalancerWorkStatus.Result currentResult;
private long bandwidth;
private long planValidityInterval;
private volatile long planValidityInterval;
private final Configuration config;

/**
Expand Down Expand Up @@ -341,6 +341,59 @@ private void checkDiskBalancerEnabled()
}
}

/**
* Sets Disk balancer is to enable or not to enable.
*
* @param diskBalancerEnabled
* true, enable diskBalancer, otherwise false to disable it.
*/
public void setDiskBalancerEnabled(boolean diskBalancerEnabled) {
isDiskBalancerEnabled = diskBalancerEnabled;
}

/**
* Returns the value indicating if diskBalancer is enabled.
*
* @return boolean.
*/
@VisibleForTesting
public boolean isDiskBalancerEnabled() {
return isDiskBalancerEnabled;
}

/**
* Sets maximum amount of time disk balancer plan is valid.
*
* @param planValidityInterval
ayushtkn marked this conversation as resolved.
Show resolved Hide resolved
*/
public void setPlanValidityInterval(long planValidityInterval) {
this.config.setTimeDuration(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
planValidityInterval, TimeUnit.MILLISECONDS);
this.planValidityInterval = planValidityInterval;
}

/**
* Gets maximum amount of time disk balancer plan is valid, then milliseconds is assumed.
*
* @return long
ayushtkn marked this conversation as resolved.
Show resolved Hide resolved
*/
@VisibleForTesting
public long getPlanValidityInterval() {
return planValidityInterval;
}

/**
* Gets maximum amount of time disk balancer plan is valid in config,
* then milliseconds is assumed.
*
* @return long
ayushtkn marked this conversation as resolved.
Show resolved Hide resolved
*/
@VisibleForTesting
public long getPlanValidityIntervalInConfig() {
return config.getTimeDuration(DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
}

/**
* Verifies that user provided plan is valid.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_SLOWDISKS_TO_EXCLUDE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
Expand All @@ -57,6 +61,7 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
Expand Down Expand Up @@ -794,4 +799,56 @@ public void testDfsUsageKlass() throws ReconfigurationException, InterruptedExce
Thread.sleep(5000);
assertTrue(counter > lastCounter);
}

@Test
public void testDiskBalancerParameters() throws Exception {
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);

// Verify DFS_DISK_BALANCER_ENABLED.
// Try invalid values.
LambdaTestUtils.intercept(ReconfigurationException.class,
"Could not change property dfs.disk.balancer.enabled from 'true' to 'text'",
() -> dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "text"));

// Set default value.
dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, null);
assertEquals(dn.getConf().getBoolean(DFS_DISK_BALANCER_ENABLED,
DFS_DISK_BALANCER_ENABLED_DEFAULT), dn.getDiskBalancer().isDiskBalancerEnabled());

// Set DFS_DISK_BALANCER_ENABLED to false.
dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "false");
assertFalse(dn.getDiskBalancer().isDiskBalancerEnabled());

// Set DFS_DISK_BALANCER_ENABLED to true.
dn.reconfigureProperty(DFS_DISK_BALANCER_ENABLED, "true");
assertTrue(dn.getDiskBalancer().isDiskBalancerEnabled());

// Verify DFS_DISK_BALANCER_PLAN_VALID_INTERVAL.
// Try invalid values.
LambdaTestUtils.intercept(ReconfigurationException.class,
"Could not change property dfs.disk.balancer.plan.valid.interval from " +
"'1d' to 'text'",
() -> dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "text"));

// Set default value.
dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, null);
assertEquals(dn.getConf().getTimeDuration(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS),
dn.getDiskBalancer().getPlanValidityInterval());
assertEquals(dn.getConf().getTimeDuration(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL,
DFS_DISK_BALANCER_PLAN_VALID_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS),
dn.getDiskBalancer().getPlanValidityIntervalInConfig());

// Set value is 6 then 6 milliseconds.
dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "" + 6);
assertEquals(6, dn.getDiskBalancer().getPlanValidityInterval());
assertEquals(6, dn.getDiskBalancer().getPlanValidityIntervalInConfig());

// Set value with time unit.
dn.reconfigureProperty(DFS_DISK_BALANCER_PLAN_VALID_INTERVAL, "1m");
assertEquals(60000, dn.getDiskBalancer().getPlanValidityInterval());
assertEquals(60000, dn.getDiskBalancer().getPlanValidityIntervalInConfig());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void testGetDiskBalancerInvalidSetting() throws Exception {
}

@Test
public void testgetDiskBalancerBandwidth() throws Exception {
public void testGetDiskBalancerBandwidth() throws Exception {
RpcTestHelper rpcTestHelper = new RpcTestHelper().invoke();
DataNode dataNode = rpcTestHelper.getDataNode();
String planHash = rpcTestHelper.getPlanHash();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException, Interr
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(20, outs.size());
assertEquals(22, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
}

Expand Down