Skip to content

Commit

Permalink
Add pipeline name to ingest metadata (elastic#50467)
Browse files Browse the repository at this point in the history
This commit adds the name of the current pipeline to ingest metadata.
This pipeline name is accessible under the following key: '_ingest.pipeline'.

Example usage in pipeline:
PUT /_ingest/pipeline/2
{
    "processors": [
        {
            "set": {
                "field": "pipeline_name",
                "value": "{{_ingest.pipeline}}"
            }
        }
    ]
}

Closes elastic#42106
  • Loading branch information
martijnvg authored and SivagurunathanV committed Jan 21, 2020
1 parent e397be1 commit ee02062
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 41 deletions.
12 changes: 8 additions & 4 deletions docs/reference/ingest/apis/simulate-pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ The API returns the following response:
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.674Z"
"timestamp": "2017-05-04T22:46:09.674Z",
"pipeline": "_simulate_pipeline"
}
}
},
Expand All @@ -364,7 +365,8 @@ The API returns the following response:
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.675Z"
"timestamp": "2017-05-04T22:46:09.675Z",
"pipeline": "_simulate_pipeline"
}
}
}
Expand All @@ -381,7 +383,8 @@ The API returns the following response:
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.676Z"
"timestamp": "2017-05-04T22:46:09.676Z",
"pipeline": "_simulate_pipeline"
}
}
},
Expand All @@ -395,7 +398,8 @@ The API returns the following response:
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.677Z"
"timestamp": "2017-05-04T22:46:09.677Z",
"pipeline": "_simulate_pipeline"
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/ingest/processors/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ include::common-options.asciidoc[]
--------------------------------------------------
// NOTCONSOLE

The name of the current pipeline can be accessed from the `_ingest.pipeline` ingest metadata key.

An example of using this processor for nesting pipelines would be:

Define an inner pipeline:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,81 @@ teardown:
}
- match: { error.root_cause.0.type: "illegal_state_exception" }
- match: { error.root_cause.0.reason: "Pipeline processor configured for non-existent pipeline [legal-department]" }

---
"Test _ingest.pipeline metadata":
- do:
ingest.put_pipeline:
id: "pipeline1"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
},
{
"pipeline" : {
"name": "another_pipeline"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "another_pipeline"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
},
{
"pipeline" : {
"name": "another_pipeline2"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "another_pipeline2"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "pipeline1"
body: >
{
}
- do:
get:
index: test
id: 1
- length: { _source.pipelines: 3 }
- match: { _source.pipelines.0: "pipeline1" }
- match: { _source.pipelines.1: "another_pipeline" }
- match: { _source.pipelines.2: "another_pipeline2" }
Original file line number Diff line number Diff line change
Expand Up @@ -284,26 +284,30 @@ teardown:
- length: { docs.0.processor_results.0.doc._source: 2 }
- match: { docs.0.processor_results.0.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.0.doc._source.field2.value: "_value" }
- length: { docs.0.processor_results.0.doc._ingest: 1 }
- length: { docs.0.processor_results.0.doc._ingest: 2 }
- is_true: docs.0.processor_results.0.doc._ingest.timestamp
- is_true: docs.0.processor_results.0.doc._ingest.pipeline
- length: { docs.0.processor_results.1.doc._source: 3 }
- match: { docs.0.processor_results.1.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.1.doc._source.field2.value: "_value" }
- match: { docs.0.processor_results.1.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.1.doc._ingest: 1 }
- length: { docs.0.processor_results.1.doc._ingest: 2 }
- is_true: docs.0.processor_results.1.doc._ingest.timestamp
- is_true: docs.0.processor_results.1.doc._ingest.pipeline
- length: { docs.0.processor_results.2.doc._source: 3 }
- match: { docs.0.processor_results.2.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.2.doc._source.field2.value: "_VALUE" }
- match: { docs.0.processor_results.2.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.2.doc._ingest: 1 }
- length: { docs.0.processor_results.2.doc._ingest: 2 }
- is_true: docs.0.processor_results.2.doc._ingest.timestamp
- is_true: docs.0.processor_results.2.doc._ingest.pipeline
- length: { docs.0.processor_results.3.doc._source: 3 }
- match: { docs.0.processor_results.3.doc._source.foo.bar.0.item: "hello" }
- match: { docs.0.processor_results.3.doc._source.field2.value: "_VALUE" }
- match: { docs.0.processor_results.3.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.3.doc._ingest: 1 }
- length: { docs.0.processor_results.3.doc._ingest: 2 }
- is_true: docs.0.processor_results.3.doc._ingest.timestamp
- is_true: docs.0.processor_results.3.doc._ingest.pipeline

---
"Test simulate with exception thrown":
Expand Down Expand Up @@ -393,12 +397,14 @@ teardown:
- match: { docs.1.processor_results.0.doc._index: "index" }
- match: { docs.1.processor_results.0.doc._source.foo: 5 }
- match: { docs.1.processor_results.0.doc._source.bar: "hello" }
- length: { docs.1.processor_results.0.doc._ingest: 1 }
- length: { docs.1.processor_results.0.doc._ingest: 2 }
- is_true: docs.1.processor_results.0.doc._ingest.timestamp
- is_true: docs.1.processor_results.0.doc._ingest.pipeline
- match: { docs.1.processor_results.1.doc._source.foo: 5 }
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
- length: { docs.1.processor_results.1.doc._ingest: 1 }
- length: { docs.1.processor_results.1.doc._ingest: 2 }
- is_true: docs.1.processor_results.1.doc._ingest.timestamp
- is_true: docs.1.processor_results.1.doc._ingest.pipeline

---
"Test verbose simulate with on_failure":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,8 +646,14 @@ private static Object deepCopy(Object value) {
*/
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
if (executedPipelines.add(pipeline.getId())) {
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
pipeline.execute(this, (result, e) -> {
executedPipelines.remove(pipeline.getId());
if (previousPipeline != null) {
ingestMetadata.put("pipeline", previousPipeline);
} else {
ingestMetadata.remove("pipeline");
}
handler.accept(result, e);
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class PipelineProcessor extends AbstractProcessor {
private final TemplateScript.Factory pipelineTemplate;
private final IngestService ingestService;

private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
super(tag);
this.pipelineTemplate = pipelineTemplate;
this.ingestService = ingestService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,17 @@ public void testExecuteVerboseItem() throws Exception {
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id"));
IngestDocument firstProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument();
assertThat(firstProcessorIngestDocument, not(sameInstance(this.ingestDocument)));
assertIngestDocument(firstProcessorIngestDocument, this.ingestDocument);
assertThat(firstProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata())));

assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id"));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());

assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("test-id"));
IngestDocument secondProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument();
assertThat(secondProcessorIngestDocument, not(sameInstance(this.ingestDocument)));
assertIngestDocument(secondProcessorIngestDocument, this.ingestDocument);
assertThat(secondProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata())));
assertThat(secondProcessorIngestDocument.getSourceAndMetadata(),
not(sameInstance(firstProcessorIngestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata())));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
}

public void testExecuteItem() throws Exception {
TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
Expand Down Expand Up @@ -147,10 +141,7 @@ public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception {
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), instanceOf(RuntimeException.class));
Expand Down Expand Up @@ -191,14 +182,12 @@ public void testExecuteVerboseItemWithOnFailure() throws Exception {
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock");
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0");
metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed");
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(),
ingestDocumentWithOnFailureMetadata);

assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(),
ingestDocumentWithOnFailureMetadata);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());

assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getProcessorTag(), equalTo("processor_2"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), ingestDocument);
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(2), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getFailure(), nullValue());
}

Expand All @@ -221,10 +210,7 @@ public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), sameInstance(exception));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
}

public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception {
Expand All @@ -245,10 +231,7 @@ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
}

public void testExecuteItemWithFailure() throws Exception {
Expand Down Expand Up @@ -392,4 +375,19 @@ public String getType() {
}
}

private static void assertVerboseResult(SimulateProcessorResult result,
String expectedPipelineId,
IngestDocument expectedIngestDocument) {
IngestDocument simulateVerboseIngestDocument = result.getIngestDocument();
// Remove and compare pipeline key. It is always in the verbose result,
// since that is a snapshot of how the ingest doc looks during pipeline execution, but not in the final ingestDocument.
// The key gets added and removed during pipeline execution.
String actualPipelineId = (String) simulateVerboseIngestDocument.getIngestMetadata().remove("pipeline");
assertThat(actualPipelineId, equalTo(expectedPipelineId));

assertThat(simulateVerboseIngestDocument, not(sameInstance(expectedIngestDocument)));
assertIngestDocument(simulateVerboseIngestDocument, expectedIngestDocument);
assertThat(simulateVerboseIngestDocument.getSourceAndMetadata(), not(sameInstance(expectedIngestDocument.getSourceAndMetadata())));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,12 @@ public void testBreakOnFailure() throws Exception {
public void testFailureProcessorIsInvokedOnFailure() {
TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(4));
assertThat(ingestMetadata.entrySet(), hasSize(5));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue());
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2"));
assertThat(ingestMetadata.get("pipeline"), equalTo("1"));
});

Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
Expand Down

0 comments on commit ee02062

Please sign in to comment.