Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String NUM_LIFO_MODE_SWITCHES_NAME = "numLifoModeSwitches";
String NUM_LIFO_MODE_SWITCHES_DESC = "Total number of calls in general queue which " +
"were served from the tail of the queue";
String NUM_CALL_RESPONSE_QUEUE_NAME = "numCallsInResponseQueue";
String NUM_CALL_RESPONSE_QUEUE_DESC = "Number of calls in response queue.";
String SIZE_RESPONSE_QUEUE_NAME = "sizeOfResponseQueue";
String SIZE_RESPONSE_QUEUE_DESC = "Size of response queue.";

void authorizationSuccess();

Expand All @@ -115,4 +119,11 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
void processedCall(int processingTime);

void queuedAndProcessedCall(int totalTime);

void addCallToResponseQueue();

void removeCallFromResponseQueue();

void updateResponseQueueSize(long size);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
package org.apache.hadoop.hbase.ipc;

import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.hbase.metrics.ExceptionTrackingSourceImpl;
import org.apache.hadoop.hbase.metrics.Interns;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;

@InterfaceAudience.Private
public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl
Expand All @@ -41,7 +41,8 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl
private final MutableFastCounter authenticationFallbacks;
private final MutableFastCounter sentBytes;
private final MutableFastCounter receivedBytes;

private final MutableGaugeLong numCallsInResponseQueue;
private final MutableGaugeLong sizeOfResponseQueue;

private MetricHistogram queueCallTime;
private MetricHistogram processCallTime;
Expand Down Expand Up @@ -81,6 +82,10 @@ public MetricsHBaseServerSourceImpl(String metricsName,
REQUEST_SIZE_DESC);
this.responseSize = this.getMetricsRegistry().newSizeHistogram(RESPONSE_SIZE_NAME,
RESPONSE_SIZE_DESC);
this.numCallsInResponseQueue = this.getMetricsRegistry().newGauge(NUM_CALL_RESPONSE_QUEUE_NAME,
NUM_CALL_RESPONSE_QUEUE_DESC, 0L);
this.sizeOfResponseQueue = this.getMetricsRegistry().newGauge(SIZE_RESPONSE_QUEUE_NAME,
SIZE_RESPONSE_QUEUE_DESC, 0L);
}

@Override
Expand Down Expand Up @@ -139,6 +144,21 @@ public void queuedAndProcessedCall(int totalTime) {
totalCallTime.add(totalTime);
}

@Override
public void addCallToResponseQueue() {
numCallsInResponseQueue.incr();
}

@Override
public void removeCallFromResponseQueue() {
numCallsInResponseQueue.decr();
}

@Override
public void updateResponseQueueSize(long size) {
sizeOfResponseQueue.incr(size);
}

@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ boolean hasRemaining() {
return remaining > 0;
}

int getRemaining() {
return remaining;
}

/**
* Write out our chain of buffers in chunks
* @param channel Where to write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ void totalCall(int totalTime) {
source.queuedAndProcessedCall(totalTime);
}

void addCallToResponseQueue() {
source.addCallToResponseQueue();
}

void removeCallFromResponseQueue() {
source.removeCallFromResponseQueue();
}

void updateResponseQueueSize(long size) {
source.updateResponseQueueSize(size);
}

public void exception(Throwable throwable) {
source.exception();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,8 @@ private long purge(long lastPurgeTime) {
}
Call call = connection.responseQueue.peekFirst();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, peekFirst doesn't remove element.

if (call != null && now > call.timestamp + purgeTimeout) {
metrics.removeCallFromResponseQueue();
metrics.updateResponseQueueSize(-call.response.getRemaining());
conWithOldCalls.add(call.connection);
}
}
Expand Down Expand Up @@ -1254,9 +1256,15 @@ private boolean processAllResponses(final Connection connection) throws IOExcept
if (call == null) {
return true;
}
long beforeProcess = call.response.getRemaining();
if (!processResponse(call)) {
connection.responseQueue.addFirst(call);
long afterProcess = call.response.getRemaining();
metrics.updateResponseQueueSize(afterProcess - beforeProcess);
return false;
} else {
metrics.removeCallFromResponseQueue();
metrics.updateResponseQueueSize(-beforeProcess);
}
}
// Check that state within the lock to be consistent
Expand Down Expand Up @@ -1286,6 +1294,8 @@ void doRespond(Call call) throws IOException {
}
// Too big to fit, putting ahead.
call.connection.responseQueue.addFirst(call);
metrics.addCallToResponseQueue();
metrics.updateResponseQueueSize(call.response.getRemaining());
added = true; // We will register to the selector later, outside of the lock.
}
} finally {
Expand All @@ -1295,6 +1305,8 @@ void doRespond(Call call) throws IOException {

if (!added) {
call.connection.responseQueue.addLast(call);
metrics.addCallToResponseQueue();
metrics.updateResponseQueueSize(call.response.getRemaining());
}
call.responder.registerForWrite(call.connection);

Expand Down