Skip to content

Commit

Permalink
Backport: Add pipeline name to ingest metadata (#51050)
Browse files Browse the repository at this point in the history
Backport: #50467

This commit adds the name of the current pipeline to ingest metadata.
This pipeline name is accessible under the following key: '_ingest.pipeline'.

Example usage in pipeline:
PUT /_ingest/pipeline/2
{
    "processors": [
        {
            "set": {
                "field": "pipeline_name",
                "value": "{{_ingest.pipeline}}"
            }
        }
    ]
}

Closes #42106
  • Loading branch information
martijnvg committed Jan 16, 2020
1 parent 45d7bdc commit 02dfd71
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 41 deletions.
12 changes: 8 additions & 4 deletions docs/reference/ingest/apis/simulate-pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ The API returns the following response:
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.674Z"
"timestamp": "2017-05-04T22:46:09.674Z",
"pipeline": "_simulate_pipeline"
}
}
},
Expand All @@ -370,7 +371,8 @@ The API returns the following response:
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.675Z"
"timestamp": "2017-05-04T22:46:09.675Z",
"pipeline": "_simulate_pipeline"
}
}
}
Expand All @@ -388,7 +390,8 @@ The API returns the following response:
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.676Z"
"timestamp": "2017-05-04T22:46:09.676Z",
"pipeline": "_simulate_pipeline"
}
}
},
Expand All @@ -403,7 +406,8 @@ The API returns the following response:
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.677Z"
"timestamp": "2017-05-04T22:46:09.677Z",
"pipeline": "_simulate_pipeline"
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/ingest/processors/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ include::common-options.asciidoc[]
--------------------------------------------------
// NOTCONSOLE

The name of the current pipeline can be accessed from the `_ingest.pipeline` ingest metadata key.

An example of using this processor for nesting pipelines would be:

Define an inner pipeline:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,81 @@ teardown:
}
- match: { error.root_cause.0.type: "illegal_state_exception" }
- match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [legal-department]" }

---
"Test _ingest.pipeline metadata":
- do:
ingest.put_pipeline:
id: "pipeline1"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
},
{
"pipeline" : {
"name": "another_pipeline"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "another_pipeline"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
},
{
"pipeline" : {
"name": "another_pipeline2"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "another_pipeline2"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "pipeline1"
body: >
{
}
- do:
get:
index: test
id: 1
- length: { _source.pipelines: 3 }
- match: { _source.pipelines.0: "pipeline1" }
- match: { _source.pipelines.1: "another_pipeline" }
- match: { _source.pipelines.2: "another_pipeline2" }
Original file line number Diff line number Diff line change
Expand Up @@ -291,26 +291,30 @@ teardown:
- length: { docs.0.processor_results.0.doc._source: 2 }
- match: { docs.0.processor_results.0.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.0.doc._source.field2.value: "_value" }
- length: { docs.0.processor_results.0.doc._ingest: 1 }
- length: { docs.0.processor_results.0.doc._ingest: 2 }
- is_true: docs.0.processor_results.0.doc._ingest.timestamp
- is_true: docs.0.processor_results.0.doc._ingest.pipeline
- length: { docs.0.processor_results.1.doc._source: 3 }
- match: { docs.0.processor_results.1.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.1.doc._source.field2.value: "_value" }
- match: { docs.0.processor_results.1.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.1.doc._ingest: 1 }
- length: { docs.0.processor_results.1.doc._ingest: 2 }
- is_true: docs.0.processor_results.1.doc._ingest.timestamp
- is_true: docs.0.processor_results.1.doc._ingest.pipeline
- length: { docs.0.processor_results.2.doc._source: 3 }
- match: { docs.0.processor_results.2.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.2.doc._source.field2.value: "_VALUE" }
- match: { docs.0.processor_results.2.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.2.doc._ingest: 1 }
- length: { docs.0.processor_results.2.doc._ingest: 2 }
- is_true: docs.0.processor_results.2.doc._ingest.timestamp
- is_true: docs.0.processor_results.2.doc._ingest.pipeline
- length: { docs.0.processor_results.3.doc._source: 3 }
- match: { docs.0.processor_results.3.doc._source.foo.bar.0.item: "hello" }
- match: { docs.0.processor_results.3.doc._source.field2.value: "_VALUE" }
- match: { docs.0.processor_results.3.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.3.doc._ingest: 1 }
- length: { docs.0.processor_results.3.doc._ingest: 2 }
- is_true: docs.0.processor_results.3.doc._ingest.timestamp
- is_true: docs.0.processor_results.3.doc._ingest.pipeline

---
"Test simulate with exception thrown":
Expand Down Expand Up @@ -404,12 +408,14 @@ teardown:
- match: { docs.1.processor_results.0.doc._index: "index" }
- match: { docs.1.processor_results.0.doc._source.foo: 5 }
- match: { docs.1.processor_results.0.doc._source.bar: "hello" }
- length: { docs.1.processor_results.0.doc._ingest: 1 }
- length: { docs.1.processor_results.0.doc._ingest: 2 }
- is_true: docs.1.processor_results.0.doc._ingest.timestamp
- is_true: docs.1.processor_results.0.doc._ingest.pipeline
- match: { docs.1.processor_results.1.doc._source.foo: 5 }
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
- length: { docs.1.processor_results.1.doc._ingest: 1 }
- length: { docs.1.processor_results.1.doc._ingest: 2 }
- is_true: docs.1.processor_results.1.doc._ingest.timestamp
- is_true: docs.1.processor_results.1.doc._ingest.pipeline

---
"Test verbose simulate with on_failure":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,14 @@ private static Object deepCopy(Object value) {
*/
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
if (executedPipelines.add(pipeline.getId())) {
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
pipeline.execute(this, (result, e) -> {
executedPipelines.remove(pipeline.getId());
if (previousPipeline != null) {
ingestMetadata.put("pipeline", previousPipeline);
} else {
ingestMetadata.remove("pipeline");
}
handler.accept(result, e);
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class PipelineProcessor extends AbstractProcessor {
private final TemplateScript.Factory pipelineTemplate;
private final IngestService ingestService;

private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
super(tag);
this.pipelineTemplate = pipelineTemplate;
this.ingestService = ingestService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,17 @@ public void testExecuteVerboseItem() throws Exception {
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id"));
IngestDocument firstProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument();
assertThat(firstProcessorIngestDocument, not(sameInstance(this.ingestDocument)));
assertIngestDocument(firstProcessorIngestDocument, this.ingestDocument);
assertThat(firstProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata())));

assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id"));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());

assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("test-id"));
IngestDocument secondProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument();
assertThat(secondProcessorIngestDocument, not(sameInstance(this.ingestDocument)));
assertIngestDocument(secondProcessorIngestDocument, this.ingestDocument);
assertThat(secondProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata())));
assertThat(secondProcessorIngestDocument.getSourceAndMetadata(),
not(sameInstance(firstProcessorIngestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata())));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
}

public void testExecuteItem() throws Exception {
TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
Expand Down Expand Up @@ -147,10 +141,7 @@ public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception {
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), instanceOf(RuntimeException.class));
Expand Down Expand Up @@ -191,14 +182,12 @@ public void testExecuteVerboseItemWithOnFailure() throws Exception {
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock");
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0");
metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed");
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(),
ingestDocumentWithOnFailureMetadata);

assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(),
ingestDocumentWithOnFailureMetadata);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());

assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getProcessorTag(), equalTo("processor_2"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), ingestDocument);
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(2), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getFailure(), nullValue());
}

Expand All @@ -221,10 +210,7 @@ public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), sameInstance(exception));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
}

public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception {
Expand All @@ -245,10 +231,7 @@ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
}

public void testExecuteItemWithFailure() throws Exception {
Expand Down Expand Up @@ -392,4 +375,19 @@ public String getType() {
}
}

private static void assertVerboseResult(SimulateProcessorResult result,
String expectedPipelineId,
IngestDocument expectedIngestDocument) {
IngestDocument simulateVerboseIngestDocument = result.getIngestDocument();
// Remove and compare pipeline key. It is always in the verbose result,
// since that is a snapshot of how the ingest doc looks during pipeline execution, but not in the final ingestDocument.
// The key gets added and removed during pipeline execution.
String actualPipelineId = (String) simulateVerboseIngestDocument.getIngestMetadata().remove("pipeline");
assertThat(actualPipelineId, equalTo(expectedPipelineId));

assertThat(simulateVerboseIngestDocument, not(sameInstance(expectedIngestDocument)));
assertIngestDocument(simulateVerboseIngestDocument, expectedIngestDocument);
assertThat(simulateVerboseIngestDocument.getSourceAndMetadata(), not(sameInstance(expectedIngestDocument.getSourceAndMetadata())));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,12 @@ public void testBreakOnFailure() throws Exception {
public void testFailureProcessorIsInvokedOnFailure() {
TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(4));
assertThat(ingestMetadata.entrySet(), hasSize(5));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue());
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2"));
assertThat(ingestMetadata.get("pipeline"), equalTo("1"));
});

Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
Expand Down

0 comments on commit 02dfd71

Please sign in to comment.