From 00bf51c565efe38083327aee09be215f9f590461 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Mon, 26 Sep 2022 11:16:17 -0700 Subject: [PATCH 01/19] Collect growth events and bytes metrics if PQ is enabled: Java changes. --- .../logstash/ackedqueue/QueueFactoryExt.java | 21 ++++++-- .../execution/AbstractPipelineExt.java | 52 ++++++++++++++++++- .../instrument/metrics/MetricKeys.java | 8 ++- .../metrics/gauge/LazyDelegatingGauge.java | 7 +++ 4 files changed, 82 insertions(+), 6 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java index 9aa468c3a08..457d19ae4db 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java @@ -42,6 +42,21 @@ @JRubyClass(name = "QueueFactory") public final class QueueFactoryExt extends RubyBasicObject { + /** + * A static value to indicate Persistent Queue is enabled. + */ + public static String PERSISTED_TYPE = "persisted"; + + /** + * A static value to indicate Memory Queue is enabled. + */ + public static String MEMORY_TYPE = "memory"; + + /** + * A contextual name to expose the queue type. + */ + public static String CONTEXT_NAME = "queue.type"; + private static final long serialVersionUID = 1L; public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) { @@ -51,8 +66,8 @@ public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) { @JRubyMethod(meta = true) public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv, final IRubyObject settings) throws IOException { - final String type = getSetting(context, settings, "queue.type").asJavaString(); - if ("persisted".equals(type)) { + final String type = getSetting(context, settings, CONTEXT_NAME).asJavaString(); + if (PERSISTED_TYPE.equals(type)) { final Path queuePath = Paths.get( getSetting(context, settings, "path.queue").asJavaString(), getSetting(context, settings, "pipeline.id").asJavaString() @@ -77,7 +92,7 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final getSetting(context, settings, "queue.max_bytes") } ); - } else if ("memory".equals(type)) { + } else if (MEMORY_TYPE.equals(type)) { return new JrubyWrappedSynchronousQueueExt( context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS ).initialize( diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index ccc7cbdc701..a665454d2a3 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -33,9 +33,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; @@ -76,9 +78,12 @@ import org.logstash.instrument.metrics.AbstractMetricExt; import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; import org.logstash.instrument.metrics.Metric; +import org.logstash.instrument.metrics.MetricType; import org.logstash.instrument.metrics.NullMetricExt; import org.logstash.instrument.metrics.UptimeMetric; import org.logstash.instrument.metrics.counter.LongCounter; +import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge; +import org.logstash.instrument.metrics.gauge.NumberGauge; import org.logstash.instrument.metrics.FlowMetric; import org.logstash.plugins.ConfigVariableExpander; import org.logstash.plugins.factory.ExecutionContextFactoryExt; @@ -105,7 +110,7 @@ public class AbstractPipelineExt extends RubyBasicObject { private static final Logger LOGGER = LogManager.getLogger(AbstractPipelineExt.class); private static final @SuppressWarnings("rawtypes") RubyArray CAPACITY_NAMESPACE = - RubyArray.newArray(RubyUtil.RUBY, RubyUtil.RUBY.newSymbol("capacity")); + RubyArray.newArray(RubyUtil.RUBY, CAPACITY_KEY); private static final @SuppressWarnings("rawtypes") RubyArray DATA_NAMESPACE = RubyArray.newArray(RubyUtil.RUBY, RubyUtil.RUBY.newSymbol("data")); @@ -489,6 +494,8 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { final RubySymbol[] flowNamespace = buildNamespace(FLOW_KEY); final RubySymbol[] eventsNamespace = buildNamespace(EVENTS_KEY); + final RubySymbol[] queueNamespace = buildNamespace(QUEUE_KEY); + final RubySymbol[] queueCapacityNamespace = buildNamespace(QUEUE_KEY, CAPACITY_KEY); final LongCounter eventsInCounter = initOrGetCounterMetric(context, eventsNamespace, IN_KEY); final FlowMetric inputThroughput = createFlowMetric(INPUT_THROUGHPUT_KEY, eventsInCounter, uptimeInPreciseSeconds); @@ -515,6 +522,33 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { this.flowMetrics.add(concurrencyFlow); storeMetric(context, flowNamespace, concurrencyFlow); + // collect the growth_events & growth_bytes metrics if only persisted queue is enabled. + if (getSetting(context, QueueFactoryExt.CONTEXT_NAME).asJavaString() + .equals(QueueFactoryExt.PERSISTED_TYPE)) { + final Optional events = initOrGetNumberGaugeMetric(context, queueNamespace, EVENTS_KEY); + if (events.isPresent()) { + final FlowMetric growthEventsFlow = createFlowMetric(QUEUE_GROWTH_EVENTS_KEY, events.get(), uptimeInPreciseMillis); + this.flowMetrics.add(growthEventsFlow); + storeMetric(context, flowNamespace, growthEventsFlow); + } else { + LOGGER.warn(String.format("Cannot initialize `%s` metric. Retrieving `%s` number gauge from `%s` namespace(s) failed.", + QUEUE_GROWTH_EVENTS_KEY, + EVENTS_KEY, + Arrays.stream(queueNamespace).map(RubySymbol::asString).collect(Collectors.toList()))); + } + + final Optional queueSizeInBytes = initOrGetNumberGaugeMetric(context, queueCapacityNamespace, QUEUE_SIZE_IN_BYTES_KEY); + if (queueSizeInBytes.isPresent()) { + final FlowMetric growthBytesFlow = createFlowMetric(QUEUE_GROWTH_BYTES_KEY, queueSizeInBytes.get(), uptimeInPreciseMillis); + this.flowMetrics.add(growthBytesFlow); + storeMetric(context, flowNamespace, growthBytesFlow); + } else { + LOGGER.warn(String.format("Cannot initialize `%s` metric. Retrieving `%s` number gauge from `%s` namespace(s) failed.", + QUEUE_GROWTH_BYTES_KEY, + QUEUE_SIZE_IN_BYTES_KEY, + Arrays.stream(queueCapacityNamespace).map(RubySymbol::asString).collect(Collectors.toList()))); + } + } return context.nil; } @@ -540,6 +574,22 @@ private LongCounter initOrGetCounterMetric(final ThreadContext context, return retrievedMetric.toJava(LongCounter.class); } + private Optional initOrGetNumberGaugeMetric(final ThreadContext context, + final RubySymbol[] subPipelineNamespacePath, + final RubySymbol metricName) { + final IRubyObject collector = this.metric.collector(context); + final IRubyObject fullNamespace = pipelineNamespacedPath(subPipelineNamespacePath); + final IRubyObject retrievedMetric = collector.callMethod(context, "get", new IRubyObject[]{fullNamespace, metricName, context.runtime.newSymbol("gauge")}); + + LazyDelegatingGauge delegatingGauge = retrievedMetric.toJava(LazyDelegatingGauge.class); + if (MetricType.GAUGE_NUMBER.asString().equals(delegatingGauge.getType().asString()) == false) { + return Optional.empty(); + } + + NumberGauge castedNumberGauge = (NumberGauge) delegatingGauge.getMetric().get(); + return Optional.of(castedNumberGauge); + } + private UptimeMetric initOrGetUptimeMetric(final ThreadContext context, final RubySymbol[] subPipelineNamespacePath, final RubySymbol uptimeMetricName) { diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java index 21e8ca84885..bec17e34066 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java @@ -65,6 +65,8 @@ private MetricKeys() { public static final RubySymbol QUEUE_KEY = RubyUtil.RUBY.newSymbol("queue"); + public static final RubySymbol CAPACITY_KEY = RubyUtil.RUBY.newSymbol("capacity"); + public static final RubySymbol DLQ_KEY = RubyUtil.RUBY.newSymbol("dlq"); public static final RubySymbol STORAGE_POLICY_KEY = RubyUtil.RUBY.newSymbol("storage_policy"); @@ -92,7 +94,9 @@ private MetricKeys() { public static final RubySymbol WORKER_CONCURRENCY_KEY = RubyUtil.RUBY.newSymbol("worker_concurrency"); - public static final RubySymbol UPTIME_IN_SECONDS_KEY = RubyUtil.RUBY.newSymbol("uptime_in_seconds"); - public static final RubySymbol UPTIME_IN_MILLIS_KEY = RubyUtil.RUBY.newSymbol("uptime_in_millis"); + + public static final RubySymbol QUEUE_GROWTH_EVENTS_KEY = RubyUtil.RUBY.newSymbol("growth_events"); + + public static final RubySymbol QUEUE_GROWTH_BYTES_KEY = RubyUtil.RUBY.newSymbol("growth_bytes"); } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java index 1e1d995a20d..acfa9208b5b 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java @@ -27,6 +27,8 @@ import org.logstash.instrument.metrics.AbstractMetric; import org.logstash.instrument.metrics.MetricType; +import java.util.Optional; + /** * A lazy proxy to a more specific typed {@link GaugeMetric}. The metric will only be initialized if the initial value is set, or once the {@code set} operation is called. *

Intended only for use with Ruby's duck typing, Java consumers should use the specific typed {@link GaugeMetric}

@@ -71,6 +73,11 @@ public Object get() { return lazyMetric == null ? null : lazyMetric.get(); } + @SuppressWarnings("rawtypes") + public Optional getMetric() { + return Optional.of(lazyMetric); + } + @Override public MetricType getType() { return lazyMetric == null ? null : lazyMetric.getType(); From 79f6f2580194ba0a7818cc16c8fc2a34679d3d57 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Mon, 26 Sep 2022 13:36:18 -0700 Subject: [PATCH 02/19] Move queue flow under queue namespace. --- .../java/org/logstash/execution/AbstractPipelineExt.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index a665454d2a3..b5b29a5d6c5 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -496,6 +496,7 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { final RubySymbol[] eventsNamespace = buildNamespace(EVENTS_KEY); final RubySymbol[] queueNamespace = buildNamespace(QUEUE_KEY); final RubySymbol[] queueCapacityNamespace = buildNamespace(QUEUE_KEY, CAPACITY_KEY); + final RubySymbol[] queueFlowNamespace = buildNamespace(QUEUE_KEY, FLOW_KEY); final LongCounter eventsInCounter = initOrGetCounterMetric(context, eventsNamespace, IN_KEY); final FlowMetric inputThroughput = createFlowMetric(INPUT_THROUGHPUT_KEY, eventsInCounter, uptimeInPreciseSeconds); @@ -529,7 +530,7 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { if (events.isPresent()) { final FlowMetric growthEventsFlow = createFlowMetric(QUEUE_GROWTH_EVENTS_KEY, events.get(), uptimeInPreciseMillis); this.flowMetrics.add(growthEventsFlow); - storeMetric(context, flowNamespace, growthEventsFlow); + storeMetric(context, queueFlowNamespace, growthEventsFlow); } else { LOGGER.warn(String.format("Cannot initialize `%s` metric. Retrieving `%s` number gauge from `%s` namespace(s) failed.", QUEUE_GROWTH_EVENTS_KEY, @@ -541,7 +542,7 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { if (queueSizeInBytes.isPresent()) { final FlowMetric growthBytesFlow = createFlowMetric(QUEUE_GROWTH_BYTES_KEY, queueSizeInBytes.get(), uptimeInPreciseMillis); this.flowMetrics.add(growthBytesFlow); - storeMetric(context, flowNamespace, growthBytesFlow); + storeMetric(context, queueFlowNamespace, growthBytesFlow); } else { LOGGER.warn(String.format("Cannot initialize `%s` metric. Retrieving `%s` number gauge from `%s` namespace(s) failed.", QUEUE_GROWTH_BYTES_KEY, From efa6bbbff82cadc36c5c697529b6b10943f9f207 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Tue, 27 Sep 2022 14:09:12 -0700 Subject: [PATCH 03/19] Pipeline level PQ flow metrics: add unit & integration tests. --- .../spec/logstash/api/commands/stats_spec.rb | 20 +++++++++++++++++++ .../logstash/api/modules/node_stats_spec.rb | 19 ++++++++++++++++++ logstash-core/spec/support/shared_contexts.rb | 4 +++- qa/integration/specs/monitoring_api_spec.rb | 6 ++++++ 4 files changed, 48 insertions(+), 1 deletion(-) diff --git a/logstash-core/spec/logstash/api/commands/stats_spec.rb b/logstash-core/spec/logstash/api/commands/stats_spec.rb index f7ecb0ebe7d..a9a3861bd5f 100644 --- a/logstash-core/spec/logstash/api/commands/stats_spec.rb +++ b/logstash-core/spec/logstash/api/commands/stats_spec.rb @@ -181,6 +181,26 @@ :input_throughput ) end + it "returns queue metric information" do + expect(report[:main][:queue].keys).to include( + :capacity, + :events, + :type, + :data, + :flow) + expect(report[:main][:queue][:capacity].keys).to include( + :page_capacity_in_bytes, + :max_queue_size_in_bytes, + :queue_size_in_bytes, + :max_unread_events) + expect(report[:main][:queue][:data].keys).to include( + :storage_type, + :path, + :free_space_in_bytes) + expect(report[:main][:queue][:flow].keys).to include( + :growth_bytes, + :growth_events) + end end context "when using multiple pipelines" do before(:each) do diff --git a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb index ef3c0e50328..83cb8d714fd 100644 --- a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb +++ b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb @@ -124,6 +124,25 @@ "filters" => Array, "outputs" => Array, }, + "queue" => { + "capacity" => { + "page_capacity_in_bytes" => Numeric, + "max_queue_size_in_bytes" => Numeric, + "queue_size_in_bytes" => Numeric, + "max_unread_events" => Numeric + }, + "events" => Numeric, + "type" => String, + "data" => { + "storage_type" => String, + "path" => String, + "free_space_in_bytes" => Numeric + }, + "flow" => { + "growth_bytes" => Hash, + "growth_events" => Hash + } + } } }, "reloads" => { diff --git a/logstash-core/spec/support/shared_contexts.rb b/logstash-core/spec/support/shared_contexts.rb index 042869df574..ab1aa345045 100644 --- a/logstash-core/spec/support/shared_contexts.rb +++ b/logstash-core/spec/support/shared_contexts.rb @@ -43,7 +43,9 @@ @agent = make_test_agent(settings) @agent.execute @pipelines_registry = LogStash::PipelinesRegistry.new - pipeline_config = mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }") + main_pipeline_settings = mock_settings + main_pipeline_settings.set("queue.type", "persisted") + pipeline_config = mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }", main_pipeline_settings) pipeline_creator = LogStash::PipelineAction::Create.new(pipeline_config, @agent.metric) expect(pipeline_creator.execute(@agent, @pipelines_registry)).to be_truthy pipeline_config = mock_pipeline_config(:secondary, "input { generator { id => '123' } } output { null {} }") diff --git a/qa/integration/specs/monitoring_api_spec.rb b/qa/integration/specs/monitoring_api_spec.rb index 386bfcfc37e..63556fdaa13 100644 --- a/qa/integration/specs/monitoring_api_spec.rb +++ b/qa/integration/specs/monitoring_api_spec.rb @@ -190,6 +190,12 @@ expect(queue_capacity_stats["page_capacity_in_bytes"]).not_to be_nil expect(queue_capacity_stats["max_queue_size_in_bytes"]).not_to be_nil expect(queue_capacity_stats["max_unread_events"]).not_to be_nil + + queue_flow_stats = queue_stats.dig("flow") + expect(queue_flow_stats).to_not be_nil + expect(queue_flow_stats).to include( + 'growth_bytes' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0), + 'growth_events' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0)) else expect(queue_stats["type"]).to eq("memory") end From 0aee22b6688f52a5e07631805b487cde171f3f00 Mon Sep 17 00:00:00 2001 From: Mashhur Date: Wed, 28 Sep 2022 13:40:56 -0700 Subject: [PATCH 04/19] Include queue info in node stats sample. --- .../monitoring/monitoring-apis.asciidoc | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/docs/static/monitoring/monitoring-apis.asciidoc b/docs/static/monitoring/monitoring-apis.asciidoc index bd3e45ca095..16c54aa0ab9 100644 --- a/docs/static/monitoring/monitoring-apis.asciidoc +++ b/docs/static/monitoring/monitoring-apis.asciidoc @@ -820,9 +820,33 @@ Example response: "last_failure_timestamp" : null, "failures" : 0 }, - "queue" : { - "type" : "memory" - } + "queue": { + "type" : "persisted", + "capacity": { + "max_unread_events": 0, + "page_capacity_in_bytes": 67108864, + "max_queue_size_in_bytes": 1073741824, + "queue_size_in_bytes": 3885 + }, + "flow": { + "growth_bytes": { + "lifetime": 0.032, + "current": 0.0 + }, + "growth_events": { + "lifetime": 0.234, + "current": 0.012 + } + }, + "data": { + "path": "/pipeline/queue/path", + "free_space_in_bytes": 936886480896, + "storage_type": "apfs" + }, + "events": 0, + "events_count": 0, + "queue_size_in_bytes": 3885, + "max_queue_size_in_bytes": 1073741824 } } } From ab2cd72b4eb8b43ead4942952dec6367690290e6 Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Tue, 4 Oct 2022 12:07:05 -0700 Subject: [PATCH 05/19] Apply suggestions from code review Change uptime precision for PQ growth metrics to uptime seconds since PQ events are based on seconds. Co-authored-by: Ry Biesemeyer --- .../main/java/org/logstash/execution/AbstractPipelineExt.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index b5b29a5d6c5..7f622db4615 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -528,7 +528,7 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { .equals(QueueFactoryExt.PERSISTED_TYPE)) { final Optional events = initOrGetNumberGaugeMetric(context, queueNamespace, EVENTS_KEY); if (events.isPresent()) { - final FlowMetric growthEventsFlow = createFlowMetric(QUEUE_GROWTH_EVENTS_KEY, events.get(), uptimeInPreciseMillis); + final FlowMetric growthEventsFlow = createFlowMetric(QUEUE_GROWTH_EVENTS_KEY, events.get(), uptimeInPreciseSeconds); this.flowMetrics.add(growthEventsFlow); storeMetric(context, queueFlowNamespace, growthEventsFlow); } else { @@ -540,7 +540,7 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { final Optional queueSizeInBytes = initOrGetNumberGaugeMetric(context, queueCapacityNamespace, QUEUE_SIZE_IN_BYTES_KEY); if (queueSizeInBytes.isPresent()) { - final FlowMetric growthBytesFlow = createFlowMetric(QUEUE_GROWTH_BYTES_KEY, queueSizeInBytes.get(), uptimeInPreciseMillis); + final FlowMetric growthBytesFlow = createFlowMetric(QUEUE_GROWTH_BYTES_KEY, queueSizeInBytes.get(), uptimeInPreciseSeconds); this.flowMetrics.add(growthBytesFlow); storeMetric(context, queueFlowNamespace, growthBytesFlow); } else { From ebbd611cd95a0950945712168c1382d2e46ee15d Mon Sep 17 00:00:00 2001 From: mashhurs Date: Mon, 10 Oct 2022 10:08:46 -0700 Subject: [PATCH 06/19] Add safeguard when using lazy delegating gauge type. --- .../main/java/org/logstash/execution/AbstractPipelineExt.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 7f622db4615..48646017044 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.regex.Matcher; @@ -583,7 +584,8 @@ private Optional initOrGetNumberGaugeMetric(final ThreadContext con final IRubyObject retrievedMetric = collector.callMethod(context, "get", new IRubyObject[]{fullNamespace, metricName, context.runtime.newSymbol("gauge")}); LazyDelegatingGauge delegatingGauge = retrievedMetric.toJava(LazyDelegatingGauge.class); - if (MetricType.GAUGE_NUMBER.asString().equals(delegatingGauge.getType().asString()) == false) { + if (Objects.isNull(delegatingGauge.getType()) || + MetricType.GAUGE_NUMBER.asString().equals(delegatingGauge.getType().asString()) == false) { return Optional.empty(); } From ac12193f0ec82d35395c7676036e571d92229c31 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Mon, 10 Oct 2022 19:15:43 +0000 Subject: [PATCH 07/19] flow metrics: simplify generics of lazy implementation Enables interface `FlowMetrics::create` to take suppliers that _implement_ a `Metric` instead of requiring them to be pre-cast, and avoid unnecessary exposure of the metrics value-type into our lazy init. --- .../instrument/metrics/FlowMetric.java | 8 ++++---- .../metrics/LazyInstantiatedFlowMetric.java | 19 ++++++++----------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetric.java index c17c69a8fc1..ad11f04087c 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetric.java @@ -42,9 +42,9 @@ static FlowMetric create(final String name, } } - static FlowMetric create(final String name, - final Supplier> numeratorSupplier, - final Supplier> denominatorSupplier) { - return new LazyInstantiatedFlowMetric<>(name, numeratorSupplier, denominatorSupplier); + static FlowMetric create(final String name, + final Supplier> numeratorSupplier, + final Supplier> denominatorSupplier) { + return new LazyInstantiatedFlowMetric(name, numeratorSupplier, denominatorSupplier); } } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/LazyInstantiatedFlowMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/LazyInstantiatedFlowMetric.java index 14ef7dee7ba..f6dbdc25837 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/LazyInstantiatedFlowMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/LazyInstantiatedFlowMetric.java @@ -16,26 +16,23 @@ * and fully initializes when both return non-null values. * * @see FlowMetric#create(String, Supplier, Supplier) - * - * @param the numerator metric's value type - * @param the denominator metric's value type */ -public class LazyInstantiatedFlowMetric implements FlowMetric { +public class LazyInstantiatedFlowMetric implements FlowMetric { static final Logger LOGGER = LogManager.getLogger(LazyInstantiatedFlowMetric.class); private final String name; - private final AtomicReference>> numeratorSupplier; - private final AtomicReference>> denominatorSupplier; + private final AtomicReference>> numeratorSupplier; + private final AtomicReference>> denominatorSupplier; private final SetOnceReference inner = SetOnceReference.unset(); private static final Map EMPTY_MAP = Map.of(); LazyInstantiatedFlowMetric(final String name, - final Supplier> numeratorSupplier, - final Supplier> denominatorSupplier) { + final Supplier> numeratorSupplier, + final Supplier> denominatorSupplier) { this.name = name; this.numeratorSupplier = new AtomicReference<>(numeratorSupplier); this.denominatorSupplier = new AtomicReference<>(denominatorSupplier); @@ -68,10 +65,10 @@ private Optional getInner() { private Optional attemptCreateInner() { if (inner.isSet()) { return inner.asOptional(); } - final Metric numeratorMetric = numeratorSupplier.getAcquire().get(); + final Metric numeratorMetric = numeratorSupplier.getAcquire().get(); if (Objects.isNull(numeratorMetric)) { return Optional.empty(); } - final Metric denominatorMetric = denominatorSupplier.getAcquire().get(); + final Metric denominatorMetric = denominatorSupplier.getAcquire().get(); if (Objects.isNull(denominatorMetric)) { return Optional.empty(); } final FlowMetric flowMetric = FlowMetric.create(this.name, numeratorMetric, denominatorMetric); @@ -91,7 +88,7 @@ private void warnNotInitialized() { LOGGER.warn("Underlying metrics for `{}` not yet instantiated, could not capture their rates", this.name); } - private static Supplier> constantMetricSupplierFor(final Metric mm) { + private static Supplier> constantMetricSupplierFor(final Metric mm) { return () -> mm; } } From 57e4f415be74b27ba858153cbe8cca3744c39985 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Mon, 10 Oct 2022 19:16:46 +0000 Subject: [PATCH 08/19] flow metrics: use lazy init for PQ gauge-based metrics --- .../execution/AbstractPipelineExt.java | 44 ++++++++----------- 1 file changed, 19 insertions(+), 25 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 48646017044..396d1b70b90 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -36,6 +36,7 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -495,9 +496,6 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { final RubySymbol[] flowNamespace = buildNamespace(FLOW_KEY); final RubySymbol[] eventsNamespace = buildNamespace(EVENTS_KEY); - final RubySymbol[] queueNamespace = buildNamespace(QUEUE_KEY); - final RubySymbol[] queueCapacityNamespace = buildNamespace(QUEUE_KEY, CAPACITY_KEY); - final RubySymbol[] queueFlowNamespace = buildNamespace(QUEUE_KEY, FLOW_KEY); final LongCounter eventsInCounter = initOrGetCounterMetric(context, eventsNamespace, IN_KEY); final FlowMetric inputThroughput = createFlowMetric(INPUT_THROUGHPUT_KEY, eventsInCounter, uptimeInPreciseSeconds); @@ -527,29 +525,20 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { // collect the growth_events & growth_bytes metrics if only persisted queue is enabled. if (getSetting(context, QueueFactoryExt.CONTEXT_NAME).asJavaString() .equals(QueueFactoryExt.PERSISTED_TYPE)) { - final Optional events = initOrGetNumberGaugeMetric(context, queueNamespace, EVENTS_KEY); - if (events.isPresent()) { - final FlowMetric growthEventsFlow = createFlowMetric(QUEUE_GROWTH_EVENTS_KEY, events.get(), uptimeInPreciseSeconds); - this.flowMetrics.add(growthEventsFlow); - storeMetric(context, queueFlowNamespace, growthEventsFlow); - } else { - LOGGER.warn(String.format("Cannot initialize `%s` metric. Retrieving `%s` number gauge from `%s` namespace(s) failed.", - QUEUE_GROWTH_EVENTS_KEY, - EVENTS_KEY, - Arrays.stream(queueNamespace).map(RubySymbol::asString).collect(Collectors.toList()))); - } - final Optional queueSizeInBytes = initOrGetNumberGaugeMetric(context, queueCapacityNamespace, QUEUE_SIZE_IN_BYTES_KEY); - if (queueSizeInBytes.isPresent()) { - final FlowMetric growthBytesFlow = createFlowMetric(QUEUE_GROWTH_BYTES_KEY, queueSizeInBytes.get(), uptimeInPreciseSeconds); - this.flowMetrics.add(growthBytesFlow); - storeMetric(context, queueFlowNamespace, growthBytesFlow); - } else { - LOGGER.warn(String.format("Cannot initialize `%s` metric. Retrieving `%s` number gauge from `%s` namespace(s) failed.", - QUEUE_GROWTH_BYTES_KEY, - QUEUE_SIZE_IN_BYTES_KEY, - Arrays.stream(queueCapacityNamespace).map(RubySymbol::asString).collect(Collectors.toList()))); - } + final RubySymbol[] queueNamespace = buildNamespace(QUEUE_KEY); + final RubySymbol[] queueCapacityNamespace = buildNamespace(QUEUE_KEY, CAPACITY_KEY); + final RubySymbol[] queueFlowNamespace = buildNamespace(QUEUE_KEY, FLOW_KEY); + + final Supplier eventsGaugeMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueNamespace, EVENTS_KEY).orElse(null); + final FlowMetric growthEventsFlow = createFlowMetric(QUEUE_GROWTH_EVENTS_KEY, eventsGaugeMetricSupplier, () -> uptimeInPreciseSeconds); + this.flowMetrics.add(growthEventsFlow); + storeMetric(context, queueFlowNamespace, growthEventsFlow); + + final Supplier queueSizeInBytesMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueCapacityNamespace, QUEUE_SIZE_IN_BYTES_KEY).orElse(null); + final FlowMetric growthBytesFlow = createFlowMetric(QUEUE_GROWTH_BYTES_KEY, queueSizeInBytesMetricSupplier, () -> uptimeInPreciseSeconds); + this.flowMetrics.add(growthBytesFlow); + storeMetric(context, queueFlowNamespace, growthBytesFlow); } return context.nil; } @@ -565,6 +554,11 @@ private static FlowMetric createFlowMetric(final RubySymbol name, final Metric denominatorMetric) { return FlowMetric.create(name.asJavaString(), numeratorMetric, denominatorMetric); } + private static FlowMetric createFlowMetric(final RubySymbol name, + final Supplier> numeratorMetricSupplier, + final Supplier> denominatorMetricSupplier) { + return FlowMetric.create(name.asJavaString(), numeratorMetricSupplier, denominatorMetricSupplier); + } private LongCounter initOrGetCounterMetric(final ThreadContext context, final RubySymbol[] subPipelineNamespacePath, From 1fcb75bc5aa5f1a78d6f3f58e0fe0e5e6d7e95c4 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Mon, 10 Oct 2022 19:18:31 +0000 Subject: [PATCH 09/19] noop: use enum equality Avoids routing two enum values through `MetricType#toString()` and `String#equals()` when they can be compared directly. --- .../main/java/org/logstash/execution/AbstractPipelineExt.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 396d1b70b90..0a39275bbc3 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -578,8 +578,7 @@ private Optional initOrGetNumberGaugeMetric(final ThreadContext con final IRubyObject retrievedMetric = collector.callMethod(context, "get", new IRubyObject[]{fullNamespace, metricName, context.runtime.newSymbol("gauge")}); LazyDelegatingGauge delegatingGauge = retrievedMetric.toJava(LazyDelegatingGauge.class); - if (Objects.isNull(delegatingGauge.getType()) || - MetricType.GAUGE_NUMBER.asString().equals(delegatingGauge.getType().asString()) == false) { + if (Objects.isNull(delegatingGauge.getType()) || delegatingGauge.getType() != MetricType.GAUGE_NUMBER) { return Optional.empty(); } From b9d5ae74f7414b45146ebbc89b2eee344a848df5 Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Tue, 11 Oct 2022 08:06:23 -0700 Subject: [PATCH 10/19] Apply suggestions from code review Optional.ofNullable used for safe return. Doc includes real tested expected metric values. Co-authored-by: Ry Biesemeyer --- docs/static/monitoring/monitoring-apis.asciidoc | 4 ++-- .../instrument/metrics/gauge/LazyDelegatingGauge.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/static/monitoring/monitoring-apis.asciidoc b/docs/static/monitoring/monitoring-apis.asciidoc index 16c54aa0ab9..f8c0fbec0cc 100644 --- a/docs/static/monitoring/monitoring-apis.asciidoc +++ b/docs/static/monitoring/monitoring-apis.asciidoc @@ -834,8 +834,8 @@ Example response: "current": 0.0 }, "growth_events": { - "lifetime": 0.234, - "current": 0.012 + "lifetime": -1.201, + "current": -0.9913 } }, "data": { diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java index acfa9208b5b..0afaea618e8 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java @@ -75,7 +75,7 @@ public Object get() { @SuppressWarnings("rawtypes") public Optional getMetric() { - return Optional.of(lazyMetric); + return Optional.ofNullable(lazyMetric); } @Override From 78d51108a518075edad641288385b83d049732a4 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Wed, 12 Oct 2022 17:22:28 +0000 Subject: [PATCH 11/19] flow metrics: make lazy-init wraper inherit from AbstractMetric this allows the Jackson serialization annotations to work --- .../metrics/LazyInstantiatedFlowMetric.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/LazyInstantiatedFlowMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/LazyInstantiatedFlowMetric.java index f6dbdc25837..61c47f26b6d 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/LazyInstantiatedFlowMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/LazyInstantiatedFlowMetric.java @@ -17,12 +17,10 @@ * * @see FlowMetric#create(String, Supplier, Supplier) */ -public class LazyInstantiatedFlowMetric implements FlowMetric { +public class LazyInstantiatedFlowMetric extends AbstractMetric> implements FlowMetric { static final Logger LOGGER = LogManager.getLogger(LazyInstantiatedFlowMetric.class); - private final String name; - private final AtomicReference>> numeratorSupplier; private final AtomicReference>> denominatorSupplier; @@ -33,7 +31,7 @@ public class LazyInstantiatedFlowMetric implements FlowMetric { LazyInstantiatedFlowMetric(final String name, final Supplier> numeratorSupplier, final Supplier> denominatorSupplier) { - this.name = name; + super(name); this.numeratorSupplier = new AtomicReference<>(numeratorSupplier); this.denominatorSupplier = new AtomicReference<>(denominatorSupplier); } @@ -43,11 +41,6 @@ public void capture() { getInner().ifPresentOrElse(FlowMetric::capture, this::warnNotInitialized); } - @Override - public String getName() { - return this.name; - } - @Override public MetricType getType() { return MetricType.FLOW_RATE; From 34a22a3384b5b12a2d5b9a3803ced66a0dab9591 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Wed, 12 Oct 2022 17:58:49 +0000 Subject: [PATCH 12/19] flow metrics: move pipeline queue-based flows into pipeline flow namespace --- .../logstash/execution/AbstractPipelineExt.java | 10 ++++------ .../logstash/instrument/metrics/MetricKeys.java | 4 ++-- qa/integration/specs/monitoring_api_spec.rb | 15 +++++++++------ 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 0a39275bbc3..55aaa5ebdf6 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -39,7 +39,6 @@ import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.annotations.VisibleForTesting; @@ -528,17 +527,16 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { final RubySymbol[] queueNamespace = buildNamespace(QUEUE_KEY); final RubySymbol[] queueCapacityNamespace = buildNamespace(QUEUE_KEY, CAPACITY_KEY); - final RubySymbol[] queueFlowNamespace = buildNamespace(QUEUE_KEY, FLOW_KEY); final Supplier eventsGaugeMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueNamespace, EVENTS_KEY).orElse(null); - final FlowMetric growthEventsFlow = createFlowMetric(QUEUE_GROWTH_EVENTS_KEY, eventsGaugeMetricSupplier, () -> uptimeInPreciseSeconds); + final FlowMetric growthEventsFlow = createFlowMetric(QUEUE_PERSISTED_GROWTH_EVENTS_KEY, eventsGaugeMetricSupplier, () -> uptimeInPreciseSeconds); this.flowMetrics.add(growthEventsFlow); - storeMetric(context, queueFlowNamespace, growthEventsFlow); + storeMetric(context, flowNamespace, growthEventsFlow); final Supplier queueSizeInBytesMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueCapacityNamespace, QUEUE_SIZE_IN_BYTES_KEY).orElse(null); - final FlowMetric growthBytesFlow = createFlowMetric(QUEUE_GROWTH_BYTES_KEY, queueSizeInBytesMetricSupplier, () -> uptimeInPreciseSeconds); + final FlowMetric growthBytesFlow = createFlowMetric(QUEUE_PERSISTED_GROWTH_BYTES_KEY, queueSizeInBytesMetricSupplier, () -> uptimeInPreciseSeconds); this.flowMetrics.add(growthBytesFlow); - storeMetric(context, queueFlowNamespace, growthBytesFlow); + storeMetric(context, flowNamespace, growthBytesFlow); } return context.nil; } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java index bec17e34066..89a04945e5c 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java @@ -96,7 +96,7 @@ private MetricKeys() { public static final RubySymbol UPTIME_IN_MILLIS_KEY = RubyUtil.RUBY.newSymbol("uptime_in_millis"); - public static final RubySymbol QUEUE_GROWTH_EVENTS_KEY = RubyUtil.RUBY.newSymbol("growth_events"); + public static final RubySymbol QUEUE_PERSISTED_GROWTH_EVENTS_KEY = RubyUtil.RUBY.newSymbol("queue_persisted_growth_events"); - public static final RubySymbol QUEUE_GROWTH_BYTES_KEY = RubyUtil.RUBY.newSymbol("growth_bytes"); + public static final RubySymbol QUEUE_PERSISTED_GROWTH_BYTES_KEY = RubyUtil.RUBY.newSymbol("queue_persisted_growth_bytes"); } diff --git a/qa/integration/specs/monitoring_api_spec.rb b/qa/integration/specs/monitoring_api_spec.rb index 63556fdaa13..c4b50835add 100644 --- a/qa/integration/specs/monitoring_api_spec.rb +++ b/qa/integration/specs/monitoring_api_spec.rb @@ -190,12 +190,6 @@ expect(queue_capacity_stats["page_capacity_in_bytes"]).not_to be_nil expect(queue_capacity_stats["max_queue_size_in_bytes"]).not_to be_nil expect(queue_capacity_stats["max_unread_events"]).not_to be_nil - - queue_flow_stats = queue_stats.dig("flow") - expect(queue_flow_stats).to_not be_nil - expect(queue_flow_stats).to include( - 'growth_bytes' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0), - 'growth_events' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0)) else expect(queue_stats["type"]).to eq("memory") end @@ -280,6 +274,15 @@ 'filter_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0), 'output_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0) ) + if logstash_service.settings.feature_flag == "persistent_queues" + expect(flow_status).to include( + 'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Number), 'lifetime' => a_kind_of(Number)), + 'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Number), 'lifetime' => a_kind_of(Number)) + ) + else + expect(flow_status).to_not include('queue_persisted_growth_bytes') + expect(flow_status).to_not include('queue_persisted_growth_events') + end end end From 57848bcffa98bd82fecf0e9edd7a9258e75d737a Mon Sep 17 00:00:00 2001 From: mashhurs Date: Wed, 12 Oct 2022 14:14:26 -0700 Subject: [PATCH 13/19] Follow up for moving PQ growth metrics under pipeline.*.flow. - Unit and integration tests are added or fixed. - Documentation added along with sample response data --- .../monitoring/monitoring-apis.asciidoc | 55 +++++++++++++------ .../spec/logstash/api/commands/stats_spec.rb | 7 +-- .../logstash/api/modules/node_stats_spec.rb | 8 +-- .../execution/AbstractPipelineExt.java | 5 +- qa/integration/specs/reload_config_spec.rb | 10 ++++ 5 files changed, 55 insertions(+), 30 deletions(-) diff --git a/docs/static/monitoring/monitoring-apis.asciidoc b/docs/static/monitoring/monitoring-apis.asciidoc index f8c0fbec0cc..1dd10db463a 100644 --- a/docs/static/monitoring/monitoring-apis.asciidoc +++ b/docs/static/monitoring/monitoring-apis.asciidoc @@ -534,6 +534,18 @@ A pipeline with only one single-threaded input may contribute up to 1.00, a pipe Additionally, some amount of back-pressure is both _normal_ and _expected_ for pipelines that are _pulling_ data, as this back-pressure allows them to slow down and pull data at a rate its downstream pipeline can tolerate. +| `queue_persisted_growth_bytes` | +Only available when pipeline is using persisted queue. +This metric is the growth rate of bytes being pushed into the pipeline's persisted queue relative to wall-clock time (`queue.queue_size_in_bytes` / second). + +For the ideal condition, this metric should trend toward zero (or negative). + +| `queue_persisted_growth_events` | +Only available when pipeline is using persisted queue. +This metric is the growth rate of events being pushed into the pipeline's persisted queue relative to wall-clock time (`queue.events_count` / second). + +For the ideal condition, this metric should trend toward zero (or negative). + |=== Each flow stat includes rates for one or more recent windows of time: @@ -613,6 +625,14 @@ Example response: "worker_concurrency" : { "current": 1.973, "lifetime": 1.721 + }, + "queue_persisted_growth_bytes": { + "lifetime": 0.032, + "current": 0.0 + }, + "queue_persisted_growth_events": { + "lifetime": -1.201, + "current": -0.9913 } }, "plugins" : { @@ -771,6 +791,14 @@ Example response: "worker_concurrency" : { "current": 1.973, "lifetime": 1.721 + }, + "queue_persisted_growth_bytes": { + "lifetime": 0.032, + "current": 0.0 + }, + "queue_persisted_growth_events": { + "lifetime": -1.201, + "current": -0.9913 } }, "plugins" : { @@ -828,25 +856,16 @@ Example response: "max_queue_size_in_bytes": 1073741824, "queue_size_in_bytes": 3885 }, - "flow": { - "growth_bytes": { - "lifetime": 0.032, - "current": 0.0 + "data": { + "path": "/pipeline/queue/path", + "free_space_in_bytes": 936886480896, + "storage_type": "apfs" }, - "growth_events": { - "lifetime": -1.201, - "current": -0.9913 - } - }, - "data": { - "path": "/pipeline/queue/path", - "free_space_in_bytes": 936886480896, - "storage_type": "apfs" - }, - "events": 0, - "events_count": 0, - "queue_size_in_bytes": 3885, - "max_queue_size_in_bytes": 1073741824 + "events": 0, + "events_count": 0, + "queue_size_in_bytes": 3885, + "max_queue_size_in_bytes": 1073741824 + } } } } diff --git a/logstash-core/spec/logstash/api/commands/stats_spec.rb b/logstash-core/spec/logstash/api/commands/stats_spec.rb index a9a3861bd5f..f3a0b98fe95 100644 --- a/logstash-core/spec/logstash/api/commands/stats_spec.rb +++ b/logstash-core/spec/logstash/api/commands/stats_spec.rb @@ -178,7 +178,9 @@ :filter_throughput, :queue_backpressure, :worker_concurrency, - :input_throughput + :input_throughput, + :queue_persisted_growth_bytes, + :queue_persisted_growth_events ) end it "returns queue metric information" do @@ -197,9 +199,6 @@ :storage_type, :path, :free_space_in_bytes) - expect(report[:main][:queue][:flow].keys).to include( - :growth_bytes, - :growth_events) end end context "when using multiple pipelines" do diff --git a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb index 83cb8d714fd..bd930df0cc8 100644 --- a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb +++ b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb @@ -116,7 +116,9 @@ "filter_throughput" => Hash, "queue_backpressure" => Hash, "worker_concurrency" => Hash, - "input_throughput" => Hash + "input_throughput" => Hash, + "queue_persisted_growth_bytes" => Hash, + "queue_persisted_growth_events" => Hash }, "plugins" => { "inputs" => Array, @@ -137,10 +139,6 @@ "storage_type" => String, "path" => String, "free_space_in_bytes" => Numeric - }, - "flow" => { - "growth_bytes" => Hash, - "growth_events" => Hash } } } diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index 55aaa5ebdf6..873d796ad83 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -521,7 +521,7 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { this.flowMetrics.add(concurrencyFlow); storeMetric(context, flowNamespace, concurrencyFlow); - // collect the growth_events & growth_bytes metrics if only persisted queue is enabled. + // collect the queue_persisted_growth_events & queue_persisted_growth_bytes metrics if only persisted queue is enabled. if (getSetting(context, QueueFactoryExt.CONTEXT_NAME).asJavaString() .equals(QueueFactoryExt.PERSISTED_TYPE)) { @@ -580,8 +580,7 @@ private Optional initOrGetNumberGaugeMetric(final ThreadContext con return Optional.empty(); } - NumberGauge castedNumberGauge = (NumberGauge) delegatingGauge.getMetric().get(); - return Optional.of(castedNumberGauge); + return Optional.of((NumberGauge) delegatingGauge.getMetric().get()); } private UptimeMetric initOrGetUptimeMetric(final ThreadContext context, diff --git a/qa/integration/specs/reload_config_spec.rb b/qa/integration/specs/reload_config_spec.rb index 71c38e55ef9..4e34a65c5ef 100644 --- a/qa/integration/specs/reload_config_spec.rb +++ b/qa/integration/specs/reload_config_spec.rb @@ -124,6 +124,16 @@ 'filter_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0), 'output_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0) ) + + if logstash_service.settings.feature_flag == "persistent_queues" + expect(flow_status).to include( + 'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Number), 'lifetime' => a_kind_of(Number)), + 'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Number), 'lifetime' => a_kind_of(Number)) + ) + else + expect(flow_status).to_not include('queue_persisted_growth_bytes') + expect(flow_status).to_not include('queue_persisted_growth_events') + end end # check reload stats From 5332935772031e49cc77033713240a03390e5374 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Wed, 12 Oct 2022 22:03:46 +0000 Subject: [PATCH 14/19] flow: pipeline pq flow rates docs --- .../monitoring/monitoring-apis.asciidoc | 68 +++++++++++-------- 1 file changed, 41 insertions(+), 27 deletions(-) diff --git a/docs/static/monitoring/monitoring-apis.asciidoc b/docs/static/monitoring/monitoring-apis.asciidoc index 1dd10db463a..63c2961f442 100644 --- a/docs/static/monitoring/monitoring-apis.asciidoc +++ b/docs/static/monitoring/monitoring-apis.asciidoc @@ -502,6 +502,10 @@ Example response: Flow rates provide visibility into how a Logstash instance or an individual pipeline is _currently_ performing relative to _itself_ over time. This allows us to attach _meaning_ to the cumulative-value metrics that are also presented by this API, and to determine whether an instance or pipeline is behaving better or worse than it has in the past. +The following flow rates are available for the logstash process as a whole and for each of its pipelines individually. +In addition, pipelines may have <> depending on their configuration. + + [%autowidth.stretch] |=== |Flow Rate | Definition @@ -533,19 +537,6 @@ It cannot be used to compare one pipeline to another or even one process to _its A pipeline with only one single-threaded input may contribute up to 1.00, a pipeline whose inputs have hundreds of inbound connections may contribute much higher numbers to this combined value. Additionally, some amount of back-pressure is both _normal_ and _expected_ for pipelines that are _pulling_ data, as this back-pressure allows them to slow down and pull data at a rate its downstream pipeline can tolerate. - -| `queue_persisted_growth_bytes` | -Only available when pipeline is using persisted queue. -This metric is the growth rate of bytes being pushed into the pipeline's persisted queue relative to wall-clock time (`queue.queue_size_in_bytes` / second). - -For the ideal condition, this metric should trend toward zero (or negative). - -| `queue_persisted_growth_events` | -Only available when pipeline is using persisted queue. -This metric is the growth rate of events being pushed into the pipeline's persisted queue relative to wall-clock time (`queue.events_count` / second). - -For the ideal condition, this metric should trend toward zero (or negative). - |=== Each flow stat includes rates for one or more recent windows of time: @@ -583,8 +574,7 @@ including: * stats for each configured filter or output stage * info about config reload successes and failures (when <> is enabled) -* info about the persistent queue (when -<> are enabled) +* info about the persistent queue (when <> are enabled) [source,js] -------------------------------------------------- @@ -626,13 +616,13 @@ Example response: "current": 1.973, "lifetime": 1.721 }, - "queue_persisted_growth_bytes": { - "lifetime": 0.032, - "current": 0.0 + "queue_persisted_growth_bytes" : { + "current": 783100, + "lifetime": 17 }, - "queue_persisted_growth_events": { - "lifetime": -1.201, - "current": -0.9913 + "queue_persisted_growth_events" : { + "current": 11, + "lifetime": 0.003 } }, "plugins" : { @@ -792,13 +782,13 @@ Example response: "current": 1.973, "lifetime": 1.721 }, - "queue_persisted_growth_bytes": { - "lifetime": 0.032, - "current": 0.0 + "queue_persisted_growth_bytes" : { + "current": 783100, + "lifetime": 17 }, - "queue_persisted_growth_events": { - "lifetime": -1.201, - "current": -0.9913 + "queue_persisted_growth_events" : { + "current": 11, + "lifetime": 0.003 } }, "plugins" : { @@ -871,6 +861,30 @@ Example response: } -------------------------------------------------- +[discrete] +[[pipeline-flow-rates]] +===== Pipeline flow rates + +Each pipeline's entry in the API response includes a number of pipeline-scoped <> such as `input_throughput`, `worker_concurrency`, and `queue_backpressure` to provide visibility into the flow of events through the pipeline. + +When configured with a <>, the pipeline's `flow` will include additional rates to provide visibility into the health of the pipeline's persistent queue: + +[%autowidth.stretch] +|=== +|Flow Rate | Definition + +| `queue_persisted_growth_events` | +This metric is expressed in events-per-second, and is the rate of change of the number of unacknowleged events in the queue, relative to wall-clock time (`queue.events_count` / second). +A positive number indicates that the queue's event-count is growing, and a negative number indicates that the queue is shrinking. + +| `queue_persisted_growth_bytes` | +This metric is expressed in bytes-per-second, and is the rate of change of the size of the persistent queue on disk, relative to wall-clock time (`queue.queue_size_in_bytes` / second). +A positive number indicates that the queue size-on-disk is growing, and a negative number indicates that the queue is shrinking. + +NOTE: The size of a PQ on disk includes both unacknowledged events and previously-acknowledged events from pages that contain one or more unprocessed events. + This means it grows gradually as individual events are added, but shrinks in large chunks each time a whole page of processed events is reclaimed (read more: <>). +|=== + [discrete] [[reload-stats]] ==== Reload stats From 2b074a890ec42e11eae4b8eb40edba62de50cc88 Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Wed, 12 Oct 2022 15:40:18 -0700 Subject: [PATCH 15/19] Do not expect flow in the queue section of API. Metrics moved to flow section. Update logstash-core/spec/logstash/api/commands/stats_spec.rb Co-authored-by: Ry Biesemeyer --- logstash-core/spec/logstash/api/commands/stats_spec.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/logstash-core/spec/logstash/api/commands/stats_spec.rb b/logstash-core/spec/logstash/api/commands/stats_spec.rb index f3a0b98fe95..52ffc3b73f2 100644 --- a/logstash-core/spec/logstash/api/commands/stats_spec.rb +++ b/logstash-core/spec/logstash/api/commands/stats_spec.rb @@ -188,8 +188,7 @@ :capacity, :events, :type, - :data, - :flow) + :data) expect(report[:main][:queue][:capacity].keys).to include( :page_capacity_in_bytes, :max_queue_size_in_bytes, From 3c72ff3a19844af0aebf3324bd084e56455f4130 Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Wed, 12 Oct 2022 15:44:29 -0700 Subject: [PATCH 16/19] Integration test failure fix. Mistake: `flow_status` should be `pipeline_flow_stats` Co-authored-by: Ry Biesemeyer --- qa/integration/specs/reload_config_spec.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/qa/integration/specs/reload_config_spec.rb b/qa/integration/specs/reload_config_spec.rb index 4e34a65c5ef..315fc31d610 100644 --- a/qa/integration/specs/reload_config_spec.rb +++ b/qa/integration/specs/reload_config_spec.rb @@ -126,13 +126,13 @@ ) if logstash_service.settings.feature_flag == "persistent_queues" - expect(flow_status).to include( + expect(pipeline_flow_stats).to include( 'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Number), 'lifetime' => a_kind_of(Number)), 'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Number), 'lifetime' => a_kind_of(Number)) ) else - expect(flow_status).to_not include('queue_persisted_growth_bytes') - expect(flow_status).to_not include('queue_persisted_growth_events') + expect(pipeline_flow_stats).to_not include('queue_persisted_growth_bytes') + expect(pipeline_flow_stats).to_not include('queue_persisted_growth_events') end end From a188ab611c2e739326c9f02c122a667e900c6772 Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Wed, 12 Oct 2022 16:25:47 -0700 Subject: [PATCH 17/19] Integration test failures fix. Number should be Numeric in the ruby specs. Co-authored-by: Ry Biesemeyer --- qa/integration/specs/monitoring_api_spec.rb | 4 ++-- qa/integration/specs/reload_config_spec.rb | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/qa/integration/specs/monitoring_api_spec.rb b/qa/integration/specs/monitoring_api_spec.rb index c4b50835add..e92538a27d2 100644 --- a/qa/integration/specs/monitoring_api_spec.rb +++ b/qa/integration/specs/monitoring_api_spec.rb @@ -276,8 +276,8 @@ ) if logstash_service.settings.feature_flag == "persistent_queues" expect(flow_status).to include( - 'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Number), 'lifetime' => a_kind_of(Number)), - 'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Number), 'lifetime' => a_kind_of(Number)) + 'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Number)), + 'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Number)) ) else expect(flow_status).to_not include('queue_persisted_growth_bytes') diff --git a/qa/integration/specs/reload_config_spec.rb b/qa/integration/specs/reload_config_spec.rb index 315fc31d610..07ae7481ae0 100644 --- a/qa/integration/specs/reload_config_spec.rb +++ b/qa/integration/specs/reload_config_spec.rb @@ -127,8 +127,8 @@ if logstash_service.settings.feature_flag == "persistent_queues" expect(pipeline_flow_stats).to include( - 'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Number), 'lifetime' => a_kind_of(Number)), - 'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Number), 'lifetime' => a_kind_of(Number)) + 'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric)), + 'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric)) ) else expect(pipeline_flow_stats).to_not include('queue_persisted_growth_bytes') From 724852ddd2deb32e3828ebcfe40d0a037b7ae983 Mon Sep 17 00:00:00 2001 From: mashhurs Date: Wed, 12 Oct 2022 17:05:14 -0700 Subject: [PATCH 18/19] Make CI happy. --- qa/integration/specs/monitoring_api_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qa/integration/specs/monitoring_api_spec.rb b/qa/integration/specs/monitoring_api_spec.rb index e92538a27d2..e727462fcc8 100644 --- a/qa/integration/specs/monitoring_api_spec.rb +++ b/qa/integration/specs/monitoring_api_spec.rb @@ -276,8 +276,8 @@ ) if logstash_service.settings.feature_flag == "persistent_queues" expect(flow_status).to include( - 'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Number)), - 'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Number)) + 'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric)), + 'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric)) ) else expect(flow_status).to_not include('queue_persisted_growth_bytes') From c89e3692f87ed4c8d44ecae16cce2e6340a2e17b Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 13 Oct 2022 19:51:06 +0000 Subject: [PATCH 19/19] api specs: use PQ only where needed --- logstash-core/spec/logstash/api/commands/stats_spec.rb | 3 ++- logstash-core/spec/logstash/api/modules/node_stats_spec.rb | 3 ++- logstash-core/spec/support/shared_contexts.rb | 4 ++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/logstash-core/spec/logstash/api/commands/stats_spec.rb b/logstash-core/spec/logstash/api/commands/stats_spec.rb index 52ffc3b73f2..cd8936266be 100644 --- a/logstash-core/spec/logstash/api/commands/stats_spec.rb +++ b/logstash-core/spec/logstash/api/commands/stats_spec.rb @@ -18,7 +18,8 @@ require "spec_helper" describe LogStash::Api::Commands::Stats do - include_context "api setup" + # enable PQ to ensure PQ-related metrics are present + include_context "api setup", {"queue.type" => "persisted"} let(:report_method) { :run } let(:extended_pipeline) { nil } diff --git a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb index bd930df0cc8..a41f7b40b72 100644 --- a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb +++ b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb @@ -21,7 +21,8 @@ require "logstash/api/modules/node_stats" describe LogStash::Api::Modules::NodeStats do - include_context "api setup" + # enable PQ to ensure PQ-related metrics are present + include_context "api setup", {"queue.type" => "persisted"} include_examples "not found" extend ResourceDSLMethods diff --git a/logstash-core/spec/support/shared_contexts.rb b/logstash-core/spec/support/shared_contexts.rb index d31208e82f3..c685313ac28 100644 --- a/logstash-core/spec/support/shared_contexts.rb +++ b/logstash-core/spec/support/shared_contexts.rb @@ -35,7 +35,7 @@ end end -shared_context "api setup" do +shared_context "api setup" do |settings_overrides={}| ## # blocks until the condition returns true, or the limit has passed @@ -56,7 +56,7 @@ def block_until(limit_seconds, &condition) before :all do clear_data_dir - settings = mock_settings("config.reload.automatic" => true, "queue.type" => "persisted") + settings = mock_settings({"config.reload.automatic" => true}.merge(settings_overrides)) config_source = make_config_source(settings) config_source.add_pipeline('main', "input { generator {id => 'api-generator-pipeline' count => 100 } } output { dummyoutput {} }")