Skip to content

Commit

Permalink
Make ingest pipeline resolution logic unit testable (#47026)
Browse files Browse the repository at this point in the history
Extracted ingest pipeline resolution logic into a static method
and added unit tests for pipeline resolution logic.

Followup from #46847
  • Loading branch information
martijnvg committed Sep 25, 2019
1 parent 65d4124 commit 737376d
Show file tree
Hide file tree
Showing 2 changed files with 267 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -155,115 +154,13 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk

boolean hasIndexRequestsWithPipelines = false;
final MetaData metaData = clusterService.state().getMetaData();
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);

if (indexRequest != null) {
if (indexRequest.isPipelineResolved() == false) {
final String requestPipeline = indexRequest.getPipeline();
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
boolean requestCanOverridePipeline = true;
String requiredPipeline = null;
// start to look for default or required pipelines via settings found in the index meta data
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
// check the alias for the index request (this is how normal index requests are modeled)
if (indexMetaData == null && indexRequest.index() != null) {
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
// check the alias for the action request (this is how upserts are modeled)
if (indexMetaData == null && actionRequest.index() != null) {
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(actionRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
if (indexMetaData != null) {
final Settings indexSettings = indexMetaData.getSettings();
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
// find the required pipeline if one is defined from an existing index
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(requiredPipeline);
requestCanOverridePipeline = false;
} else {
// find the default pipeline if one is defined from an existing index
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(defaultPipeline);
}
} else if (indexRequest.index() != null) {
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
assert (templates != null);
// order of templates are highest order first, we have to iterate through them all though
String defaultPipeline = null;
for (IndexTemplateMetaData template : templates) {
final Settings settings = template.settings();
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
requestCanOverridePipeline = false;
// we can not break in case a lower-order template has a default pipeline that we need to reject
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
// we can not break in case a lower-order template has a required pipeline that we need to reject
}
}
if (requiredPipeline != null && defaultPipeline != null) {
// we can not have picked up a required and a default pipeline from applying templates
final String message = String.format(
Locale.ROOT,
"required pipeline [%s] and default pipeline [%s] can not both be set",
requiredPipeline,
defaultPipeline);
throw new IllegalArgumentException(message);
}
final String pipeline;
if (requiredPipeline != null) {
pipeline = requiredPipeline;
} else {
pipeline = Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME);
}
indexRequest.setPipeline(pipeline);
}

if (requestPipeline != null) {
if (requestCanOverridePipeline == false) {
final String message = String.format(
Locale.ROOT,
"request pipeline [%s] can not override required pipeline [%s]",
requestPipeline,
requiredPipeline);
throw new IllegalArgumentException(message);
} else {
indexRequest.setPipeline(requestPipeline);
}
}

if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
hasIndexRequestsWithPipelines = true;
}
/*
* We have to track whether or not the pipeline for this request has already been resolved. It can happen that the
* pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request
* has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have
* already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we
* can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been
* set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish
* these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request
* pipeline parameter too.
*/
indexRequest.isPipelineResolved(true);
} else if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
hasIndexRequestsWithPipelines = true;
}
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
boolean indexRequestHasPipeline = resolveRequiredOrDefaultPipeline(actionRequest, indexRequest, metaData);
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
}

}

if (hasIndexRequestsWithPipelines) {
Expand Down Expand Up @@ -359,6 +256,112 @@ public void onFailure(Exception e) {
}
}

static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest<?> originalRequest,
IndexRequest indexRequest,
MetaData metaData) {

if (indexRequest.isPipelineResolved() == false) {
final String requestPipeline = indexRequest.getPipeline();
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
boolean requestCanOverridePipeline = true;
String requiredPipeline = null;
// start to look for default or required pipelines via settings found in the index meta data
IndexMetaData indexMetaData = metaData.indices().get(originalRequest.index());
// check the alias for the index request (this is how normal index requests are modeled)
if (indexMetaData == null && indexRequest.index() != null) {
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
// check the alias for the action request (this is how upserts are modeled)
if (indexMetaData == null && originalRequest.index() != null) {
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(originalRequest.index());
if (indexOrAlias != null && indexOrAlias.isAlias()) {
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias;
indexMetaData = alias.getWriteIndex();
}
}
if (indexMetaData != null) {
final Settings indexSettings = indexMetaData.getSettings();
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
// find the required pipeline if one is defined from an existing index
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(requiredPipeline);
requestCanOverridePipeline = false;
} else {
// find the default pipeline if one is defined from an existing index
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(defaultPipeline);
}
} else if (indexRequest.index() != null) {
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
assert (templates != null);
// order of templates are highest order first, we have to iterate through them all though
String defaultPipeline = null;
for (IndexTemplateMetaData template : templates) {
final Settings settings = template.settings();
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
requestCanOverridePipeline = false;
// we can not break in case a lower-order template has a default pipeline that we need to reject
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
// we can not break in case a lower-order template has a required pipeline that we need to reject
}
}
if (requiredPipeline != null && defaultPipeline != null) {
// we can not have picked up a required and a default pipeline from applying templates
final String message = String.format(
Locale.ROOT,
"required pipeline [%s] and default pipeline [%s] can not both be set",
requiredPipeline,
defaultPipeline);
throw new IllegalArgumentException(message);
}
final String pipeline;
if (requiredPipeline != null) {
pipeline = requiredPipeline;
} else {
pipeline = Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME);
}
indexRequest.setPipeline(pipeline);
}

if (requestPipeline != null) {
if (requestCanOverridePipeline == false) {
final String message = String.format(
Locale.ROOT,
"request pipeline [%s] can not override required pipeline [%s]",
requestPipeline,
requiredPipeline);
throw new IllegalArgumentException(message);
} else {
indexRequest.setPipeline(requestPipeline);
}
}

/*
* We have to track whether or not the pipeline for this request has already been resolved. It can happen that the
* pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request
* has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have
* already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we
* can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been
* set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish
* these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request
* pipeline parameter too.
*/
indexRequest.isPipelineResolved(true);
}

// Return whether this index request has a pipeline
return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false;
}

boolean needToCheck() {
return autoCreateIndex.needToCheck();
}
Expand Down
Loading

0 comments on commit 737376d

Please sign in to comment.