From 731537ef99dfd7e01a8f764b7f821a894ce3c90a Mon Sep 17 00:00:00 2001 From: Reid Chan Date: Thu, 22 Oct 2020 15:39:19 +0800 Subject: [PATCH 1/4] HBASE-25217 [Metrics] Add metrics for Call in IPC response queue --- .../hbase/ipc/MetricsHBaseServerSource.java | 8 ++++++++ .../ipc/MetricsHBaseServerSourceImpl.java | 20 +++++++++++++++++-- .../apache/hadoop/hbase/ipc/BufferChain.java | 4 ++++ .../hadoop/hbase/ipc/MetricsHBaseServer.java | 8 ++++++++ .../apache/hadoop/hbase/ipc/RpcServer.java | 5 +++++ 5 files changed, 43 insertions(+), 2 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index 21f457216edc..2ccf75bb2b51 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -91,6 +91,10 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { String NUM_LIFO_MODE_SWITCHES_NAME = "numLifoModeSwitches"; String NUM_LIFO_MODE_SWITCHES_DESC = "Total number of calls in general queue which " + "were served from the tail of the queue"; + String NUM_CALL_RESPONSE_QUEUE_NAME = "numCallsInResponseQueue"; + String NUM_CALL_RESPONSE_QUEUE_DESC = "Number of calls in response queue."; + String NUM_SIZE_RESPONSE_QUEUE_NAME = "numSizeInResponseQueue"; + String NUM_SIZE_RESPONSE_QUEUE_DESC = "Size in response queue."; void authorizationSuccess(); @@ -115,4 +119,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { void processedCall(int processingTime); void queuedAndProcessedCall(int totalTime); + + void addCallToResponseQueue(long size); + + void removeCallFromResponseQueue(long size); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index 97cc31af8cbe..a861821bdac2 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.ipc; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.metrics.BaseSourceImpl; import org.apache.hadoop.hbase.metrics.ExceptionTrackingSourceImpl; import org.apache.hadoop.hbase.metrics.Interns; import org.apache.hadoop.metrics2.MetricHistogram; @@ -41,7 +40,8 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl private final MutableFastCounter authenticationFallbacks; private final MutableFastCounter sentBytes; private final MutableFastCounter receivedBytes; - + private final MutableFastCounter numCallsInResponseQueue; + private final MutableFastCounter numSizeInResponseQueue; private MetricHistogram queueCallTime; private MetricHistogram processCallTime; @@ -81,6 +81,10 @@ public MetricsHBaseServerSourceImpl(String metricsName, REQUEST_SIZE_DESC); this.responseSize = this.getMetricsRegistry().newSizeHistogram(RESPONSE_SIZE_NAME, RESPONSE_SIZE_DESC); + this.numCallsInResponseQueue = this.getMetricsRegistry().newCounter(NUM_CALL_RESPONSE_QUEUE_NAME, + NUM_CALL_RESPONSE_QUEUE_DESC, 0L); + this.numSizeInResponseQueue = this.getMetricsRegistry().newCounter(NUM_SIZE_RESPONSE_QUEUE_NAME, + NUM_SIZE_RESPONSE_QUEUE_DESC, 0L); } @Override @@ -139,6 +143,18 @@ public void queuedAndProcessedCall(int totalTime) { totalCallTime.add(totalTime); } + @Override + public void addCallToResponseQueue(long size) { + numCallsInResponseQueue.incr(); + numSizeInResponseQueue.incr(size); + } + + @Override + public void removeCallFromResponseQueue(long size) { + numCallsInResponseQueue.incr(-1); + numSizeInResponseQueue.incr(-size); + } + @Override public void getMetrics(MetricsCollector metricsCollector, boolean all) { MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java index d1c43d546cdd..e93ff422452d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java @@ -67,6 +67,10 @@ boolean hasRemaining() { return remaining > 0; } + int getRemaining() { + return remaining; + } + /** * Write out our chain of buffers in chunks * @param channel Where to write diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java index fe03d4f4211d..2c05fc523686 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java @@ -86,6 +86,14 @@ void totalCall(int totalTime) { source.queuedAndProcessedCall(totalTime); } + void addCallToResponseQueue(int size) { + source.addCallToResponseQueue(size); + } + + void removeCallFromResponseQueue(int size) { + source.removeCallFromResponseQueue(size); + } + public void exception(Throwable throwable) { source.exception(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 31bb32649aad..8a9c0fe8d727 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1159,6 +1159,7 @@ private long purge(long lastPurgeTime) { } Call call = connection.responseQueue.peekFirst(); if (call != null && now > call.timestamp + purgeTimeout) { + metrics.removeCallFromResponseQueue(call.response.getRemaining()); conWithOldCalls.add(call.connection); } } @@ -1254,8 +1255,10 @@ private boolean processAllResponses(final Connection connection) throws IOExcept if (call == null) { return true; } + metrics.removeCallFromResponseQueue(call.response.getRemaining()); if (!processResponse(call)) { connection.responseQueue.addFirst(call); + metrics.addCallToResponseQueue(call.response.getRemaining()); return false; } } @@ -1286,6 +1289,7 @@ void doRespond(Call call) throws IOException { } // Too big to fit, putting ahead. call.connection.responseQueue.addFirst(call); + metrics.addCallToResponseQueue(call.response.getRemaining()); added = true; // We will register to the selector later, outside of the lock. } } finally { @@ -1295,6 +1299,7 @@ void doRespond(Call call) throws IOException { if (!added) { call.connection.responseQueue.addLast(call); + metrics.addCallToResponseQueue(call.response.getRemaining()); } call.responder.registerForWrite(call.connection); From 409c09cc856329d32d8e03de438fbfb1a41328d9 Mon Sep 17 00:00:00 2001 From: Reid Chan Date: Mon, 26 Oct 2020 16:12:31 +0800 Subject: [PATCH 2/4] Checkstyle: line length --- .../hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index a861821bdac2..785238b3ff53 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -40,7 +40,7 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl private final MutableFastCounter authenticationFallbacks; private final MutableFastCounter sentBytes; private final MutableFastCounter receivedBytes; - private final MutableFastCounter numCallsInResponseQueue; + private final MutableFastCounter numCallInResponseQueue; private final MutableFastCounter numSizeInResponseQueue; private MetricHistogram queueCallTime; @@ -81,7 +81,7 @@ public MetricsHBaseServerSourceImpl(String metricsName, REQUEST_SIZE_DESC); this.responseSize = this.getMetricsRegistry().newSizeHistogram(RESPONSE_SIZE_NAME, RESPONSE_SIZE_DESC); - this.numCallsInResponseQueue = this.getMetricsRegistry().newCounter(NUM_CALL_RESPONSE_QUEUE_NAME, + this.numCallInResponseQueue = this.getMetricsRegistry().newCounter(NUM_CALL_RESPONSE_QUEUE_NAME, NUM_CALL_RESPONSE_QUEUE_DESC, 0L); this.numSizeInResponseQueue = this.getMetricsRegistry().newCounter(NUM_SIZE_RESPONSE_QUEUE_NAME, NUM_SIZE_RESPONSE_QUEUE_DESC, 0L); @@ -145,13 +145,13 @@ public void queuedAndProcessedCall(int totalTime) { @Override public void addCallToResponseQueue(long size) { - numCallsInResponseQueue.incr(); + numCallInResponseQueue.incr(); numSizeInResponseQueue.incr(size); } @Override public void removeCallFromResponseQueue(long size) { - numCallsInResponseQueue.incr(-1); + numCallInResponseQueue.incr(-1); numSizeInResponseQueue.incr(-size); } From 1f4c90404f9102927dd6ce38038a54aa0a3f3c3a Mon Sep 17 00:00:00 2001 From: Reid Chan Date: Thu, 29 Oct 2020 11:38:13 +0800 Subject: [PATCH 3/4] Address review comments --- .../hbase/ipc/MetricsHBaseServerSource.java | 4 ++-- .../ipc/MetricsHBaseServerSourceImpl.java | 19 ++++++++++--------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index 2ccf75bb2b51..330685a803aa 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -93,8 +93,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { "were served from the tail of the queue"; String NUM_CALL_RESPONSE_QUEUE_NAME = "numCallsInResponseQueue"; String NUM_CALL_RESPONSE_QUEUE_DESC = "Number of calls in response queue."; - String NUM_SIZE_RESPONSE_QUEUE_NAME = "numSizeInResponseQueue"; - String NUM_SIZE_RESPONSE_QUEUE_DESC = "Size in response queue."; + String SIZE_RESPONSE_QUEUE_NAME = "sizeOfResponseQueue"; + String SIZE_RESPONSE_QUEUE_DESC = "Size of response queue."; void authorizationSuccess(); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index 785238b3ff53..6e90f57369ab 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -26,6 +26,7 @@ import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.lib.MutableFastCounter; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; @InterfaceAudience.Private public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl @@ -40,8 +41,8 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl private final MutableFastCounter authenticationFallbacks; private final MutableFastCounter sentBytes; private final MutableFastCounter receivedBytes; - private final MutableFastCounter numCallInResponseQueue; - private final MutableFastCounter numSizeInResponseQueue; + private final MutableGaugeLong numCallsInResponseQueue; + private final MutableGaugeLong sizeOfResponseQueue; private MetricHistogram queueCallTime; private MetricHistogram processCallTime; @@ -81,10 +82,10 @@ public MetricsHBaseServerSourceImpl(String metricsName, REQUEST_SIZE_DESC); this.responseSize = this.getMetricsRegistry().newSizeHistogram(RESPONSE_SIZE_NAME, RESPONSE_SIZE_DESC); - this.numCallInResponseQueue = this.getMetricsRegistry().newCounter(NUM_CALL_RESPONSE_QUEUE_NAME, + this.numCallsInResponseQueue = this.getMetricsRegistry().newGauge(NUM_CALL_RESPONSE_QUEUE_NAME, NUM_CALL_RESPONSE_QUEUE_DESC, 0L); - this.numSizeInResponseQueue = this.getMetricsRegistry().newCounter(NUM_SIZE_RESPONSE_QUEUE_NAME, - NUM_SIZE_RESPONSE_QUEUE_DESC, 0L); + this.sizeOfResponseQueue = this.getMetricsRegistry().newGauge(SIZE_RESPONSE_QUEUE_NAME, + SIZE_RESPONSE_QUEUE_DESC, 0L); } @Override @@ -145,14 +146,14 @@ public void queuedAndProcessedCall(int totalTime) { @Override public void addCallToResponseQueue(long size) { - numCallInResponseQueue.incr(); - numSizeInResponseQueue.incr(size); + numCallsInResponseQueue.incr(); + sizeOfResponseQueue.incr(size); } @Override public void removeCallFromResponseQueue(long size) { - numCallInResponseQueue.incr(-1); - numSizeInResponseQueue.incr(-size); + numCallsInResponseQueue.decr(); + sizeOfResponseQueue.decr(size); } @Override From 7cd1a83a7002335c29d21ec6b64cc71998d81d3d Mon Sep 17 00:00:00 2001 From: Reid Chan Date: Fri, 30 Oct 2020 18:53:48 +0800 Subject: [PATCH 4/4] Separate methods for updating responseQueueSize and numCallsInResponseQueue --- .../hbase/ipc/MetricsHBaseServerSource.java | 7 +++++-- .../hbase/ipc/MetricsHBaseServerSourceImpl.java | 11 +++++++---- .../hadoop/hbase/ipc/MetricsHBaseServer.java | 12 ++++++++---- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 17 ++++++++++++----- 4 files changed, 32 insertions(+), 15 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index 330685a803aa..b44aeb790de4 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -120,7 +120,10 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { void queuedAndProcessedCall(int totalTime); - void addCallToResponseQueue(long size); + void addCallToResponseQueue(); + + void removeCallFromResponseQueue(); + + void updateResponseQueueSize(long size); - void removeCallFromResponseQueue(long size); } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index 6e90f57369ab..77893c9b25cb 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -145,15 +145,18 @@ public void queuedAndProcessedCall(int totalTime) { } @Override - public void addCallToResponseQueue(long size) { + public void addCallToResponseQueue() { numCallsInResponseQueue.incr(); - sizeOfResponseQueue.incr(size); } @Override - public void removeCallFromResponseQueue(long size) { + public void removeCallFromResponseQueue() { numCallsInResponseQueue.decr(); - sizeOfResponseQueue.decr(size); + } + + @Override + public void updateResponseQueueSize(long size) { + sizeOfResponseQueue.incr(size); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java index 2c05fc523686..0d6914b5b392 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java @@ -86,12 +86,16 @@ void totalCall(int totalTime) { source.queuedAndProcessedCall(totalTime); } - void addCallToResponseQueue(int size) { - source.addCallToResponseQueue(size); + void addCallToResponseQueue() { + source.addCallToResponseQueue(); } - void removeCallFromResponseQueue(int size) { - source.removeCallFromResponseQueue(size); + void removeCallFromResponseQueue() { + source.removeCallFromResponseQueue(); + } + + void updateResponseQueueSize(long size) { + source.updateResponseQueueSize(size); } public void exception(Throwable throwable) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 8a9c0fe8d727..e29719e6a993 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -1159,7 +1159,8 @@ private long purge(long lastPurgeTime) { } Call call = connection.responseQueue.peekFirst(); if (call != null && now > call.timestamp + purgeTimeout) { - metrics.removeCallFromResponseQueue(call.response.getRemaining()); + metrics.removeCallFromResponseQueue(); + metrics.updateResponseQueueSize(-call.response.getRemaining()); conWithOldCalls.add(call.connection); } } @@ -1255,11 +1256,15 @@ private boolean processAllResponses(final Connection connection) throws IOExcept if (call == null) { return true; } - metrics.removeCallFromResponseQueue(call.response.getRemaining()); + long beforeProcess = call.response.getRemaining(); if (!processResponse(call)) { connection.responseQueue.addFirst(call); - metrics.addCallToResponseQueue(call.response.getRemaining()); + long afterProcess = call.response.getRemaining(); + metrics.updateResponseQueueSize(afterProcess - beforeProcess); return false; + } else { + metrics.removeCallFromResponseQueue(); + metrics.updateResponseQueueSize(-beforeProcess); } } // Check that state within the lock to be consistent @@ -1289,7 +1294,8 @@ void doRespond(Call call) throws IOException { } // Too big to fit, putting ahead. call.connection.responseQueue.addFirst(call); - metrics.addCallToResponseQueue(call.response.getRemaining()); + metrics.addCallToResponseQueue(); + metrics.updateResponseQueueSize(call.response.getRemaining()); added = true; // We will register to the selector later, outside of the lock. } } finally { @@ -1299,7 +1305,8 @@ void doRespond(Call call) throws IOException { if (!added) { call.connection.responseQueue.addLast(call); - metrics.addCallToResponseQueue(call.response.getRemaining()); + metrics.addCallToResponseQueue(); + metrics.updateResponseQueueSize(call.response.getRemaining()); } call.responder.registerForWrite(call.connection);