Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2228,5 +2228,10 @@ public DatanodeStorageReport[] getDatanodeStorageReport(
}
return reports;
}

public void setMaxSlowPeersToReport(int maxSlowPeersToReport) {
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
slowPeerTracker.setMaxSlowPeersToReport(maxSlowPeersToReport);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class SlowPeerTracker {
* Number of nodes to include in JSON report. We will return nodes with
* the highest number of votes from peers.
*/
private final int maxNodesToReport;
private volatile int maxNodesToReport;

/**
* Information about peers that have reported a node as being slow.
Expand All @@ -104,9 +104,8 @@ public SlowPeerTracker(Configuration conf, Timer timer) {
DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS) * 3;
this.maxNodesToReport = conf.getInt(
DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY,
DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT);
this.setMaxSlowPeersToReport(conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY,
DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT));
}

/**
Expand Down Expand Up @@ -282,6 +281,10 @@ long getReportValidityMs() {
return reportValidityMs;
}

public synchronized void setMaxSlowPeersToReport(int maxSlowPeersToReport) {
this.maxNodesToReport = maxSlowPeersToReport;
}

private static class LatencyWithLastReportTime {
private final Long time;
private final OutlierMetrics latency;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE;
Expand Down Expand Up @@ -343,7 +345,8 @@ public enum OperationCategory {
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY,
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY,
DFS_BLOCK_INVALIDATE_LIMIT_KEY,
DFS_DATANODE_PEER_STATS_ENABLED_KEY));
DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY));

private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
Expand Down Expand Up @@ -2212,7 +2215,8 @@ protected String reconfigurePropertyImpl(String property, String newVal)
} else if (property.equals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY) || (property.equals(
DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY)) || (property.equals(
DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY)) || (property.equals(
DFS_DATANODE_PEER_STATS_ENABLED_KEY))) {
DFS_DATANODE_PEER_STATS_ENABLED_KEY)) || property.equals(
DFS_DATANODE_MAX_NODES_TO_REPORT_KEY)) {
return reconfigureSlowNodesParameters(datanodeManager, property, newVal);
} else if (property.equals(DFS_BLOCK_INVALIDATE_LIMIT_KEY)) {
return reconfigureBlockInvalidateLimit(datanodeManager, property, newVal);
Expand Down Expand Up @@ -2446,6 +2450,13 @@ String reconfigureSlowNodesParameters(final DatanodeManager datanodeManager,
datanodeManager.initSlowPeerTracker(getConf(), timer, peerStatsEnabled);
break;
}
case DFS_DATANODE_MAX_NODES_TO_REPORT_KEY: {
int maxSlowPeersToReport = (newVal == null
? DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT : Integer.parseInt(newVal));
result = Integer.toString(maxSlowPeersToReport);
datanodeManager.setMaxSlowPeersToReport(maxSlowPeersToReport);
break;
}
default: {
throw new IllegalArgumentException(
"Unexpected property " + property + " in reconfigureSlowNodesParameters");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
package org.apache.hadoop.hdfs.server.namenode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.junit.Test;
import org.junit.Before;
import org.junit.After;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
import static org.junit.Assert.*;

Expand All @@ -40,7 +43,9 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;

Expand Down Expand Up @@ -509,6 +514,55 @@ public void testSlowPeerTrackerEnabled() throws Exception {

}

@Test
public void testSlowPeerMaxNodesToReportReconf() throws Exception {
final NameNode nameNode = cluster.getNameNode();
final DatanodeManager datanodeManager = nameNode.namesystem.getBlockManager()
.getDatanodeManager();
nameNode.reconfigurePropertyImpl(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
assertTrue("SlowNode tracker is still disabled. Reconfiguration could not be successful",
datanodeManager.getSlowPeerTracker().isSlowPeerTrackerEnabled());

SlowPeerTracker tracker = datanodeManager.getSlowPeerTracker();

OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 1.1);
tracker.addReport("node1", "node70", outlierMetrics1);
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 1.23);
tracker.addReport("node2", "node71", outlierMetrics2);
OutlierMetrics outlierMetrics3 = new OutlierMetrics(0.0, 0.0, 0.0, 2.13);
tracker.addReport("node3", "node72", outlierMetrics3);
OutlierMetrics outlierMetrics4 = new OutlierMetrics(0.0, 0.0, 0.0, 1.244);
tracker.addReport("node4", "node73", outlierMetrics4);
OutlierMetrics outlierMetrics5 = new OutlierMetrics(0.0, 0.0, 0.0, 0.2);
tracker.addReport("node5", "node74", outlierMetrics4);
OutlierMetrics outlierMetrics6 = new OutlierMetrics(0.0, 0.0, 0.0, 1.244);
tracker.addReport("node6", "node75", outlierMetrics4);

String jsonReport = tracker.getJson();
LOG.info("Retrieved slow peer json report: {}", jsonReport);

List<Boolean> containReport = validatePeerReport(jsonReport);
assertEquals(1, containReport.stream().filter(reportVal -> !reportVal).count());

nameNode.reconfigurePropertyImpl(DFS_DATANODE_MAX_NODES_TO_REPORT_KEY, "2");
jsonReport = tracker.getJson();
LOG.info("Retrieved slow peer json report: {}", jsonReport);

containReport = validatePeerReport(jsonReport);
assertEquals(4, containReport.stream().filter(reportVal -> !reportVal).count());
}

private List<Boolean> validatePeerReport(String jsonReport) {
List<Boolean> containReport = new ArrayList<>();
containReport.add(jsonReport.contains("node1"));
containReport.add(jsonReport.contains("node2"));
containReport.add(jsonReport.contains("node3"));
containReport.add(jsonReport.contains("node4"));
containReport.add(jsonReport.contains("node5"));
containReport.add(jsonReport.contains("node6"));
return containReport;
}

@After
public void shutDown() throws IOException {
if (cluster != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
Expand Down Expand Up @@ -429,18 +430,19 @@ public void testNameNodeGetReconfigurableProperties() throws IOException, Interr
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
assertEquals(18, outs.size());
assertEquals(19, outs.size());
assertTrue(outs.get(0).contains("Reconfigurable properties:"));
assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));
assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(3));
assertEquals(DFS_DATANODE_PEER_STATS_ENABLED_KEY, outs.get(4));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(5));
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(6));
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(7));
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(8));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(9));
assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(10));
assertEquals(DFS_DATANODE_MAX_NODES_TO_REPORT_KEY, outs.get(4));
assertEquals(DFS_DATANODE_PEER_STATS_ENABLED_KEY, outs.get(5));
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(6));
assertEquals(DFS_IMAGE_PARALLEL_LOAD_KEY, outs.get(7));
assertEquals(DFS_NAMENODE_AVOID_SLOW_DATANODE_FOR_READ_KEY, outs.get(8));
assertEquals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_EXCLUDE_SLOW_NODES_ENABLED_KEY, outs.get(9));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(10));
assertEquals(DFS_NAMENODE_MAX_SLOWPEER_COLLECT_NODES_KEY, outs.get(11));
assertEquals(errs.size(), 0);
}

Expand Down