From 561132656ae17e5a54f01f9e0f12d6fe3479fcb4 Mon Sep 17 00:00:00 2001 From: Parker Timmins Date: Tue, 4 Feb 2025 17:59:21 -0600 Subject: [PATCH] Add pipeline to clean docs during data stream reindex (#121617) Add the pipeline "reindex-data-stream-pipeline" to the reindex request within ReindexDataStreamIndexAction. This cleans up documents as needed before inserting into the destination index. Currently, the pipeline only sets a timestamp field with a value of 0, if the document is missing a timestamp field. This is needed because existing indices which are added to a data stream may not contain a timestamp, but reindex validates that a timestamp field exists when creating data stream destination indices. This pipeline is managed by ES, but can be overriden by users if necessary. To do this, the version field of the pipeline should be set to a value higher than the MigrateRegistry version. --- .../core/security/user/InternalUsers.java | 1 + .../reindex-data-stream-pipeline.json | 16 ++ x-pack/plugin/migrate/build.gradle | 1 + ...indexDatastreamIndexTransportActionIT.java | 146 ++++++++++++++++-- .../xpack/migrate/MigratePlugin.java | 13 ++ .../migrate/MigrateTemplateRegistry.java | 55 +++++++ ...ReindexDataStreamIndexTransportAction.java | 2 + .../upgrades/DataStreamsUpgradeIT.java | 2 +- 8 files changed, 223 insertions(+), 13 deletions(-) create mode 100644 x-pack/plugin/core/template-resources/src/main/resources/reindex-data-stream-pipeline.json create mode 100644 x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigrateTemplateRegistry.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java index 1229d62dce047..a704b350dba4b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/user/InternalUsers.java @@ -225,6 +225,7 @@ public class InternalUsers { .build() }, null, null, + new String[] {}, MetadataUtils.DEFAULT_RESERVED_METADATA, Map.of() diff --git a/x-pack/plugin/core/template-resources/src/main/resources/reindex-data-stream-pipeline.json b/x-pack/plugin/core/template-resources/src/main/resources/reindex-data-stream-pipeline.json new file mode 100644 index 0000000000000..e8c3352131700 --- /dev/null +++ b/x-pack/plugin/core/template-resources/src/main/resources/reindex-data-stream-pipeline.json @@ -0,0 +1,16 @@ +{ + "description": "This pipeline sanitizes documents that are being reindexed into a data stream using the reindex data stream API. It is an internal pipeline and should not be modified.", + "processors": [ + { + "set": { + "field": "@timestamp", + "value": 0, + "override": false + } + } + ], + "_meta": { + "managed": true + }, + "version": ${xpack.migrate.reindex.pipeline.version} +} diff --git a/x-pack/plugin/migrate/build.gradle b/x-pack/plugin/migrate/build.gradle index 283362a637e78..f179a311e0fea 100644 --- a/x-pack/plugin/migrate/build.gradle +++ b/x-pack/plugin/migrate/build.gradle @@ -19,6 +19,7 @@ dependencies { testImplementation project(xpackModule('ccr')) testImplementation project(':modules:data-streams') testImplementation project(path: ':modules:reindex') + testImplementation project(path: ':modules:ingest-common') } addQaCheckDependencies(project) diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java index e22c26bd6b8bb..1ffeaed0bd1d3 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java @@ -24,12 +24,17 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.DeletePipelineTransportAction; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineTransportAction; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.MetadataIndexStateService; import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; @@ -38,12 +43,15 @@ import org.elasticsearch.datastreams.DataStreamsPlugin; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.migrate.MigratePlugin; +import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry; +import org.junit.After; import java.io.IOException; import java.time.Instant; @@ -56,19 +64,27 @@ import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase { + @After + private void cleanupCluster() throws Exception { + clusterAdmin().execute( + DeletePipelineTransportAction.TYPE, + new DeletePipelineRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME) + ); + super.cleanUpCluster(); + } private static final String MAPPING = """ { "_doc":{ "dynamic":"strict", "properties":{ - "foo1":{ - "type":"text" - } + "foo1": {"type":"text"}, + "@timestamp": {"type":"date"} } } } @@ -76,7 +92,116 @@ public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { - return List.of(MigratePlugin.class, ReindexPlugin.class, MockTransportService.TestPlugin.class, DataStreamsPlugin.class); + return List.of( + MigratePlugin.class, + ReindexPlugin.class, + MockTransportService.TestPlugin.class, + DataStreamsPlugin.class, + IngestCommonPlugin.class + ); + } + + private static String DATA_STREAM_MAPPING = """ + { + "dynamic": true, + "_data_stream_timestamp": { + "enabled": true + }, + "properties": { + "@timestamp": {"type":"date"} + } + } + """; + + public void testTimestamp0AddedIfMissing() { + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet(); + + // add doc without timestamp + addDoc(sourceIndex, "{\"foo\":\"baz\"}"); + + // add timestamp to source mapping + indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get(); + + // call reindex + var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)) + .actionGet() + .getDestIndex(); + + assertResponse(prepareSearch(destIndex), response -> { + Map sourceAsMap = response.getHits().getAt(0).getSourceAsMap(); + assertEquals(Integer.valueOf(0), sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD)); + }); + } + + public void testTimestampNotAddedIfExists() { + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet(); + + // add doc with timestamp + String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); + var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time); + addDoc(sourceIndex, doc); + + // add timestamp to source mapping + indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get(); + + // call reindex + var destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)) + .actionGet() + .getDestIndex(); + + assertResponse(prepareSearch(destIndex), response -> { + Map sourceAsMap = response.getHits().getAt(0).getSourceAsMap(); + assertEquals(time, sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD)); + }); + } + + public void testCustomReindexPipeline() { + String customPipeline = """ + { + "processors": [ + { + "set": { + "field": "cheese", + "value": "gorgonzola" + } + } + ], + "version": 1000 + } + """; + + PutPipelineRequest putRequest = new PutPipelineRequest( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME, + new BytesArray(customPipeline), + XContentType.JSON + ); + + clusterAdmin().execute(PutPipelineTransportAction.TYPE, putRequest).actionGet(); + + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); + indicesAdmin().create(new CreateIndexRequest(sourceIndex)).actionGet(); + + // add doc with timestamp + String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); + var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time); + addDoc(sourceIndex, doc); + + // add timestamp to source mapping + indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get(); + + String destIndex = client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)) + .actionGet() + .getDestIndex(); + + assertResponse(prepareSearch(destIndex), response -> { + Map sourceAsMap = response.getHits().getAt(0).getSourceAsMap(); + assertEquals("gorgonzola", sourceAsMap.get("cheese")); + assertEquals(time, sourceAsMap.get(DEFAULT_TIMESTAMP_FIELD)); + }); } public void testDestIndexDeletedIfExists() throws Exception { @@ -200,7 +325,7 @@ public void testSettingsAddedBeforeReindex() throws Exception { assertEquals(refreshInterval, settingsResponse.getSetting(destIndex, IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey())); } - public void testMappingsAddedToDestIndex() throws Exception { + public void testMappingsAddedToDestIndex() { var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(MAPPING)).actionGet(); @@ -479,12 +604,9 @@ private static String formatInstant(Instant instant) { return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant); } - private static String getIndexUUID(String index) { - return indicesAdmin().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(index)) - .actionGet() - .getSettings() - .get(index) - .get(IndexMetadata.SETTING_INDEX_UUID); + void addDoc(String index, String doc) { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE).source(doc, XContentType.JSON)); + client().bulk(bulkRequest).actionGet(); } - } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java index f5f8beba26d8f..7811e84ac9f53 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java @@ -55,6 +55,7 @@ import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.function.Predicate; import java.util.function.Supplier; @@ -64,6 +65,18 @@ import static org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor.MAX_CONCURRENT_INDICES_REINDEXED_PER_DATA_STREAM_SETTING; public class MigratePlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin { + @Override + public Collection createComponents(PluginServices services) { + var registry = new MigrateTemplateRegistry( + services.environment().settings(), + services.clusterService(), + services.threadPool(), + services.client(), + services.xContentRegistry() + ); + registry.initialize(); + return List.of(registry); + } @Override public List getRestHandlers( diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigrateTemplateRegistry.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigrateTemplateRegistry.java new file mode 100644 index 0000000000000..2a9dc97e16352 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigrateTemplateRegistry.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.migrate; + +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.template.IndexTemplateRegistry; +import org.elasticsearch.xpack.core.template.IngestPipelineConfig; +import org.elasticsearch.xpack.core.template.JsonIngestPipelineConfig; + +import java.util.List; + +public class MigrateTemplateRegistry extends IndexTemplateRegistry { + + // This number must be incremented when we make changes to built-in pipeline. + // If a specific user pipeline is needed instead, its version should be set to a value higher than the REGISTRY_VERSION. + static final int REGISTRY_VERSION = 1; + public static final String REINDEX_DATA_STREAM_PIPELINE_NAME = "reindex-data-stream-pipeline"; + private static final String TEMPLATE_VERSION_VARIABLE = "xpack.migrate.reindex.pipeline.version"; + + public MigrateTemplateRegistry( + Settings nodeSettings, + ClusterService clusterService, + ThreadPool threadPool, + Client client, + NamedXContentRegistry xContentRegistry + ) { + super(nodeSettings, clusterService, threadPool, client, xContentRegistry); + } + + @Override + protected List getIngestPipelines() { + return List.of( + new JsonIngestPipelineConfig( + REINDEX_DATA_STREAM_PIPELINE_NAME, + "/" + REINDEX_DATA_STREAM_PIPELINE_NAME + ".json", + REGISTRY_VERSION, + TEMPLATE_VERSION_VARIABLE + ) + ); + } + + @Override + protected String getOrigin() { + return ClientHelper.STACK_ORIGIN; + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 456c61451f7f2..792ec4ec2b6f9 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -53,6 +53,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate; +import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry; import java.util.Locale; import java.util.Map; @@ -271,6 +272,7 @@ void reindex(String sourceIndexName, String destIndexName, ActionListener