Skip to content

Commit

Permalink
Merge branch 'trunk' into YARN-11252
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Aug 15, 2022
2 parents f75a555 + 622ca0d commit 0c23966
Show file tree
Hide file tree
Showing 17 changed files with 344 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public abstract class Shell {
* {@value}
*/
private static final String WINDOWS_PROBLEMS =
"https://wiki.apache.org/hadoop/WindowsProblems";
"https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems";

/**
* Name of the windows utils binary: {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;

Expand All @@ -25,6 +27,7 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
Expand Down Expand Up @@ -85,6 +88,10 @@ public class NamenodeHeartbeatService extends PeriodicService {
private NNHAServiceTarget localTarget;
/** Cache HA protocol. */
private HAServiceProtocol localTargetHAProtocol;
/** Cache NN protocol. */
private NamenodeProtocol namenodeProtocol;
/** Cache Client protocol. */
private ClientProtocol clientProtocol;
/** RPC address for the namenode. */
private String rpcAddress;
/** Service RPC address for the namenode. */
Expand All @@ -100,6 +107,9 @@ public class NamenodeHeartbeatService extends PeriodicService {

private String resolvedHost;
private String originalNnId;

private int healthMonitorTimeoutMs = (int) DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;

/**
* Create a new Namenode status updater.
* @param resolver Namenode resolver service to handle NN registration.
Expand Down Expand Up @@ -211,6 +221,15 @@ protected void serviceInit(Configuration configuration) throws Exception {
DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));

long timeoutMs = conf.getTimeDuration(DFS_ROUTER_HEALTH_MONITOR_TIMEOUT,
DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
if (timeoutMs < 0) {
LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
"Using value of : 0ms instead.", timeoutMs, DFS_ROUTER_HEALTH_MONITOR_TIMEOUT);
this.healthMonitorTimeoutMs = 0;
} else {
this.healthMonitorTimeoutMs = (int) timeoutMs;
}

super.serviceInit(configuration);
}
Expand Down Expand Up @@ -309,66 +328,26 @@ protected NamenodeStatusReport getNamenodeStatusReport() {
LOG.debug("Probing NN at service address: {}", serviceAddress);

URI serviceURI = new URI("hdfs://" + serviceAddress);
// Read the filesystem info from RPC (required)
NamenodeProtocol nn = NameNodeProxies
.createProxy(this.conf, serviceURI, NamenodeProtocol.class)
.getProxy();

if (nn != null) {
NamespaceInfo info = nn.versionRequest();
if (info != null) {
report.setNamespaceInfo(info);
}
}
// Read the filesystem info from RPC (required)
updateNameSpaceInfoParameters(serviceURI, report);
if (!report.registrationValid()) {
return report;
}

// Check for safemode from the client protocol. Currently optional, but
// should be required at some point for QoS
try {
ClientProtocol client = NameNodeProxies
.createProxy(this.conf, serviceURI, ClientProtocol.class)
.getProxy();
if (client != null) {
boolean isSafeMode = client.setSafeMode(
SafeModeAction.SAFEMODE_GET, false);
report.setSafeMode(isSafeMode);
}
} catch (Exception e) {
LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
}
updateSafeModeParameters(serviceURI, report);

// Read the stats from JMX (optional)
updateJMXParameters(webAddress, report);

if (localTarget != null) {
// Try to get the HA status
try {
// Determine if NN is active
// TODO: dynamic timeout
if (localTargetHAProtocol == null) {
localTargetHAProtocol = localTarget.getHealthMonitorProxy(conf, 30*1000);
LOG.debug("Get HA status with address {}", lifelineAddress);
}
HAServiceStatus status = localTargetHAProtocol.getServiceStatus();
report.setHAServiceState(status.getState());
} catch (Throwable e) {
if (e.getMessage().startsWith("HA for namenode is not enabled")) {
LOG.error("HA for {} is not enabled", getNamenodeDesc());
localTarget = null;
} else {
// Failed to fetch HA status, ignoring failure
LOG.error("Cannot fetch HA status for {}: {}",
getNamenodeDesc(), e.getMessage(), e);
}
localTargetHAProtocol = null;
}
}
} catch(IOException e) {
// Try to get the HA status
updateHAStatusParameters(report);
} catch (IOException e) {
LOG.error("Cannot communicate with {}: {}",
getNamenodeDesc(), e.getMessage());
} catch(Throwable e) {
} catch (Throwable e) {
// Generic error that we don't know about
LOG.error("Unexpected exception while communicating with {}: {}",
getNamenodeDesc(), e.getMessage(), e);
Expand Down Expand Up @@ -399,6 +378,59 @@ private static String getNnHeartBeatServiceName(String nsId, String nnId) {
(nnId == null ? "" : " " + nnId);
}

/**
* Get the namespace information for a Namenode via RPC and add them to the report.
* @param serviceURI Server address of the Namenode to monitor.
* @param report Namenode status report updating with namespace information data.
* @throws IOException This method will throw IOException up, because RBF need
* use Namespace Info to identify this NS. If there are some IOExceptions,
* RBF doesn't need to get other information from NameNode,
* so throw IOException up.
*/
private void updateNameSpaceInfoParameters(URI serviceURI,
NamenodeStatusReport report) throws IOException {
try {
if (this.namenodeProtocol == null) {
this.namenodeProtocol = NameNodeProxies.createProxy(this.conf, serviceURI,
NamenodeProtocol.class).getProxy();
}
if (namenodeProtocol != null) {
NamespaceInfo info = namenodeProtocol.versionRequest();
if (info != null) {
report.setNamespaceInfo(info);
}
}
} catch (IOException e) {
this.namenodeProtocol = null;
throw e;
}
}

/**
* Get the safemode information for a Namenode via RPC and add them to the report.
* Safemode is only one status of NameNode and is useless for RBF identify one NameNode.
* So If there are some IOExceptions, RBF can just ignore it and try to collect
* other information form namenode continue.
* @param serviceURI Server address of the Namenode to monitor.
* @param report Namenode status report updating with safemode information data.
*/
private void updateSafeModeParameters(URI serviceURI, NamenodeStatusReport report) {
try {
if (this.clientProtocol == null) {
this.clientProtocol = NameNodeProxies
.createProxy(this.conf, serviceURI, ClientProtocol.class)
.getProxy();
}
if (clientProtocol != null) {
boolean isSafeMode = clientProtocol.setSafeMode(SafeModeAction.SAFEMODE_GET, false);
report.setSafeMode(isSafeMode);
}
} catch (Exception e) {
LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
this.clientProtocol = null;
}
}

/**
* Get the parameters for a Namenode from JMX and add them to the report.
* @param address Web interface of the Namenode to monitor.
Expand All @@ -415,6 +447,34 @@ private void updateJMXParameters(
}
}

/**
* Get the HA status for a Namenode via RPC and add them to the report.
* @param report Namenode status report updating with HA status information data.
*/
private void updateHAStatusParameters(NamenodeStatusReport report) {
if (localTarget != null) {
try {
// Determine if NN is active
if (localTargetHAProtocol == null) {
localTargetHAProtocol = localTarget.getHealthMonitorProxy(
conf, this.healthMonitorTimeoutMs);
LOG.debug("Get HA status with address {}", lifelineAddress);
}
HAServiceStatus status = localTargetHAProtocol.getServiceStatus();
report.setHAServiceState(status.getState());
} catch (Throwable e) {
if (e.getMessage().startsWith("HA for namenode is not enabled")) {
LOG.error("HA for {} is not enabled", getNamenodeDesc());
localTarget = null;
} else {
// Failed to fetch HA status, ignoring failure
LOG.error("Cannot fetch HA status for {}", getNamenodeDesc(), e);
}
localTargetHAProtocol = null;
}
}
}

/**
* Fetches NamenodeInfo metrics from namenode.
* @param address Web interface of the Namenode to monitor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_ROUTER_PREFIX + "heartbeat.interval";
public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(5);
public static final String DFS_ROUTER_HEALTH_MONITOR_TIMEOUT =
FEDERATION_ROUTER_PREFIX + "health.monitor.timeout";
public static final long DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT =
TimeUnit.SECONDS.toMillis(30);
public static final String DFS_ROUTER_MONITOR_NAMENODE =
FEDERATION_ROUTER_PREFIX + "monitor.namenode";
public static final String DFS_ROUTER_MONITOR_NAMENODE_RESOLUTION_ENABLED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,14 @@
</description>
</property>

<property>
<name>dfs.federation.router.health.monitor.timeout</name>
<value>30s</value>
<description>
Time out for Router to obtain HAServiceStatus from NameNode.
</description>
</property>

<property>
<name>dfs.federation.router.heartbeat-state.interval</name>
<value>5s</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,4 @@ public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
@VisibleForTesting
public void mockAnException() {
}

@VisibleForTesting
public void mockJNStreams() throws IOException {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3628,7 +3628,10 @@ public String getBPServiceActorInfo() {
*/
@Override // DataNodeMXBean
public String getVolumeInfo() {
Preconditions.checkNotNull(data, "Storage not yet initialized");
if (data == null) {
LOG.debug("Storage not yet initialized.");
return "";
}
return JSON.toString(data.getVolumeInfoMap());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@

import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;

import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotDeletionGc;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
Expand Down Expand Up @@ -1390,8 +1389,6 @@ void startActiveServices() throws IOException {
editLog.initJournalsForWrite();
// May need to recover
editLog.recoverUnclosedStreams();

BlockManagerFaultInjector.getInstance().mockJNStreams();

LOG.info("Catching up to latest edits from old active before " +
"taking over writer role in edits logs");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public void stop() throws IOException {
}

@VisibleForTesting
public FSEditLog getEditLog() {
FSEditLog getEditLog() {
return editLog;
}

Expand Down Expand Up @@ -311,7 +311,7 @@ public Void run() throws Exception {
startTime - lastLoadTimeMs);
// It is already under the name system lock and the checkpointer
// thread is already stopped. No need to acquire any other lock.
editsTailed = doTailEdits(false);
editsTailed = doTailEdits();
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
Expand All @@ -326,10 +326,6 @@ public Void run() throws Exception {

@VisibleForTesting
public long doTailEdits() throws IOException, InterruptedException {
return doTailEdits(true);
}

private long doTailEdits(boolean onlyDurableTxns) throws IOException, InterruptedException {
Collection<EditLogInputStream> streams;
FSImage image = namesystem.getFSImage();

Expand All @@ -338,7 +334,7 @@ private long doTailEdits(boolean onlyDurableTxns) throws IOException, Interrupte
long startTime = timer.monotonicNow();
try {
streams = editLog.selectInputStreams(lastTxnId + 1, 0,
null, inProgressOk, onlyDurableTxns);
null, inProgressOk, true);
} catch (IOException ioe) {
// This is acceptable. If we try to tail edits in the middle of an edits
// log roll, i.e. the last one has been finalized but the new inprogress
Expand Down
Loading

0 comments on commit 0c23966

Please sign in to comment.