diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestProcessorPool.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestProcessorPool.java index 463ebf1ec8..a997380035 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestProcessorPool.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestProcessorPool.java @@ -38,6 +38,8 @@ final class RequestProcessorPool { private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorPool.class); + private static final String PROCESSOR_INDEX = "processor_index"; + private final RequestChannel[] requestChannels; private final RequestProcessor[] processors; @@ -54,10 +56,16 @@ public RequestProcessorPool( RequestHandler[] requestHandlers = initializeRequestHandlers(protocols, service); for (int i = 0; i < numProcessors; i++) { - requestChannels[i] = new RequestChannel(totalQueueCapacity / numProcessors); + RequestChannel requestChannel = new RequestChannel(totalQueueCapacity / numProcessors); + requestChannels[i] = requestChannel; // bind processor to a single channel to make requests from the // same channel processed serializable - processors[i] = new RequestProcessor(i, requestChannels[i], service, requestHandlers); + processors[i] = new RequestProcessor(i, requestChannel, service, requestHandlers); + requestsMetrics.gauge( + PROCESSOR_INDEX, + String.valueOf(i), + MetricNames.REQUEST_QUEUE_SIZE, + requestChannel::requestsCount); } // register requestQueueSize metrics requestsMetrics.gauge(MetricNames.REQUEST_QUEUE_SIZE, this::getRequestQueueSize); diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestsMetrics.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestsMetrics.java index 5bf9ab0d72..592eaf30d9 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestsMetrics.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/RequestsMetrics.java @@ -85,6 +85,11 @@ > void gauge(String name, G gauge) { requestMetricGroup.gauge(name, gauge); } + /** Create a gauge metric in a child group of the request metric group. */ + > void gauge(String groupKey, String groupValue, String name, G gauge) { + requestMetricGroup.addGroup(groupKey, groupValue).gauge(name, gauge); + } + /** Add a metric group for given request name. */ private void addMetrics(MetricGroup parentMetricGroup, String requestName) { metricsByRequest.put( diff --git a/website/docs/maintenance/observability/monitor-metrics.md b/website/docs/maintenance/observability/monitor-metrics.md index 961d0bc8f3..d40e666422 100644 --- a/website/docs/maintenance/observability/monitor-metrics.md +++ b/website/docs/maintenance/observability/monitor-metrics.md @@ -639,19 +639,31 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM - coordinator + coordinator request requestQueueSize The CoordinatorServer node network waiting queue size. Gauge - tabletserver + request_processor_index + requestQueueSize + The CoordinatorServer node network waiting queue size labeled with processor_index. + Gauge + + + tabletserver request requestQueueSize The TabletServer node network waiting queue size. Gauge + + request_processor_index + requestQueueSize + The TabletServer node network waiting queue size labeled with processor_index. + Gauge + request_produceLog @@ -1172,4 +1184,4 @@ All metrics are registered under the `fluss.tieringService` metric group, which Meter - \ No newline at end of file +