Skip to content

Commit

Permalink
Adding some tests for many nested pipeline processors (#105291) (#105471
Browse files Browse the repository at this point in the history
)
  • Loading branch information
masseyke committed Feb 14, 2024
1 parent d1117ae commit d57de7d
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
import org.elasticsearch.action.ingest.SimulateDocumentResult;
import org.elasticsearch.action.ingest.SimulateDocumentVerboseResult;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.junit.Before;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
/*
* This test is meant to make sure that we can handle ingesting a document with a reasonably large number of nested pipeline processors.
*/
public class ManyNestedPipelinesIT extends ESIntegTestCase {
private final int manyPipelinesCount = randomIntBetween(2, 20);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(IngestCommonPlugin.class);
}

@Before
public void loadManyPipelines() {
internalCluster().ensureAtLeastNumDataNodes(1);
internalCluster().startMasterOnlyNode();
createChainedPipelines(manyPipelinesCount);
}

public void testIngestManyPipelines() {
String index = "index";
DocWriteResponse response = client().prepareIndex(index, "_doc")
.setSource(Collections.singletonMap("foo", "bar"))
.setPipeline("pipeline_0")
.get();
assertThat(response.getResult(), equalTo(DocWriteResponse.Result.CREATED));
GetResponse getREsponse = client().prepareGet(index, "_doc", response.getId()).get();
assertThat(getREsponse.getSource().get("foo"), equalTo("baz"));
}

public void testSimulateManyPipelines() throws IOException {
List<SimulateDocumentResult> results = executeSimulate(false);
assertThat(results.size(), equalTo(1));
assertThat(results.get(0), instanceOf(SimulateDocumentBaseResult.class));
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) results.get(0);
assertNull(result.getFailure());
IngestDocument resultDoc = result.getIngestDocument();
assertThat(resultDoc.getFieldValue("foo", String.class), equalTo("baz"));
}

public void testSimulateVerboseManyPipelines() throws IOException {
List<SimulateDocumentResult> results = executeSimulate(true);
assertThat(results.size(), equalTo(1));
assertThat(results.get(0), instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult result = (SimulateDocumentVerboseResult) results.get(0);
assertThat(result.getProcessorResults().size(), equalTo(manyPipelinesCount));
List<SimulateProcessorResult> simulateProcessorResults = result.getProcessorResults();
SimulateProcessorResult lastResult = simulateProcessorResults.get(simulateProcessorResults.size() - 1);
IngestDocument resultDoc = lastResult.getIngestDocument();
assertThat(resultDoc.getFieldValue("foo", String.class), equalTo("baz"));
}

private List<SimulateDocumentResult> executeSimulate(boolean verbose) throws IOException {
BytesReference simulateRequestBytes = BytesReference.bytes(
jsonBuilder().startObject()
.startArray("docs")
.startObject()
.field("_index", "foo")
.field("_id", "id")
.startObject("_source")
.field("foo", "bar")
.endObject()
.endObject()
.endArray()
.endObject()
);
SimulatePipelineResponse simulatePipelineResponse = clusterAdmin().prepareSimulatePipeline(simulateRequestBytes, XContentType.JSON)
.setId("pipeline_0")
.setVerbose(verbose)
.get();
return simulatePipelineResponse.getResults();
}

private void createChainedPipelines(int count) {
for (int i = 0; i < count - 1; i++) {
createChainedPipeline(i);
}
createLastPipeline(count - 1);
}

private void createChainedPipeline(int number) {
String pipelineId = "pipeline_" + number;
String nextPipelineId = "pipeline_" + (number + 1);
String pipelineTemplate = "{\n"
+ " \"processors\": [\n"
+ " {\n"
+ " \"pipeline\": {\n"
+ " \"name\": \"%s\"\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ " }";
String pipeline = String.format(Locale.ROOT, pipelineTemplate, nextPipelineId);
clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(pipeline), XContentType.JSON).get();
}

private void createLastPipeline(int number) {
String pipelineId = "pipeline_" + number;
String pipeline = " {\n"
+ " \"processors\": [\n"
+ " {\n"
+ " \"set\": {\n"
+ " \"field\": \"foo\",\n"
+ " \"value\": \"baz\"\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ " }";
clusterAdmin().preparePutPipeline(pipelineId, new BytesArray(pipeline), XContentType.JSON).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,59 @@ public void testActualPipelineProcessorWithCycle() throws Exception {
assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1"));
}

public void testActualPipelineProcessorNested() throws Exception {
/*
* This test creates a pipeline made up of many nested pipeline processors, ending in a processor that counts both how many times
* it is called for a given document (by updating a field on that document) and how many times it is called overall.
*/
IngestService ingestService = createIngestService();
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
int pipelineCount = randomIntBetween(2, 20);
for (int i = 0; i < pipelineCount - 1; i++) {
String pipelineId = "pipeline" + i;
String nextPipelineId = "pipeline" + (i + 1);
Map<String, Object> nextPipelineConfig = new HashMap<>();
nextPipelineConfig.put("name", nextPipelineId);
Pipeline pipeline = new Pipeline(
pipelineId,
null,
null,
null,
new CompoundProcessor(factory.create(Collections.emptyMap(), null, null, nextPipelineConfig))
);
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
}

// The last pipeline calls the CountCallsProcessor rather than yet another pipeline processor:
String lastPipelineId = "pipeline" + (pipelineCount - 1);
CountCallsProcessor countCallsProcessor = new CountCallsProcessor();
Pipeline lastPipeline = new Pipeline(lastPipelineId, null, null, null, new CompoundProcessor(countCallsProcessor));
when(ingestService.getPipeline(lastPipelineId)).thenReturn(lastPipeline);

String firstPipelineId = "pipeline0";
Map<String, Object> firstPipelineConfig = new HashMap<>();
firstPipelineConfig.put("name", firstPipelineId);
PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, null, firstPipelineConfig);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);

CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);

IngestDocument[] holder = new IngestDocument[1];
trackingProcessor.execute(ingestDocument, (result, e) -> holder[0] = result);
IngestDocument document = holder[0];
assertNotNull(document);
// Make sure that the final processor was called exactly once on this document:
assertThat(document.getFieldValue(countCallsProcessor.getCountFieldName(), Integer.class), equalTo(1));
// But it was called exactly one other time during the pipeline cycle check (to be enabled after a fix) :
// assertThat(countCallsProcessor.getTotalCount(), equalTo(2));
assertThat(resultList.size(), equalTo(pipelineCount + 1)); // one result per pipeline, plus the "count_calls" processor
for (int i = 0; i < resultList.size() - 1; i++) {
SimulateProcessorResult result = resultList.get(i);
assertThat(result.getType(), equalTo(pipelineProcessor.getType()));
}
assertThat(resultList.get(resultList.size() - 1).getType(), equalTo(countCallsProcessor.getType()));
}

public void testActualPipelineProcessorRepeatedInvocation() throws Exception {
String pipelineId = "pipeline1";
IngestService ingestService = createIngestService();
Expand Down Expand Up @@ -767,4 +820,43 @@ public void testActualPipelineProcessorRepeatedInvocation() throws Exception {
resultList.get(3).getIngestDocument().getSourceAndMetadata().get(key1)
);
}

/*
* This test processor keeps track of how many times it has been called. It also creates/updates the "count" field on documents with
* the number of times this processor has been called for that document.
*/
private static final class CountCallsProcessor extends AbstractProcessor {
private int totalCount = 0;

private CountCallsProcessor() {
super("count_calls", "counts calls");
}

@Override
public String getType() {
return "count_calls";
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
boolean hasCount = ingestDocument.hasField(getCountFieldName());
Integer count;
if (hasCount) {
count = ingestDocument.getFieldValue(getCountFieldName(), Integer.class);
} else {
count = 0;
}
ingestDocument.setFieldValue(getCountFieldName(), (count + 1));
totalCount++;
return ingestDocument;
}

public String getCountFieldName() {
return "count";
}

public int getTotalCount() {
return totalCount;
}
}
}

0 comments on commit d57de7d

Please sign in to comment.