Skip to content

Commit

Permalink
HDFS-16974. Consider volumes average load of each DataNode when choos…
Browse files Browse the repository at this point in the history
…ing target. (apache#5541). Contributed by Shuyan Zhang.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
  • Loading branch information
zhangshuyan0 authored and ferdelyi committed May 26, 2023
1 parent d9d610e commit eb8b11d
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.redundancy.considerLoad.factor";
public static final double
DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT = 2.0;
public static final String DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_KEY =
"dfs.namenode.redundancy.considerLoadByVolume";
public static final boolean
DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_DEFAULT
= false;
public static final String DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY;
public static final int DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private enum NodeNotChosenReason {
NOT_IN_SERVICE("the node is not in service"),
NODE_STALE("the node is stale"),
NODE_TOO_BUSY("the node is too busy"),
NODE_TOO_BUSY_BY_VOLUME("the node is too busy based on volume load"),
TOO_MANY_NODES_ON_RACK("the rack has too many chosen nodes"),
NOT_ENOUGH_STORAGE_SPACE("not enough storage space to place the block"),
NO_REQUIRED_STORAGE_TYPE("required storage types are unavailable"),
Expand All @@ -101,6 +102,7 @@ private String getText() {
protected boolean considerLoad;
private boolean considerLoadByStorageType;
protected double considerLoadFactor;
private boolean considerLoadByVolume = false;
private boolean preferLocalNode;
private boolean dataNodePeerStatsEnabled;
private volatile boolean excludeSlowNodesEnabled;
Expand Down Expand Up @@ -131,6 +133,10 @@ public void initialize(Configuration conf, FSClusterStats stats,
this.considerLoadFactor = conf.getDouble(
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR,
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT);
this.considerLoadByVolume = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_KEY,
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYVOLUME_DEFAULT
);
this.stats = stats;
this.clusterMap = clusterMap;
this.host2datanodeMap = host2datanodeMap;
Expand Down Expand Up @@ -1007,6 +1013,16 @@ boolean excludeNodeByLoad(DatanodeDescriptor node){
"(load: " + nodeLoad + " > " + maxLoad + ")");
return true;
}
if (considerLoadByVolume) {
final int numVolumesAvailable = node.getNumVolumesAvailable();
final double maxLoadForVolumes = considerLoadFactor * numVolumesAvailable *
stats.getInServiceXceiverAverageForVolume();
if (maxLoadForVolumes > 0.0 && nodeLoad > maxLoadForVolumes) {
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY_BY_VOLUME,
"(load: " + nodeLoad + " > " + maxLoadForVolumes + ") ");
return true;
}
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ public Type getType() {
// HB processing can use it to tell if it is the first HB since DN restarted
private boolean heartbeatedSinceRegistration = false;

/** The number of volumes that can be written.*/
private int numVolumesAvailable = 0;

/**
* DatanodeDescriptor constructor
* @param nodeID id of the data node
Expand Down Expand Up @@ -411,6 +414,7 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
long totalNonDfsUsed = 0;
Set<String> visitedMount = new HashSet<>();
Set<DatanodeStorageInfo> failedStorageInfos = null;
int volumesAvailable = 0;

// Decide if we should check for any missing StorageReport and mark it as
// failed. There are different scenarios.
Expand Down Expand Up @@ -489,7 +493,11 @@ private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
visitedMount.add(mount);
}
}
if (report.getRemaining() > 0 && storage.getState() != State.FAILED) {
volumesAvailable += 1;
}
}
this.numVolumesAvailable = volumesAvailable;

// Update total metrics for the node.
setCapacity(totalCapacity);
Expand Down Expand Up @@ -981,6 +989,14 @@ public VolumeFailureSummary getVolumeFailureSummary() {
return volumeFailureSummary;
}

/**
* Return the number of volumes that can be written.
* @return the number of volumes that can be written.
*/
public int getNumVolumesAvailable() {
return numVolumesAvailable;
}

/**
* @param nodeReg DatanodeID to update registration for.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2101,6 +2101,17 @@ public double getInServiceXceiverAverage() {
return avgLoad;
}

@Override
public double getInServiceXceiverAverageForVolume() {
double avgLoad = 0;
final int volumes = heartbeatManager.getInServiceAvailableVolumeCount();
if (volumes > 0) {
final long xceivers = heartbeatManager.getInServiceXceiverCount();
avgLoad = (double)xceivers/volumes;
}
return avgLoad;
}

@Override
public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
return heartbeatManager.getStorageTypeStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public interface DatanodeStatistics {

/** @return number of non-decommission(ing|ed) nodes */
public int getNumDatanodesInService();


/** @return average xceiver count for writable volumes. */
int getInServiceAvailableVolumeCount();
/**
* @return the total used space by data nodes for non-DFS purposes
* such as storing temporary files on the local file system
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class DatanodeStats {

private int nodesInService = 0;
private int nodesInServiceXceiverCount = 0;
private int nodesInServiceAvailableVolumeCount = 0;
private int expiredHeartbeats = 0;

synchronized void add(final DatanodeDescriptor node) {
Expand All @@ -58,6 +59,7 @@ synchronized void add(final DatanodeDescriptor node) {
capacityRemaining += node.getRemaining();
cacheCapacity += node.getCacheCapacity();
cacheUsed += node.getCacheUsed();
nodesInServiceAvailableVolumeCount += node.getNumVolumesAvailable();
} else if (node.isDecommissionInProgress() ||
node.isEnteringMaintenance()) {
cacheCapacity += node.getCacheCapacity();
Expand Down Expand Up @@ -87,6 +89,7 @@ synchronized void subtract(final DatanodeDescriptor node) {
capacityRemaining -= node.getRemaining();
cacheCapacity -= node.getCacheCapacity();
cacheUsed -= node.getCacheUsed();
nodesInServiceAvailableVolumeCount -= node.getNumVolumesAvailable();
} else if (node.isDecommissionInProgress() ||
node.isEnteringMaintenance()) {
cacheCapacity -= node.getCacheCapacity();
Expand Down Expand Up @@ -149,6 +152,10 @@ synchronized int getNodesInServiceXceiverCount() {
return nodesInServiceXceiverCount;
}

synchronized int getNodesInServiceAvailableVolumeCount() {
return nodesInServiceAvailableVolumeCount;
}

synchronized int getExpiredHeartbeats() {
return expiredHeartbeats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,24 @@ public interface FSClusterStats {
public int getNumDatanodesInService();

/**
* an indication of the average load of non-decommission(ing|ed) nodes
* eligible for block placement
* An indication of the average load of non-decommission(ing|ed) nodes
* eligible for block placement.
*
* @return average of the in service number of block transfers and block
* writes that are currently occurring on the cluster.
*/
public double getInServiceXceiverAverage();

/**
* An indication of the average load of volumes at non-decommission(ing|ed)
* nodes eligible for block placement.
*
* @return average of in service number of block transfers and block
* writes that are currently occurring on the volumes of the
* cluster.
*/
double getInServiceXceiverAverageForVolume();

/**
* Indicates the storage statistics per storage type.
* @return storage statistics per storage type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ public int getInServiceXceiverCount() {
public int getNumDatanodesInService() {
return stats.getNodesInService();
}

@Override
public int getInServiceAvailableVolumeCount() {
return stats.getNodesInServiceAvailableVolumeCount();
}

@Override
public long getCacheCapacity() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,14 @@
</description>
</property>

<property>
<name>dfs.namenode.redundancy.considerLoadByVolume</name>
<value>false</value>
<description>Decide if chooseTarget considers the target's volume load or
not.
</description>
</property>

<property>
<name>dfs.namenode.read.considerLoad</name>
<value>false</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ abstract public class BaseReplicationPolicyTest {
protected String blockPlacementPolicy;
protected NamenodeProtocols nameNodeRpc = null;

static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
void updateHeartbeatWithUsage(DatanodeDescriptor dn,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
long dnCacheCapacity, long dnCacheUsed, int xceiverCount,
int volFailures) {
dn.getStorageInfos()[0].setUtilizationForTesting(
capacity, dfsUsed, remaining, blockPoolUsed);
dn.updateHeartbeat(
dnManager.getHeartbeatManager().updateHeartbeat(dn,
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
}
Expand Down
Loading

0 comments on commit eb8b11d

Please sign in to comment.