Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect queue growth events and bytes metrics when PQ is enabled. #14554

Merged
merged 24 commits into from Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
00bf51c
Collect growth events and bytes metrics if PQ is enabled: Java changes.
mashhurs Sep 26, 2022
79f6f25
Move queue flow under queue namespace.
mashhurs Sep 26, 2022
efa6bbb
Pipeline level PQ flow metrics: add unit & integration tests.
mashhurs Sep 27, 2022
0aee22b
Include queue info in node stats sample.
mashhurs Sep 28, 2022
ab2cd72
Apply suggestions from code review
mashhurs Oct 4, 2022
ebbd611
Add safeguard when using lazy delegating gauge type.
mashhurs Oct 10, 2022
ac12193
flow metrics: simplify generics of lazy implementation
yaauie Oct 10, 2022
57e4f41
flow metrics: use lazy init for PQ gauge-based metrics
yaauie Oct 10, 2022
1fcb75b
noop: use enum equality
yaauie Oct 10, 2022
5dbf383
Merge pull request #1 from yaauie/flow-metrics-pq
mashhurs Oct 11, 2022
b9d5ae7
Apply suggestions from code review
mashhurs Oct 11, 2022
2074d68
Merge branch 'main' into feature/flow-metrics-integration-pq
mashhurs Oct 11, 2022
78d5110
flow metrics: make lazy-init wraper inherit from AbstractMetric
yaauie Oct 12, 2022
34a22a3
flow metrics: move pipeline queue-based flows into pipeline flow name…
yaauie Oct 12, 2022
358864a
Merge pull request #2 from yaauie/flow-metrics-pq-in-pipeline-namespace
mashhurs Oct 12, 2022
57848bc
Follow up for moving PQ growth metrics under pipeline.*.flow.
mashhurs Oct 12, 2022
5332935
flow: pipeline pq flow rates docs
yaauie Oct 12, 2022
2b074a8
Do not expect flow in the queue section of API. Metrics moved to flow…
mashhurs Oct 12, 2022
3c72ff3
Integration test failure fix.
mashhurs Oct 12, 2022
a188ab6
Integration test failures fix.
mashhurs Oct 12, 2022
724852d
Make CI happy.
mashhurs Oct 13, 2022
c6a772c
Merge pull request #3 from yaauie/flow-metrics-pq-docs
mashhurs Oct 13, 2022
c89e369
api specs: use PQ only where needed
yaauie Oct 13, 2022
816ea2c
Merge pull request #4 from yaauie/flow-metrics-fewer-shared-pq-tests
mashhurs Oct 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 45 additions & 2 deletions docs/static/monitoring/monitoring-apis.asciidoc
Expand Up @@ -534,6 +534,18 @@ A pipeline with only one single-threaded input may contribute up to 1.00, a pipe

Additionally, some amount of back-pressure is both _normal_ and _expected_ for pipelines that are _pulling_ data, as this back-pressure allows them to slow down and pull data at a rate its downstream pipeline can tolerate.

| `queue_persisted_growth_bytes` |
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaauie, I need your review these important lines. Want to be sure every single point we are going to deliver to the end-user shouldn't be missed. Should be concise!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet. I had been working on the docs and have just rebased and made a PR against your branch -> mashhurs#3

Only available when pipeline is using persisted queue.
This metric is the growth rate of bytes being pushed into the pipeline's persisted queue relative to wall-clock time (`queue.queue_size_in_bytes` / second).

For the ideal condition, this metric should trend toward zero (or negative).

| `queue_persisted_growth_events` |
Only available when pipeline is using persisted queue.
This metric is the growth rate of events being pushed into the pipeline's persisted queue relative to wall-clock time (`queue.events_count` / second).

For the ideal condition, this metric should trend toward zero (or negative).

|===

Each flow stat includes rates for one or more recent windows of time:
Expand Down Expand Up @@ -613,6 +625,14 @@ Example response:
"worker_concurrency" : {
"current": 1.973,
"lifetime": 1.721
},
"queue_persisted_growth_bytes": {
"lifetime": 0.032,
"current": 0.0
},
"queue_persisted_growth_events": {
"lifetime": -1.201,
"current": -0.9913
}
},
"plugins" : {
Expand Down Expand Up @@ -771,6 +791,14 @@ Example response:
"worker_concurrency" : {
"current": 1.973,
"lifetime": 1.721
},
"queue_persisted_growth_bytes": {
"lifetime": 0.032,
"current": 0.0
},
"queue_persisted_growth_events": {
"lifetime": -1.201,
"current": -0.9913
}
},
"plugins" : {
Expand Down Expand Up @@ -820,8 +848,23 @@ Example response:
"last_failure_timestamp" : null,
"failures" : 0
},
"queue" : {
"type" : "memory"
"queue": {
"type" : "persisted",
"capacity": {
"max_unread_events": 0,
"page_capacity_in_bytes": 67108864,
"max_queue_size_in_bytes": 1073741824,
"queue_size_in_bytes": 3885
},
"data": {
"path": "/pipeline/queue/path",
"free_space_in_bytes": 936886480896,
"storage_type": "apfs"
},
"events": 0,
"events_count": 0,
"queue_size_in_bytes": 3885,
"max_queue_size_in_bytes": 1073741824
}
}
}
Expand Down
21 changes: 20 additions & 1 deletion logstash-core/spec/logstash/api/commands/stats_spec.rb
Expand Up @@ -178,9 +178,28 @@
:filter_throughput,
:queue_backpressure,
:worker_concurrency,
:input_throughput
:input_throughput,
:queue_persisted_growth_bytes,
:queue_persisted_growth_events
)
end
it "returns queue metric information" do
expect(report[:main][:queue].keys).to include(
:capacity,
:events,
:type,
:data,
:flow)
mashhurs marked this conversation as resolved.
Show resolved Hide resolved
expect(report[:main][:queue][:capacity].keys).to include(
:page_capacity_in_bytes,
:max_queue_size_in_bytes,
:queue_size_in_bytes,
:max_unread_events)
expect(report[:main][:queue][:data].keys).to include(
:storage_type,
:path,
:free_space_in_bytes)
end
end
context "when using multiple pipelines" do
before(:each) do
Expand Down
19 changes: 18 additions & 1 deletion logstash-core/spec/logstash/api/modules/node_stats_spec.rb
Expand Up @@ -116,14 +116,31 @@
"filter_throughput" => Hash,
"queue_backpressure" => Hash,
"worker_concurrency" => Hash,
"input_throughput" => Hash
"input_throughput" => Hash,
"queue_persisted_growth_bytes" => Hash,
"queue_persisted_growth_events" => Hash
},
"plugins" => {
"inputs" => Array,
"codecs" => Array,
"filters" => Array,
"outputs" => Array,
},
"queue" => {
"capacity" => {
"page_capacity_in_bytes" => Numeric,
"max_queue_size_in_bytes" => Numeric,
"queue_size_in_bytes" => Numeric,
"max_unread_events" => Numeric
},
"events" => Numeric,
"type" => String,
"data" => {
"storage_type" => String,
"path" => String,
"free_space_in_bytes" => Numeric
}
}
}
},
"reloads" => {
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/spec/support/shared_contexts.rb
Expand Up @@ -56,7 +56,7 @@ def block_until(limit_seconds, &condition)

before :all do
clear_data_dir
settings = mock_settings("config.reload.automatic" => true)
settings = mock_settings("config.reload.automatic" => true, "queue.type" => "persisted")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have separate PQ and MQ tests in CI. Will this override them so that we only test PQ? Should we instead guard our specs to only validate the presence of our PQ-related metrics when the PQ is active?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only helps us to validate the structure shape defined in node_stats_spec.rb node stats API expects.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: don't enable the PQ for all of the shared tests in order to validate behavior of the PQ in just one of those tests. I recently worked with this shared context so I can see if I can find a way to limit the scope in which we have to incur the cost of spooling up PQ's.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR here -> mashhurs#4

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great approach to apply specific setting through the context:

include_context "api setup", {"queue.type" => "persisted"}

config_source = make_config_source(settings)
config_source.add_pipeline('main', "input { generator {id => 'api-generator-pipeline' count => 100 } } output { dummyoutput {} }")

Expand Down
Expand Up @@ -42,6 +42,21 @@
@JRubyClass(name = "QueueFactory")
public final class QueueFactoryExt extends RubyBasicObject {

/**
* A static value to indicate Persistent Queue is enabled.
*/
public static String PERSISTED_TYPE = "persisted";

/**
* A static value to indicate Memory Queue is enabled.
*/
public static String MEMORY_TYPE = "memory";

/**
* A contextual name to expose the queue type.
*/
public static String CONTEXT_NAME = "queue.type";

private static final long serialVersionUID = 1L;

public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) {
Expand All @@ -51,8 +66,8 @@ public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) {
@JRubyMethod(meta = true)
public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv,
final IRubyObject settings) throws IOException {
final String type = getSetting(context, settings, "queue.type").asJavaString();
if ("persisted".equals(type)) {
final String type = getSetting(context, settings, CONTEXT_NAME).asJavaString();
if (PERSISTED_TYPE.equals(type)) {
final Path queuePath = Paths.get(
getSetting(context, settings, "path.queue").asJavaString(),
getSetting(context, settings, "pipeline.id").asJavaString()
Expand All @@ -77,7 +92,7 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final
getSetting(context, settings, "queue.max_bytes")
}
);
} else if ("memory".equals(type)) {
} else if (MEMORY_TYPE.equals(type)) {
return new JrubyWrappedSynchronousQueueExt(
context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS
).initialize(
Expand Down
Expand Up @@ -33,7 +33,10 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
Expand Down Expand Up @@ -76,9 +79,12 @@
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.Metric;
import org.logstash.instrument.metrics.MetricType;
import org.logstash.instrument.metrics.NullMetricExt;
import org.logstash.instrument.metrics.UptimeMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.instrument.metrics.gauge.LazyDelegatingGauge;
import org.logstash.instrument.metrics.gauge.NumberGauge;
import org.logstash.instrument.metrics.FlowMetric;
import org.logstash.plugins.ConfigVariableExpander;
import org.logstash.plugins.factory.ExecutionContextFactoryExt;
Expand All @@ -105,7 +111,7 @@ public class AbstractPipelineExt extends RubyBasicObject {
private static final Logger LOGGER = LogManager.getLogger(AbstractPipelineExt.class);

private static final @SuppressWarnings("rawtypes") RubyArray CAPACITY_NAMESPACE =
RubyArray.newArray(RubyUtil.RUBY, RubyUtil.RUBY.newSymbol("capacity"));
RubyArray.newArray(RubyUtil.RUBY, CAPACITY_KEY);

private static final @SuppressWarnings("rawtypes") RubyArray DATA_NAMESPACE =
RubyArray.newArray(RubyUtil.RUBY, RubyUtil.RUBY.newSymbol("data"));
Expand Down Expand Up @@ -515,6 +521,23 @@ public final IRubyObject initializeFlowMetrics(final ThreadContext context) {
this.flowMetrics.add(concurrencyFlow);
storeMetric(context, flowNamespace, concurrencyFlow);

// collect the queue_persisted_growth_events & queue_persisted_growth_bytes metrics if only persisted queue is enabled.
if (getSetting(context, QueueFactoryExt.CONTEXT_NAME).asJavaString()
.equals(QueueFactoryExt.PERSISTED_TYPE)) {

final RubySymbol[] queueNamespace = buildNamespace(QUEUE_KEY);
final RubySymbol[] queueCapacityNamespace = buildNamespace(QUEUE_KEY, CAPACITY_KEY);

final Supplier<NumberGauge> eventsGaugeMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueNamespace, EVENTS_KEY).orElse(null);
final FlowMetric growthEventsFlow = createFlowMetric(QUEUE_PERSISTED_GROWTH_EVENTS_KEY, eventsGaugeMetricSupplier, () -> uptimeInPreciseSeconds);
this.flowMetrics.add(growthEventsFlow);
storeMetric(context, flowNamespace, growthEventsFlow);

final Supplier<NumberGauge> queueSizeInBytesMetricSupplier = () -> initOrGetNumberGaugeMetric(context, queueCapacityNamespace, QUEUE_SIZE_IN_BYTES_KEY).orElse(null);
final FlowMetric growthBytesFlow = createFlowMetric(QUEUE_PERSISTED_GROWTH_BYTES_KEY, queueSizeInBytesMetricSupplier, () -> uptimeInPreciseSeconds);
this.flowMetrics.add(growthBytesFlow);
storeMetric(context, flowNamespace, growthBytesFlow);
}
return context.nil;
}

Expand All @@ -529,6 +552,11 @@ private static FlowMetric createFlowMetric(final RubySymbol name,
final Metric<? extends Number> denominatorMetric) {
return FlowMetric.create(name.asJavaString(), numeratorMetric, denominatorMetric);
}
private static FlowMetric createFlowMetric(final RubySymbol name,
final Supplier<? extends Metric<? extends Number>> numeratorMetricSupplier,
final Supplier<? extends Metric<? extends Number>> denominatorMetricSupplier) {
return FlowMetric.create(name.asJavaString(), numeratorMetricSupplier, denominatorMetricSupplier);
}

private LongCounter initOrGetCounterMetric(final ThreadContext context,
final RubySymbol[] subPipelineNamespacePath,
Expand All @@ -540,6 +568,21 @@ private LongCounter initOrGetCounterMetric(final ThreadContext context,
return retrievedMetric.toJava(LongCounter.class);
}

private Optional<NumberGauge> initOrGetNumberGaugeMetric(final ThreadContext context,
final RubySymbol[] subPipelineNamespacePath,
final RubySymbol metricName) {
final IRubyObject collector = this.metric.collector(context);
final IRubyObject fullNamespace = pipelineNamespacedPath(subPipelineNamespacePath);
final IRubyObject retrievedMetric = collector.callMethod(context, "get", new IRubyObject[]{fullNamespace, metricName, context.runtime.newSymbol("gauge")});

LazyDelegatingGauge delegatingGauge = retrievedMetric.toJava(LazyDelegatingGauge.class);
if (Objects.isNull(delegatingGauge.getType()) || delegatingGauge.getType() != MetricType.GAUGE_NUMBER) {
return Optional.empty();
}

return Optional.of((NumberGauge) delegatingGauge.getMetric().get());
}

private UptimeMetric initOrGetUptimeMetric(final ThreadContext context,
final RubySymbol[] subPipelineNamespacePath,
final RubySymbol uptimeMetricName) {
Expand Down
Expand Up @@ -42,9 +42,9 @@ static FlowMetric create(final String name,
}
}

static <N extends Number, D extends Number> FlowMetric create(final String name,
final Supplier<Metric<N>> numeratorSupplier,
final Supplier<Metric<D>> denominatorSupplier) {
return new LazyInstantiatedFlowMetric<>(name, numeratorSupplier, denominatorSupplier);
static FlowMetric create(final String name,
final Supplier<? extends Metric<? extends Number>> numeratorSupplier,
final Supplier<? extends Metric<? extends Number>> denominatorSupplier) {
return new LazyInstantiatedFlowMetric(name, numeratorSupplier, denominatorSupplier);
}
}
Expand Up @@ -16,27 +16,22 @@
* and fully initializes when <em>both</em> return non-null values.
*
* @see FlowMetric#create(String, Supplier, Supplier)
*
* @param <N> the numerator metric's value type
* @param <D> the denominator metric's value type
*/
public class LazyInstantiatedFlowMetric<N extends Number, D extends Number> implements FlowMetric {
public class LazyInstantiatedFlowMetric extends AbstractMetric<Map<String, Double>> implements FlowMetric {

static final Logger LOGGER = LogManager.getLogger(LazyInstantiatedFlowMetric.class);

private final String name;

private final AtomicReference<Supplier<Metric<N>>> numeratorSupplier;
private final AtomicReference<Supplier<Metric<D>>> denominatorSupplier;
private final AtomicReference<Supplier<? extends Metric<? extends Number>>> numeratorSupplier;
private final AtomicReference<Supplier<? extends Metric<? extends Number>>> denominatorSupplier;

private final SetOnceReference<FlowMetric> inner = SetOnceReference.unset();

private static final Map<String,Double> EMPTY_MAP = Map.of();

LazyInstantiatedFlowMetric(final String name,
final Supplier<Metric<N>> numeratorSupplier,
final Supplier<Metric<D>> denominatorSupplier) {
this.name = name;
final Supplier<? extends Metric<? extends Number>> numeratorSupplier,
final Supplier<? extends Metric<? extends Number>> denominatorSupplier) {
super(name);
this.numeratorSupplier = new AtomicReference<>(numeratorSupplier);
this.denominatorSupplier = new AtomicReference<>(denominatorSupplier);
}
Expand All @@ -46,11 +41,6 @@ public void capture() {
getInner().ifPresentOrElse(FlowMetric::capture, this::warnNotInitialized);
}

@Override
public String getName() {
return this.name;
}

@Override
public MetricType getType() {
return MetricType.FLOW_RATE;
Expand All @@ -68,10 +58,10 @@ private Optional<FlowMetric> getInner() {
private Optional<FlowMetric> attemptCreateInner() {
if (inner.isSet()) { return inner.asOptional(); }

final Metric<N> numeratorMetric = numeratorSupplier.getAcquire().get();
final Metric<? extends Number> numeratorMetric = numeratorSupplier.getAcquire().get();
if (Objects.isNull(numeratorMetric)) { return Optional.empty(); }

final Metric<D> denominatorMetric = denominatorSupplier.getAcquire().get();
final Metric<? extends Number> denominatorMetric = denominatorSupplier.getAcquire().get();
if (Objects.isNull(denominatorMetric)) { return Optional.empty(); }

final FlowMetric flowMetric = FlowMetric.create(this.name, numeratorMetric, denominatorMetric);
Expand All @@ -91,7 +81,7 @@ private void warnNotInitialized() {
LOGGER.warn("Underlying metrics for `{}` not yet instantiated, could not capture their rates", this.name);
}

private static <TT extends Number> Supplier<Metric<TT>> constantMetricSupplierFor(final Metric<TT> mm) {
private static Supplier<Metric<? extends Number>> constantMetricSupplierFor(final Metric<? extends Number> mm) {
return () -> mm;
}
}
Expand Up @@ -65,6 +65,8 @@ private MetricKeys() {

public static final RubySymbol QUEUE_KEY = RubyUtil.RUBY.newSymbol("queue");

public static final RubySymbol CAPACITY_KEY = RubyUtil.RUBY.newSymbol("capacity");

public static final RubySymbol DLQ_KEY = RubyUtil.RUBY.newSymbol("dlq");

public static final RubySymbol STORAGE_POLICY_KEY = RubyUtil.RUBY.newSymbol("storage_policy");
Expand Down Expand Up @@ -92,7 +94,9 @@ private MetricKeys() {

public static final RubySymbol WORKER_CONCURRENCY_KEY = RubyUtil.RUBY.newSymbol("worker_concurrency");

public static final RubySymbol UPTIME_IN_SECONDS_KEY = RubyUtil.RUBY.newSymbol("uptime_in_seconds");

public static final RubySymbol UPTIME_IN_MILLIS_KEY = RubyUtil.RUBY.newSymbol("uptime_in_millis");

public static final RubySymbol QUEUE_PERSISTED_GROWTH_EVENTS_KEY = RubyUtil.RUBY.newSymbol("queue_persisted_growth_events");

public static final RubySymbol QUEUE_PERSISTED_GROWTH_BYTES_KEY = RubyUtil.RUBY.newSymbol("queue_persisted_growth_bytes");
}