From ffd089ee72054efebfa614c658c2524cd6818ed0 Mon Sep 17 00:00:00 2001 From: Parker Timmins Date: Wed, 12 Feb 2025 08:11:36 -0600 Subject: [PATCH] Fix ReindexDataStreamIndexAction timestamp validation bug in tests (#122274) Fix race condition test bugs related to the reindex-data-stream-pipeline. For tests that add doc without timestamp, then add mapping with timestamp, ensure green between adding doc and adding mapping. This makes sure that doc has been written to all shards and thus that timestamp validation does not occur while doc is being written to a shard. Delete pipeline in Before method, then wait for it to be re-created by the MigrateTemplateRegistry. --- muted-tests.yml | 24 ---------------- ...indexDatastreamIndexTransportActionIT.java | 28 ++++++++++++++++--- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 91a3d8a25a839..980eba3030616 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -329,32 +329,8 @@ tests: - class: org.elasticsearch.search.CrossClusterSearchUnavailableClusterIT method: testSearchSkipUnavailable issue: https://github.com/elastic/elasticsearch/issues/121497 -- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT - method: testDestIndexContainsDocs - issue: https://github.com/elastic/elasticsearch/issues/121915 -- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT - method: testTsdbStartEndSet - issue: https://github.com/elastic/elasticsearch/issues/121916 -- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT - method: testDestIndexNameSet_noDotPrefix - issue: https://github.com/elastic/elasticsearch/issues/121772 - class: org.elasticsearch.ingest.geoip.FullClusterRestartIT issue: https://github.com/elastic/elasticsearch/issues/121935 -- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT - method: testTimestampNotAddedIfExists - issue: https://github.com/elastic/elasticsearch/issues/121842 -- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT - method: testDestIndexNameSet_withDotPrefix - issue: https://github.com/elastic/elasticsearch/issues/121977 -- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT - method: testFailIfMetadataBlockSet - issue: https://github.com/elastic/elasticsearch/issues/121978 -- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT - method: testFailIfReadBlockSet - issue: https://github.com/elastic/elasticsearch/issues/122123 -- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT - method: testTimestamp0AddedIfMissing - issue: https://github.com/elastic/elasticsearch/issues/121745 # Examples: # diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java index a3642ddb664d4..e3b73d0aaa5cb 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java @@ -49,7 +49,7 @@ import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.migrate.MigratePlugin; import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry; -import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.time.Instant; @@ -67,9 +67,11 @@ import static org.hamcrest.Matchers.equalTo; public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase { - @After - private void cleanup() { + + @Before + private void setup() throws Exception { deletePipeline(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME); + assertBusy(() -> { assertTrue(getPipelines(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME).isFound()); }); } private static final String MAPPING = """ @@ -114,6 +116,9 @@ public void testTimestamp0AddedIfMissing() { // add doc without timestamp addDoc(sourceIndex, "{\"foo\":\"baz\"}"); + // wait until doc is written to all shards before adding mapping + ensureHealth(sourceIndex); + // add timestamp to source mapping indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get(); @@ -129,6 +134,7 @@ public void testTimestamp0AddedIfMissing() { } public void testTimestampNotAddedIfExists() { + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex))); @@ -137,6 +143,9 @@ public void testTimestampNotAddedIfExists() { var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time); addDoc(sourceIndex, doc); + // wait until doc is written to all shards before adding mapping + ensureHealth(sourceIndex); + // add timestamp to source mapping indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get(); @@ -184,6 +193,9 @@ public void testCustomReindexPipeline() { var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time); addDoc(sourceIndex, doc); + // wait until doc is written to all shards before adding mapping + ensureHealth(sourceIndex); + // add timestamp to source mapping indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get(); @@ -293,7 +305,7 @@ public void testMissingSourceIndex() { ); } - public void testSettingsAddedBeforeReindex() throws Exception { + public void testSettingsAddedBeforeReindex() { // start with a static setting var numShards = randomIntBetween(1, 10); var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build(); @@ -604,4 +616,12 @@ void addDoc(String index, String doc) { bulkRequest.add(new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE).source(doc, XContentType.JSON)); safeGet(client().bulk(bulkRequest)); } + + private void ensureHealth(String index) { + if (cluster().numDataNodes() > 1) { + ensureGreen(index); + } else { + ensureYellow(index); + } + } }