Skip to content

Commit

Permalink
Add support for _meta field to ingest pipelines (#75905) (#76377)
Browse files Browse the repository at this point in the history
We are adding a _meta field to many of our REST APIs so that users can attach whatever metadata they
want. The data in this field will not be used by Elasticsearch. This commit add the _meta field to ingest
pipelines.
Relates to #75799
  • Loading branch information
masseyke committed Aug 11, 2021
1 parent 4941d0e commit aaa712d
Show file tree
Hide file tree
Showing 14 changed files with 179 additions and 45 deletions.
46 changes: 46 additions & 0 deletions docs/reference/ingest/apis/put-pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,53 @@ Processors run sequentially in the order specified.
`version`::
(Optional, integer)
Version number used by external systems to track ingest pipelines.

+
This parameter is intended for external systems only. {es} does not use or
validate pipeline version numbers.

`_meta`::
(Optional, object)
Optional metadata about the ingest pipeline. May have any contents. This
map is not automatically generated by {es}.
// end::pipeline-object[]

[[put-pipeline-api-example]]
==== {api-examples-title}

[[pipeline-metadata]]
===== Pipeline metadata

You can use the `_meta` parameter to add arbitrary metadata to a pipeline.
This user-defined object is stored in the cluster state,
so keeping it short is preferable.

The `_meta` parameter is optional and not automatically generated or used by {es}.

To unset `_meta`, replace the pipeline without specifying one.

[source,console]
--------------------------------------------------
PUT /_ingest/pipeline/my-pipeline-id
{
"description" : "My optional pipeline description",
"processors" : [
{
"set" : {
"description" : "My optional processor description",
"field": "my-keyword-field",
"value": "foo"
}
}
],
"_meta": {
"reason": "set my-keyword-field to foo",
"serialization": {
"class": "MyPipeline",
"id": 10
}
}
}
--------------------------------------------------

To check the `_meta`, use the <<get-pipeline-api,get pipeline>> API.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
Expand All @@ -22,8 +23,10 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.IngestInfo;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -54,6 +57,12 @@ public PutPipelineTransportAction(ThreadPool threadPool, TransportService transp
@Override
protected void masterOperation(PutPipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
throws Exception {
if (state.getNodes().getMinNodeVersion().before(Version.V_7_15_0)) {
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
if (pipelineConfig.containsKey(Pipeline.META_KEY)) {
throw new IllegalStateException("pipelines with _meta field require minimum node version of " + Version.V_7_15_0);
}
}
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear()
.addMetric(NodesInfoRequest.Metric.INGEST.metricName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean v
List<SimulateProcessorResult> processorResultList = new CopyOnWriteArrayList<>();
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
pipeline.getMetadata(), verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline, (result, e) -> {
handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ public String getType() {
}
};
String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded";
return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor));
return new Pipeline(id, description, null, null, new CompoundProcessor(failureProcessor));
}

static class PipelineHolder {
Expand Down
21 changes: 16 additions & 5 deletions server/src/main/java/org/elasticsearch/ingest/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,30 @@ public final class Pipeline {
public static final String PROCESSORS_KEY = "processors";
public static final String VERSION_KEY = "version";
public static final String ON_FAILURE_KEY = "on_failure";
public static final String META_KEY = "_meta";

private final String id;
@Nullable
private final String description;
@Nullable
private final Integer version;
@Nullable
private final Map<String, Object> metadata;
private final CompoundProcessor compoundProcessor;
private final IngestMetric metrics;
private final LongSupplier relativeTimeProvider;

public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
this(id, description, version, compoundProcessor, System::nanoTime);
public Pipeline(String id, @Nullable String description, @Nullable Integer version,
@Nullable Map<String, Object> metadata, CompoundProcessor compoundProcessor) {
this(id, description, version, metadata, compoundProcessor, System::nanoTime);
}

//package private for testing
Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor,
LongSupplier relativeTimeProvider) {
Pipeline(String id, @Nullable String description, @Nullable Integer version, @Nullable Map<String, Object> metadata,
CompoundProcessor compoundProcessor, LongSupplier relativeTimeProvider) {
this.id = id;
this.description = description;
this.metadata = metadata;
this.compoundProcessor = compoundProcessor;
this.version = version;
this.metrics = new IngestMetric();
Expand All @@ -58,6 +63,7 @@ public static Pipeline create(String id, Map<String, Object> config,
Map<String, Processor.Factory> processorFactories, ScriptService scriptService) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
Map<String, Object> metadata = ConfigurationUtils.readOptionalMap(null, null, config, META_KEY);
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, scriptService, processorFactories);
List<Map<String, Object>> onFailureProcessorConfigs =
Expand All @@ -73,7 +79,7 @@ public static Pipeline create(String id, Map<String, Object> config,
}
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors),
Collections.unmodifiableList(onFailureProcessors));
return new Pipeline(id, description, version, compoundProcessor);
return new Pipeline(id, description, version, metadata, compoundProcessor);
}

/**
Expand Down Expand Up @@ -120,6 +126,11 @@ public Integer getVersion() {
return version;
}

@Nullable
public Map<String, Object> getMetadata() {
return metadata;
}

/**
* Get the underlying {@link CompoundProcessor} containing the Pipeline's processors
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription(), conditionalWithResult));
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
pipeline.getMetadata(), verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline, handler);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void destroy() {

public void testExecuteVerboseItem() throws Exception {
TestProcessor processor = new TestProcessor("test-id", "mock", null, ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor, processor));

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
Expand All @@ -93,7 +93,7 @@ public void testExecuteVerboseItem() throws Exception {
}
public void testExecuteItem() throws Exception {
TestProcessor processor = new TestProcessor("processor_0", "mock", null, ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor, processor));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> {
Expand All @@ -113,7 +113,7 @@ public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception {
TestProcessor processor1 = new TestProcessor("processor_0", "mock", null, ingestDocument -> {});
TestProcessor processor2 = new TestProcessor("processor_1", "mock", null, new RuntimeException("processor failed"));
TestProcessor processor3 = new TestProcessor("processor_2", "mock", null, ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1, processor2, processor3));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
Expand Down Expand Up @@ -142,8 +142,8 @@ public void testExecuteVerboseItemWithOnFailure() throws Exception {
TestProcessor processor1 = new TestProcessor("processor_0", "mock", null, new RuntimeException("processor failed"));
TestProcessor processor2 = new TestProcessor("processor_1", "mock", null, ingestDocument -> {});
TestProcessor processor3 = new TestProcessor("processor_2", "mock", null, ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version,
new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1),
Pipeline pipeline = new Pipeline("_id", "_description", version, null,
new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2)), processor3));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
Expand Down Expand Up @@ -184,7 +184,7 @@ public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception
RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", null, exception);
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
Expand All @@ -205,7 +205,7 @@ public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception
public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception {
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", null, ingestDocument -> { });
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
Expand All @@ -225,7 +225,7 @@ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws

public void testExecuteItemWithFailure() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); });
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor, processor));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> {
Expand All @@ -247,7 +247,7 @@ public void testExecuteItemWithFailure() throws Exception {
public void testDropDocument() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
Processor processor2 = new DropProcessor.Factory().create(Collections.emptyMap(), null, null, Collections.emptyMap());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1, processor2));

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
Expand All @@ -267,7 +267,7 @@ public void testDropDocument() throws Exception {
public void testDropDocumentVerbose() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
Processor processor2 = new DropProcessor.Factory().create(Collections.emptyMap(), null, null, Collections.emptyMap());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1, processor2));

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
Expand All @@ -291,7 +291,7 @@ public void testDropDocumentVerboseExtraProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field1", "value"));
Processor processor2 = new DropProcessor.Factory().create(Collections.emptyMap(), null, null, Collections.emptyMap());
TestProcessor processor3 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value"));
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1, processor2, processor3));

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
Expand Down Expand Up @@ -338,7 +338,7 @@ public String getType() {
return "none-of-your-business";
}
};
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1));
SimulatePipelineRequest.Parsed request = new SimulatePipelineRequest.Parsed(pipeline, documents, false);

AtomicReference<SimulatePipelineResponse> responseHolder = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
public void init() throws IOException {
TestProcessor processor = new TestProcessor(ingestDocument -> {});
CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, pipelineCompoundProcessor);
Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, null, pipelineCompoundProcessor);
Map<String, Processor.Factory> registry =
Collections.singletonMap("mock_processor", (factories, tag, description, config) -> processor);
ingestService = mock(IngestService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,10 @@ public void testFailureProcessorIsInvokedOnFailure() {
assertThat(ingestMetadata.get("pipeline"), equalTo("1"));
});

Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(false, singletonList(new AbstractProcessor(null, null) {
Pipeline pipeline2 = new Pipeline("2", null, null, null,
new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
Pipeline pipeline1 = new Pipeline("1", null, null, null, new CompoundProcessor(false, singletonList(new AbstractProcessor(null,
null) {
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
ingestDocument.executePipeline(pipeline2, handler);
Expand Down Expand Up @@ -326,9 +328,9 @@ public void testNewCompoundProcessorException() {
}

public void testNewCompoundProcessorExceptionPipelineOrigin() {
Pipeline pipeline2 = new Pipeline("2", null, null,
Pipeline pipeline2 = new Pipeline("2", null, null, null,
new CompoundProcessor(new TestProcessor("my_tag", "my_type", null, new RuntimeException())));
Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(new AbstractProcessor(null, null) {
Pipeline pipeline1 = new Pipeline("1", null, null, null, new CompoundProcessor(new AbstractProcessor(null, null) {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit aaa712d

Please sign in to comment.