Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2937,8 +2937,12 @@ private void processRpcRequest(RpcRequestHeaderProto header,
if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
call.markCallCoordinated(true);
long stateId;
stateId = alignmentContext.receiveRequestState(
header, getMaxIdleTime());
try {
stateId = alignmentContext.receiveRequestState(header, getMaxIdleTime());
} catch (RetriableException re) {
rpcMetrics.incrRpcCallsRejectedByObserver();
throw re;
}
call.setClientStateId(stateId);
if (header.hasRouterFederatedState()) {
call.setFederatedNamespaceState(header.getRouterFederatedState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public static RpcMetrics create(Server server, Configuration conf) {
MutableCounterLong rpcRequeueCalls;
@Metric("Number of successful RPC calls")
MutableCounterLong rpcCallSuccesses;
@Metric("Number of observer namenode rejected RPC calls")
MutableCounterLong rpcCallsRejectedByObserver;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @haiyang1987 for your report. And thanks @simbadzina @goiri for your review.

I'm just concern if it's reasonable that rpcCallsRejectedByObserver is in RpcMetrics.java. I think rpcCallsRejectedByObserver is a logic of namenode, so how about changing this logic as a HDFS issue, not common issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ZanderXu for your comment.
@goiri @simbadzina What do you think? If necessary, I will modify it into an HDFS issue to achieve this requirement.
thanks~


@Metric("Number of open connections") public int numOpenConnections() {
return server.getNumOpenConnections();
Expand Down Expand Up @@ -363,6 +365,13 @@ public void incrRpcCallSuccesses() {
rpcCallSuccesses.incr();
}

/**
* Increments the Observer NameNode rejected RPC Calls Counter.
*/
public void incrRpcCallsRejectedByObserver() {
rpcCallsRejectedByObserver.incr();
}

/**
* Returns a MutableRate Counter.
* @return Mutable Rate
Expand Down Expand Up @@ -412,6 +421,15 @@ public long getRpcRequeueCalls() {
return rpcRequeueCalls.value();
}

/**
* Returns the number of observer namenode rejected RPC calls.
* @return long
*/
@VisibleForTesting
public long getRpcCallsRejectedByObserver() {
return rpcCallsRejectedByObserver.value();
}

public MutableRate getDeferredRpcProcessingTime() {
return deferredRpcProcessingTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc
| `RpcSlowCalls` | Total number of slow RPC calls |
| `RpcRequeueCalls` | Total number of requeue RPC calls |
| `RpcCallsSuccesses` | Total number of RPC calls that are successfully processed |
| `RpcCallsRejectedByObserver` | Total number of RPC calls that are observer namenode rejected |
| `NumOpenConnections` | Current number of open connections |
| `NumInProcessHandler` | Current number of handlers on working |
| `CallQueueLength` | Current length of the call queue |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode.ha;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
Expand All @@ -28,6 +29,9 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -145,13 +149,29 @@ public void testMultiObserver() throws Exception {
public void testObserverFallBehind() throws Exception {
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
RPC.Server clientRpcServer2 = ((NameNodeRpcServer)dfsCluster
.getNameNodeRpc(2)).getClientRpcServer();
RpcMetrics rpcMetrics2 = clientRpcServer2.getRpcMetrics();
assertEquals(0, rpcMetrics2.getRpcCallsRejectedByObserver());
RPC.Server clientRpcServer3 = ((NameNodeRpcServer)dfsCluster
.getNameNodeRpc(3)).getClientRpcServer();
RpcMetrics rpcMetrics3 = clientRpcServer3.getRpcMetrics();
assertEquals(0, rpcMetrics3.getRpcCallsRejectedByObserver());

// Set large state Id on the client
dfsCluster.rollEditLogAndTail(0);
dfs.getFileStatus(testPath);
assertSentTo(2, 3);

// Set large state Id on the client.
long realStateId = HATestUtil.setACStateId(dfs, 500000);
dfs.getFileStatus(testPath);
// Should end up on ANN
// Should end up on ANN.
assertSentTo(0);
HATestUtil.setACStateId(dfs, realStateId);

// Validate rpcCallsRejectedByObserver metric.
assertEquals(1, rpcMetrics2.getRpcCallsRejectedByObserver());
assertEquals(1, rpcMetrics3.getRpcCallsRejectedByObserver());
}

private void assertSentTo(int... nnIndices) throws IOException {
Expand Down