diff --git a/muted-tests.yml b/muted-tests.yml index 7f5f71881557d..84c564acc464c 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -513,8 +513,6 @@ tests: - class: org.elasticsearch.xpack.transform.checkpoint.TransformCCSCanMatchIT method: testTransformLifecycle_RangeQueryThatMatchesNoShards issue: https://github.com/elastic/elasticsearch/issues/121480 -- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT - issue: https://github.com/elastic/elasticsearch/issues/121737 - class: org.elasticsearch.xpack.security.authc.service.ServiceAccountSingleNodeTests method: testAuthenticateWithServiceFileToken issue: https://github.com/elastic/elasticsearch/issues/120988 diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java index af9269072ef85..06edc94849ab2 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java @@ -26,6 +26,8 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.DeletePipelineTransportAction; +import org.elasticsearch.action.ingest.GetPipelineAction; +import org.elasticsearch.action.ingest.GetPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.action.ingest.PutPipelineTransportAction; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -51,7 +53,7 @@ import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.migrate.MigratePlugin; import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry; -import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.time.Instant; @@ -69,14 +71,26 @@ import static org.hamcrest.Matchers.equalTo; public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase { - @After - private void cleanup() { + + @Before + private void setup() throws Exception { safeGet( clusterAdmin().execute( DeletePipelineTransportAction.TYPE, new DeletePipelineRequest(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME) ) ); + + assertBusy(() -> { + assertTrue( + safeGet( + clusterAdmin().execute( + GetPipelineAction.INSTANCE, + new GetPipelineRequest(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME) + ) + ).isFound() + ); + }); } private static final String MAPPING = """ @@ -121,6 +135,9 @@ public void testTimestamp0AddedIfMissing() { // add doc without timestamp addDoc(sourceIndex, "{\"foo\":\"baz\"}"); + // wait until doc is written to all shards before adding mapping + ensureHealth(sourceIndex); + // add timestamp to source mapping indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get(); @@ -136,6 +153,7 @@ public void testTimestamp0AddedIfMissing() { } public void testTimestampNotAddedIfExists() { + var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT); safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex))); @@ -144,6 +162,9 @@ public void testTimestampNotAddedIfExists() { var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time); addDoc(sourceIndex, doc); + // wait until doc is written to all shards before adding mapping + ensureHealth(sourceIndex); + // add timestamp to source mapping indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get(); @@ -189,6 +210,9 @@ public void testCustomReindexPipeline() { var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time); addDoc(sourceIndex, doc); + // wait until doc is written to all shards before adding mapping + ensureHealth(sourceIndex); + // add timestamp to source mapping indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get(); @@ -298,7 +322,7 @@ public void testMissingSourceIndex() { ); } - public void testSettingsAddedBeforeReindex() throws Exception { + public void testSettingsAddedBeforeReindex() { // start with a static setting var numShards = randomIntBetween(1, 10); var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build(); @@ -603,4 +627,12 @@ void addDoc(String index, String doc) { bulkRequest.add(new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE).source(doc, XContentType.JSON)); safeGet(client().bulk(bulkRequest)); } + + private void ensureHealth(String index) { + if (cluster().numDataNodes() > 1) { + ensureGreen(index); + } else { + ensureYellow(index); + } + } }