diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 9b047a0d4f1ac..afa3b911d6d03 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -54,7 +54,6 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -155,115 +154,13 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener indicesMetaData = metaData.indices(); for (DocWriteRequest actionRequest : bulkRequest.requests) { IndexRequest indexRequest = getIndexWriteRequest(actionRequest); - if (indexRequest != null) { - if (indexRequest.isPipelineResolved() == false) { - final String requestPipeline = indexRequest.getPipeline(); - indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); - boolean requestCanOverridePipeline = true; - String requiredPipeline = null; - // start to look for default or required pipelines via settings found in the index meta data - IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index()); - // check the alias for the index request (this is how normal index requests are modeled) - if (indexMetaData == null && indexRequest.index() != null) { - AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); - if (indexOrAlias != null && indexOrAlias.isAlias()) { - AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; - indexMetaData = alias.getWriteIndex(); - } - } - // check the alias for the action request (this is how upserts are modeled) - if (indexMetaData == null && actionRequest.index() != null) { - AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(actionRequest.index()); - if (indexOrAlias != null && indexOrAlias.isAlias()) { - AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; - indexMetaData = alias.getWriteIndex(); - } - } - if (indexMetaData != null) { - final Settings indexSettings = indexMetaData.getSettings(); - if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) { - // find the required pipeline if one is defined from an existing index - requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings); - assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) : - IndexSettings.DEFAULT_PIPELINE.get(indexSettings); - indexRequest.setPipeline(requiredPipeline); - requestCanOverridePipeline = false; - } else { - // find the default pipeline if one is defined from an existing index - String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings); - indexRequest.setPipeline(defaultPipeline); - } - } else if (indexRequest.index() != null) { - // the index does not exist yet (and is valid request), so match index templates to look for a default pipeline - List templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index()); - assert (templates != null); - // order of templates are highest order first, we have to iterate through them all though - String defaultPipeline = null; - for (IndexTemplateMetaData template : templates) { - final Settings settings = template.settings(); - if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) { - requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings); - requestCanOverridePipeline = false; - // we can not break in case a lower-order template has a default pipeline that we need to reject - } else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) { - defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); - // we can not break in case a lower-order template has a required pipeline that we need to reject - } - } - if (requiredPipeline != null && defaultPipeline != null) { - // we can not have picked up a required and a default pipeline from applying templates - final String message = String.format( - Locale.ROOT, - "required pipeline [%s] and default pipeline [%s] can not both be set", - requiredPipeline, - defaultPipeline); - throw new IllegalArgumentException(message); - } - final String pipeline; - if (requiredPipeline != null) { - pipeline = requiredPipeline; - } else { - pipeline = Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME); - } - indexRequest.setPipeline(pipeline); - } - - if (requestPipeline != null) { - if (requestCanOverridePipeline == false) { - final String message = String.format( - Locale.ROOT, - "request pipeline [%s] can not override required pipeline [%s]", - requestPipeline, - requiredPipeline); - throw new IllegalArgumentException(message); - } else { - indexRequest.setPipeline(requestPipeline); - } - } - - if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) { - hasIndexRequestsWithPipelines = true; - } - /* - * We have to track whether or not the pipeline for this request has already been resolved. It can happen that the - * pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request - * has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have - * already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we - * can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been - * set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish - * these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request - * pipeline parameter too. - */ - indexRequest.isPipelineResolved(true); - } else if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) { - hasIndexRequestsWithPipelines = true; - } + // Each index request needs to be evaluated, because this method also modifies the IndexRequest + boolean indexRequestHasPipeline = resolveRequiredOrDefaultPipeline(actionRequest, indexRequest, metaData); + hasIndexRequestsWithPipelines |= indexRequestHasPipeline; } - } if (hasIndexRequestsWithPipelines) { @@ -359,6 +256,112 @@ public void onFailure(Exception e) { } } + static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest originalRequest, + IndexRequest indexRequest, + MetaData metaData) { + + if (indexRequest.isPipelineResolved() == false) { + final String requestPipeline = indexRequest.getPipeline(); + indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); + boolean requestCanOverridePipeline = true; + String requiredPipeline = null; + // start to look for default or required pipelines via settings found in the index meta data + IndexMetaData indexMetaData = metaData.indices().get(originalRequest.index()); + // check the alias for the index request (this is how normal index requests are modeled) + if (indexMetaData == null && indexRequest.index() != null) { + AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); + if (indexOrAlias != null && indexOrAlias.isAlias()) { + AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; + indexMetaData = alias.getWriteIndex(); + } + } + // check the alias for the action request (this is how upserts are modeled) + if (indexMetaData == null && originalRequest.index() != null) { + AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(originalRequest.index()); + if (indexOrAlias != null && indexOrAlias.isAlias()) { + AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; + indexMetaData = alias.getWriteIndex(); + } + } + if (indexMetaData != null) { + final Settings indexSettings = indexMetaData.getSettings(); + if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) { + // find the required pipeline if one is defined from an existing index + requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings); + assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) : + IndexSettings.DEFAULT_PIPELINE.get(indexSettings); + indexRequest.setPipeline(requiredPipeline); + requestCanOverridePipeline = false; + } else { + // find the default pipeline if one is defined from an existing index + String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings); + indexRequest.setPipeline(defaultPipeline); + } + } else if (indexRequest.index() != null) { + // the index does not exist yet (and is valid request), so match index templates to look for a default pipeline + List templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index()); + assert (templates != null); + // order of templates are highest order first, we have to iterate through them all though + String defaultPipeline = null; + for (IndexTemplateMetaData template : templates) { + final Settings settings = template.settings(); + if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) { + requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings); + requestCanOverridePipeline = false; + // we can not break in case a lower-order template has a default pipeline that we need to reject + } else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) { + defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); + // we can not break in case a lower-order template has a required pipeline that we need to reject + } + } + if (requiredPipeline != null && defaultPipeline != null) { + // we can not have picked up a required and a default pipeline from applying templates + final String message = String.format( + Locale.ROOT, + "required pipeline [%s] and default pipeline [%s] can not both be set", + requiredPipeline, + defaultPipeline); + throw new IllegalArgumentException(message); + } + final String pipeline; + if (requiredPipeline != null) { + pipeline = requiredPipeline; + } else { + pipeline = Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME); + } + indexRequest.setPipeline(pipeline); + } + + if (requestPipeline != null) { + if (requestCanOverridePipeline == false) { + final String message = String.format( + Locale.ROOT, + "request pipeline [%s] can not override required pipeline [%s]", + requestPipeline, + requiredPipeline); + throw new IllegalArgumentException(message); + } else { + indexRequest.setPipeline(requestPipeline); + } + } + + /* + * We have to track whether or not the pipeline for this request has already been resolved. It can happen that the + * pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request + * has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have + * already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we + * can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been + * set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish + * these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request + * pipeline parameter too. + */ + indexRequest.isPipelineResolved(true); + } + + // Return whether this index request has a pipeline + return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false; + } + boolean needToCheck() { return autoCreateIndex.needToCheck(); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index dfba68d364b01..8ef668c01b727 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver; @@ -28,11 +29,17 @@ import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; @@ -42,9 +49,12 @@ import org.junit.Before; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; public class TransportBulkActionTests extends ESTestCase { @@ -154,4 +164,152 @@ public void testGetIndexWriteRequest() throws Exception { UpdateRequest badUpsertRequest = new UpdateRequest("index", "type", "id1"); assertNull(TransportBulkAction.getIndexWriteRequest(badUpsertRequest)); } + + public void testResolveRequiredOrDefaultPipelineDefaultPipeline() { + IndexMetaData.Builder builder = IndexMetaData.builder("idx") + .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline")) + .numberOfShards(1) + .numberOfReplicas(0) + .putAlias(AliasMetaData.builder("alias").writeIndex(true).build()); + MetaData metaData = MetaData.builder().put(builder).build(); + + // index name matches with IDM: + IndexRequest indexRequest = new IndexRequest("idx"); + boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + assertThat(result, is(true)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo("default-pipeline")); + + // alias name matches with IDM: + indexRequest = new IndexRequest("alias"); + result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + assertThat(result, is(true)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo("default-pipeline")); + + // index name matches with ITMD: + IndexTemplateMetaData.Builder templateBuilder = IndexTemplateMetaData.builder("name1") + .patterns(List.of("id*")) + .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline")); + metaData = MetaData.builder().put(templateBuilder).build(); + indexRequest = new IndexRequest("idx"); + result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + assertThat(result, is(true)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo("default-pipeline")); + } + + public void testResolveRequiredOrDefaultPipelineRequiredPipeline() { + IndexMetaData.Builder builder = IndexMetaData.builder("idx") + .settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline")) + .numberOfShards(1) + .numberOfReplicas(0) + .putAlias(AliasMetaData.builder("alias").writeIndex(true).build()); + MetaData metaData = MetaData.builder().put(builder).build(); + + // index name matches with IDM: + IndexRequest indexRequest = new IndexRequest("idx"); + boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + assertThat(result, is(true)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo("required-pipeline")); + + // alias name matches with IDM: + indexRequest = new IndexRequest("alias"); + result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + assertThat(result, is(true)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo("required-pipeline")); + + // index name matches with ITMD: + IndexTemplateMetaData.Builder templateBuilder = IndexTemplateMetaData.builder("name1") + .patterns(List.of("id*")) + .settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline")); + metaData = MetaData.builder().put(templateBuilder).build(); + indexRequest = new IndexRequest("idx"); + result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + assertThat(result, is(true)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo("required-pipeline")); + } + + public void testResolveRequiredOrDefaultAndRequiredPipeline() { + IndexTemplateMetaData.Builder builder1 = IndexTemplateMetaData.builder("name1") + .patterns(List.of("i*")) + .settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline")); + IndexTemplateMetaData.Builder builder2 = IndexTemplateMetaData.builder("name2") + .patterns(List.of("id*")) + .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline")); + MetaData metaData = MetaData.builder().put(builder1).put(builder2).build(); + + IndexRequest indexRequest = new IndexRequest("idx"); + Exception e = expectThrows(IllegalArgumentException.class, + () -> TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData)); + assertThat(e.getMessage(), + equalTo("required pipeline [required-pipeline] and default pipeline [default-pipeline] can not both be set")); + } + + public void testResolveRequiredOrDefaultPipelineRequestPipeline() { + // no pipeline: + { + MetaData metaData = MetaData.builder().build(); + IndexRequest indexRequest = new IndexRequest("idx"); + boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + assertThat(result, is(false)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME)); + } + + // request pipeline: + { + MetaData metaData = MetaData.builder().build(); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline"); + boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + assertThat(result, is(true)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo("request-pipeline")); + } + + // request pipeline with default pipeline: + { + IndexMetaData.Builder builder = IndexMetaData.builder("idx") + .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline")) + .numberOfShards(1) + .numberOfReplicas(0); + MetaData metaData = MetaData.builder().put(builder).build(); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline"); + boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + assertThat(result, is(true)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo("request-pipeline")); + } + + // request pipeline with required pipeline: + { + IndexMetaData.Builder builder = IndexMetaData.builder("idx") + .settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline")) + .numberOfShards(1) + .numberOfReplicas(0); + MetaData metaData = MetaData.builder().put(builder).build(); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline"); + Exception e = expectThrows(IllegalArgumentException.class, + () -> TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData)); + assertThat(e.getMessage(), + equalTo("request pipeline [request-pipeline] can not override required pipeline [required-pipeline]")); + } + + // request pipeline set to required pipeline: + { + IndexMetaData.Builder builder = IndexMetaData.builder("idx") + .settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline")) + .numberOfShards(1) + .numberOfReplicas(0); + MetaData metaData = MetaData.builder().put(builder).build(); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("required-pipeline").isPipelineResolved(true); + boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + assertThat(result, is(true)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo("required-pipeline")); + } + } }