Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,6 @@ tests:
- class: org.elasticsearch.xpack.transform.checkpoint.TransformCCSCanMatchIT
method: testTransformLifecycle_RangeQueryThatMatchesNoShards
issue: https://github.com/elastic/elasticsearch/issues/121480
- class: org.elasticsearch.xpack.migrate.action.ReindexDatastreamIndexTransportActionIT
issue: https://github.com/elastic/elasticsearch/issues/121737
- class: org.elasticsearch.xpack.security.authc.service.ServiceAccountSingleNodeTests
method: testAuthenticateWithServiceFileToken
issue: https://github.com/elastic/elasticsearch/issues/120988
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
import org.elasticsearch.action.ingest.DeletePipelineTransportAction;
import org.elasticsearch.action.ingest.GetPipelineAction;
import org.elasticsearch.action.ingest.GetPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -51,7 +53,7 @@
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.migrate.MigratePlugin;
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -69,14 +71,26 @@
import static org.hamcrest.Matchers.equalTo;

public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {
@After
private void cleanup() {

@Before
private void setup() throws Exception {
safeGet(
clusterAdmin().execute(
DeletePipelineTransportAction.TYPE,
new DeletePipelineRequest(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME)
)
);

assertBusy(() -> {
assertTrue(
safeGet(
clusterAdmin().execute(
GetPipelineAction.INSTANCE,
new GetPipelineRequest(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME)
)
).isFound()
);
});
}

private static final String MAPPING = """
Expand Down Expand Up @@ -121,6 +135,9 @@ public void testTimestamp0AddedIfMissing() {
// add doc without timestamp
addDoc(sourceIndex, "{\"foo\":\"baz\"}");

// wait until doc is written to all shards before adding mapping
ensureHealth(sourceIndex);

// add timestamp to source mapping
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();

Expand All @@ -136,6 +153,7 @@ public void testTimestamp0AddedIfMissing() {
}

public void testTimestampNotAddedIfExists() {

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));

Expand All @@ -144,6 +162,9 @@ public void testTimestampNotAddedIfExists() {
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
addDoc(sourceIndex, doc);

// wait until doc is written to all shards before adding mapping
ensureHealth(sourceIndex);

// add timestamp to source mapping
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();

Expand Down Expand Up @@ -189,6 +210,9 @@ public void testCustomReindexPipeline() {
var doc = String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, time);
addDoc(sourceIndex, doc);

// wait until doc is written to all shards before adding mapping
ensureHealth(sourceIndex);

// add timestamp to source mapping
indicesAdmin().preparePutMapping(sourceIndex).setSource(DATA_STREAM_MAPPING, XContentType.JSON).get();

Expand Down Expand Up @@ -298,7 +322,7 @@ public void testMissingSourceIndex() {
);
}

public void testSettingsAddedBeforeReindex() throws Exception {
public void testSettingsAddedBeforeReindex() {
// start with a static setting
var numShards = randomIntBetween(1, 10);
var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build();
Expand Down Expand Up @@ -603,4 +627,12 @@ void addDoc(String index, String doc) {
bulkRequest.add(new IndexRequest(index).opType(DocWriteRequest.OpType.CREATE).source(doc, XContentType.JSON));
safeGet(client().bulk(bulkRequest));
}

private void ensureHealth(String index) {
if (cluster().numDataNodes() > 1) {
ensureGreen(index);
} else {
ensureYellow(index);
}
}
}