Skip to content

Commit

Permalink
Better names and types for ingest stats (#93533)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed Feb 7, 2023
1 parent b61f60a commit 815e6d7
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 14 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/93533.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 93533
summary: Better names and types for ingest stats
area: Ingest Node
type: bug
issues:
- 80763
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void testIngestStatsNamesAndTypes() throws IOException {

IngestStats.ProcessorStat setB = processorStats.get(1);
assertThat(setB.getName(), equalTo("set:set-b"));
assertThat(setB.getType(), equalTo("conditional"));
assertThat(setB.getType(), equalTo("set"));
assertThat(setB.getStats().getIngestCount(), equalTo(0L)); // see false_script above

IngestStats.ProcessorStat setC = processorStats.get(2);
Expand All @@ -138,8 +138,8 @@ public void testIngestStatsNamesAndTypes() throws IOException {
assertThat(setC.getStats().getIngestCount(), equalTo(1L));

IngestStats.ProcessorStat setD = processorStats.get(3);
assertThat(setD.getName(), equalTo("compound:CompoundProcessor-set-d"));
assertThat(setD.getType(), equalTo("conditional"));
assertThat(setD.getName(), equalTo("set:set-d"));
assertThat(setD.getType(), equalTo("set"));
assertThat(setD.getStats().getIngestCount(), equalTo(1L));
}

Expand All @@ -154,9 +154,7 @@ public void testIngestStatsNamesAndTypes() throws IOException {
Map<String, Object> stats = createParser(JsonXContent.jsonXContent, Strings.toString(builder)).map();

int setProcessorCount = path(stats, "nodes.ingest.processor_stats.set.count");
assertThat(setProcessorCount, equalTo(2));
int conditionalProcessorCount = path(stats, "nodes.ingest.processor_stats.conditional.count");
assertThat(conditionalProcessorCount, equalTo(1));
assertThat(setProcessorCount, equalTo(3));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ public static Processor readProcessor(
);
}
if (onFailureProcessors.size() > 0 || ignoreFailure) {
processor = new CompoundProcessor(ignoreFailure, List.of(processor), onFailureProcessors);
processor = new OnFailureProcessor(ignoreFailure, processor, onFailureProcessors);
}
if (conditionalScript != null) {
processor = new ConditionalProcessor(tag, description, conditionalScript, scriptService, processor);
Expand Down
30 changes: 24 additions & 6 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,13 @@ Map<String, PipelineHolder> pipelines() {
}

/**
* Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as
* wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric.
* Recursive method to obtain all the non-failure processors for given compoundProcessor.
* <p>
* 'if' and 'ignore_failure'/'on_failure' are implemented as wrappers around the actual processor (via {@link ConditionalProcessor}
* and {@link OnFailureProcessor}, respectively), so we unwrap these processors internally in order to expose the underlying
* 'actual' processor via the metrics. This corresponds best to the customer intent -- e.g. they used a 'set' processor that has an
* 'on_failure', so we report metrics for the set processor, not an on_failure processor.
*
* @param compoundProcessor The compound processor to start walking the non-failure processors
* @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples.
* @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor
Expand All @@ -550,13 +555,26 @@ private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(
for (Tuple<Processor, IngestMetric> processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();

// unwrap 'if' and 'ignore_failure/on_failure' wrapping, so that we expose the underlying actual processor
boolean unwrapped;
do {
unwrapped = false;
if (processor instanceof ConditionalProcessor conditional) {
processor = conditional.getInnerProcessor();
metric = conditional.getMetric(); // prefer the conditional's metric, it only covers when the conditional was true
unwrapped = true;
}
if (processor instanceof OnFailureProcessor onFailure) {
processor = onFailure.getInnerProcessor();
metric = onFailure.getInnerMetric(); // the wrapped processor records the failure count
unwrapped = true;
}
} while (unwrapped);

if (processor instanceof CompoundProcessor cp) {
getProcessorMetrics(cp, processorMetrics);
} else {
// Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true.
if (processor instanceof ConditionalProcessor cp) {
metric = (cp.getMetric());
}
processorMetrics.add(new Tuple<>(processor, metric));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest;

import org.elasticsearch.core.Tuple;

import java.util.List;

/**
* A wrapping processor that adds failure handling logic around the wrapped processor.
*/
public class OnFailureProcessor extends CompoundProcessor implements WrappingProcessor {

static final String TYPE = "on_failure";

public OnFailureProcessor(boolean ignoreFailure, Processor processor, List<Processor> onFailureProcessors) {
super(ignoreFailure, List.of(processor), onFailureProcessors);
}

@Override
public Processor getInnerProcessor() {
List<Processor> processors = this.getProcessors();
assert processors.size() == 1;
return processors.get(0);
}

IngestMetric getInnerMetric() {
List<Tuple<Processor, IngestMetric>> processorsAndMetrics = this.getProcessorsWithMetrics();
assert processorsAndMetrics.size() == 1;
return processorsAndMetrics.get(0).v2();
}

@Override
public String getType() {
return TYPE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest;

import org.elasticsearch.test.ESTestCase;

import java.util.List;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;

public class OnFailureProcessorTests extends ESTestCase {

public void testOnFailureProcessor() {
TestProcessor processor = new TestProcessor(new RuntimeException("error"));
OnFailureProcessor onFailureProcessor = new OnFailureProcessor(false, processor, List.of());

assertThat(onFailureProcessor.isIgnoreFailure(), is(false));

assertThat(onFailureProcessor.getProcessors().size(), equalTo(1));
assertThat(onFailureProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(onFailureProcessor.getInnerProcessor(), sameInstance(processor));

assertThat(onFailureProcessor.getOnFailureProcessors().isEmpty(), is(true));

assertThat(onFailureProcessor.getInnerMetric(), sameInstance(onFailureProcessor.getProcessorsWithMetrics().get(0).v2()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void testCreateProcessorsWithOnFailureProperties() throws Exception {
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("compound"));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("on_failure"));
}

public void testFlattenProcessors() throws Exception {
Expand Down

0 comments on commit 815e6d7

Please sign in to comment.