From 82eb8d0dbef36bde6d34d50729e8eb70a9be1f82 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 4 Sep 2019 10:29:01 -0400 Subject: [PATCH 1/6] Add the ability to require an ingest pipeline This commit adds the ability to require an ingest pipeline on an index. Today we can have a default pipeline, but that could be overridden by a request pipeline parameter. This commit introduces a new index setting index.required_pipeline that acts similarly to index.default_pipeline, except that it can not be overridden by a request pipeline parameter. Additionally, a default pipeline and a request pipeline can not both be set. The required pipeline can be set to _none to ensure that no pipeline ever runs for index requests on that index. --- docs/reference/index-modules.asciidoc | 9 +- .../action/bulk/TransportBulkAction.java | 82 ++++++++--- .../common/settings/IndexScopedSettings.java | 1 + .../elasticsearch/index/IndexSettings.java | 81 ++++++++++- .../index/RequiredPipelineIT.java | 133 ++++++++++++++++++ .../action/TransportResumeFollowAction.java | 1 + 6 files changed, 281 insertions(+), 26 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index 93da06d9b9674..9ae1dac826e3f 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -234,13 +234,20 @@ specific index module: The length of time that a <> remains available for <>. Defaults to `60s`. - `index.default_pipeline`:: + `index.default_pipeline`:: The default <> pipeline for this index. Index requests will fail if the default pipeline is set and the pipeline does not exist. The default may be overridden using the `pipeline` parameter. The special pipeline name `_none` indicates no ingest pipeline should be run. + `index.required_pipeline`:: + The required <> pipeline for this index. Index requests + will fail if the required pipeline is set and the pipeline does not exist. + The required pipeline can not be overridden with the `pipeline` parameter. A + default pipeline and a required pipeline can not both be set. The special + pipeline name `_none` indicates no ingest pipeline will run. + [float] === Settings in other index modules diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index a2f105df7e9b7..1ebe60d559524 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -76,6 +76,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -155,11 +156,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener indicesMetaData = metaData.indices(); for (DocWriteRequest actionRequest : bulkRequest.requests) { IndexRequest indexRequest = getIndexWriteRequest(actionRequest); + if (indexRequest != null) { - // get pipeline from request - String pipeline = indexRequest.getPipeline(); - if (pipeline == null) { - // start to look for default pipeline via settings found in the index meta data + final String requestPipeline = indexRequest.getPipeline(); + if (requestPipeline == null) { + indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); + boolean requestCanOverridePipeline = true; + String requiredPipeline = null; + // start to look for default or required pipelines via settings found in the index meta data IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index()); // check the alias for the index request (this is how normal index requests are modeled) if (indexMetaData == null && indexRequest.index() != null) { @@ -178,34 +182,76 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index()); assert (templates != null); - String defaultPipeline = IngestService.NOOP_PIPELINE_NAME; - // order of templates are highest order first, break if we find a default_pipeline + // order of templates are highest order first, we have to iterate through them all though + String defaultPipeline = null; for (IndexTemplateMetaData template : templates) { final Settings settings = template.settings(); - if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) { + if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) { + requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings); + requestCanOverridePipeline = false; + // we can not break in case a lower-order template has a default pipeline that we need to reject + } else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) { defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); - break; + // we can not break in case a lower-order template has a required pipeline that we need to reject } } - indexRequest.setPipeline(defaultPipeline); - if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { - hasIndexRequestsWithPipelines = true; + if (requiredPipeline != null && defaultPipeline != null) { + // we can not have picked up a required and a default pipeline from applying templates + final String message = String.format( + Locale.ROOT, + "required pipeline [%s] and default pipeline [%s] can not both be set", + requiredPipeline, + defaultPipeline); + throw new IllegalArgumentException(message); + } + final String pipeline; + if (requiredPipeline != null) { + pipeline = requiredPipeline; + } else { + pipeline = Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME); + } + indexRequest.setPipeline(pipeline); + } + + if (requestPipeline != null) { + if (requestCanOverridePipeline == false) { + final String message = String.format( + Locale.ROOT, + "request pipeline [%s] can not override required pipeline [%s]", + requestPipeline, + requiredPipeline); + throw new IllegalArgumentException(message); + } else { + indexRequest.setPipeline(requestPipeline); } } - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { + + if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) { + hasIndexRequestsWithPipelines = true; + } + + } else if (IngestService.NOOP_PIPELINE_NAME.equals(requestPipeline) == false) { hasIndexRequestsWithPipelines = true; } } + } if (hasIndexRequestsWithPipelines) { diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index ff8404f4cfb76..07b107db678e7 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -163,6 +163,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { EngineConfig.INDEX_CODEC_SETTING, IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS, IndexSettings.DEFAULT_PIPELINE, + IndexSettings.REQUIRED_PIPELINE, MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING, // validate that built-in similarities don't get redefined diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 495bb71ab2104..8ee2223162c02 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -35,8 +35,10 @@ import org.elasticsearch.node.Node; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -299,12 +301,67 @@ public final class IndexSettings { 1000, 1, Property.Dynamic, Property.IndexScope); public static final Setting DEFAULT_PIPELINE = - new Setting<>("index.default_pipeline", IngestService.NOOP_PIPELINE_NAME, s -> { - if (s == null || s.isEmpty()) { - throw new IllegalArgumentException("Value for [index.default_pipeline] must be a non-empty string."); - } - return s; - }, Property.Dynamic, Property.IndexScope); + new Setting<>("index.default_pipeline", + IngestService.NOOP_PIPELINE_NAME, + Function.identity(), + new DefaultPipelineValidator(), + Property.Dynamic, + Property.IndexScope); + + public static final Setting REQUIRED_PIPELINE = + new Setting<>("index.required_pipeline", + IngestService.NOOP_PIPELINE_NAME, + Function.identity(), + new RequiredPipelineValidator(), + Property.Dynamic, + Property.IndexScope); + + static class DefaultPipelineValidator implements Setting.Validator { + + @Override + public void validate(final String value) { + + } + + @Override + public void validate(final String value, final Map, String> settings) { + final String requiredPipeline = settings.get(IndexSettings.REQUIRED_PIPELINE); + if (value.equals(IngestService.NOOP_PIPELINE_NAME) == false + && requiredPipeline.equals(IngestService.NOOP_PIPELINE_NAME) == false) { + throw new IllegalArgumentException( + "index has a default pipeline [" + value + "] and a required pipeline [" + requiredPipeline + "]"); + } + } + + @Override + public Iterator> settings() { + return List.of(REQUIRED_PIPELINE).iterator(); + } + + } + + static class RequiredPipelineValidator implements Setting.Validator { + + @Override + public void validate(final String value) { + + } + + @Override + public void validate(final String value, final Map, String> settings) { + final String defaultPipeline = settings.get(IndexSettings.DEFAULT_PIPELINE); + if (value.equals(IngestService.NOOP_PIPELINE_NAME) && defaultPipeline.equals(IngestService.NOOP_PIPELINE_NAME) == false) { + throw new IllegalArgumentException( + "index has a required pipeline [" + value + "] and a default pipeline [" + defaultPipeline + "]"); + } + } + + @Override + public Iterator> settings() { + return List.of(DEFAULT_PIPELINE).iterator(); + } + + } /** * Marks an index to be searched throttled. This means that never more than one shard of such an index will be searched concurrently @@ -384,6 +441,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { private volatile int maxAnalyzedOffset; private volatile int maxTermsCount; private volatile String defaultPipeline; + private volatile String requiredPipeline; private volatile boolean searchThrottled; /** @@ -555,6 +613,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter); scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength); scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline); + scopedSettings.addSettingsUpdateConsumer(REQUIRED_PIPELINE, this::setRequiredPipeline); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations); scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, this::setRetentionLeaseMillis); @@ -746,7 +805,7 @@ public TimeValue getTranslogSyncInterval() { public void setTranslogSyncInterval(TimeValue translogSyncInterval) { this.syncInterval = translogSyncInterval; } - + /** * Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled. */ @@ -985,6 +1044,14 @@ public void setDefaultPipeline(String defaultPipeline) { this.defaultPipeline = defaultPipeline; } + public String getRequiredPipeline() { + return requiredPipeline; + } + + public void setRequiredPipeline(final String requiredPipeline) { + this.requiredPipeline = requiredPipeline; + } + /** * Returns true if soft-delete is enabled. */ diff --git a/server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java b/server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java new file mode 100644 index 0000000000000..98cf54f35b6a2 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java @@ -0,0 +1,133 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasToString; + +public class RequiredPipelineIT extends ESIntegTestCase { + + public void testRequiredPipeline() { + final Settings settings = Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build(); + createIndex("index", settings); + + // this asserts that the required_pipeline was used, without us having to actually create the pipeline etc. + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> client().prepareIndex("index", "_doc", "1").setSource(Map.of("field", "value")).get()); + assertThat(e, hasToString(containsString("pipeline with id [required_pipeline] does not exist"))); + } + + public void testDefaultAndRequiredPipeline() { + final Settings settings = Settings.builder() + .put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline") + .put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline") + .build(); + final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> createIndex("index", settings)); + assertThat( + e, + hasToString(containsString("index has a default pipeline [default_pipeline] and a required pipeline [required_pipeline]"))); + } + + public void testDefaultAndRequiredPipelineFromTemplates() { + final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1); + final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE); + final int requiredPipelineOrder; + final int defaultPipelineOrder; + if (randomBoolean()) { + defaultPipelineOrder = lowOrder; + requiredPipelineOrder = highOrder; + } else { + defaultPipelineOrder = highOrder; + requiredPipelineOrder = lowOrder; + } + final Settings defaultPipelineSettings = + Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); + admin().indices() + .preparePutTemplate("default") + .setPatterns(List.of("index*")) + .setOrder(defaultPipelineOrder) + .setSettings(defaultPipelineSettings) + .get(); + final Settings requiredPipelineSettings = + Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build(); + admin().indices() + .preparePutTemplate("required") + .setPatterns(List.of("index*")) + .setOrder(requiredPipelineOrder) + .setSettings(requiredPipelineSettings) + .get(); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> client().prepareIndex("index", "_doc", "1").setSource(Map.of("field", "value")).get()); + assertThat( + e, + hasToString(containsString( + "required pipeline [required_pipeline] and default pipeline [default_pipeline] can not both be set"))); + } + + public void testHighOrderRequiredPipelinePreferred() throws IOException { + final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1); + final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE); + final Settings defaultPipelineSettings = + Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "low_order_required_pipeline").build(); + admin().indices() + .preparePutTemplate("default") + .setPatterns(List.of("index*")) + .setOrder(lowOrder) + .setSettings(defaultPipelineSettings) + .get(); + final Settings requiredPipelineSettings = + Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "high_order_required_pipeline").build(); + admin().indices() + .preparePutTemplate("required") + .setPatterns(List.of("index*")) + .setOrder(highOrder) + .setSettings(requiredPipelineSettings) + .get(); + + // this asserts that the high_order_required_pipeline was selected, without us having to actually create the pipeline etc. + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> client().prepareIndex("index", "_doc", "1").setSource(Map.of("field", "value")).get()); + assertThat(e, hasToString(containsString("pipeline with id [high_order_required_pipeline] does not exist"))); + } + + public void testRequiredPipelineAndRequestPipeline() { + final Settings settings = Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build(); + createIndex("index", settings); + final IndexRequestBuilder builder = client().prepareIndex("index", "_doc", "1"); + builder.setSource(Map.of("field", "value")); + builder.setPipeline("request_pipeline"); + final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::get); + assertThat( + e, + hasToString(containsString("request pipeline [request_pipeline] can not override required pipeline [required_pipeline]"))); + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index c6101c0879d7f..f48db20758073 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -391,6 +391,7 @@ static String[] extractLeaderShardHistoryUUIDs(Map ccrIndexMetaD IndexSettings.MAX_SLICES_PER_SCROLL, IndexSettings.MAX_ADJACENCY_MATRIX_FILTERS_SETTING, IndexSettings.DEFAULT_PIPELINE, + IndexSettings.REQUIRED_PIPELINE, IndexSettings.INDEX_SEARCH_THROTTLED, IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, From c636e938cba069cd5f83fd93afc05c49d0908597 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 18 Sep 2019 16:57:55 -0700 Subject: [PATCH 2/6] Add some REST tests --- .../test/ingest/240_required_pipeline.yml | 189 ++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml new file mode 100644 index 0000000000000..d464657ad2598 --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml @@ -0,0 +1,189 @@ +--- +teardown: + - do: + ingest.delete_pipeline: + id: "my_pipeline" + ignore: 404 + +--- +"Test index with default pipeline": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "bytes" : { + "field" : "bytes_source_field", + "target_field" : "bytes_target_field" + } + } + ] + } + - match: { acknowledged: true } + # required pipeline via index + - do: + indices.create: + index: test + body: + settings: + index: + required_pipeline: "my_pipeline" + aliases: + test_alias: {} + + - do: + index: + index: test + id: 1 + body: {bytes_source_field: "1kb"} + + - do: + get: + index: test + id: 1 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } + # required pipeline via alias + - do: + index: + index: test_alias + id: 2 + body: {bytes_source_field: "1kb"} + + - do: + get: + index: test + id: 2 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } + # required pipeline via upsert + - do: + update: + index: test + id: 3 + body: + script: + source: "ctx._source.ran_script = true" + lang: "painless" + upsert: { "bytes_source_field":"1kb" } + - do: + get: + index: test + id: 3 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } + # required pipeline via scripted upsert + - do: + update: + index: test + id: 4 + body: + script: + source: "ctx._source.bytes_source_field = '1kb'" + lang: "painless" + upsert : {} + scripted_upsert: true + - do: + get: + index: test + id: 4 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } + # required pipeline via doc_as_upsert + - do: + update: + index: test + id: 5 + body: + doc: { "bytes_source_field":"1kb" } + doc_as_upsert: true + - do: + get: + index: test + id: 5 + - match: { _source.bytes_source_field: "1kb" } + - match: { _source.bytes_target_field: 1024 } + # required pipeline via bulk upsert + # note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline + # needs to be in the upsert, not the script + - do: + bulk: + refresh: true + body: | + {"update":{"_id":"6","_index":"test"}} + {"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}} + {"update":{"_id":"7","_index":"test"}} + {"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true} + {"update":{"_id":"8","_index":"test"}} + {"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true} + {"update":{"_id":"6_alias","_index":"test_alias"}} + {"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}} + {"update":{"_id":"7_alias","_index":"test_alias"}} + {"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true} + {"update":{"_id":"8_alias","_index":"test_alias"}} + {"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true} + + - do: + mget: + body: + docs: + - { _index: "test", _id: "6" } + - { _index: "test", _id: "7" } + - { _index: "test", _id: "8" } + - { _index: "test", _id: "6_alias" } + - { _index: "test", _id: "7_alias" } + - { _index: "test", _id: "8_alias" } + - match: { docs.0._index: "test" } + - match: { docs.0._id: "6" } + - match: { docs.0._source.bytes_source_field: "1kb" } + - match: { docs.0._source.bytes_target_field: 1024 } + - is_false: docs.0._source.ran_script + - match: { docs.1._index: "test" } + - match: { docs.1._id: "7" } + - match: { docs.1._source.bytes_source_field: "2kb" } + - match: { docs.1._source.bytes_target_field: 2048 } + - match: { docs.2._index: "test" } + - match: { docs.2._id: "8" } + - match: { docs.2._source.bytes_source_field: "3kb" } + - match: { docs.2._source.bytes_target_field: 3072 } + - match: { docs.2._source.ran_script: true } + - match: { docs.3._index: "test" } + - match: { docs.3._id: "6_alias" } + - match: { docs.3._source.bytes_source_field: "1kb" } + - match: { docs.3._source.bytes_target_field: 1024 } + - is_false: docs.3._source.ran_script + - match: { docs.4._index: "test" } + - match: { docs.4._id: "7_alias" } + - match: { docs.4._source.bytes_source_field: "2kb" } + - match: { docs.4._source.bytes_target_field: 2048 } + - match: { docs.5._index: "test" } + - match: { docs.5._id: "8_alias" } + - match: { docs.5._source.bytes_source_field: "3kb" } + - match: { docs.5._source.bytes_target_field: 3072 } + - match: { docs.5._source.ran_script: true } + + # explicit no required pipeline + - do: + index: + index: test + id: 9 + pipeline: "_none" + body: {bytes_source_field: "1kb"} + + - do: + get: + index: test + id: 9 + - match: { _source.bytes_source_field: "1kb" } + - is_false: _source.bytes_target_field + # bad request + - do: + catch: bad_request + index: + index: test + id: 10 + pipeline: "" + body: {bytes_source_field: "1kb"} From 5ddab06e0ad25ef5c72fd2a73fb154975a3ec0d7 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 18 Sep 2019 22:28:25 -0400 Subject: [PATCH 3/6] Fix double processing --- .../test/ingest/240_required_pipeline.yml | 22 +++--------- .../action/bulk/TransportBulkAction.java | 20 ++++++++--- .../action/index/IndexRequest.java | 36 ++++++++++++++++--- 3 files changed, 52 insertions(+), 26 deletions(-) diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml index d464657ad2598..01553bcf40acc 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml @@ -6,7 +6,7 @@ teardown: ignore: 404 --- -"Test index with default pipeline": +"Test index with required pipeline": - do: ingest.put_pipeline: id: "my_pipeline" @@ -165,25 +165,11 @@ teardown: - match: { docs.5._source.bytes_target_field: 3072 } - match: { docs.5._source.ran_script: true } - # explicit no required pipeline + # bad request, request pipeline can not be specified - do: + catch: /illegal_argument_exception.*request pipeline \[pipeline\] can not override required pipeline \[my_pipeline\]/ index: index: test id: 9 - pipeline: "_none" - body: {bytes_source_field: "1kb"} - - - do: - get: - index: test - id: 9 - - match: { _source.bytes_source_field: "1kb" } - - is_false: _source.bytes_target_field - # bad request - - do: - catch: bad_request - index: - index: test - id: 10 - pipeline: "" + pipeline: "pipeline" body: {bytes_source_field: "1kb"} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 1ebe60d559524..6c592314e6df8 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -158,8 +158,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener implement private String pipeline; + private boolean isForwardedRequest; + /** * Value for {@link #getAutoGeneratedTimestamp()} if the document has an external * provided ID. @@ -124,6 +126,9 @@ public IndexRequest(StreamInput in) throws IOException { version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); pipeline = in.readOptionalString(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + isForwardedRequest = in.readBoolean(); + } isRetry = in.readBoolean(); autoGeneratedTimestamp = in.readLong(); if (in.readBoolean()) { @@ -249,7 +254,7 @@ public XContentType getContentType() { @Override public String type() { if (type == null) { - return MapperService.SINGLE_MAPPING_NAME; + return MapperService.SINGLE_MAPPING_NAME; } return type; } @@ -278,7 +283,7 @@ public IndexRequest defaultTypeIfNull(String defaultType) { type = defaultType; } return this; - } + } /** * The id of the indexed document. If not set, will be automatically generated. */ @@ -333,6 +338,26 @@ public String getPipeline() { return this.pipeline; } + /** + * Sets if this request has been forwarded by the coordinating node. + * + * @param isForwardedRequest true if the request has been forwarded to an ingest node by the coordinating node + * @return the request + */ + public IndexRequest isForwardedRequest(final boolean isForwardedRequest) { + this.isForwardedRequest = isForwardedRequest; + return this; + } + + /** + * Returns whether or not this request has been forwarded by the coordinating node. + * + * @return true if the request has been forwarded to an ingest node by the coordinating node + */ + public boolean isForwardedRequest() { + return this.isForwardedRequest; + } + /** * The source of the document to index, recopied to a new array if it is unsafe. */ @@ -616,8 +641,8 @@ public void resolveRouting(MetaData metaData) { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - // A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions. - // So we use the type accessor method here to make the type non-null (will default it to "_doc"). + // A 7.x request allows null types but if deserialized in a 6.x node will cause nullpointer exceptions. + // So we use the type accessor method here to make the type non-null (will default it to "_doc"). out.writeOptionalString(type()); out.writeOptionalString(id); out.writeOptionalString(routing); @@ -626,6 +651,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(version); out.writeByte(versionType.getValue()); out.writeOptionalString(pipeline); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(isForwardedRequest); + } out.writeBoolean(isRetry); out.writeLong(autoGeneratedTimestamp); if (contentType != null) { From 3d618a7d54884f1067aefacb00c48c6409fbce60 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 18 Sep 2019 22:34:18 -0400 Subject: [PATCH 4/6] Add assertion --- .../action/bulk/TransportBulkAction.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 6c592314e6df8..3032483b755d5 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SparseFixedBitSet; +import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceAlreadyExistsException; @@ -246,7 +247,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener Date: Thu, 19 Sep 2019 06:41:04 -0400 Subject: [PATCH 5/6] Fix NPE in tests --- .../TransportBulkActionIndicesThatCannotBeCreatedTests.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index ee721377afe67..fb71d33aa564f 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -106,6 +107,9 @@ private void indicesThatCannotBeCreatedTestCase(Set expected, ClusterState state = mock(ClusterState.class); when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA); when(clusterService.state()).thenReturn(state); + DiscoveryNode localNode = mock(DiscoveryNode.class); + when(clusterService.localNode()).thenReturn(localNode); + when(localNode.isIngestNode()).thenReturn(randomBoolean()); final ThreadPool threadPool = mock(ThreadPool.class); final ExecutorService direct = EsExecutors.newDirectExecutorService(); when(threadPool.executor(anyString())).thenReturn(direct); From 01416c17f73ddb970412a274399362fff5122b21 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 19 Sep 2019 11:05:37 -0400 Subject: [PATCH 6/6] Always mark as resolved --- .../action/bulk/TransportBulkAction.java | 28 +++++++++---------- .../action/index/IndexRequest.java | 22 +++++++-------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 3032483b755d5..7f22964cca6de 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -159,7 +159,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener implement private String pipeline; - private boolean isForwardedRequest; + private boolean isPipelineResolved; /** * Value for {@link #getAutoGeneratedTimestamp()} if the document has an external @@ -127,7 +127,7 @@ public IndexRequest(StreamInput in) throws IOException { versionType = VersionType.fromValue(in.readByte()); pipeline = in.readOptionalString(); if (in.getVersion().onOrAfter(Version.V_8_0_0)) { - isForwardedRequest = in.readBoolean(); + isPipelineResolved = in.readBoolean(); } isRetry = in.readBoolean(); autoGeneratedTimestamp = in.readLong(); @@ -339,23 +339,23 @@ public String getPipeline() { } /** - * Sets if this request has been forwarded by the coordinating node. + * Sets if the pipeline for this request has been resolved by the coordinating node. * - * @param isForwardedRequest true if the request has been forwarded to an ingest node by the coordinating node + * @param isPipelineResolved true if the pipeline has been resolved * @return the request */ - public IndexRequest isForwardedRequest(final boolean isForwardedRequest) { - this.isForwardedRequest = isForwardedRequest; + public IndexRequest isPipelineResolved(final boolean isPipelineResolved) { + this.isPipelineResolved = isPipelineResolved; return this; } /** - * Returns whether or not this request has been forwarded by the coordinating node. + * Returns whether or not the pipeline for this request has been resolved by the coordinating node. * - * @return true if the request has been forwarded to an ingest node by the coordinating node + * @return true if the pipeline has been resolved */ - public boolean isForwardedRequest() { - return this.isForwardedRequest; + public boolean isPipelineResolved() { + return this.isPipelineResolved; } /** @@ -652,7 +652,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeByte(versionType.getValue()); out.writeOptionalString(pipeline); if (out.getVersion().onOrAfter(Version.V_8_0_0)) { - out.writeBoolean(isForwardedRequest); + out.writeBoolean(isPipelineResolved); } out.writeBoolean(isRetry); out.writeLong(autoGeneratedTimestamp);