diff --git a/docs/static/monitoring/monitoring-apis.asciidoc b/docs/static/monitoring/monitoring-apis.asciidoc index bd3e45ca095..63c2961f442 100644 --- a/docs/static/monitoring/monitoring-apis.asciidoc +++ b/docs/static/monitoring/monitoring-apis.asciidoc @@ -502,6 +502,10 @@ Example response: Flow rates provide visibility into how a Logstash instance or an individual pipeline is _currently_ performing relative to _itself_ over time. This allows us to attach _meaning_ to the cumulative-value metrics that are also presented by this API, and to determine whether an instance or pipeline is behaving better or worse than it has in the past. +The following flow rates are available for the logstash process as a whole and for each of its pipelines individually. +In addition, pipelines may have <> depending on their configuration. + + [%autowidth.stretch] |=== |Flow Rate | Definition @@ -533,7 +537,6 @@ It cannot be used to compare one pipeline to another or even one process to _its A pipeline with only one single-threaded input may contribute up to 1.00, a pipeline whose inputs have hundreds of inbound connections may contribute much higher numbers to this combined value. Additionally, some amount of back-pressure is both _normal_ and _expected_ for pipelines that are _pulling_ data, as this back-pressure allows them to slow down and pull data at a rate its downstream pipeline can tolerate. - |=== Each flow stat includes rates for one or more recent windows of time: @@ -571,8 +574,7 @@ including: * stats for each configured filter or output stage * info about config reload successes and failures (when <> is enabled) -* info about the persistent queue (when -<> are enabled) +* info about the persistent queue (when <> are enabled) [source,js] -------------------------------------------------- @@ -613,6 +615,14 @@ Example response: "worker_concurrency" : { "current": 1.973, "lifetime": 1.721 + }, + "queue_persisted_growth_bytes" : { + "current": 783100, + "lifetime": 17 + }, + "queue_persisted_growth_events" : { + "current": 11, + "lifetime": 0.003 } }, "plugins" : { @@ -771,6 +781,14 @@ Example response: "worker_concurrency" : { "current": 1.973, "lifetime": 1.721 + }, + "queue_persisted_growth_bytes" : { + "current": 783100, + "lifetime": 17 + }, + "queue_persisted_growth_events" : { + "current": 11, + "lifetime": 0.003 } }, "plugins" : { @@ -820,14 +838,53 @@ Example response: "last_failure_timestamp" : null, "failures" : 0 }, - "queue" : { - "type" : "memory" + "queue": { + "type" : "persisted", + "capacity": { + "max_unread_events": 0, + "page_capacity_in_bytes": 67108864, + "max_queue_size_in_bytes": 1073741824, + "queue_size_in_bytes": 3885 + }, + "data": { + "path": "/pipeline/queue/path", + "free_space_in_bytes": 936886480896, + "storage_type": "apfs" + }, + "events": 0, + "events_count": 0, + "queue_size_in_bytes": 3885, + "max_queue_size_in_bytes": 1073741824 } } } } -------------------------------------------------- +[discrete] +[[pipeline-flow-rates]] +===== Pipeline flow rates + +Each pipeline's entry in the API response includes a number of pipeline-scoped <> such as `input_throughput`, `worker_concurrency`, and `queue_backpressure` to provide visibility into the flow of events through the pipeline. + +When configured with a <>, the pipeline's `flow` will include additional rates to provide visibility into the health of the pipeline's persistent queue: + +[%autowidth.stretch] +|=== +|Flow Rate | Definition + +| `queue_persisted_growth_events` | +This metric is expressed in events-per-second, and is the rate of change of the number of unacknowleged events in the queue, relative to wall-clock time (`queue.events_count` / second). +A positive number indicates that the queue's event-count is growing, and a negative number indicates that the queue is shrinking. + +| `queue_persisted_growth_bytes` | +This metric is expressed in bytes-per-second, and is the rate of change of the size of the persistent queue on disk, relative to wall-clock time (`queue.queue_size_in_bytes` / second). +A positive number indicates that the queue size-on-disk is growing, and a negative number indicates that the queue is shrinking. + +NOTE: The size of a PQ on disk includes both unacknowledged events and previously-acknowledged events from pages that contain one or more unprocessed events. + This means it grows gradually as individual events are added, but shrinks in large chunks each time a whole page of processed events is reclaimed (read more: <>). +|=== + [discrete] [[reload-stats]] ==== Reload stats diff --git a/logstash-core/spec/logstash/api/commands/stats_spec.rb b/logstash-core/spec/logstash/api/commands/stats_spec.rb index f7ecb0ebe7d..cd8936266be 100644 --- a/logstash-core/spec/logstash/api/commands/stats_spec.rb +++ b/logstash-core/spec/logstash/api/commands/stats_spec.rb @@ -18,7 +18,8 @@ require "spec_helper" describe LogStash::Api::Commands::Stats do - include_context "api setup" + # enable PQ to ensure PQ-related metrics are present + include_context "api setup", {"queue.type" => "persisted"} let(:report_method) { :run } let(:extended_pipeline) { nil } @@ -178,9 +179,27 @@ :filter_throughput, :queue_backpressure, :worker_concurrency, - :input_throughput + :input_throughput, + :queue_persisted_growth_bytes, + :queue_persisted_growth_events ) end + it "returns queue metric information" do + expect(report[:main][:queue].keys).to include( + :capacity, + :events, + :type, + :data) + expect(report[:main][:queue][:capacity].keys).to include( + :page_capacity_in_bytes, + :max_queue_size_in_bytes, + :queue_size_in_bytes, + :max_unread_events) + expect(report[:main][:queue][:data].keys).to include( + :storage_type, + :path, + :free_space_in_bytes) + end end context "when using multiple pipelines" do before(:each) do diff --git a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb index ef3c0e50328..a41f7b40b72 100644 --- a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb +++ b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb @@ -21,7 +21,8 @@ require "logstash/api/modules/node_stats" describe LogStash::Api::Modules::NodeStats do - include_context "api setup" + # enable PQ to ensure PQ-related metrics are present + include_context "api setup", {"queue.type" => "persisted"} include_examples "not found" extend ResourceDSLMethods @@ -116,7 +117,9 @@ "filter_throughput" => Hash, "queue_backpressure" => Hash, "worker_concurrency" => Hash, - "input_throughput" => Hash + "input_throughput" => Hash, + "queue_persisted_growth_bytes" => Hash, + "queue_persisted_growth_events" => Hash }, "plugins" => { "inputs" => Array, @@ -124,6 +127,21 @@ "filters" => Array, "outputs" => Array, }, + "queue" => { + "capacity" => { + "page_capacity_in_bytes" => Numeric, + "max_queue_size_in_bytes" => Numeric, + "queue_size_in_bytes" => Numeric, + "max_unread_events" => Numeric + }, + "events" => Numeric, + "type" => String, + "data" => { + "storage_type" => String, + "path" => String, + "free_space_in_bytes" => Numeric + } + } } }, "reloads" => { diff --git a/logstash-core/spec/support/shared_contexts.rb b/logstash-core/spec/support/shared_contexts.rb index 5eb3f16a56b..c685313ac28 100644 --- a/logstash-core/spec/support/shared_contexts.rb +++ b/logstash-core/spec/support/shared_contexts.rb @@ -35,7 +35,7 @@ end end -shared_context "api setup" do +shared_context "api setup" do |settings_overrides={}| ## # blocks until the condition returns true, or the limit has passed @@ -56,7 +56,7 @@ def block_until(limit_seconds, &condition) before :all do clear_data_dir - settings = mock_settings("config.reload.automatic" => true) + settings = mock_settings({"config.reload.automatic" => true}.merge(settings_overrides)) config_source = make_config_source(settings) config_source.add_pipeline('main', "input { generator {id => 'api-generator-pipeline' count => 100 } } output { dummyoutput {} }") diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java index 9aa468c3a08..457d19ae4db 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java @@ -42,6 +42,21 @@ @JRubyClass(name = "QueueFactory") public final class QueueFactoryExt extends RubyBasicObject { + /** + * A static value to indicate Persistent Queue is enabled. + */ + public static String PERSISTED_TYPE = "persisted"; + + /** + * A static value to indicate Memory Queue is enabled. + */ + public static String MEMORY_TYPE = "memory"; + + /** + * A contextual name to expose the queue type. + */ + public static String CONTEXT_NAME = "queue.type"; + private static final long serialVersionUID = 1L; public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) { @@ -51,8 +66,8 @@ public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) { @JRubyMethod(meta = true) public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv, final IRubyObject settings) throws IOException { - final String type = getSetting(context, settings, "queue.type").asJavaString(); - if ("persisted".equals(type)) { + final String type = getSetting(context, settings, CONTEXT_NAME).asJavaString(); + if (PERSISTED_TYPE.equals(type)) { final Path queuePath = Paths.get( getSetting(context, settings, "path.queue").asJavaString(), getSetting(context, settings, "pipeline.id").asJavaString() @@ -77,7 +92,7 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final getSetting(context, settings, "queue.max_bytes") } ); - } else if ("memory".equals(type)) { + } else if (MEMORY_TYPE.equals(type)) { return new JrubyWrappedSynchronousQueueExt( context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS ).initialize( diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index ccc7cbdc701..873d796ad83 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -33,7 +33,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.UUID; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -76,9 +79,12 @@ import org.logstash.instrument.metrics.AbstractMetricExt; import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; import org.logstash.instrument.metrics.Metric; +import org.logstash.instrument.metrics.MetricType; import org.logstash.instrument.metrics.NullMetricExt; import org.logstash.instrument.metrics.UptimeMetric; import org.logstash.instrument.metrics.counter.LongCounter; +import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge; +import org.logstash.instrument.metrics.gauge.NumberGauge; import org.logstash.instrument.metrics.FlowMetric; import org.logstash.plugins.ConfigVariableExpander; import org.logstash.plugins.factory.ExecutionContextFactoryExt; @@ -105,7 +111,7 @@ public class AbstractPipelineExt extends RubyBasicObject { private static final Logger LOGGER = LogManager.getLogger(AbstractPipelineExt.class); private static final @SuppressWarnings("rawtypes") RubyArray CAPACITY_NAMESPACE = - RubyArray.newArray(RubyUtil.RUBY, RubyUtil.RUBY.newSymbol("capacity")); + RubyArray.newArray(RubyUtil.RUBY, CAPACITY_KEY); private static final @SuppressWarnings("rawtypes") RubyArray DATA_NAMESPACE = RubyArray.newArray(RubyUtil.RUBY, RubyUtil.RUBY.newSymbol("data")); @@ -515,6 +521,23 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) { this.flowMetrics.add(concurrencyFlow); storeMetric(context, flowNamespace, concurrencyFlow); + // collect the queue_persisted_growth_events & queue_persisted_growth_bytes metrics if only persisted queue is enabled. + if (getSetting(context, QueueFactoryExt.CONTEXT_NAME).asJavaString() + .equals(QueueFactoryExt.PERSISTED_TYPE)) { + + final RubySymbol[] queueNamespace = buildNamespace(QUEUE_KEY); + final RubySymbol[] queueCapacityNamespace = buildNamespace(QUEUE_KEY, CAPACITY_KEY); + + final Supplier eventsGaugeMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueNamespace, EVENTS_KEY).orElse(null); + final FlowMetric growthEventsFlow = createFlowMetric(QUEUE_PERSISTED_GROWTH_EVENTS_KEY, eventsGaugeMetricSupplier, () -> uptimeInPreciseSeconds); + this.flowMetrics.add(growthEventsFlow); + storeMetric(context, flowNamespace, growthEventsFlow); + + final Supplier queueSizeInBytesMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueCapacityNamespace, QUEUE_SIZE_IN_BYTES_KEY).orElse(null); + final FlowMetric growthBytesFlow = createFlowMetric(QUEUE_PERSISTED_GROWTH_BYTES_KEY, queueSizeInBytesMetricSupplier, () -> uptimeInPreciseSeconds); + this.flowMetrics.add(growthBytesFlow); + storeMetric(context, flowNamespace, growthBytesFlow); + } return context.nil; } @@ -529,6 +552,11 @@ private static FlowMetric createFlowMetric(final RubySymbol name, final Metric denominatorMetric) { return FlowMetric.create(name.asJavaString(), numeratorMetric, denominatorMetric); } + private static FlowMetric createFlowMetric(final RubySymbol name, + final Supplier> numeratorMetricSupplier, + final Supplier> denominatorMetricSupplier) { + return FlowMetric.create(name.asJavaString(), numeratorMetricSupplier, denominatorMetricSupplier); + } private LongCounter initOrGetCounterMetric(final ThreadContext context, final RubySymbol[] subPipelineNamespacePath, @@ -540,6 +568,21 @@ private LongCounter initOrGetCounterMetric(final ThreadContext context, return retrievedMetric.toJava(LongCounter.class); } + private Optional initOrGetNumberGaugeMetric(final ThreadContext context, + final RubySymbol[] subPipelineNamespacePath, + final RubySymbol metricName) { + final IRubyObject collector = this.metric.collector(context); + final IRubyObject fullNamespace = pipelineNamespacedPath(subPipelineNamespacePath); + final IRubyObject retrievedMetric = collector.callMethod(context, "get", new IRubyObject[]{fullNamespace, metricName, context.runtime.newSymbol("gauge")}); + + LazyDelegatingGauge delegatingGauge = retrievedMetric.toJava(LazyDelegatingGauge.class); + if (Objects.isNull(delegatingGauge.getType()) || delegatingGauge.getType() != MetricType.GAUGE_NUMBER) { + return Optional.empty(); + } + + return Optional.of((NumberGauge) delegatingGauge.getMetric().get()); + } + private UptimeMetric initOrGetUptimeMetric(final ThreadContext context, final RubySymbol[] subPipelineNamespacePath, final RubySymbol uptimeMetricName) { diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetric.java index c17c69a8fc1..ad11f04087c 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/FlowMetric.java @@ -42,9 +42,9 @@ static FlowMetric create(final String name, } } - static FlowMetric create(final String name, - final Supplier> numeratorSupplier, - final Supplier> denominatorSupplier) { - return new LazyInstantiatedFlowMetric<>(name, numeratorSupplier, denominatorSupplier); + static FlowMetric create(final String name, + final Supplier> numeratorSupplier, + final Supplier> denominatorSupplier) { + return new LazyInstantiatedFlowMetric(name, numeratorSupplier, denominatorSupplier); } } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/LazyInstantiatedFlowMetric.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/LazyInstantiatedFlowMetric.java index 14ef7dee7ba..61c47f26b6d 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/LazyInstantiatedFlowMetric.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/LazyInstantiatedFlowMetric.java @@ -16,27 +16,22 @@ * and fully initializes when both return non-null values. * * @see FlowMetric#create(String, Supplier, Supplier) - * - * @param the numerator metric's value type - * @param the denominator metric's value type */ -public class LazyInstantiatedFlowMetric implements FlowMetric { +public class LazyInstantiatedFlowMetric extends AbstractMetric> implements FlowMetric { static final Logger LOGGER = LogManager.getLogger(LazyInstantiatedFlowMetric.class); - private final String name; - - private final AtomicReference>> numeratorSupplier; - private final AtomicReference>> denominatorSupplier; + private final AtomicReference>> numeratorSupplier; + private final AtomicReference>> denominatorSupplier; private final SetOnceReference inner = SetOnceReference.unset(); private static final Map EMPTY_MAP = Map.of(); LazyInstantiatedFlowMetric(final String name, - final Supplier> numeratorSupplier, - final Supplier> denominatorSupplier) { - this.name = name; + final Supplier> numeratorSupplier, + final Supplier> denominatorSupplier) { + super(name); this.numeratorSupplier = new AtomicReference<>(numeratorSupplier); this.denominatorSupplier = new AtomicReference<>(denominatorSupplier); } @@ -46,11 +41,6 @@ public void capture() { getInner().ifPresentOrElse(FlowMetric::capture, this::warnNotInitialized); } - @Override - public String getName() { - return this.name; - } - @Override public MetricType getType() { return MetricType.FLOW_RATE; @@ -68,10 +58,10 @@ private Optional getInner() { private Optional attemptCreateInner() { if (inner.isSet()) { return inner.asOptional(); } - final Metric numeratorMetric = numeratorSupplier.getAcquire().get(); + final Metric numeratorMetric = numeratorSupplier.getAcquire().get(); if (Objects.isNull(numeratorMetric)) { return Optional.empty(); } - final Metric denominatorMetric = denominatorSupplier.getAcquire().get(); + final Metric denominatorMetric = denominatorSupplier.getAcquire().get(); if (Objects.isNull(denominatorMetric)) { return Optional.empty(); } final FlowMetric flowMetric = FlowMetric.create(this.name, numeratorMetric, denominatorMetric); @@ -91,7 +81,7 @@ private void warnNotInitialized() { LOGGER.warn("Underlying metrics for `{}` not yet instantiated, could not capture their rates", this.name); } - private static Supplier> constantMetricSupplierFor(final Metric mm) { + private static Supplier> constantMetricSupplierFor(final Metric mm) { return () -> mm; } } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java index 21e8ca84885..89a04945e5c 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/MetricKeys.java @@ -65,6 +65,8 @@ private MetricKeys() { public static final RubySymbol QUEUE_KEY = RubyUtil.RUBY.newSymbol("queue"); + public static final RubySymbol CAPACITY_KEY = RubyUtil.RUBY.newSymbol("capacity"); + public static final RubySymbol DLQ_KEY = RubyUtil.RUBY.newSymbol("dlq"); public static final RubySymbol STORAGE_POLICY_KEY = RubyUtil.RUBY.newSymbol("storage_policy"); @@ -92,7 +94,9 @@ private MetricKeys() { public static final RubySymbol WORKER_CONCURRENCY_KEY = RubyUtil.RUBY.newSymbol("worker_concurrency"); - public static final RubySymbol UPTIME_IN_SECONDS_KEY = RubyUtil.RUBY.newSymbol("uptime_in_seconds"); - public static final RubySymbol UPTIME_IN_MILLIS_KEY = RubyUtil.RUBY.newSymbol("uptime_in_millis"); + + public static final RubySymbol QUEUE_PERSISTED_GROWTH_EVENTS_KEY = RubyUtil.RUBY.newSymbol("queue_persisted_growth_events"); + + public static final RubySymbol QUEUE_PERSISTED_GROWTH_BYTES_KEY = RubyUtil.RUBY.newSymbol("queue_persisted_growth_bytes"); } diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java index 1e1d995a20d..0afaea618e8 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/gauge/LazyDelegatingGauge.java @@ -27,6 +27,8 @@ import org.logstash.instrument.metrics.AbstractMetric; import org.logstash.instrument.metrics.MetricType; +import java.util.Optional; + /** * A lazy proxy to a more specific typed {@link GaugeMetric}. The metric will only be initialized if the initial value is set, or once the {@code set} operation is called. *

Intended only for use with Ruby's duck typing, Java consumers should use the specific typed {@link GaugeMetric}

@@ -71,6 +73,11 @@ public Object get() { return lazyMetric == null ? null : lazyMetric.get(); } + @SuppressWarnings("rawtypes") + public Optional getMetric() { + return Optional.ofNullable(lazyMetric); + } + @Override public MetricType getType() { return lazyMetric == null ? null : lazyMetric.getType(); diff --git a/qa/integration/specs/monitoring_api_spec.rb b/qa/integration/specs/monitoring_api_spec.rb index 386bfcfc37e..e727462fcc8 100644 --- a/qa/integration/specs/monitoring_api_spec.rb +++ b/qa/integration/specs/monitoring_api_spec.rb @@ -274,6 +274,15 @@ 'filter_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0), 'output_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0) ) + if logstash_service.settings.feature_flag == "persistent_queues" + expect(flow_status).to include( + 'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric)), + 'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric)) + ) + else + expect(flow_status).to_not include('queue_persisted_growth_bytes') + expect(flow_status).to_not include('queue_persisted_growth_events') + end end end diff --git a/qa/integration/specs/reload_config_spec.rb b/qa/integration/specs/reload_config_spec.rb index 71c38e55ef9..07ae7481ae0 100644 --- a/qa/integration/specs/reload_config_spec.rb +++ b/qa/integration/specs/reload_config_spec.rb @@ -124,6 +124,16 @@ 'filter_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0), 'output_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0) ) + + if logstash_service.settings.feature_flag == "persistent_queues" + expect(pipeline_flow_stats).to include( + 'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric)), + 'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric)) + ) + else + expect(pipeline_flow_stats).to_not include('queue_persisted_growth_bytes') + expect(pipeline_flow_stats).to_not include('queue_persisted_growth_events') + end end # check reload stats