From 02dfd71efa80bb992b4c04231c9c841b7b2be73f Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 16 Jan 2020 10:50:47 +0100 Subject: [PATCH] Backport: Add pipeline name to ingest metadata (#51050) Backport: #50467 This commit adds the name of the current pipeline to ingest metadata. This pipeline name is accessible under the following key: '_ingest.pipeline'. Example usage in pipeline: PUT /_ingest/pipeline/2 { "processors": [ { "set": { "field": "pipeline_name", "value": "{{_ingest.pipeline}}" } } ] } Closes #42106 --- .../ingest/apis/simulate-pipeline.asciidoc | 12 ++- .../ingest/processors/pipeline.asciidoc | 2 + .../test/ingest/210_pipeline_processor.yml | 78 +++++++++++++++++++ .../rest-api-spec/test/ingest/90_simulate.yml | 18 +++-- .../elasticsearch/ingest/IngestDocument.java | 6 ++ .../ingest/PipelineProcessor.java | 2 +- .../ingest/SimulateExecutionServiceTests.java | 56 +++++++------ .../ingest/CompoundProcessorTests.java | 3 +- .../ingest/PipelineProcessorTests.java | 57 ++++++++++++++ .../ingest/TrackingResultProcessorTests.java | 5 ++ 10 files changed, 198 insertions(+), 41 deletions(-) diff --git a/docs/reference/ingest/apis/simulate-pipeline.asciidoc b/docs/reference/ingest/apis/simulate-pipeline.asciidoc index c05bc2fa3ceaa..dbb38fa9b7d1b 100644 --- a/docs/reference/ingest/apis/simulate-pipeline.asciidoc +++ b/docs/reference/ingest/apis/simulate-pipeline.asciidoc @@ -355,7 +355,8 @@ The API returns the following response: "foo": "bar" }, "_ingest": { - "timestamp": "2017-05-04T22:46:09.674Z" + "timestamp": "2017-05-04T22:46:09.674Z", + "pipeline": "_simulate_pipeline" } } }, @@ -370,7 +371,8 @@ The API returns the following response: "foo": "bar" }, "_ingest": { - "timestamp": "2017-05-04T22:46:09.675Z" + "timestamp": "2017-05-04T22:46:09.675Z", + "pipeline": "_simulate_pipeline" } } } @@ -388,7 +390,8 @@ The API returns the following response: "foo": "rab" }, "_ingest": { - "timestamp": "2017-05-04T22:46:09.676Z" + "timestamp": "2017-05-04T22:46:09.676Z", + "pipeline": "_simulate_pipeline" } } }, @@ -403,7 +406,8 @@ The API returns the following response: "foo": "rab" }, "_ingest": { - "timestamp": "2017-05-04T22:46:09.677Z" + "timestamp": "2017-05-04T22:46:09.677Z", + "pipeline": "_simulate_pipeline" } } } diff --git a/docs/reference/ingest/processors/pipeline.asciidoc b/docs/reference/ingest/processors/pipeline.asciidoc index aecec9bdf1fd6..8dc6114071d35 100644 --- a/docs/reference/ingest/processors/pipeline.asciidoc +++ b/docs/reference/ingest/processors/pipeline.asciidoc @@ -21,6 +21,8 @@ include::common-options.asciidoc[] -------------------------------------------------- // NOTCONSOLE +The name of the current pipeline can be accessed from the `_ingest.pipeline` ingest metadata key. + An example of using this processor for nesting pipelines would be: Define an inner pipeline: diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml index a012536c53bb4..bc82b7f1ca7e1 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -202,3 +202,81 @@ teardown: } - match: { error.root_cause.0.type: "illegal_state_exception" } - match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [legal-department]" } + +--- +"Test _ingest.pipeline metadata": + - do: + ingest.put_pipeline: + id: "pipeline1" + body: > + { + "processors" : [ + { + "append" : { + "field": "pipelines", + "value": "{{_ingest.pipeline}}" + } + }, + { + "pipeline" : { + "name": "another_pipeline" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.put_pipeline: + id: "another_pipeline" + body: > + { + "processors" : [ + { + "append" : { + "field": "pipelines", + "value": "{{_ingest.pipeline}}" + } + }, + { + "pipeline" : { + "name": "another_pipeline2" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.put_pipeline: + id: "another_pipeline2" + body: > + { + "processors" : [ + { + "append" : { + "field": "pipelines", + "value": "{{_ingest.pipeline}}" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "pipeline1" + body: > + { + } + + - do: + get: + index: test + id: 1 + - length: { _source.pipelines: 3 } + - match: { _source.pipelines.0: "pipeline1" } + - match: { _source.pipelines.1: "another_pipeline" } + - match: { _source.pipelines.2: "another_pipeline2" } diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index 84048e4679d0f..8bf7c06b567f9 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -291,26 +291,30 @@ teardown: - length: { docs.0.processor_results.0.doc._source: 2 } - match: { docs.0.processor_results.0.doc._source.foo.bar.0.item: "HELLO" } - match: { docs.0.processor_results.0.doc._source.field2.value: "_value" } - - length: { docs.0.processor_results.0.doc._ingest: 1 } + - length: { docs.0.processor_results.0.doc._ingest: 2 } - is_true: docs.0.processor_results.0.doc._ingest.timestamp + - is_true: docs.0.processor_results.0.doc._ingest.pipeline - length: { docs.0.processor_results.1.doc._source: 3 } - match: { docs.0.processor_results.1.doc._source.foo.bar.0.item: "HELLO" } - match: { docs.0.processor_results.1.doc._source.field2.value: "_value" } - match: { docs.0.processor_results.1.doc._source.field3: "third_val" } - - length: { docs.0.processor_results.1.doc._ingest: 1 } + - length: { docs.0.processor_results.1.doc._ingest: 2 } - is_true: docs.0.processor_results.1.doc._ingest.timestamp + - is_true: docs.0.processor_results.1.doc._ingest.pipeline - length: { docs.0.processor_results.2.doc._source: 3 } - match: { docs.0.processor_results.2.doc._source.foo.bar.0.item: "HELLO" } - match: { docs.0.processor_results.2.doc._source.field2.value: "_VALUE" } - match: { docs.0.processor_results.2.doc._source.field3: "third_val" } - - length: { docs.0.processor_results.2.doc._ingest: 1 } + - length: { docs.0.processor_results.2.doc._ingest: 2 } - is_true: docs.0.processor_results.2.doc._ingest.timestamp + - is_true: docs.0.processor_results.2.doc._ingest.pipeline - length: { docs.0.processor_results.3.doc._source: 3 } - match: { docs.0.processor_results.3.doc._source.foo.bar.0.item: "hello" } - match: { docs.0.processor_results.3.doc._source.field2.value: "_VALUE" } - match: { docs.0.processor_results.3.doc._source.field3: "third_val" } - - length: { docs.0.processor_results.3.doc._ingest: 1 } + - length: { docs.0.processor_results.3.doc._ingest: 2 } - is_true: docs.0.processor_results.3.doc._ingest.timestamp + - is_true: docs.0.processor_results.3.doc._ingest.pipeline --- "Test simulate with exception thrown": @@ -404,12 +408,14 @@ teardown: - match: { docs.1.processor_results.0.doc._index: "index" } - match: { docs.1.processor_results.0.doc._source.foo: 5 } - match: { docs.1.processor_results.0.doc._source.bar: "hello" } - - length: { docs.1.processor_results.0.doc._ingest: 1 } + - length: { docs.1.processor_results.0.doc._ingest: 2 } - is_true: docs.1.processor_results.0.doc._ingest.timestamp + - is_true: docs.1.processor_results.0.doc._ingest.pipeline - match: { docs.1.processor_results.1.doc._source.foo: 5 } - match: { docs.1.processor_results.1.doc._source.bar: "HELLO" } - - length: { docs.1.processor_results.1.doc._ingest: 1 } + - length: { docs.1.processor_results.1.doc._ingest: 2 } - is_true: docs.1.processor_results.1.doc._ingest.timestamp + - is_true: docs.1.processor_results.1.doc._ingest.pipeline --- "Test verbose simulate with on_failure": diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 4183b48dc5923..6f159b679b3ad 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -648,8 +648,14 @@ private static Object deepCopy(Object value) { */ public void executePipeline(Pipeline pipeline, BiConsumer handler) { if (executedPipelines.add(pipeline.getId())) { + Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId()); pipeline.execute(this, (result, e) -> { executedPipelines.remove(pipeline.getId()); + if (previousPipeline != null) { + ingestMetadata.put("pipeline", previousPipeline); + } else { + ingestMetadata.remove("pipeline"); + } handler.accept(result, e); }); } else { diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java index be02fe24752c1..2ee99d6e5d48f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java @@ -31,7 +31,7 @@ public class PipelineProcessor extends AbstractProcessor { private final TemplateScript.Factory pipelineTemplate; private final IngestService ingestService; - private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) { + PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) { super(tag); this.pipelineTemplate = pipelineTemplate; this.ingestService = ingestService; diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java index 43eaad41728fe..ae951356b1875 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulateExecutionServiceTests.java @@ -91,23 +91,17 @@ public void testExecuteVerboseItem() throws Exception { assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class)); SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse; assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2)); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id")); - IngestDocument firstProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(); - assertThat(firstProcessorIngestDocument, not(sameInstance(this.ingestDocument))); - assertIngestDocument(firstProcessorIngestDocument, this.ingestDocument); - assertThat(firstProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata()))); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id")); + assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue()); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("test-id")); - IngestDocument secondProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(); - assertThat(secondProcessorIngestDocument, not(sameInstance(this.ingestDocument))); - assertIngestDocument(secondProcessorIngestDocument, this.ingestDocument); - assertThat(secondProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata()))); - assertThat(secondProcessorIngestDocument.getSourceAndMetadata(), - not(sameInstance(firstProcessorIngestDocument.getSourceAndMetadata()))); + assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(), ingestDocument); + assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument().getSourceAndMetadata(), + not(sameInstance(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata()))); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue()); } - public void testExecuteItem() throws Exception { TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {}); Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor)); @@ -147,10 +141,7 @@ public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception { assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2)); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0")); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue()); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument))); - assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(), - not(sameInstance(ingestDocument.getSourceAndMetadata()))); + assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1")); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue()); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), instanceOf(RuntimeException.class)); @@ -191,14 +182,12 @@ public void testExecuteVerboseItemWithOnFailure() throws Exception { metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock"); metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0"); metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed"); - assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), - ingestDocumentWithOnFailureMetadata); - + assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(), + ingestDocumentWithOnFailureMetadata); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue()); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getProcessorTag(), equalTo("processor_2")); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), not(sameInstance(ingestDocument))); - assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), ingestDocument); + assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(2), pipeline.getId(), ingestDocument); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getFailure(), nullValue()); } @@ -221,10 +210,7 @@ public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1)); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0")); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), sameInstance(exception)); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument))); - assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(), - not(sameInstance(ingestDocument.getSourceAndMetadata()))); + assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument); } public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception { @@ -245,10 +231,7 @@ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1)); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0")); assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue()); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument))); - assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument); - assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(), - not(sameInstance(ingestDocument.getSourceAndMetadata()))); + assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument); } public void testExecuteItemWithFailure() throws Exception { @@ -392,4 +375,19 @@ public String getType() { } } + private static void assertVerboseResult(SimulateProcessorResult result, + String expectedPipelineId, + IngestDocument expectedIngestDocument) { + IngestDocument simulateVerboseIngestDocument = result.getIngestDocument(); + // Remove and compare pipeline key. It is always in the verbose result, + // since that is a snapshot of how the ingest doc looks during pipeline execution, but not in the final ingestDocument. + // The key gets added and removed during pipeline execution. + String actualPipelineId = (String) simulateVerboseIngestDocument.getIngestMetadata().remove("pipeline"); + assertThat(actualPipelineId, equalTo(expectedPipelineId)); + + assertThat(simulateVerboseIngestDocument, not(sameInstance(expectedIngestDocument))); + assertIngestDocument(simulateVerboseIngestDocument, expectedIngestDocument); + assertThat(simulateVerboseIngestDocument.getSourceAndMetadata(), not(sameInstance(expectedIngestDocument.getSourceAndMetadata()))); + } + } diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index 09d995adc6593..d225cf1c18128 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -285,11 +285,12 @@ public void testBreakOnFailure() throws Exception { public void testFailureProcessorIsInvokedOnFailure() { TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> { Map ingestMetadata = ingestDocument.getIngestMetadata(); - assertThat(ingestMetadata.entrySet(), hasSize(4)); + assertThat(ingestMetadata.entrySet(), hasSize(5)); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor")); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue()); assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2")); + assertThat(ingestMetadata.get("pipeline"), equalTo("1")); }); Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!")))); diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index aebcc28e77d5e..8e947534f8b09 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -20,17 +20,22 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.TemplateScript; import org.elasticsearch.test.ESTestCase; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -205,6 +210,58 @@ pipeline3Id, null, null, new CompoundProcessor( assertThat(pipeline3Stats.getIngestFailedCount(), equalTo(1L)); } + public void testIngestPipelineMetadata() { + IngestService ingestService = createIngestService(); + + final int numPipelines = 16; + Pipeline firstPipeline = null; + for (int i = 0; i < numPipelines; i++) { + String pipelineId = Integer.toString(i); + List processors = new ArrayList<>(); + processors.add(new AbstractProcessor(null) { + @Override + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + ingestDocument.appendFieldValue("pipelines", ingestDocument.getIngestMetadata().get("pipeline")); + return ingestDocument; + } + + @Override + public String getType() { + return null; + } + + }); + if (i < (numPipelines - 1)) { + TemplateScript.Factory pipelineName = new TestTemplateService.MockTemplateScript.Factory(Integer.toString(i + 1)); + processors.add(new PipelineProcessor(null, pipelineName, ingestService)); + } + + + Pipeline pipeline = new Pipeline(pipelineId, null, null, new CompoundProcessor(false, processors, Collections.emptyList())); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + if (firstPipeline == null) { + firstPipeline = pipeline; + } + } + + IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + IngestDocument[] docHolder = new IngestDocument[1]; + Exception[] errorHolder = new Exception[1]; + testIngestDocument.executePipeline(firstPipeline, (doc, e) -> { + docHolder[0] = doc; + errorHolder[0] = e; + }); + assertThat(docHolder[0], notNullValue()); + assertThat(errorHolder[0], nullValue()); + + IngestDocument ingestDocument = docHolder[0]; + List pipelines = ingestDocument.getFieldValue("pipelines", List.class); + assertThat(pipelines.size(), equalTo(numPipelines)); + for (int i = 0; i < numPipelines; i++) { + assertThat(pipelines.get(i), equalTo(Integer.toString(i))); + } + } + static IngestService createIngestService() { IngestService ingestService = mock(IngestService.class); ScriptService scriptService = mock(ScriptService.class); diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index c66d4742b991b..ef4613ce2ffd1 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -220,6 +220,7 @@ pipelineId, null, null, new CompoundProcessor( trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(3)); @@ -287,6 +288,7 @@ pipelineId2, null, null, new CompoundProcessor( SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId2); @@ -355,6 +357,7 @@ pipelineId2, null, null, new CompoundProcessor( SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId1); verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); verify(ingestService, Mockito.never()).getPipeline(pipelineId2); @@ -406,6 +409,7 @@ pipelineId, null, null, new CompoundProcessor( trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(4)); @@ -482,6 +486,7 @@ pipelineId, null, null, new CompoundProcessor( trackingProcessor.execute(ingestDocument, (result, e) -> {}); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId); verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(2));