From 846f7e7ad7675da3a9ef3a06de459a5b601ae546 Mon Sep 17 00:00:00 2001 From: TessaIO Date: Sat, 27 Apr 2024 18:06:42 +0200 Subject: [PATCH] feat: add the number of merge buffers used for druid emitter Signed-off-by: TessaIO --- docs/operations/metrics.md | 3 +++ .../druid/collections/BlockingPool.java | 5 ++++ .../collections/DefaultBlockingPool.java | 6 +++++ .../druid/collections/DummyBlockingPool.java | 6 +++++ .../MetricsEmittingMergingBlockingPool.java | 25 +++++++++++++++++++ .../apache/druid/query/TestBufferPool.java | 6 +++++ .../druid/guice/DruidProcessingModule.java | 10 +++++--- 7 files changed, 58 insertions(+), 3 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/MetricsEmittingMergingBlockingPool.java diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index a877d8b8522d..6dd8010cc9c0 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -61,6 +61,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/success/count`|Number of queries successfully processed.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | +|`query/merge/buffersUsed`|number of merge buffers allocated to broker while performing groupBy merge queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will resend the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.||Varies| @@ -106,6 +107,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/success/count`|Number of queries successfully processed.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| +|`query/merge/buffersUsed`|number of merge buffers allocated to historical while performing groupBy merge queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| @@ -123,6 +125,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| +|`query/merge/buffersUsed`|number of merge buffers allocated to historical while performing groupBy merge queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| ### Jetty diff --git a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java index 4fb3ff66d8bf..2ea9e5a5244c 100644 --- a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java @@ -49,4 +49,9 @@ public interface BlockingPool * @return count of pending requests */ long getPendingRequests(); + + /** + * @return number of buffers used/polled from the pool at that time. + */ + int getUsedBufferCount(); } diff --git a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java index e41a9e5d75d4..ef81ecb2295d 100644 --- a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java @@ -212,4 +212,10 @@ private void offer(T theObject) lock.unlock(); } } + + @Override + public int getUsedBufferCount() + { + return maxSize - objects.size(); + } } diff --git a/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java b/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java index 2553f9ab425f..2d70d8cadd18 100644 --- a/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java @@ -61,4 +61,10 @@ public long getPendingRequests() { return 0; } + + @Override + public int getUsedBufferCount() + { + return 0; + } } diff --git a/processing/src/main/java/org/apache/druid/query/MetricsEmittingMergingBlockingPool.java b/processing/src/main/java/org/apache/druid/query/MetricsEmittingMergingBlockingPool.java new file mode 100644 index 000000000000..b58b6a8efaa9 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/MetricsEmittingMergingBlockingPool.java @@ -0,0 +1,25 @@ +package org.apache.druid.query; + +import com.google.common.base.Supplier; +import org.apache.druid.collections.DefaultBlockingPool; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; + +public class MetricsEmittingMergingBlockingPool extends DefaultBlockingPool + implements ExecutorServiceMonitor.MetricEmitter{ + + public MetricsEmittingMergingBlockingPool( + Supplier generator, + int limit, + ExecutorServiceMonitor executorServiceMonitor + ) { + super(generator, limit); + executorServiceMonitor.add(this); + } + + @Override + public void emitMetrics(ServiceEmitter emitter, ServiceMetricEvent.Builder metricBuilder) + { + emitter.emit(metricBuilder.build("query/merge/buffersUsed", getUsedBufferCount())); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java index a650437f83f0..ee137ee56f89 100644 --- a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java +++ b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java @@ -147,4 +147,10 @@ public Collection getOutstandingExceptionsCreated() { return takenFromMap.values(); } + + @Override + public int getUsedBufferCount() + { + return Integer.MAX_VALUE; + } } diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java index a2daa25e214a..d87367a651c4 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java @@ -126,12 +126,16 @@ public NonBlockingPool getIntermediateResultsPool(DruidProcessingCon @Provides @LazySingleton @Merging - public BlockingPool getMergeBufferPool(DruidProcessingConfig config) + public BlockingPool getMergeBufferPool( + DruidProcessingConfig config, + ExecutorServiceMonitor executorServiceMonitor + ) { verifyDirectMemory(config); - return new DefaultBlockingPool<>( + return new MetricsEmittingMergingBlockingPool<>( new OffheapBufferGenerator("result merging", config.intermediateComputeSizeBytes()), - config.getNumMergeBuffers() + config.getNumMergeBuffers(), + executorServiceMonitor ); }