From 16a9b5a32f0dce84855069c9a4b9c6c5ce32e81f Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 17 Sep 2024 13:19:12 -0500 Subject: [PATCH] Making changes to BulkRequest to enable component template substitutions in the simulate ingest API (#112957) --- .../org/elasticsearch/TransportVersions.java | 1 + .../action/bulk/BulkRequest.java | 25 ++++ .../action/bulk/BulkRequestModifier.java | 5 +- .../action/bulk/SimulateBulkRequest.java | 82 +++++++++++- .../ingest/RestSimulateIngestAction.java | 3 +- .../action/bulk/BulkRequestTests.java | 20 +++ .../action/bulk/SimulateBulkRequestTests.java | 122 +++++++++++++++++- .../TransportSimulateBulkActionTests.java | 4 +- .../ingest/SimulateIngestServiceTests.java | 6 +- 9 files changed, 250 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index ddb805f6c1130..65bd8bc647e72 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -214,6 +214,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_AGGREGATE_EXEC_TRACKS_INTERMEDIATE_ATTRS = def(8_738_00_0); public static final TransportVersion CCS_TELEMETRY_STATS = def(8_739_00_0); public static final TransportVersion GLOBAL_RETENTION_TELEMETRY = def(8_740_00_0); + public static final TransportVersion SIMULATE_COMPONENT_TEMPLATES_SUBSTITUTIONS = def(8_743_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 6e6af4c016cf6..1425dde28ea3b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.metadata.ComponentTemplate; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -38,6 +39,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -475,4 +477,27 @@ public Set getIndices() { public boolean isSimulated() { return false; // Always false, but may be overridden by a subclass } + + /* + * Returns any component template substitutions that are to be used as part of this bulk request. We would likely only have + * substitutions in the event of a simulated request. + */ + public Map getComponentTemplateSubstitutions() throws IOException { + return Map.of(); + } + + /* + * This copies this bulk request, but without all of its inner requests or the set of indices found in those requests + */ + public BulkRequest shallowClone() { + BulkRequest bulkRequest = new BulkRequest(globalIndex); + bulkRequest.setRefreshPolicy(getRefreshPolicy()); + bulkRequest.waitForActiveShards(waitForActiveShards()); + bulkRequest.timeout(timeout()); + bulkRequest.pipeline(pipeline()); + bulkRequest.routing(routing()); + bulkRequest.requireAlias(requireAlias()); + bulkRequest.requireDataStream(requireDataStream()); + return bulkRequest; + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java index b3048dc18008b..3e47c78a76354 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java @@ -87,10 +87,7 @@ BulkRequest getBulkRequest() { if (itemResponses.isEmpty()) { return bulkRequest; } else { - BulkRequest modifiedBulkRequest = new BulkRequest(); - modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy()); - modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); - modifiedBulkRequest.timeout(bulkRequest.timeout()); + BulkRequest modifiedBulkRequest = bulkRequest.shallowClone(); int slot = 0; List> requests = bulkRequest.requests(); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java index 86e5d3b0985d0..3cc7fa12733bf 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/SimulateBulkRequest.java @@ -9,16 +9,21 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.cluster.metadata.ComponentTemplate; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; +import org.elasticsearch.xcontent.XContentParserConfiguration; import java.io.IOException; +import java.util.HashMap; import java.util.Map; /** - * This extends BulkRequest with support for providing substitute pipeline definitions. In a user request, the pipeline substitutions - * will look something like this: + * This extends BulkRequest with support for providing substitute pipeline definitions and component template definitions. In a user + * request, the substitutions will look something like this: * * "pipeline_substitutions": { * "my-pipeline-1": { @@ -45,6 +50,29 @@ * } * ] * } + * }, + * "component_template_substitutions": { + * "my-template-1": { + * "template": { + * "settings": { + * "number_of_shards": 1 + * }, + * "mappings": { + * "_source": { + * "enabled": false + * }, + * "properties": { + * "host_name": { + * "type": "keyword" + * }, + * "created_at": { + * "type": "date", + * "format": "EEE MMM dd HH:mm:ss Z yyyy" + * } + * } + * } + * } + * } * } * * The pipelineSubstitutions Map held by this class is intended to be the result of XContentHelper.convertToMap(). The top-level keys @@ -53,27 +81,42 @@ */ public class SimulateBulkRequest extends BulkRequest { private final Map> pipelineSubstitutions; + private final Map> componentTemplateSubstitutions; /** * @param pipelineSubstitutions The pipeline definitions that are to be used in place of any pre-existing pipeline definitions with * the same pipelineId. The key of the map is the pipelineId, and the value the pipeline definition as * parsed by XContentHelper.convertToMap(). + * @param componentTemplateSubstitutions The component template definitions that are to be used in place of any pre-existing + * component template definitions with the same name. */ - public SimulateBulkRequest(@Nullable Map> pipelineSubstitutions) { + public SimulateBulkRequest( + @Nullable Map> pipelineSubstitutions, + @Nullable Map> componentTemplateSubstitutions + ) { super(); this.pipelineSubstitutions = pipelineSubstitutions; + this.componentTemplateSubstitutions = componentTemplateSubstitutions; } @SuppressWarnings("unchecked") public SimulateBulkRequest(StreamInput in) throws IOException { super(in); this.pipelineSubstitutions = (Map>) in.readGenericValue(); + if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_COMPONENT_TEMPLATES_SUBSTITUTIONS)) { + this.componentTemplateSubstitutions = (Map>) in.readGenericValue(); + } else { + componentTemplateSubstitutions = Map.of(); + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeGenericValue(pipelineSubstitutions); + if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_COMPONENT_TEMPLATES_SUBSTITUTIONS)) { + out.writeGenericValue(componentTemplateSubstitutions); + } } public Map> getPipelineSubstitutions() { @@ -84,4 +127,37 @@ public Map> getPipelineSubstitutions() { public boolean isSimulated() { return true; } + + @Override + public Map getComponentTemplateSubstitutions() throws IOException { + if (componentTemplateSubstitutions == null) { + return Map.of(); + } + Map result = new HashMap<>(componentTemplateSubstitutions.size()); + for (Map.Entry> rawEntry : componentTemplateSubstitutions.entrySet()) { + result.put(rawEntry.getKey(), convertRawTemplateToComponentTemplate(rawEntry.getValue())); + } + return result; + } + + private static ComponentTemplate convertRawTemplateToComponentTemplate(Map rawTemplate) throws IOException { + ComponentTemplate componentTemplate; + try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawTemplate)) { + componentTemplate = ComponentTemplate.parse(parser); + } + return componentTemplate; + } + + @Override + public BulkRequest shallowClone() { + BulkRequest bulkRequest = new SimulateBulkRequest(pipelineSubstitutions, componentTemplateSubstitutions); + bulkRequest.setRefreshPolicy(getRefreshPolicy()); + bulkRequest.waitForActiveShards(waitForActiveShards()); + bulkRequest.timeout(timeout()); + bulkRequest.pipeline(pipeline()); + bulkRequest.routing(routing()); + bulkRequest.requireAlias(requireAlias()); + bulkRequest.requireDataStream(requireDataStream()); + return bulkRequest; + } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java index ef9252072c526..6de15b0046f1b 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java @@ -75,7 +75,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC Tuple sourceTuple = request.contentOrSourceParam(); Map sourceMap = XContentHelper.convertToMap(sourceTuple.v2(), false, sourceTuple.v1()).v2(); SimulateBulkRequest bulkRequest = new SimulateBulkRequest( - (Map>) sourceMap.remove("pipeline_substitutions") + (Map>) sourceMap.remove("pipeline_substitutions"), + (Map>) sourceMap.remove("component_template_substitutions") ); BytesReference transformedData = convertToBulkRequestXContentBytes(sourceMap); bulkRequest.add( diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java index 643e2d90bf615..c601401a1c49d 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java @@ -475,4 +475,24 @@ public void testUnsupportedAction() { allOf(containsString("Malformed action/metadata line [1]"), containsString("found [get")) ); } + + public void testShallowClone() { + BulkRequest bulkRequest = new BulkRequest(randomBoolean() ? null : randomAlphaOfLength(10)); + bulkRequest.setRefreshPolicy(randomFrom(RefreshPolicy.values())); + bulkRequest.waitForActiveShards(randomIntBetween(1, 10)); + bulkRequest.timeout(randomTimeValue()); + bulkRequest.pipeline(randomBoolean() ? null : randomAlphaOfLength(10)); + bulkRequest.routing(randomBoolean() ? null : randomAlphaOfLength(10)); + bulkRequest.requireAlias(randomBoolean()); + bulkRequest.requireDataStream(randomBoolean()); + BulkRequest shallowCopy = bulkRequest.shallowClone(); + assertThat(shallowCopy.requests, equalTo(List.of())); + assertThat(shallowCopy.getRefreshPolicy(), equalTo(bulkRequest.getRefreshPolicy())); + assertThat(shallowCopy.waitForActiveShards(), equalTo(bulkRequest.waitForActiveShards())); + assertThat(shallowCopy.timeout(), equalTo(bulkRequest.timeout())); + assertThat(shallowCopy.pipeline(), equalTo(bulkRequest.pipeline())); + assertThat(shallowCopy.routing(), equalTo(bulkRequest.routing())); + assertThat(shallowCopy.requireAlias(), equalTo(bulkRequest.requireAlias())); + assertThat(shallowCopy.requireDataStream(), equalTo(bulkRequest.requireDataStream())); + } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java index 2e347e052125a..b6b1770e2ed5c 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/SimulateBulkRequestTests.java @@ -9,24 +9,36 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.metadata.ComponentTemplate; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentType; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class SimulateBulkRequestTests extends ESTestCase { public void testSerialization() throws Exception { - testSerialization(getTestPipelineSubstitutions()); - testSerialization(null); - testSerialization(Map.of()); + testSerialization(getTestPipelineSubstitutions(), getTestTemplateSubstitutions()); + testSerialization(getTestPipelineSubstitutions(), null); + testSerialization(null, getTestTemplateSubstitutions()); + testSerialization(null, null); + testSerialization(Map.of(), Map.of()); } - private void testSerialization(Map> pipelineSubstitutions) throws IOException { - SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions); + private void testSerialization( + Map> pipelineSubstitutions, + Map> templateSubstitutions + ) throws IOException { + SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions, templateSubstitutions); /* * Note: SimulateBulkRequest does not implement equals or hashCode, so we can't test serialization in the usual way for a * Writable @@ -35,6 +47,94 @@ private void testSerialization(Map> pipelineSubstitu assertThat(copy.getPipelineSubstitutions(), equalTo(simulateBulkRequest.getPipelineSubstitutions())); } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void testGetComponentTemplateSubstitutions() throws IOException { + SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(Map.of(), Map.of()); + assertThat(simulateBulkRequest.getComponentTemplateSubstitutions(), equalTo(Map.of())); + String substituteComponentTemplatesString = """ + { + "mappings_template": { + "template": { + "mappings": { + "dynamic": "true", + "properties": { + "foo": { + "type": "keyword" + } + } + } + } + }, + "settings_template": { + "template": { + "settings": { + "index": { + "default_pipeline": "bar-pipeline" + } + } + } + } + } + """; + + Map tempMap = XContentHelper.convertToMap( + new BytesArray(substituteComponentTemplatesString.getBytes(StandardCharsets.UTF_8)), + randomBoolean(), + XContentType.JSON + ).v2(); + Map> substituteComponentTemplates = (Map>) tempMap; + simulateBulkRequest = new SimulateBulkRequest(Map.of(), substituteComponentTemplates); + Map componentTemplateSubstitutions = simulateBulkRequest.getComponentTemplateSubstitutions(); + assertThat(componentTemplateSubstitutions.size(), equalTo(2)); + assertThat( + XContentHelper.convertToMap( + XContentHelper.toXContent( + componentTemplateSubstitutions.get("mappings_template").template(), + XContentType.JSON, + randomBoolean() + ), + randomBoolean(), + XContentType.JSON + ).v2(), + equalTo(substituteComponentTemplates.get("mappings_template").get("template")) + ); + assertNull(componentTemplateSubstitutions.get("mappings_template").template().settings()); + assertNull(componentTemplateSubstitutions.get("settings_template").template().mappings()); + assertThat(componentTemplateSubstitutions.get("settings_template").template().settings().size(), equalTo(1)); + assertThat( + componentTemplateSubstitutions.get("settings_template").template().settings().get("index.default_pipeline"), + equalTo("bar-pipeline") + ); + } + + public void testShallowClone() throws IOException { + SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(getTestPipelineSubstitutions(), getTestTemplateSubstitutions()); + simulateBulkRequest.setRefreshPolicy(randomFrom(WriteRequest.RefreshPolicy.values())); + simulateBulkRequest.waitForActiveShards(randomIntBetween(1, 10)); + simulateBulkRequest.timeout(randomTimeValue()); + simulateBulkRequest.pipeline(randomBoolean() ? null : randomAlphaOfLength(10)); + simulateBulkRequest.routing(randomBoolean() ? null : randomAlphaOfLength(10)); + simulateBulkRequest.requireAlias(randomBoolean()); + simulateBulkRequest.requireDataStream(randomBoolean()); + BulkRequest shallowCopy = simulateBulkRequest.shallowClone(); + assertThat(shallowCopy, instanceOf(SimulateBulkRequest.class)); + SimulateBulkRequest simulateBulkRequestCopy = (SimulateBulkRequest) shallowCopy; + assertThat(simulateBulkRequestCopy.requests, equalTo(List.of())); + assertThat( + simulateBulkRequestCopy.getComponentTemplateSubstitutions(), + equalTo(simulateBulkRequest.getComponentTemplateSubstitutions()) + ); + assertThat(simulateBulkRequestCopy.getPipelineSubstitutions(), equalTo(simulateBulkRequest.getPipelineSubstitutions())); + assertThat(simulateBulkRequestCopy.getRefreshPolicy(), equalTo(simulateBulkRequest.getRefreshPolicy())); + assertThat(simulateBulkRequestCopy.waitForActiveShards(), equalTo(simulateBulkRequest.waitForActiveShards())); + assertThat(simulateBulkRequestCopy.timeout(), equalTo(simulateBulkRequest.timeout())); + assertThat(shallowCopy.pipeline(), equalTo(simulateBulkRequest.pipeline())); + assertThat(shallowCopy.routing(), equalTo(simulateBulkRequest.routing())); + assertThat(shallowCopy.requireAlias(), equalTo(simulateBulkRequest.requireAlias())); + assertThat(shallowCopy.requireDataStream(), equalTo(simulateBulkRequest.requireDataStream())); + + } + private static Map> getTestPipelineSubstitutions() { return Map.of( "pipeline1", @@ -43,4 +143,16 @@ private static Map> getTestPipelineSubstitutions() { Map.of("processors", List.of(Map.of("processor3", Map.of()))) ); } + + private static Map> getTestTemplateSubstitutions() { + return Map.of( + "template1", + Map.of( + "template", + Map.of("mappings", Map.of("_source", Map.of("enabled", false), "properties", Map.of()), "settings", Map.of()) + ), + "template2", + Map.of("template", Map.of("mappings", Map.of(), "settings", Map.of())) + ); + } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java index 77a463e5ca3e5..e3c863ee69985 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java @@ -135,7 +135,7 @@ public void tearDown() throws Exception { public void testIndexData() { Task task = mock(Task.class); // unused - BulkRequest bulkRequest = new SimulateBulkRequest((Map>) null); + BulkRequest bulkRequest = new SimulateBulkRequest(null, null); int bulkItemCount = randomIntBetween(0, 200); for (int i = 0; i < bulkItemCount; i++) { Map source = Map.of(randomAlphaOfLength(10), randomAlphaOfLength(5)); @@ -218,7 +218,7 @@ public void testIndexDataWithValidation() throws IOException { * (7) An indexing request to a nonexistent index that matches no templates */ Task task = mock(Task.class); // unused - BulkRequest bulkRequest = new SimulateBulkRequest((Map>) null); + BulkRequest bulkRequest = new SimulateBulkRequest(null, null); int bulkItemCount = randomIntBetween(0, 200); Map indicesMap = new HashMap<>(); Map v1Templates = new HashMap<>(); diff --git a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java index 554bc34cce7cc..332a04e40e43d 100644 --- a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java @@ -65,7 +65,7 @@ public void testGetPipeline() { ingestService.innerUpdatePipelines(ingestMetadata); { // First we make sure that if there are no substitutions that we get our original pipeline back: - SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest((Map>) null); + SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(null, null); SimulateIngestService simulateIngestService = new SimulateIngestService(ingestService, simulateBulkRequest); Pipeline pipeline = simulateIngestService.getPipeline("pipeline1"); assertThat(pipeline.getProcessors(), contains(transformedMatch(Processor::getType, equalTo("processor1")))); @@ -83,7 +83,7 @@ public void testGetPipeline() { ); pipelineSubstitutions.put("pipeline2", newHashMap("processors", List.of(newHashMap("processor3", Collections.emptyMap())))); - SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions); + SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions, null); SimulateIngestService simulateIngestService = new SimulateIngestService(ingestService, simulateBulkRequest); Pipeline pipeline1 = simulateIngestService.getPipeline("pipeline1"); assertThat( @@ -103,7 +103,7 @@ public void testGetPipeline() { */ Map> pipelineSubstitutions = new HashMap<>(); pipelineSubstitutions.put("pipeline2", newHashMap("processors", List.of(newHashMap("processor3", Collections.emptyMap())))); - SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions); + SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions, null); SimulateIngestService simulateIngestService = new SimulateIngestService(ingestService, simulateBulkRequest); Pipeline pipeline1 = simulateIngestService.getPipeline("pipeline1"); assertThat(pipeline1.getProcessors(), contains(transformedMatch(Processor::getType, equalTo("processor1"))));