Skip to content

Commit

Permalink
Replace required pipeline with final pipeline (#49470)
Browse files Browse the repository at this point in the history
This commit enhances the required pipeline functionality by changing it
so that default/request pipelines can also be executed, but the required
pipeline is always executed last. This gives users the flexibility to
execute their own indexing pipelines, but also ensure that any required
pipelines are also executed. Since such pipelines are executed last, we
change the name of required pipelines to final pipelines.
  • Loading branch information
jasontedor committed Nov 22, 2019
1 parent 5cf9238 commit 829b888
Show file tree
Hide file tree
Showing 13 changed files with 518 additions and 372 deletions.
12 changes: 6 additions & 6 deletions docs/reference/index-modules.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,12 @@ specific index module:
overridden using the `pipeline` parameter. The special pipeline name `_none` indicates
no ingest pipeline should be run.

`index.required_pipeline`::
The required <<ingest,ingest node>> pipeline for this index. Index requests
will fail if the required pipeline is set and the pipeline does not exist.
The required pipeline can not be overridden with the `pipeline` parameter. A
default pipeline and a required pipeline can not both be set. The special
pipeline name `_none` indicates no ingest pipeline will run.
`index.final_pipeline`::
The final <<ingest,ingest node>> pipeline for this index. Index requests
will fail if the final pipeline is set and the pipeline does not exist.
The final pipeline always runs after the request pipeline (if specified) and
the default pipeline (if it exists). The special pipeline name `_none`
indicates no ingest pipeline will run.

[float]
=== Settings in other index modules
Expand Down
7 changes: 5 additions & 2 deletions docs/reference/ingest.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[partintro]
--
Use an ingest node to pre-process documents before the actual document indexing happens.
Use an ingest node to pre-process documents before the actual document indexing happens.
The ingest node intercepts bulk and index requests, it applies transformations, and it then
passes the documents back to the index or bulk APIs.

Expand All @@ -23,7 +23,7 @@ another processor that renames a field. The <<cluster-state,cluster state>> then
the configured pipelines.

To use a pipeline, simply specify the `pipeline` parameter on an index or bulk request. This
way, the ingest node knows which pipeline to use.
way, the ingest node knows which pipeline to use.

For example:
Create a pipeline
Expand Down Expand Up @@ -79,6 +79,9 @@ Response:
An index may also declare a <<dynamic-index-settings,default pipeline>> that will be used in the
absence of the `pipeline` parameter.

Finally, an index may also declare a <<dynamic-index-settings,final pipeline>>
that will be executed after any request or default pipeline (if any).

See <<ingest-apis,Ingest APIs>> for more information about creating, adding, and deleting pipelines.

--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ teardown:
ignore: 404

---
"Test index with required pipeline":
"Test index with final pipeline":
- do:
ingest.put_pipeline:
id: "my_pipeline"
Expand All @@ -23,14 +23,14 @@ teardown:
]
}
- match: { acknowledged: true }
# required pipeline via index
# final pipeline via index
- do:
indices.create:
index: test
body:
settings:
index:
required_pipeline: "my_pipeline"
final_pipeline: "my_pipeline"
aliases:
test_alias: {}

Expand All @@ -46,7 +46,7 @@ teardown:
id: 1
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via alias
# final pipeline via alias
- do:
index:
index: test_alias
Expand All @@ -59,7 +59,7 @@ teardown:
id: 2
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via upsert
# final pipeline via upsert
- do:
update:
index: test
Expand All @@ -75,7 +75,7 @@ teardown:
id: 3
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via scripted upsert
# final pipeline via scripted upsert
- do:
update:
index: test
Expand All @@ -92,7 +92,7 @@ teardown:
id: 4
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via doc_as_upsert
# final pipeline via doc_as_upsert
- do:
update:
index: test
Expand All @@ -106,7 +106,7 @@ teardown:
id: 5
- match: { _source.bytes_source_field: "1kb" }
- match: { _source.bytes_target_field: 1024 }
# required pipeline via bulk upsert
# final pipeline via bulk upsert
# note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline
# needs to be in the upsert, not the script
- do:
Expand Down Expand Up @@ -164,12 +164,3 @@ teardown:
- match: { docs.5._source.bytes_source_field: "3kb" }
- match: { docs.5._source.bytes_target_field: 3072 }
- match: { docs.5._source.ran_script: true }

# bad request, request pipeline can not be specified
- do:
catch: /illegal_argument_exception.*request pipeline \[pipeline\] can not override required pipeline \[my_pipeline\]/
index:
index: test
id: 9
pipeline: "pipeline"
body: {bytes_source_field: "1kb"}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -167,7 +166,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
boolean indexRequestHasPipeline = resolveRequiredOrDefaultPipeline(actionRequest, indexRequest, metaData);
boolean indexRequestHasPipeline = resolvePipelines(actionRequest, indexRequest, metaData);
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
}

Expand Down Expand Up @@ -273,16 +272,14 @@ public void onFailure(Exception e) {
}
}

static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalRequest,
IndexRequest indexRequest,
MetaData metaData) {

static boolean resolvePipelines(final DocWriteRequest<?> originalRequest, final IndexRequest indexRequest, final MetaData metaData) {
if (indexRequest.isPipelineResolved() == false) {
final String requestPipeline = indexRequest.getPipeline();
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
boolean requestCanOverridePipeline = true;
String requiredPipeline = null;
// start to look for default or required pipelines via settings found in the index meta data
indexRequest.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME);
String defaultPipeline = null;
String finalPipeline = null;
// start to look for default or final pipelines via settings found in the index meta data
IndexMetaData indexMetaData = metaData.indices().get(originalRequest.index());
// check the alias for the index request (this is how normal index requests are modeled)
if (indexMetaData == null && indexRequest.index() != null) {
Expand All @@ -302,64 +299,42 @@ static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalReque
}
if (indexMetaData != null) {
final Settings indexSettings = indexMetaData.getSettings();
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
// find the required pipeline if one is defined from an existing index
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(requiredPipeline);
requestCanOverridePipeline = false;
} else {
// find the default pipeline if one is defined from an existing index
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
if (IndexSettings.DEFAULT_PIPELINE.exists(indexSettings)) {
// find the default pipeline if one is defined from an existing index setting
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(defaultPipeline);
}
if (IndexSettings.FINAL_PIPELINE.exists(indexSettings)) {
// find the final pipeline if one is defined from an existing index setting
finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexSettings);
indexRequest.setFinalPipeline(finalPipeline);
}
} else if (indexRequest.index() != null) {
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
// the index does not exist yet (and this is a valid request), so match index templates to look for pipelines
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
assert (templates != null);
// order of templates are highest order first, we have to iterate through them all though
String defaultPipeline = null;
for (IndexTemplateMetaData template : templates) {
// order of templates are highest order first
for (final IndexTemplateMetaData template : templates) {
final Settings settings = template.settings();
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
requestCanOverridePipeline = false;
// we can not break in case a lower-order template has a default pipeline that we need to reject
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
// we can not break in case a lower-order template has a required pipeline that we need to reject
// we can not break in case a lower-order template has a final pipeline that we need to collect
}
if (finalPipeline == null && IndexSettings.FINAL_PIPELINE.exists(settings)) {
finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings);
// we can not break in case a lower-order template has a default pipeline that we need to collect
}
if (defaultPipeline != null && finalPipeline != null) {
// we can break if we have already collected a default and final pipeline
break;
}
}
if (requiredPipeline != null && defaultPipeline != null) {
// we can not have picked up a required and a default pipeline from applying templates
final String message = String.format(
Locale.ROOT,
"required pipeline [%s] and default pipeline [%s] can not both be set",
requiredPipeline,
defaultPipeline);
throw new IllegalArgumentException(message);
}
final String pipeline;
if (requiredPipeline != null) {
pipeline = requiredPipeline;
} else {
pipeline = defaultPipeline != null ? defaultPipeline : IngestService.NOOP_PIPELINE_NAME;
}
indexRequest.setPipeline(pipeline);
indexRequest.setPipeline(defaultPipeline != null ? defaultPipeline : IngestService.NOOP_PIPELINE_NAME);
indexRequest.setFinalPipeline(finalPipeline != null ? finalPipeline : IngestService.NOOP_PIPELINE_NAME);
}

if (requestPipeline != null) {
if (requestCanOverridePipeline == false) {
final String message = String.format(
Locale.ROOT,
"request pipeline [%s] can not override required pipeline [%s]",
requestPipeline,
requiredPipeline);
throw new IllegalArgumentException(message);
} else {
indexRequest.setPipeline(requestPipeline);
}
indexRequest.setPipeline(requestPipeline);
}

/*
Expand All @@ -375,8 +350,10 @@ static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalReque
indexRequest.isPipelineResolved(true);
}

// Return whether this index request has a pipeline
return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false;

// return whether this index request has a pipeline
return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false
|| IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false;
}

boolean needToCheck() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private XContentType contentType;

private String pipeline;
private String finalPipeline;

private boolean isPipelineResolved;

Expand Down Expand Up @@ -133,6 +134,9 @@ public IndexRequest(StreamInput in) throws IOException {
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
pipeline = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
finalPipeline = in.readOptionalString();
}
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
isPipelineResolved = in.readBoolean();
}
Expand Down Expand Up @@ -246,6 +250,9 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("pipeline cannot be an empty string", validationException);
}

if (finalPipeline != null && finalPipeline.isEmpty()) {
validationException = addValidationError("final pipeline cannot be an empty string", validationException);
}

return validationException;
}
Expand Down Expand Up @@ -350,6 +357,26 @@ public String getPipeline() {
return this.pipeline;
}

/**
* Sets the final ingest pipeline to be executed before indexing the document.
*
* @param finalPipeline the name of the final pipeline
* @return this index request
*/
public IndexRequest setFinalPipeline(final String finalPipeline) {
this.finalPipeline = finalPipeline;
return this;
}

/**
* Returns the final ingest pipeline to be executed before indexing the document.
*
* @return the name of the final pipeline
*/
public String getFinalPipeline() {
return this.finalPipeline;
}

/**
* Sets if the pipeline for this request has been resolved by the coordinating node.
*
Expand Down Expand Up @@ -686,6 +713,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeByte(versionType.getValue());
out.writeOptionalString(pipeline);
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeOptionalString(finalPipeline);
}
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeBoolean(isPipelineResolved);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
IndexSettings.DEFAULT_PIPELINE,
IndexSettings.REQUIRED_PIPELINE,
IndexSettings.FINAL_PIPELINE,
MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,

// validate that built-in similarities don't get redefined
Expand Down
Loading

0 comments on commit 829b888

Please sign in to comment.