diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index 21f457216edc..b44aeb790de4 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -91,6 +91,10 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { String NUM_LIFO_MODE_SWITCHES_NAME = "numLifoModeSwitches"; String NUM_LIFO_MODE_SWITCHES_DESC = "Total number of calls in general queue which " + "were served from the tail of the queue"; + String NUM_CALL_RESPONSE_QUEUE_NAME = "numCallsInResponseQueue"; + String NUM_CALL_RESPONSE_QUEUE_DESC = "Number of calls in response queue."; + String SIZE_RESPONSE_QUEUE_NAME = "sizeOfResponseQueue"; + String SIZE_RESPONSE_QUEUE_DESC = "Size of response queue."; void authorizationSuccess(); @@ -115,4 +119,11 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { void processedCall(int processingTime); void queuedAndProcessedCall(int totalTime); + + void addCallToResponseQueue(); + + void removeCallFromResponseQueue(); + + void updateResponseQueueSize(long size); + } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index 97cc31af8cbe..77893c9b25cb 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -20,13 +20,13 @@ package org.apache.hadoop.hbase.ipc; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.metrics.BaseSourceImpl; import org.apache.hadoop.hbase.metrics.ExceptionTrackingSourceImpl; import org.apache.hadoop.hbase.metrics.Interns; import org.apache.hadoop.metrics2.MetricHistogram; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; @InterfaceAudience.Private public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl @@ -41,7 +41,8 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl private final MutableFastCounter authenticationFallbacks; private final MutableFastCounter sentBytes; private final MutableFastCounter receivedBytes; - + private final MutableGaugeLong numCallsInResponseQueue; + private final MutableGaugeLong sizeOfResponseQueue; private MetricHistogram queueCallTime; private MetricHistogram processCallTime; @@ -81,6 +82,10 @@ public MetricsHBaseServerSourceImpl(String metricsName, REQUEST_SIZE_DESC); this.responseSize = this.getMetricsRegistry().newSizeHistogram(RESPONSE_SIZE_NAME, RESPONSE_SIZE_DESC); + this.numCallsInResponseQueue = this.getMetricsRegistry().newGauge(NUM_CALL_RESPONSE_QUEUE_NAME, + NUM_CALL_RESPONSE_QUEUE_DESC, 0L); + this.sizeOfResponseQueue = this.getMetricsRegistry().newGauge(SIZE_RESPONSE_QUEUE_NAME, + SIZE_RESPONSE_QUEUE_DESC, 0L); } @Override @@ -139,6 +144,21 @@ public void queuedAndProcessedCall(int totalTime) { totalCallTime.add(totalTime); } + @Override + public void addCallToResponseQueue() { + numCallsInResponseQueue.incr(); + } + + @Override + public void removeCallFromResponseQueue() { + numCallsInResponseQueue.decr(); + } + + @Override + public void updateResponseQueueSize(long size) { + sizeOfResponseQueue.incr(size); + } + @Override public void getMetrics(MetricsCollector metricsCollector, boolean all) { MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java index d1c43d546cdd..e93ff422452d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java @@ -67,6 +67,10 @@ boolean hasRemaining() { return remaining > 0; } + int getRemaining() { + return remaining; + } + /** * Write out our chain of buffers in chunks * @param channel Where to write diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java index fe03d4f4211d..0d6914b5b392 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java @@ -86,6 +86,18 @@ void totalCall(int totalTime) { source.queuedAndProcessedCall(totalTime); } + void addCallToResponseQueue() { + source.addCallToResponseQueue(); + } + + void removeCallFromResponseQueue() { + source.removeCallFromResponseQueue(); + } + + void updateResponseQueueSize(long size) { + source.updateResponseQueueSize(size); + } + public void exception(Throwable throwable) { source.exception(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 31bb32649aad..e29719e6a993 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1159,6 +1159,8 @@ private long purge(long lastPurgeTime) { } Call call = connection.responseQueue.peekFirst(); if (call != null && now > call.timestamp + purgeTimeout) { + metrics.removeCallFromResponseQueue(); + metrics.updateResponseQueueSize(-call.response.getRemaining()); conWithOldCalls.add(call.connection); } } @@ -1254,9 +1256,15 @@ private boolean processAllResponses(final Connection connection) throws IOExcept if (call == null) { return true; } + long beforeProcess = call.response.getRemaining(); if (!processResponse(call)) { connection.responseQueue.addFirst(call); + long afterProcess = call.response.getRemaining(); + metrics.updateResponseQueueSize(afterProcess - beforeProcess); return false; + } else { + metrics.removeCallFromResponseQueue(); + metrics.updateResponseQueueSize(-beforeProcess); } } // Check that state within the lock to be consistent @@ -1286,6 +1294,8 @@ void doRespond(Call call) throws IOException { } // Too big to fit, putting ahead. call.connection.responseQueue.addFirst(call); + metrics.addCallToResponseQueue(); + metrics.updateResponseQueueSize(call.response.getRemaining()); added = true; // We will register to the selector later, outside of the lock. } } finally { @@ -1295,6 +1305,8 @@ void doRespond(Call call) throws IOException { if (!added) { call.connection.responseQueue.addLast(call); + metrics.addCallToResponseQueue(); + metrics.updateResponseQueueSize(call.response.getRemaining()); } call.responder.registerForWrite(call.connection);