diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java index 75b268f6ffd2f..a92d5ac98f148 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/ExplainDataStreamLifecycleIT.java @@ -11,28 +11,32 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.rollover.Condition; +import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.RolloverConditions; import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.datastreams.CreateDataStreamAction; -import org.elasticsearch.action.datastreams.GetDataStreamAction; import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.ExplainIndexDataStreamLifecycle; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamFailureStore; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; +import org.elasticsearch.cluster.metadata.DataStreamOptions; +import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.DataStreamsPlugin; -import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; @@ -47,6 +51,7 @@ import java.util.Map; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo; +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.dataStreamIndexEqualTo; import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD; import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT.TestSystemDataStreamPlugin.SYSTEM_DATA_STREAM_NAME; import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleServiceIT.TestSystemDataStreamPlugin.SYSTEM_DATA_STREAM_RETENTION_DAYS; @@ -66,7 +71,8 @@ protected Collection> nodePlugins() { return List.of( DataStreamsPlugin.class, MockTransportService.TestPlugin.class, - DataStreamLifecycleServiceIT.TestSystemDataStreamPlugin.class + DataStreamLifecycleServiceIT.TestSystemDataStreamPlugin.class, + MapperExtrasPlugin.class ); } @@ -99,29 +105,16 @@ public void testExplainLifecycle() throws Exception { indexDocs(dataStreamName, 1); - assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); - assertThat(backingIndices.size(), equalTo(2)); - String backingIndex = backingIndices.get(0).getName(); - assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1)); - String writeIndex = backingIndices.get(1).getName(); - assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); - }); + List backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2); + String firstGenerationIndex = backingIndices.get(0); + assertThat(firstGenerationIndex, backingIndexEqualTo(dataStreamName, 1)); + String secondGenerationIndex = backingIndices.get(1); + assertThat(secondGenerationIndex, backingIndexEqualTo(dataStreamName, 2)); { ExplainDataStreamLifecycleAction.Request explainIndicesRequest = new ExplainDataStreamLifecycleAction.Request( TEST_REQUEST_TIMEOUT, - new String[] { - DataStream.getDefaultBackingIndexName(dataStreamName, 1), - DataStream.getDefaultBackingIndexName(dataStreamName, 2) } + new String[] { firstGenerationIndex, secondGenerationIndex } ); ExplainDataStreamLifecycleAction.Response response = client().execute( ExplainDataStreamLifecycleAction.INSTANCE, @@ -140,18 +133,16 @@ public void testExplainLifecycle() throws Exception { assertThat(explainIndex.getError(), nullValue()); } - if (explainIndex.getIndex().equals(DataStream.getDefaultBackingIndexName(dataStreamName, 1))) { + if (explainIndex.getIndex().equals(firstGenerationIndex)) { // first generation index was rolled over - assertThat(explainIndex.getIndex(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); assertThat(explainIndex.getRolloverDate(), notNullValue()); assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), notNullValue()); assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), notNullValue()); } else { // the write index has not been rolled over yet - assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), nullValue()); - assertThat(explainIndex.getIndex(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); assertThat(explainIndex.getRolloverDate(), nullValue()); assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), nullValue()); + assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), nullValue()); } } } @@ -160,9 +151,7 @@ public void testExplainLifecycle() throws Exception { // let's also explain with include_defaults=true ExplainDataStreamLifecycleAction.Request explainIndicesRequest = new ExplainDataStreamLifecycleAction.Request( TEST_REQUEST_TIMEOUT, - new String[] { - DataStream.getDefaultBackingIndexName(dataStreamName, 1), - DataStream.getDefaultBackingIndexName(dataStreamName, 2) }, + new String[] { firstGenerationIndex, secondGenerationIndex }, true ); ExplainDataStreamLifecycleAction.Response response = client().execute( @@ -195,18 +184,16 @@ public void testExplainLifecycle() throws Exception { assertThat(explainIndex.getLifecycle(), notNullValue()); assertThat(explainIndex.getLifecycle().dataRetention(), nullValue()); - if (explainIndex.getIndex().equals(DataStream.getDefaultBackingIndexName(dataStreamName, 1))) { + if (explainIndex.getIndex().equals(firstGenerationIndex)) { // first generation index was rolled over - assertThat(explainIndex.getIndex(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); assertThat(explainIndex.getRolloverDate(), notNullValue()); assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), notNullValue()); assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), notNullValue()); } else { // the write index has not been rolled over yet - assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), nullValue()); - assertThat(explainIndex.getIndex(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); assertThat(explainIndex.getRolloverDate(), nullValue()); assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), nullValue()); + assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), nullValue()); } } } @@ -228,28 +215,15 @@ public void testSystemExplainLifecycle() throws Exception { indexDocs(dataStreamName, 1); - assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); - assertThat(backingIndices.size(), equalTo(2)); - String backingIndex = backingIndices.get(0).getName(); - assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1)); - String writeIndex = backingIndices.get(1).getName(); - assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); - }); + List backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2); + String firstGenerationIndex = backingIndices.get(0); + assertThat(firstGenerationIndex, backingIndexEqualTo(dataStreamName, 1)); + String secondGenerationIndex = backingIndices.get(1); + assertThat(secondGenerationIndex, backingIndexEqualTo(dataStreamName, 2)); ExplainDataStreamLifecycleAction.Request explainIndicesRequest = new ExplainDataStreamLifecycleAction.Request( TEST_REQUEST_TIMEOUT, - new String[] { - DataStream.getDefaultBackingIndexName(dataStreamName, 1), - DataStream.getDefaultBackingIndexName(dataStreamName, 2) } + new String[] { firstGenerationIndex, secondGenerationIndex } ); ExplainDataStreamLifecycleAction.Response response = client().execute( ExplainDataStreamLifecycleAction.INSTANCE, @@ -272,72 +246,221 @@ public void testSystemExplainLifecycle() throws Exception { } } + public void testExplainFailuresLifecycle() throws Exception { + // Failure indices are always managed unless explicitly disabled. + putComposableIndexTemplate( + "id1", + """ + { + "properties": { + "@timestamp" : { + "type": "date" + }, + "count": { + "type": "long" + } + } + }""", + List.of("metrics-foo*"), + null, + null, + DataStreamLifecycle.Template.DEFAULT, + new DataStreamOptions.Template(new DataStreamFailureStore.Template(true)) + ); + String dataStreamName = "metrics-foo"; + CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request( + TEST_REQUEST_TIMEOUT, + TEST_REQUEST_TIMEOUT, + dataStreamName + ); + client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); + + indexFailedDocs(dataStreamName, 1); + + List failureIndices = waitForDataStreamIndices(dataStreamName, 1, true); + String firstGenerationIndex = failureIndices.get(0); + assertThat(firstGenerationIndex, DataStreamTestHelper.dataStreamIndexEqualTo(dataStreamName, 2, true)); + + indexFailedDocs(dataStreamName, 1); + failureIndices = waitForDataStreamIndices(dataStreamName, 2, true); + String secondGenerationIndex = failureIndices.get(1); + assertThat(secondGenerationIndex, DataStreamTestHelper.dataStreamIndexEqualTo(dataStreamName, 3, true)); + + { + ExplainDataStreamLifecycleAction.Request explainIndicesRequest = new ExplainDataStreamLifecycleAction.Request( + TEST_REQUEST_TIMEOUT, + new String[] { firstGenerationIndex, secondGenerationIndex } + ); + ExplainDataStreamLifecycleAction.Response response = client().execute( + ExplainDataStreamLifecycleAction.INSTANCE, + explainIndicesRequest + ).actionGet(); + assertThat(response.getIndices().size(), is(2)); + // we requested the explain for indices with the default include_details=false + assertThat(response.getRolloverConfiguration(), nullValue()); + for (ExplainIndexDataStreamLifecycle explainIndex : response.getIndices()) { + assertThat(explainIndex.isManagedByLifecycle(), is(true)); + assertThat(explainIndex.getIndexCreationDate(), notNullValue()); + assertThat(explainIndex.getLifecycle(), notNullValue()); + assertThat(explainIndex.getLifecycle().dataRetention(), nullValue()); + if (internalCluster().numDataNodes() > 1) { + // If the number of nodes is 1 then the cluster will be yellow so forcemerge will report an error if it has run + assertThat(explainIndex.getError(), nullValue()); + } + + if (explainIndex.getIndex().equals(firstGenerationIndex)) { + // first generation index was rolled over + assertThat(explainIndex.getRolloverDate(), notNullValue()); + assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), notNullValue()); + assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), notNullValue()); + } else { + // the write index has not been rolled over yet + assertThat(explainIndex.getRolloverDate(), nullValue()); + assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), nullValue()); + assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), nullValue()); + } + } + } + + { + // let's also explain with include_defaults=true + ExplainDataStreamLifecycleAction.Request explainIndicesRequest = new ExplainDataStreamLifecycleAction.Request( + TEST_REQUEST_TIMEOUT, + new String[] { firstGenerationIndex }, + true + ); + ExplainDataStreamLifecycleAction.Response response = client().execute( + ExplainDataStreamLifecycleAction.INSTANCE, + explainIndicesRequest + ).actionGet(); + assertThat(response.getIndices().size(), is(1)); + RolloverConfiguration rolloverConfiguration = response.getRolloverConfiguration(); + assertThat(rolloverConfiguration, notNullValue()); + Map> conditions = rolloverConfiguration.resolveRolloverConditions(null).getConditions(); + assertThat(conditions.size(), is(2)); + assertThat(conditions.get(RolloverConditions.MAX_DOCS_FIELD.getPreferredName()).value(), is(1L)); + assertThat(conditions.get(RolloverConditions.MIN_DOCS_FIELD.getPreferredName()).value(), is(1L)); + } + + { + // Let's also explain using the data stream name + ExplainDataStreamLifecycleAction.Request explainIndicesRequest = new ExplainDataStreamLifecycleAction.Request( + TEST_REQUEST_TIMEOUT, + new String[] { dataStreamName } + ); + ExplainDataStreamLifecycleAction.Response response = client().execute( + ExplainDataStreamLifecycleAction.INSTANCE, + explainIndicesRequest + ).actionGet(); + assertThat(response.getIndices().size(), is(1)); + for (ExplainIndexDataStreamLifecycle explainIndex : response.getIndices()) { + assertThat(explainIndex.isManagedByLifecycle(), is(true)); + assertThat(explainIndex.getIndexCreationDate(), notNullValue()); + assertThat(explainIndex.getLifecycle(), notNullValue()); + assertThat(explainIndex.getLifecycle().dataRetention(), nullValue()); + + if (internalCluster().numDataNodes() > 1) { + // If the number of nodes is 1 then the cluster will be yellow so forcemerge will report an error if it has run + assertThat(explainIndex.getError(), nullValue()); + } + + if (explainIndex.getIndex().equals(firstGenerationIndex)) { + // first generation index was rolled over + assertThat(explainIndex.getRolloverDate(), notNullValue()); + assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), notNullValue()); + assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), notNullValue()); + } else { + // the write index has not been rolled over yet + assertThat(explainIndex.getRolloverDate(), nullValue()); + assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), nullValue()); + assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), nullValue()); + } + } + } + } + public void testExplainLifecycleForIndicesWithErrors() throws Exception { // empty lifecycle contains the default rollover DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.DEFAULT; - putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle); - + putComposableIndexTemplate( + "id1", + """ + { + "properties": { + "@timestamp" : { + "type": "date" + }, + "count": { + "type": "long" + } + } + }""", + List.of("metrics-foo*"), + null, + null, + lifecycle, + new DataStreamOptions.Template(new DataStreamFailureStore.Template(true)) + ); String dataStreamName = "metrics-foo"; CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request( TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, dataStreamName ); - client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get(); + safeGet(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest)); + safeGet(client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName + "::failures", null))); indexDocs(dataStreamName, 1); // let's allow one rollover to go through - assertBusy(() -> { - GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( - TEST_REQUEST_TIMEOUT, - new String[] { dataStreamName } - ); - GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) - .actionGet(); - assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); - assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName)); - List backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices(); - assertThat(backingIndices.size(), equalTo(2)); - String backingIndex = backingIndices.get(0).getName(); - assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1)); - String writeIndex = backingIndices.get(1).getName(); - assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2)); - }); + List backingIndices = waitForDataStreamBackingIndices(dataStreamName, 2); + String firstGenerationIndex = backingIndices.get(0); + assertThat(firstGenerationIndex, backingIndexEqualTo(dataStreamName, 1)); + String secondGenerationIndex = backingIndices.get(1); + assertThat(secondGenerationIndex, backingIndexEqualTo(dataStreamName, 3)); + // let's ensure that the failure store is initialised + List failureIndices = waitForDataStreamIndices(dataStreamName, 1, true); + String firstGenerationFailureIndex = failureIndices.get(0); + assertThat(firstGenerationFailureIndex, dataStreamIndexEqualTo(dataStreamName, 2, true)); // prevent new indices from being created (ie. future rollovers) updateClusterSettings(Settings.builder().put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), 1)); indexDocs(dataStreamName, 1); + indexFailedDocs(dataStreamName, 1); - String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 2); assertBusy(() -> { ExplainDataStreamLifecycleAction.Request explainIndicesRequest = new ExplainDataStreamLifecycleAction.Request( TEST_REQUEST_TIMEOUT, - new String[] { writeIndexName } + new String[] { secondGenerationIndex, firstGenerationFailureIndex } ); ExplainDataStreamLifecycleAction.Response response = client().execute( ExplainDataStreamLifecycleAction.INSTANCE, explainIndicesRequest ).actionGet(); - assertThat(response.getIndices().size(), is(1)); + assertThat(response.getIndices().size(), is(2)); // we requested the explain for indices with the default include_details=false assertThat(response.getRolloverConfiguration(), nullValue()); - for (ExplainIndexDataStreamLifecycle explainIndex : response.getIndices()) { - assertThat(explainIndex.getIndex(), is(writeIndexName)); - assertThat(explainIndex.isManagedByLifecycle(), is(true)); - assertThat(explainIndex.getIndexCreationDate(), notNullValue()); - assertThat(explainIndex.getLifecycle(), notNullValue()); - assertThat(explainIndex.getLifecycle().dataRetention(), nullValue()); - assertThat(explainIndex.getRolloverDate(), nullValue()); - assertThat(explainIndex.getTimeSinceRollover(System::currentTimeMillis), nullValue()); + for (int i = 0; i < 2; i++) { + ExplainIndexDataStreamLifecycle explainIndex = response.getIndices().get(i); + if (i == 0) { + assertThat(explainIndex.getIndex(), is(secondGenerationIndex)); + } else { + assertThat(explainIndex.getIndex(), is(firstGenerationFailureIndex)); + } + assertThat(explainIndex.getIndex(), explainIndex.isManagedByLifecycle(), is(true)); + assertThat(explainIndex.getIndex(), explainIndex.getIndexCreationDate(), notNullValue()); + assertThat(explainIndex.getIndex(), explainIndex.getLifecycle(), notNullValue()); + assertThat(explainIndex.getIndex(), explainIndex.getLifecycle().dataRetention(), nullValue()); + assertThat(explainIndex.getIndex(), explainIndex.getRolloverDate(), nullValue()); + assertThat(explainIndex.getIndex(), explainIndex.getTimeSinceRollover(System::currentTimeMillis), nullValue()); // index has not been rolled over yet - assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), nullValue()); + assertThat(explainIndex.getIndex(), explainIndex.getGenerationTime(System::currentTimeMillis), nullValue()); - assertThat(explainIndex.getError(), notNullValue()); - assertThat(explainIndex.getError().error(), containsString("maximum normal shards open")); - assertThat(explainIndex.getError().retryCount(), greaterThanOrEqualTo(1)); + assertThat(explainIndex.getIndex(), explainIndex.getError(), notNullValue()); + assertThat(explainIndex.getIndex(), explainIndex.getError().error(), containsString("maximum normal shards open")); + assertThat(explainIndex.getIndex(), explainIndex.getError().retryCount(), greaterThanOrEqualTo(1)); } }); @@ -347,21 +470,25 @@ public void testExplainLifecycleForIndicesWithErrors() throws Exception { assertBusy(() -> { ExplainDataStreamLifecycleAction.Request explainIndicesRequest = new ExplainDataStreamLifecycleAction.Request( TEST_REQUEST_TIMEOUT, - new String[] { writeIndexName } + new String[] { secondGenerationIndex, firstGenerationFailureIndex } ); ExplainDataStreamLifecycleAction.Response response = client().execute( ExplainDataStreamLifecycleAction.INSTANCE, explainIndicesRequest ).actionGet(); - assertThat(response.getIndices().size(), is(1)); + assertThat(response.getIndices().size(), is(2)); if (internalCluster().numDataNodes() > 1) { assertThat(response.getIndices().get(0).getError(), is(nullValue())); + assertThat(response.getIndices().get(1).getError(), is(nullValue())); } else { /* * If there is only one node in the cluster then the replica shard will never be allocated. So forcemerge will never * succeed, and there will always be an error in the error store. This behavior is subject to change in the future. */ assertThat(response.getIndices().get(0).getError(), is(notNullValue())); + assertThat(response.getIndices().get(0).getError().error(), containsString("Force merge request ")); + assertThat(response.getIndices().get(1).getError(), is(notNullValue())); + assertThat(response.getIndices().get(1).getError().error(), containsString("Force merge request ")); } }); } @@ -385,11 +512,14 @@ public void testExplainDataStreamLifecycleForUnmanagedIndices() throws Exception indexDocs(dataStreamName, 4); - String writeIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + List backingIndices = waitForDataStreamBackingIndices(dataStreamName, 1); + String firstGenerationIndex = backingIndices.get(0); + assertThat(firstGenerationIndex, backingIndexEqualTo(dataStreamName, 1)); + assertBusy(() -> { ExplainDataStreamLifecycleAction.Request explainIndicesRequest = new ExplainDataStreamLifecycleAction.Request( TEST_REQUEST_TIMEOUT, - new String[] { writeIndexName } + new String[] { firstGenerationIndex } ); ExplainDataStreamLifecycleAction.Response response = client().execute( ExplainDataStreamLifecycleAction.INSTANCE, @@ -399,7 +529,7 @@ public void testExplainDataStreamLifecycleForUnmanagedIndices() throws Exception assertThat(response.getRolloverConfiguration(), nullValue()); for (ExplainIndexDataStreamLifecycle explainIndex : response.getIndices()) { assertThat(explainIndex.isManagedByLifecycle(), is(false)); - assertThat(explainIndex.getIndex(), is(writeIndexName)); + assertThat(explainIndex.getIndex(), is(firstGenerationIndex)); assertThat(explainIndex.getIndexCreationDate(), nullValue()); assertThat(explainIndex.getLifecycle(), nullValue()); assertThat(explainIndex.getGenerationTime(System::currentTimeMillis), nullValue()); @@ -433,6 +563,29 @@ static void indexDocs(String dataStream, int numDocs) { indicesAdmin().refresh(new RefreshRequest(dataStream)).actionGet(); } + private void indexFailedDocs(String dataStream, int numDocs) { + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numDocs; i++) { + String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); + bulkRequest.add( + new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE) + .source( + String.format(Locale.ROOT, "{\"%s\":\"%s\", \"count\":\"not-a-number\"}", DEFAULT_TIMESTAMP_FIELD, value), + XContentType.JSON + ) + ); + } + BulkResponse bulkResponse = safeGet(client().bulk(bulkRequest)); + assertThat(bulkResponse.getItems().length, equalTo(numDocs)); + String failureIndexPrefix = DataStream.FAILURE_STORE_PREFIX + dataStream; + for (BulkItemResponse itemResponse : bulkResponse) { + assertThat(itemResponse.getFailureMessage(), nullValue()); + assertThat(itemResponse.status(), equalTo(RestStatus.CREATED)); + assertThat(itemResponse.getIndex(), startsWith(failureIndexPrefix)); + } + safeGet(indicesAdmin().refresh(new RefreshRequest(dataStream))); + } + static void putComposableIndexTemplate( String id, @Nullable String mappings, @@ -440,6 +593,18 @@ static void putComposableIndexTemplate( @Nullable Settings settings, @Nullable Map metadata, @Nullable DataStreamLifecycle.Template lifecycle + ) throws IOException { + putComposableIndexTemplate(id, mappings, patterns, settings, metadata, lifecycle, null); + } + + static void putComposableIndexTemplate( + String id, + @Nullable String mappings, + List patterns, + @Nullable Settings settings, + @Nullable Map metadata, + @Nullable DataStreamLifecycle.Template lifecycle, + @Nullable DataStreamOptions.Template options ) throws IOException { TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id); request.indexTemplate( @@ -450,6 +615,7 @@ static void putComposableIndexTemplate( .settings(settings) .mappings(mappings == null ? null : CompressedXContent.fromJSON(mappings)) .lifecycle(lifecycle) + .dataStreamOptions(options) ) .metadata(metadata) .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index ee83cb13af294..9164de5b27fc6 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -1039,7 +1039,8 @@ public void onResponse(RolloverResponse rolloverResponse) { @Override public void onFailure(Exception e) { DataStream dataStream = clusterService.state().metadata().dataStreams().get(resolvedRolloverTarget.resource()); - if (dataStream == null || dataStream.getWriteIndex().getName().equals(writeIndexName) == false) { + boolean targetsFailureStore = IndexComponentSelector.FAILURES == resolvedRolloverTarget.selector(); + if (dataStream == null || Objects.equals(getWriteIndexName(dataStream, targetsFailureStore), writeIndexName) == false) { // the data stream has another write index so no point in recording an error for the previous write index we were // attempting to roll over // if there are persistent issues with rolling over this data stream, the next data stream lifecycle run will attempt to @@ -1054,6 +1055,17 @@ public void onFailure(Exception e) { }); } + @Nullable + private String getWriteIndexName(DataStream dataStream, boolean failureStore) { + if (dataStream == null) { + return null; + } + if (failureStore) { + return dataStream.getWriteFailureIndex() == null ? null : dataStream.getWriteFailureIndex().getName(); + } + return dataStream.getWriteIndex().getName(); + } + private void updateIndexSetting(UpdateSettingsRequest updateSettingsRequest, ActionListener listener) { assert updateSettingsRequest.indices() != null && updateSettingsRequest.indices().length == 1 : "Data stream lifecycle service updates the settings for one index at a time"; diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 6675dd48b1820..79293eaf6b4be 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -70,6 +70,7 @@ import static org.elasticsearch.cluster.metadata.DataStream.BACKING_INDEX_PREFIX; import static org.elasticsearch.cluster.metadata.DataStream.DATE_FORMATTER; +import static org.elasticsearch.cluster.metadata.DataStream.FAILURE_STORE_PREFIX; import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName; import static org.elasticsearch.cluster.metadata.DataStream.getDefaultFailureStoreName; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID; @@ -611,27 +612,50 @@ public static String backingIndexPattern(String dataStreamName, long generation) return String.format(Locale.ROOT, "\\.ds-%s-(\\d{4}\\.\\d{2}\\.\\d{2}-)?%06d", dataStreamName, generation); } + /** + * Checks if the index name provided starts with the prefix ".ds-", continues with the data stream name + * till the next `-`, and after the last `-` it ends with a number that matches the generation. + * @param dataStreamName + * @param generation + * @return the matcher + */ public static Matcher backingIndexEqualTo(String dataStreamName, int generation) { + return dataStreamIndexEqualTo(dataStreamName, generation, false); + } + + /** + * Checks if the index name provided starts with the prefix ".ds-" when failure store is false and ".fs-" when true, continues with + * the data stream name till the next `-`, and after the last `-` it ends with a number that matches the generation. + * @param dataStreamName + * @param generation + * @param failureStore, determines the prefix, ".ds-" when failure store is false and ".fs-" when true + * @return the matcher + */ + public static Matcher dataStreamIndexEqualTo(String dataStreamName, int generation, boolean failureStore) { return new TypeSafeMatcher<>() { + private final String prefix = failureStore ? FAILURE_STORE_PREFIX : BACKING_INDEX_PREFIX; @Override protected boolean matchesSafely(String backingIndexName) { if (backingIndexName == null) { return false; } - + String actualPrefix = backingIndexName.substring(0, prefix.length()); int indexOfLastDash = backingIndexName.lastIndexOf('-'); - String actualDataStreamName = parseDataStreamName(backingIndexName, indexOfLastDash); + String actualDataStreamName = parseDataStreamName(backingIndexName, prefix, indexOfLastDash); int actualGeneration = parseGeneration(backingIndexName, indexOfLastDash); - return actualDataStreamName.equals(dataStreamName) && actualGeneration == generation; + return actualPrefix.equals(prefix) && actualDataStreamName.equals(dataStreamName) && actualGeneration == generation; } @Override protected void describeMismatchSafely(String backingIndexName, Description mismatchDescription) { + String actualPrefix = backingIndexName.substring(0, prefix.length()); int indexOfLastDash = backingIndexName.lastIndexOf('-'); - String dataStreamName = parseDataStreamName(backingIndexName, indexOfLastDash); + String dataStreamName = parseDataStreamName(backingIndexName, prefix, indexOfLastDash); int generation = parseGeneration(backingIndexName, indexOfLastDash); - mismatchDescription.appendText(" was data stream name ") + mismatchDescription.appendText(" was prefix ") + .appendValue(actualPrefix) + .appendText(", data stream name ") .appendValue(dataStreamName) .appendText(" and generation ") .appendValue(generation); @@ -639,14 +663,16 @@ protected void describeMismatchSafely(String backingIndexName, Description misma @Override public void describeTo(Description description) { - description.appendText("expected data stream name ") + description.appendText("expected prefix ") + .appendValue(prefix) + .appendText(", expected data stream name ") .appendValue(dataStreamName) .appendText(" and expected generation ") .appendValue(generation); } - private static String parseDataStreamName(String backingIndexName, int indexOfLastDash) { - return backingIndexName.substring(4, backingIndexName.lastIndexOf('-', indexOfLastDash - 1)); + private static String parseDataStreamName(String backingIndexName, String prefix, int indexOfLastDash) { + return backingIndexName.substring(prefix.length(), backingIndexName.lastIndexOf('-', indexOfLastDash - 1)); } private static int parseGeneration(String backingIndexName, int indexOfLastDash) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 58045daaf83c7..9a185116fc1fd 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -840,10 +840,52 @@ private static Settings.Builder getExcludeSettings(int num, Settings.Builder bui return builder; } + /** + * Waits for the specified data stream to have the expected number of backing indices. + */ + public static List waitForDataStreamBackingIndices(String dataStreamName, int expectedSize) { + return waitForDataStreamIndices(dataStreamName, expectedSize, false); + } + + /** + * Waits for the specified data stream to have the expected number of backing or failure indices. + */ + public static List waitForDataStreamIndices(String dataStreamName, int expectedSize, boolean failureStore) { + // We listen to the cluster state on the master node to ensure all other nodes have already acked the new cluster state. + // This avoids inconsistencies in subsequent API calls which might hit a non-master node. + final var listener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { + final var dataStream = clusterState.metadata().dataStreams().get(dataStreamName); + if (dataStream == null) { + return false; + } + return dataStream.getDataStreamIndices(failureStore).getIndices().size() == expectedSize; + }); + safeAwait(listener); + final var backingIndexNames = getDataStreamBackingIndexNames(dataStreamName, failureStore); + assertEquals( + Strings.format( + "Retrieved number of data stream indices doesn't match expectation for data stream [%s]. Expected %d but got %s", + dataStreamName, + expectedSize, + backingIndexNames + ), + expectedSize, + backingIndexNames.size() + ); + return backingIndexNames; + } + /** * Returns a list of the data stream's backing index names. */ - public List getDataStreamBackingIndexNames(String dataStreamName) { + public static List getDataStreamBackingIndexNames(String dataStreamName) { + return getDataStreamBackingIndexNames(dataStreamName, false); + } + + /** + * Returns a list of the data stream's backing or failure index names. + */ + public static List getDataStreamBackingIndexNames(String dataStreamName, boolean failureStore) { GetDataStreamAction.Response response = safeGet( client().execute( GetDataStreamAction.INSTANCE, @@ -853,7 +895,7 @@ public List getDataStreamBackingIndexNames(String dataStreamName) { assertThat(response.getDataStreams().size(), equalTo(1)); DataStream dataStream = response.getDataStreams().get(0).getDataStream(); assertThat(dataStream.getName(), equalTo(dataStreamName)); - return dataStream.getIndices().stream().map(Index::getName).toList(); + return dataStream.getDataStreamIndices(failureStore).getIndices().stream().map(Index::getName).toList(); } /**