Skip to content

Commit

Permalink
Improving the performance of the ingest simulate verbose API (#105265) (
Browse files Browse the repository at this point in the history
#105513)

This updates the simulate verbose API to run in O(N) (for number of pipelines)
time and memory like the simulate and ingest APIs rather than O(N^2).
  • Loading branch information
masseyke committed Feb 14, 2024
1 parent 9b6cf27 commit 4d09f96
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 54 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/105265.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 105265
summary: Improving the performance of the ingest simulate verbose API
area: "Ingest Node"
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* This test is meant to make sure that we can handle ingesting a document with a reasonably large number of nested pipeline processors.
*/
public class ManyNestedPipelinesIT extends ESIntegTestCase {
private final int manyPipelinesCount = randomIntBetween(2, 20);
private final int manyPipelinesCount = randomIntBetween(2, 50);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Expand Down Expand Up @@ -121,6 +121,7 @@ private void createChainedPipeline(int number) {
+ " \"processors\": [\n"
+ " {\n"
+ " \"pipeline\": {\n"
+ " \"tag\": \"tag\",\n"
+ " \"name\": \"%s\"\n"
+ " }\n"
+ " }\n"
Expand All @@ -136,6 +137,7 @@ private void createLastPipeline(int number) {
+ " \"processors\": [\n"
+ " {\n"
+ " \"set\": {\n"
+ " \"tag\": \"tag\",\n"
+ " \"field\": \"foo\",\n"
+ " \"value\": \"baz\"\n"
+ " }\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@ public final class TrackingResultProcessor implements Processor {
private final ConditionalProcessor conditionalProcessor;
private final List<SimulateProcessorResult> processorResultList;
private final boolean ignoreFailure;
// This field indicates that this processor needs to run the initial check for cycles in pipeline processors:
private final boolean performCycleCheck;

TrackingResultProcessor(
boolean ignoreFailure,
Processor actualProcessor,
ConditionalProcessor conditionalProcessor,
List<SimulateProcessorResult> processorResultList
List<SimulateProcessorResult> processorResultList,
boolean performCycleCheck
) {
this.ignoreFailure = ignoreFailure;
this.processorResultList = processorResultList;
this.actualProcessor = actualProcessor;
this.conditionalProcessor = conditionalProcessor;
this.performCycleCheck = performCycleCheck;
}

@Override
Expand Down Expand Up @@ -86,54 +90,43 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
);
throw e;
}
ingestDocumentCopy.executePipeline(pipelineToCall, (result, e) -> {
// special handling for pipeline cycle errors
if (e instanceof ElasticsearchException && e.getCause() instanceof GraphStructureException) {
if (ignoreFailure) {
processorResultList.add(
new SimulateProcessorResult(
pipelineProcessor.getType(),
pipelineProcessor.getTag(),
pipelineProcessor.getDescription(),
ingestDocument,
e,
conditionalWithResult
)
);
if (performCycleCheck) {
ingestDocumentCopy.executePipeline(pipelineToCall, (result, e) -> {
// special handling for pipeline cycle errors
if (e instanceof ElasticsearchException && e.getCause() instanceof GraphStructureException) {
if (ignoreFailure) {
processorResultList.add(
new SimulateProcessorResult(
pipelineProcessor.getType(),
pipelineProcessor.getTag(),
pipelineProcessor.getDescription(),
ingestDocument,
e,
conditionalWithResult
)
);
} else {
processorResultList.add(
new SimulateProcessorResult(
pipelineProcessor.getType(),
pipelineProcessor.getTag(),
pipelineProcessor.getDescription(),
e,
conditionalWithResult
)
);
}
handler.accept(null, e);
} else {
processorResultList.add(
new SimulateProcessorResult(
pipelineProcessor.getType(),
pipelineProcessor.getTag(),
pipelineProcessor.getDescription(),
e,
conditionalWithResult
)
);
// now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and
// execute it
decorateAndExecutePipeline(pipeline, ingestDocument, conditionalWithResult, handler);
}
handler.accept(null, e);
} else {
// now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
// add the pipeline process to the results
processorResultList.add(
new SimulateProcessorResult(
actualProcessor.getType(),
actualProcessor.getTag(),
actualProcessor.getDescription(),
conditionalWithResult
)
);
Pipeline verbosePipeline = new Pipeline(
pipeline.getId(),
pipeline.getDescription(),
pipeline.getVersion(),
pipeline.getMetadata(),
verbosePipelineProcessor
);
ingestDocument.executePipeline(verbosePipeline, handler);
}
});
});
} else {
// The cycle check has been done before, so we can just decorate the pipeline with our instrumentation and execute it:
decorateAndExecutePipeline(pipeline, ingestDocument, conditionalWithResult, handler);
}
return;
}

Expand Down Expand Up @@ -189,6 +182,36 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
});
}

/*
* This method decorates the pipeline's compound processor with a new TrackingResultProcessor that does not do cycle checking, and
* executes the pipeline using that TrackingResultProcessor.
*/
private void decorateAndExecutePipeline(
Pipeline pipeline,
IngestDocument ingestDocument,
Tuple<String, Boolean> conditionalWithResult,
BiConsumer<IngestDocument, Exception> handler
) {
CompoundProcessor verbosePipelineProcessor = decorateNoCycleCheck(pipeline.getCompoundProcessor(), null, processorResultList);
// add the pipeline process to the results
processorResultList.add(
new SimulateProcessorResult(
actualProcessor.getType(),
actualProcessor.getTag(),
actualProcessor.getDescription(),
conditionalWithResult
)
);
Pipeline verbosePipeline = new Pipeline(
pipeline.getId(),
pipeline.getDescription(),
pipeline.getVersion(),
pipeline.getMetadata(),
verbosePipelineProcessor
);
ingestDocument.executePipeline(verbosePipeline, handler);
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException();
Expand All @@ -209,10 +232,31 @@ public String getDescription() {
return actualProcessor.getDescription();
}

/*
* This decorates the existing processor with a new TrackingResultProcessor that does _not_ do checks for cycles in pipeline
* processors, with the assumption that that check has already been done.
*/
private static CompoundProcessor decorateNoCycleCheck(
CompoundProcessor compoundProcessor,
ConditionalProcessor parentCondition,
List<SimulateProcessorResult> processorResultList
) {
return decorate(compoundProcessor, parentCondition, processorResultList, false);
}

public static CompoundProcessor decorate(
CompoundProcessor compoundProcessor,
ConditionalProcessor parentCondition,
List<SimulateProcessorResult> processorResultList
) {
return decorate(compoundProcessor, parentCondition, processorResultList, true);
}

private static CompoundProcessor decorate(
CompoundProcessor compoundProcessor,
ConditionalProcessor parentCondition,
List<SimulateProcessorResult> processorResultList,
boolean performCycleCheck
) {
List<Processor> processors = new ArrayList<>();
for (Processor processor : compoundProcessor.getProcessors()) {
Expand All @@ -225,7 +269,13 @@ public static CompoundProcessor decorate(
processors.add(decorate((CompoundProcessor) processor, conditionalProcessor, processorResultList));
} else {
processors.add(
new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, conditionalProcessor, processorResultList)
new TrackingResultProcessor(
compoundProcessor.isIgnoreFailure(),
processor,
conditionalProcessor,
processorResultList,
performCycleCheck
)
);
}
}
Expand All @@ -240,7 +290,13 @@ public static CompoundProcessor decorate(
onFailureProcessors.add(decorate((CompoundProcessor) processor, conditionalProcessor, processorResultList));
} else {
onFailureProcessors.add(
new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, conditionalProcessor, processorResultList)
new TrackingResultProcessor(
compoundProcessor.isIgnoreFailure(),
processor,
conditionalProcessor,
processorResultList,
performCycleCheck
)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void init() {

public void testActualProcessor() throws Exception {
TestProcessor actualProcessor = new TestProcessor(ingestDocument -> {});
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList);
TrackingResultProcessor trackingProcessor = new TrackingResultProcessor(false, actualProcessor, null, resultList, true);
trackingProcessor.execute(ingestDocument, (result, e) -> {});

SimulateProcessorResult expectedResult = new SimulateProcessorResult(
Expand Down Expand Up @@ -714,7 +714,7 @@ public void testActualPipelineProcessorNested() throws Exception {
*/
IngestService ingestService = createIngestService();
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
int pipelineCount = randomIntBetween(2, 20);
int pipelineCount = randomIntBetween(2, 150);
for (int i = 0; i < pipelineCount - 1; i++) {
String pipelineId = "pipeline" + i;
String nextPipelineId = "pipeline" + (i + 1);
Expand Down Expand Up @@ -750,8 +750,8 @@ public void testActualPipelineProcessorNested() throws Exception {
assertNotNull(document);
// Make sure that the final processor was called exactly once on this document:
assertThat(document.getFieldValue(countCallsProcessor.getCountFieldName(), Integer.class), equalTo(1));
// But it was called exactly one other time during the pipeline cycle check (to be enabled after a fix) :
// assertThat(countCallsProcessor.getTotalCount(), equalTo(2));
// But it was called exactly one other time during the pipeline cycle check:
assertThat(countCallsProcessor.getTotalCount(), equalTo(2));
assertThat(resultList.size(), equalTo(pipelineCount + 1)); // one result per pipeline, plus the "count_calls" processor
for (int i = 0; i < resultList.size() - 1; i++) {
SimulateProcessorResult result = resultList.get(i);
Expand Down

0 comments on commit 4d09f96

Please sign in to comment.