diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index a594d2be01ccb..5dba40f6ff5c0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -2937,8 +2937,12 @@ private void processRpcRequest(RpcRequestHeaderProto header, if (alignmentContext.isCoordinatedCall(protoName, methodName)) { call.markCallCoordinated(true); long stateId; - stateId = alignmentContext.receiveRequestState( - header, getMaxIdleTime()); + try { + stateId = alignmentContext.receiveRequestState(header, getMaxIdleTime()); + } catch (RetriableException re) { + rpcMetrics.incrRpcCallsRejectedByObserver(); + throw re; + } call.setClientStateId(stateId); if (header.hasRouterFederatedState()) { call.setFederatedNamespaceState(header.getRouterFederatedState()); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index b9be973204d21..44ae4ff8cd11c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -147,6 +147,8 @@ public static RpcMetrics create(Server server, Configuration conf) { MutableCounterLong rpcRequeueCalls; @Metric("Number of successful RPC calls") MutableCounterLong rpcCallSuccesses; + @Metric("Number of observer namenode rejected RPC calls") + MutableCounterLong rpcCallsRejectedByObserver; @Metric("Number of open connections") public int numOpenConnections() { return server.getNumOpenConnections(); @@ -363,6 +365,13 @@ public void incrRpcCallSuccesses() { rpcCallSuccesses.incr(); } + /** + * Increments the Observer NameNode rejected RPC Calls Counter. + */ + public void incrRpcCallsRejectedByObserver() { + rpcCallsRejectedByObserver.incr(); + } + /** * Returns a MutableRate Counter. * @return Mutable Rate @@ -412,6 +421,15 @@ public long getRpcRequeueCalls() { return rpcRequeueCalls.value(); } + /** + * Returns the number of observer namenode rejected RPC calls. + * @return long + */ + @VisibleForTesting + public long getRpcCallsRejectedByObserver() { + return rpcCallsRejectedByObserver.value(); + } + public MutableRate getDeferredRpcProcessingTime() { return deferredRpcProcessingTime; } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index 01d89b81356e4..01f35c3507624 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -90,6 +90,7 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc | `RpcSlowCalls` | Total number of slow RPC calls | | `RpcRequeueCalls` | Total number of requeue RPC calls | | `RpcCallsSuccesses` | Total number of RPC calls that are successfully processed | +| `RpcCallsRejectedByObserver` | Total number of RPC calls that are observer namenode rejected | | `NumOpenConnections` | Current number of open connections | | `NumInProcessHandler` | Current number of handlers on working | | `CallQueueLength` | Current length of the call queue | diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java index a0913e4c5e447..cb6ffd60f5b1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -28,6 +29,9 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -145,13 +149,29 @@ public void testMultiObserver() throws Exception { public void testObserverFallBehind() throws Exception { dfs.mkdir(testPath, FsPermission.getDefault()); assertSentTo(0); + RPC.Server clientRpcServer2 = ((NameNodeRpcServer)dfsCluster + .getNameNodeRpc(2)).getClientRpcServer(); + RpcMetrics rpcMetrics2 = clientRpcServer2.getRpcMetrics(); + assertEquals(0, rpcMetrics2.getRpcCallsRejectedByObserver()); + RPC.Server clientRpcServer3 = ((NameNodeRpcServer)dfsCluster + .getNameNodeRpc(3)).getClientRpcServer(); + RpcMetrics rpcMetrics3 = clientRpcServer3.getRpcMetrics(); + assertEquals(0, rpcMetrics3.getRpcCallsRejectedByObserver()); - // Set large state Id on the client + dfsCluster.rollEditLogAndTail(0); + dfs.getFileStatus(testPath); + assertSentTo(2, 3); + + // Set large state Id on the client. long realStateId = HATestUtil.setACStateId(dfs, 500000); dfs.getFileStatus(testPath); - // Should end up on ANN + // Should end up on ANN. assertSentTo(0); HATestUtil.setACStateId(dfs, realStateId); + + // Validate rpcCallsRejectedByObserver metric. + assertEquals(1, rpcMetrics2.getRpcCallsRejectedByObserver()); + assertEquals(1, rpcMetrics3.getRpcCallsRejectedByObserver()); } private void assertSentTo(int... nnIndices) throws IOException {