From 5a6e243fe92855247fc3920e2da8c5610de19d9b Mon Sep 17 00:00:00 2001 From: Mary Gouseti Date: Sat, 3 May 2025 15:50:22 +0300 Subject: [PATCH 1/2] [Failure store] Introduce default retention for failure indices (#127573) We introduce a new global retention setting `data_streams.lifecycle.retention.failures_default` which is used by the data stream lifecycle management as the default retention when the failure store lifecycle of the data stream does not specify one. Elasticsearch comes with the default value of 30 days. The value can be changed via the settings API to any time value higher than 10 seconds or -1 to indicate no default retention should apply. The failures default retention can be set to values higher than the max retention, but then the max retention will be effective. The reason for this choice it to ensure that no deployments will be broken, if the user has already set up max retention less than 30 days. (cherry picked from commit fe36c42eee7b83f56aa9da222fc1d318603659be) --- docs/changelog/127573.yaml | 5 + .../DataStreamLifecycleServiceIT.java | 3 +- .../DataStreamGlobalRetentionIT.java | 9 +- .../action/TransportGetDataStreamsAction.java | 5 +- .../lifecycle/DataStreamLifecycleService.java | 44 +++++--- ...sportExplainDataStreamLifecycleAction.java | 3 +- .../action/GetDataStreamsResponseTests.java | 2 +- .../TransportGetDataStreamsActionTests.java | 12 ++- .../DataStreamLifecycleServiceTests.java | 28 ++++- .../data_stream/240_failure_store_info.yml | 9 ++ .../org/elasticsearch/TransportVersions.java | 1 + .../datastreams/GetDataStreamAction.java | 73 +++++++++---- .../ExplainDataStreamLifecycleAction.java | 56 +++++++--- .../metadata/DataStreamGlobalRetention.java | 11 ++ .../DataStreamGlobalRetentionSettings.java | 102 ++++++++++++++++-- .../cluster/metadata/DataStreamLifecycle.java | 8 +- .../metadata/MetadataDataStreamsService.java | 4 +- .../MetadataIndexTemplateService.java | 33 ++++-- .../common/settings/ClusterSettings.java | 1 + .../datastreams/GetDataStreamActionTests.java | 2 +- ...plainDataStreamLifecycleResponseTests.java | 30 ++++-- ...ataStreamGlobalRetentionSettingsTests.java | 82 +++++++++++++- .../metadata/DataStreamLifecycleTests.java | 5 +- .../DataStreamUsageTransportAction.java | 2 +- 24 files changed, 430 insertions(+), 100 deletions(-) create mode 100644 docs/changelog/127573.yaml diff --git a/docs/changelog/127573.yaml b/docs/changelog/127573.yaml new file mode 100644 index 0000000000000..b0bc1548c09f7 --- /dev/null +++ b/docs/changelog/127573.yaml @@ -0,0 +1,5 @@ +pr: 127573 +summary: "[Failure store] Introduce default retention for failure indices" +area: Data streams +type: enhancement +issues: [] diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java index 91ed7a4c0c50f..a139c3191c6a1 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java @@ -279,7 +279,8 @@ public void testSystemDataStreamRetention() throws Exception { builder, withEffectiveRetention, getDataStreamResponse.getRolloverConfiguration(), - getDataStreamResponse.getGlobalRetention() + getDataStreamResponse.getDataGlobalRetention(), + getDataStreamResponse.getFailuresGlobalRetention() ); String serialized = Strings.toString(builder); Map resultMap = XContentHelper.convertToMap( diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java index c7e69a812f0a6..5f48e959ee4b9 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamGlobalRetentionIT.java @@ -147,7 +147,12 @@ public void testDataStreamRetention() throws Exception { @SuppressWarnings("unchecked") public void testDefaultRetention() throws Exception { // Set default global retention - updateClusterSettings(Settings.builder().put("data_streams.lifecycle.retention.default", "10s").build()); + updateClusterSettings( + Settings.builder() + .put("data_streams.lifecycle.retention.default", "10s") + .put("data_streams.lifecycle.retention.failures_default", "10s") + .build() + ); // Verify that the effective retention matches the default retention { @@ -163,7 +168,7 @@ public void testDefaultRetention() throws Exception { assertThat(lifecycle.get("data_retention"), nullValue()); Map failuresLifecycle = ((Map>) dataStream.get("failure_store")).get("lifecycle"); assertThat(failuresLifecycle.get("effective_retention"), is("10s")); - assertThat(failuresLifecycle.get("retention_determined_by"), is("default_global_retention")); + assertThat(failuresLifecycle.get("retention_determined_by"), is("default_failures_retention")); assertThat(failuresLifecycle.get("data_retention"), nullValue()); } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java index 2204c624d4fba..2681bf1615665 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsAction.java @@ -88,7 +88,7 @@ public TransportGetDataStreamsAction( threadPool, actionFilters, GetDataStreamAction.Request::new, - GetDataStreamAction.Response::new, + GetDataStreamAction.Response::read, transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT) ); this.indexNameExpressionResolver = indexNameExpressionResolver; @@ -287,7 +287,8 @@ public int compareTo(IndexInfo o) { return new GetDataStreamAction.Response( dataStreamInfos, request.includeDefaults() ? clusterSettings.get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING) : null, - globalRetentionSettings.get() + globalRetentionSettings.get(false), + globalRetentionSettings.get(true) ); } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index e8edad3556fc4..22d13bab57e8a 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -44,7 +44,6 @@ import org.elasticsearch.cluster.SimpleBatchedExecutor; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.DataStream; -import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.IndexAbstraction; @@ -354,13 +353,18 @@ void run(ClusterState state) { continue; } + // Retrieve the effective retention to ensure the same retention is used for this data stream + // through all operations. + var dataRetention = getEffectiveRetention(dataStream, globalRetentionSettings, false); + var failuresRetention = getEffectiveRetention(dataStream, globalRetentionSettings, true); + // the following indices should not be considered for the remainder of this service run, for various reasons. Set indicesToExcludeForRemainingRun = new HashSet<>(); // These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed, // depending on rollover criteria, for this reason we exclude them for the remaining run. - indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, false)); - Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true); + indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, dataRetention, false)); + Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, failuresRetention, true); if (failureStoreWriteIndex != null) { indicesToExcludeForRemainingRun.add(failureStoreWriteIndex); } @@ -376,7 +380,9 @@ void run(ClusterState state) { ); try { - indicesToExcludeForRemainingRun.addAll(maybeExecuteRetention(state, dataStream, indicesToExcludeForRemainingRun)); + indicesToExcludeForRemainingRun.addAll( + maybeExecuteRetention(state, dataStream, dataRetention, failuresRetention, indicesToExcludeForRemainingRun) + ); } catch (Exception e) { // individual index errors would be reported via the API action listener for every delete call // we could potentially record errors at a data stream level and expose it via the _data_stream API? @@ -807,7 +813,12 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) { } @Nullable - private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) { + private Index maybeExecuteRollover( + ClusterState state, + DataStream dataStream, + TimeValue effectiveRetention, + boolean rolloverFailureStore + ) { Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex(); if (currentRunWriteIndex == null) { return null; @@ -818,7 +829,7 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo RolloverRequest rolloverRequest = getDefaultRolloverRequest( rolloverConfiguration, dataStream.getName(), - lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()), + effectiveRetention, rolloverFailureStore ); transportActionsDeduplicator.executeOnce( @@ -868,14 +879,17 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo * @param indicesToExcludeForRemainingRun Indices to exclude from retention even if it would be time for them to be deleted * @return The set of indices that delete requests have been sent for */ - Set maybeExecuteRetention(ClusterState state, DataStream dataStream, Set indicesToExcludeForRemainingRun) { - Metadata metadata = state.metadata(); - DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get(); - var dataRetention = getRetention(dataStream, globalRetention, false); - var failureRetention = getRetention(dataStream, globalRetention, true); + Set maybeExecuteRetention( + ClusterState state, + DataStream dataStream, + TimeValue dataRetention, + TimeValue failureRetention, + Set indicesToExcludeForRemainingRun + ) { if (dataRetention == null && failureRetention == null) { return Set.of(); } + Metadata metadata = state.metadata(); List backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention( metadata::index, nowSupplier, @@ -1320,11 +1334,15 @@ private static boolean isForceMergeComplete(IndexMetadata backingIndex) { } @Nullable - private static TimeValue getRetention(DataStream dataStream, DataStreamGlobalRetention globalRetention, boolean failureStore) { + private static TimeValue getEffectiveRetention( + DataStream dataStream, + DataStreamGlobalRetentionSettings globalRetentionSettings, + boolean failureStore + ) { DataStreamLifecycle lifecycle = failureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle(); return lifecycle == null || lifecycle.enabled() == false ? null - : lifecycle.getEffectiveDataRetention(globalRetention, dataStream.isInternal()); + : lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(failureStore), dataStream.isInternal()); } /** diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java index 67152a0d324f3..5d58a1d1cb947 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java @@ -120,7 +120,8 @@ protected void masterOperation( new ExplainDataStreamLifecycleAction.Response( explainIndices, request.includeDefaults() ? clusterSettings.get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING) : null, - globalRetentionSettings.get() + globalRetentionSettings.get(false), + globalRetentionSettings.get(true) ) ); } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java index 6a9bf1e602a14..e483158c484c1 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java @@ -45,7 +45,7 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase @Override protected Writeable.Reader instanceReader() { - return Response::new; + return Response::read; } @Override diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsActionTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsActionTests.java index e041fd1b45430..0189ff6d59745 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsActionTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/TransportGetDataStreamsActionTests.java @@ -351,8 +351,8 @@ public void testPassingGlobalRetention() { emptyDataStreamFailureStoreSettings, null ); - assertThat(response.getGlobalRetention(), nullValue()); - DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention( + assertThat(response.getDataGlobalRetention(), nullValue()); + DataStreamGlobalRetention dataGlobalRetention = new DataStreamGlobalRetention( TimeValue.timeValueDays(randomIntBetween(1, 5)), TimeValue.timeValueDays(randomIntBetween(5, 10)) ); @@ -361,9 +361,9 @@ public void testPassingGlobalRetention() { Settings.builder() .put( DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING.getKey(), - globalRetention.defaultRetention() + dataGlobalRetention.defaultRetention() ) - .put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), globalRetention.maxRetention()) + .put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), dataGlobalRetention.maxRetention()) .build() ) ); @@ -377,7 +377,9 @@ public void testPassingGlobalRetention() { emptyDataStreamFailureStoreSettings, null ); - assertThat(response.getGlobalRetention(), equalTo(globalRetention)); + assertThat(response.getDataGlobalRetention(), equalTo(dataGlobalRetention)); + // We used the default failures retention here which is greater than the max + assertThat(response.getFailuresGlobalRetention(), equalTo(new DataStreamGlobalRetention(null, dataGlobalRetention.maxRetention()))); } public void testDataStreamIsFailureStoreEffectivelyEnabled_disabled() { diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index 8736cdc80630b..b084470149e90 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -1566,9 +1566,16 @@ public void testMaybeExecuteRetentionSuccessfulDownsampledIndex() { ClusterState state = downsampleSetup(dataStreamName, SUCCESS); DataStream dataStream = state.metadata().dataStreams().get(dataStreamName); String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention(); // Executing the method to be tested: - Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of()); + Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention( + clusterService.state(), + dataStream, + dataRetention, + null, + Set.of() + ); assertThat(indicesToBeRemoved, contains(state.getMetadata().index(firstGenIndexName).getIndex())); } @@ -1576,10 +1583,16 @@ public void testMaybeExecuteRetentionDownsampledIndexInProgress() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); ClusterState state = downsampleSetup(dataStreamName, STARTED); DataStream dataStream = state.metadata().dataStreams().get(dataStreamName); - String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention(); // Executing the method to be tested: - Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of()); + Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention( + clusterService.state(), + dataStream, + dataRetention, + null, + Set.of() + ); assertThat(indicesToBeRemoved, empty()); } @@ -1588,9 +1601,16 @@ public void testMaybeExecuteRetentionDownsampledUnknown() { ClusterState state = downsampleSetup(dataStreamName, UNKNOWN); DataStream dataStream = state.metadata().dataStreams().get(dataStreamName); String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention(); // Executing the method to be tested: - Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of()); + Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention( + clusterService.state(), + dataStream, + dataRetention, + null, + Set.of() + ); assertThat(indicesToBeRemoved, contains(state.getMetadata().index(firstGenIndexName).getIndex())); } diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_failure_store_info.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_failure_store_info.yml index 9739b290f4510..98d9d3a6ddb60 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_failure_store_info.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_failure_store_info.yml @@ -189,6 +189,9 @@ teardown: - match: { data_streams.0.template: 'my-template1' } - match: { data_streams.0.failure_store.enabled: true } - match: { data_streams.0.failure_store.lifecycle.enabled: false } + - is_false: data_streams.0.failure_store.lifecycle.data_retention + - is_false: data_streams.0.failure_store.lifecycle.effective_retention + - is_false: data_streams.0.failure_store.lifecycle.retention_determined_by - length: { data_streams.0.failure_store.indices: 1 } - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-fs-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000002/' } - is_false: data_streams.0.failure_store.indices.0.prefer_ilm @@ -212,6 +215,9 @@ teardown: - match: { data_streams.0.template: 'my-template2' } - match: { data_streams.0.failure_store.enabled: true } - match: { data_streams.0.failure_store.lifecycle.enabled: true } + - is_false: data_streams.0.failure_store.lifecycle.data_retention + - match: { data_streams.0.failure_store.lifecycle.effective_retention: '30d' } + - match: { data_streams.0.failure_store.lifecycle.retention_determined_by: 'default_failures_retention' } - match: { data_streams.0.failure_store.indices: [] } # Initialize failure store @@ -234,6 +240,9 @@ teardown: - match: { data_streams.0.template: 'my-template2' } - match: { data_streams.0.failure_store.enabled: true } - match: { data_streams.0.failure_store.lifecycle.enabled: true } + - is_false: data_streams.0.failure_store.lifecycle.data_retention + - match: { data_streams.0.failure_store.lifecycle.effective_retention: '30d' } + - match: { data_streams.0.failure_store.lifecycle.retention_determined_by: 'default_failures_retention' } - length: { data_streams.0.failure_store.indices: 1 } - match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-fs-default-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000002/' } - is_false: data_streams.0.failure_store.indices.0.prefer_ilm diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 9f6ba588b330f..322e988b4f42a 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -212,6 +212,7 @@ static TransportVersion def(int id) { public static final TransportVersion PINNED_RETRIEVER_8_19 = def(8_841_0_23); public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_BLOCK_8_19 = def(8_841_0_24); public static final TransportVersion INTRODUCE_FAILURES_LIFECYCLE_BACKPORT_8_19 = def(8_841_0_25); + public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19 = def(8_841_0_26); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java index bf2ea910659f6..68a2a1716e48a 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java @@ -352,7 +352,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return toXContent(builder, params, null, null); + return toXContent(builder, params, null, null, null); } /** @@ -363,7 +363,8 @@ public XContentBuilder toXContent( XContentBuilder builder, Params params, @Nullable RolloverConfiguration rolloverConfiguration, - @Nullable DataStreamGlobalRetention globalRetention + @Nullable DataStreamGlobalRetention dataGlobalRetention, + @Nullable DataStreamGlobalRetention failureGlobalRetention ) throws IOException { builder.startObject(); builder.field(DataStream.NAME_FIELD.getPreferredName(), dataStream.getName()); @@ -384,7 +385,7 @@ public XContentBuilder toXContent( if (dataStream.getDataLifecycle() != null) { builder.field(LIFECYCLE_FIELD.getPreferredName()); dataStream.getDataLifecycle() - .toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal()); + .toXContent(builder, params, rolloverConfiguration, dataGlobalRetention, dataStream.isInternal()); } if (ilmPolicyName != null) { builder.field(ILM_POLICY_FIELD.getPreferredName(), ilmPolicyName); @@ -423,7 +424,7 @@ public XContentBuilder toXContent( DataStreamLifecycle failuresLifecycle = dataStream.getFailuresLifecycle(failureStoreEffectivelyEnabled); if (failuresLifecycle != null) { builder.field(LIFECYCLE_FIELD.getPreferredName()); - failuresLifecycle.toXContent(builder, params, rolloverConfiguration, globalRetention, dataStream.isInternal()); + failuresLifecycle.toXContent(builder, params, rolloverConfiguration, failureGlobalRetention, dataStream.isInternal()); } builder.endObject(); builder.endObject(); @@ -582,30 +583,44 @@ public void writeTo(StreamOutput out) throws IOException { @Nullable private final RolloverConfiguration rolloverConfiguration; @Nullable - private final DataStreamGlobalRetention globalRetention; + private final DataStreamGlobalRetention dataGlobalRetention; + @Nullable + private final DataStreamGlobalRetention failuresGlobalRetention; public Response(List dataStreams) { - this(dataStreams, null, null); + this(dataStreams, null, null, null); } public Response( List dataStreams, @Nullable RolloverConfiguration rolloverConfiguration, - @Nullable DataStreamGlobalRetention globalRetention + @Nullable DataStreamGlobalRetention dataGlobalRetention, + @Nullable DataStreamGlobalRetention failuresGlobalRetention ) { this.dataStreams = dataStreams; this.rolloverConfiguration = rolloverConfiguration; - this.globalRetention = globalRetention; + this.dataGlobalRetention = dataGlobalRetention; + this.failuresGlobalRetention = failuresGlobalRetention; } - public Response(StreamInput in) throws IOException { - this( - in.readCollectionAsList(DataStreamInfo::new), - in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X) ? in.readOptionalWriteable(RolloverConfiguration::new) : null, - in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) - ? in.readOptionalWriteable(DataStreamGlobalRetention::read) - : null - ); + public static Response read(StreamInput in) throws IOException { + var dataStreamInfo = in.readCollectionAsList(DataStreamInfo::new); + var rolloverConfiguration = in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X) + ? in.readOptionalWriteable(RolloverConfiguration::new) + : null; + DataStreamGlobalRetention dataGlobalRetention = null; + DataStreamGlobalRetention failuresGlobalRetention = null; + if (in.getTransportVersion().onOrAfter(TransportVersions.INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19)) { + var defaultRetention = in.readOptionalTimeValue(); + var maxRetention = in.readOptionalTimeValue(); + var failuresDefaultRetention = in.readOptionalTimeValue(); + dataGlobalRetention = DataStreamGlobalRetention.create(defaultRetention, maxRetention); + failuresGlobalRetention = DataStreamGlobalRetention.create(failuresDefaultRetention, maxRetention); + } else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) { + dataGlobalRetention = in.readOptionalWriteable(DataStreamGlobalRetention::read); + failuresGlobalRetention = dataGlobalRetention; + } + return new Response(dataStreamInfo, rolloverConfiguration, dataGlobalRetention, failuresGlobalRetention); } public List getDataStreams() { @@ -618,8 +633,13 @@ public RolloverConfiguration getRolloverConfiguration() { } @Nullable - public DataStreamGlobalRetention getGlobalRetention() { - return globalRetention; + public DataStreamGlobalRetention getDataGlobalRetention() { + return dataGlobalRetention; + } + + @Nullable + public DataStreamGlobalRetention getFailuresGlobalRetention() { + return failuresGlobalRetention; } @Override @@ -628,8 +648,13 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) { out.writeOptionalWriteable(rolloverConfiguration); } - if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) { - out.writeOptionalWriteable(globalRetention); + // A version 9.x cluster will never read this, so we only need to include the patch version here. + if (out.getTransportVersion().isPatchFrom(TransportVersions.INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19)) { + out.writeOptionalTimeValue(dataGlobalRetention == null ? null : dataGlobalRetention.defaultRetention()); + out.writeOptionalTimeValue(dataGlobalRetention == null ? null : dataGlobalRetention.maxRetention()); + out.writeOptionalTimeValue(failuresGlobalRetention == null ? null : failuresGlobalRetention.defaultRetention()); + } else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) { + out.writeOptionalWriteable(dataGlobalRetention); } } @@ -642,7 +667,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder, DataStreamLifecycle.addEffectiveRetentionParams(params), rolloverConfiguration, - globalRetention + dataGlobalRetention, + failuresGlobalRetention ); } builder.endArray(); @@ -657,12 +683,13 @@ public boolean equals(Object o) { Response response = (Response) o; return dataStreams.equals(response.dataStreams) && Objects.equals(rolloverConfiguration, response.rolloverConfiguration) - && Objects.equals(globalRetention, response.globalRetention); + && Objects.equals(dataGlobalRetention, response.dataGlobalRetention) + && Objects.equals(failuresGlobalRetention, response.failuresGlobalRetention); } @Override public int hashCode() { - return Objects.hash(dataStreams, rolloverConfiguration, globalRetention); + return Objects.hash(dataStreams, rolloverConfiguration, dataGlobalRetention, failuresGlobalRetention); } } diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleAction.java index b522225d94246..601a30356c113 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleAction.java @@ -147,25 +147,39 @@ public static class Response extends ActionResponse implements ChunkedToXContent @Nullable private final RolloverConfiguration rolloverConfiguration; @Nullable - private final DataStreamGlobalRetention globalRetention; + private final DataStreamGlobalRetention dataGlobalRetention; + @Nullable + private final DataStreamGlobalRetention failureGlobalRetention; public Response( List indices, @Nullable RolloverConfiguration rolloverConfiguration, - @Nullable DataStreamGlobalRetention globalRetention + @Nullable DataStreamGlobalRetention dataGlobalRetention, + @Nullable DataStreamGlobalRetention failureGlobalRetention ) { this.indices = indices; this.rolloverConfiguration = rolloverConfiguration; - this.globalRetention = globalRetention; + this.dataGlobalRetention = dataGlobalRetention; + this.failureGlobalRetention = failureGlobalRetention; } public Response(StreamInput in) throws IOException { super(in); this.indices = in.readCollectionAsList(ExplainIndexDataStreamLifecycle::new); this.rolloverConfiguration = in.readOptionalWriteable(RolloverConfiguration::new); - this.globalRetention = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) - ? in.readOptionalWriteable(DataStreamGlobalRetention::read) - : null; + if (in.getTransportVersion().onOrAfter(TransportVersions.INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19)) { + var defaultRetention = in.readOptionalTimeValue(); + var maxRetention = in.readOptionalTimeValue(); + var defaultFailuresRetention = in.readOptionalTimeValue(); + dataGlobalRetention = DataStreamGlobalRetention.create(defaultRetention, maxRetention); + failureGlobalRetention = DataStreamGlobalRetention.create(defaultFailuresRetention, maxRetention); + } else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) { + dataGlobalRetention = in.readOptionalWriteable(DataStreamGlobalRetention::read); + failureGlobalRetention = dataGlobalRetention; + } else { + dataGlobalRetention = null; + failureGlobalRetention = null; + } } public List getIndices() { @@ -176,16 +190,31 @@ public RolloverConfiguration getRolloverConfiguration() { return rolloverConfiguration; } - public DataStreamGlobalRetention getGlobalRetention() { - return globalRetention; + public DataStreamGlobalRetention getDataGlobalRetention() { + return dataGlobalRetention; + } + + public DataStreamGlobalRetention getFailuresGlobalRetention() { + return failureGlobalRetention; + } + + private DataStreamGlobalRetention getGlobalRetentionForLifecycle(DataStreamLifecycle lifecycle) { + if (lifecycle == null) { + return null; + } + return lifecycle.targetsFailureStore() ? failureGlobalRetention : dataGlobalRetention; } @Override public void writeTo(StreamOutput out) throws IOException { out.writeCollection(indices); out.writeOptionalWriteable(rolloverConfiguration); - if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) { - out.writeOptionalWriteable(globalRetention); + if (out.getTransportVersion().onOrAfter(TransportVersions.INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19)) { + out.writeOptionalTimeValue(dataGlobalRetention == null ? null : dataGlobalRetention.defaultRetention()); + out.writeOptionalTimeValue(dataGlobalRetention == null ? null : dataGlobalRetention.maxRetention()); + out.writeOptionalTimeValue(failureGlobalRetention == null ? null : failureGlobalRetention.defaultRetention()); + } else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) { + out.writeOptionalWriteable(getDataGlobalRetention()); } } @@ -200,12 +229,13 @@ public boolean equals(Object o) { Response response = (Response) o; return Objects.equals(indices, response.indices) && Objects.equals(rolloverConfiguration, response.rolloverConfiguration) - && Objects.equals(globalRetention, response.globalRetention); + && Objects.equals(dataGlobalRetention, response.dataGlobalRetention) + && Objects.equals(failureGlobalRetention, response.failureGlobalRetention); } @Override public int hashCode() { - return Objects.hash(indices, rolloverConfiguration, globalRetention); + return Objects.hash(indices, rolloverConfiguration, dataGlobalRetention, failureGlobalRetention); } @Override @@ -220,7 +250,7 @@ public Iterator toXContentChunked(ToXContent.Params outerP builder, DataStreamLifecycle.addEffectiveRetentionParams(outerParams), rolloverConfiguration, - globalRetention + getGlobalRetentionForLifecycle(explainIndexDataLifecycle.getLifecycle()) ); return builder; }), Iterators.single((builder, params) -> { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java index 673960c713391..568594da0c877 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetention.java @@ -48,6 +48,17 @@ public DataStreamGlobalRetention(TimeValue defaultRetention, TimeValue maxRetent this.maxRetention = maxRetention; } + /** + * Helper method that creates a global retention object or returns null in case both retentions are null + */ + @Nullable + public static DataStreamGlobalRetention create(@Nullable TimeValue defaultRetention, @Nullable TimeValue maxRetention) { + if (defaultRetention == null && maxRetention == null) { + return null; + } + return new DataStreamGlobalRetention(defaultRetention, maxRetention); + } + private boolean validateRetentionValue(@Nullable TimeValue retention) { return retention == null || retention.getMillis() >= MIN_RETENTION_VALUE.getMillis(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettings.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettings.java index 9e7256d6818bb..1940b54b7ca03 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettings.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; @@ -24,8 +25,10 @@ * This class holds the data stream global retention settings. It defines, validates and monitors the settings. *

* The global retention settings apply to non-system data streams that are managed by the data stream lifecycle. They consist of: - * - The default retention which applies to data streams that do not have a retention defined. - * - The max retention which applies to all data streams that do not have retention or their retention has exceeded this value. + * - The default retention which applies to the backing indices of data streams that do not have a retention defined. + * - The max retention which applies to backing and failure indices of data streams that do not have retention or their + * retention has exceeded this value. + * - The failures default retention which applied to the failure indices of data streams that do not have retention defined. */ public class DataStreamGlobalRetentionSettings { @@ -82,27 +85,66 @@ public Iterator> settings() { Setting.Property.Dynamic ); + static final TimeValue FAILURES_DEFAULT_RETENTION = TimeValue.timeValueDays(30); + public static final Setting FAILURE_STORE_DEFAULT_RETENTION_SETTING = Setting.timeSetting( + "data_streams.lifecycle.retention.failures_default", + FAILURES_DEFAULT_RETENTION, + new Setting.Validator<>() { + @Override + public void validate(TimeValue value) {} + + @Override + public void validate(final TimeValue settingValue, final Map, Object> settings) { + TimeValue defaultRetention = getSettingValueOrNull(settingValue); + // Currently, we do not validate the default for the failure store against the max because + // we start with a default value that might conflict the max retention. + validateIsolatedRetentionValue(defaultRetention, FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey()); + } + }, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + @Nullable private volatile TimeValue defaultRetention; @Nullable private volatile TimeValue maxRetention; + @Nullable + private volatile TimeValue failuresDefaultRetention; + /** We cache the global retention objects, volatile is sufficient we only "write" this values in the settings appliers which + * are executed by {@link org.elasticsearch.common.settings.AbstractScopedSettings#applySettings(Settings)} which is synchronised. + */ + @Nullable + private volatile DataStreamGlobalRetention dataGlobalRetention; + @Nullable + private volatile DataStreamGlobalRetention failuresGlobalRetention; private DataStreamGlobalRetentionSettings() { } + /** + * @return the max retention that applies to all data stream data + */ @Nullable public TimeValue getMaxRetention() { return maxRetention; } + /** + * @return the default retention that applies either to the data component + */ @Nullable public TimeValue getDefaultRetention() { return defaultRetention; } - public boolean areDefined() { - return getDefaultRetention() != null || getMaxRetention() != null; + /** + * @return the default retention that applies either to the data or the failures component + */ + @Nullable + public TimeValue getDefaultRetention(boolean failureStore) { + return failureStore ? failuresDefaultRetention : defaultRetention; } /** @@ -113,17 +155,33 @@ public static DataStreamGlobalRetentionSettings create(ClusterSettings clusterSe DataStreamGlobalRetentionSettings dataStreamGlobalRetentionSettings = new DataStreamGlobalRetentionSettings(); clusterSettings.initializeAndWatch(DATA_STREAMS_DEFAULT_RETENTION_SETTING, dataStreamGlobalRetentionSettings::setDefaultRetention); clusterSettings.initializeAndWatch(DATA_STREAMS_MAX_RETENTION_SETTING, dataStreamGlobalRetentionSettings::setMaxRetention); + clusterSettings.initializeAndWatch( + FAILURE_STORE_DEFAULT_RETENTION_SETTING, + dataStreamGlobalRetentionSettings::setFailuresDefaultRetention + ); return dataStreamGlobalRetentionSettings; } private void setMaxRetention(TimeValue maxRetention) { this.maxRetention = getSettingValueOrNull(maxRetention); - logger.info("Updated max factory retention to [{}]", this.maxRetention == null ? null : maxRetention.getStringRep()); + this.dataGlobalRetention = createDataStreamGlobalRetention(false); + this.failuresGlobalRetention = createDataStreamGlobalRetention(true); + logger.info("Updated global max retention to [{}]", this.maxRetention == null ? null : maxRetention.getStringRep()); } private void setDefaultRetention(TimeValue defaultRetention) { this.defaultRetention = getSettingValueOrNull(defaultRetention); - logger.info("Updated default factory retention to [{}]", this.defaultRetention == null ? null : defaultRetention.getStringRep()); + this.dataGlobalRetention = createDataStreamGlobalRetention(false); + logger.info("Updated global default retention to [{}]", this.defaultRetention == null ? null : defaultRetention.getStringRep()); + } + + private void setFailuresDefaultRetention(TimeValue failuresDefaultRetention) { + this.failuresDefaultRetention = getSettingValueOrNull(failuresDefaultRetention); + this.failuresGlobalRetention = createDataStreamGlobalRetention(true); + logger.info( + "Updated failures default retention to [{}]", + this.failuresDefaultRetention == null ? null : failuresDefaultRetention.getStringRep() + ); } private static void validateIsolatedRetentionValue(@Nullable TimeValue retention, String settingName) { @@ -150,12 +208,36 @@ private static void validateGlobalRetentionConfiguration(@Nullable TimeValue def } } + /** + * @return the global retention of backing indices + */ @Nullable public DataStreamGlobalRetention get() { - if (areDefined() == false) { + return get(false); + } + + /** + * Returns the global retention that applies to the data or failures of a data stream + * @param failureStore, true if we are retrieving the global retention that applies to failure store, false otherwise. + */ + @Nullable + public DataStreamGlobalRetention get(boolean failureStore) { + return failureStore ? failuresGlobalRetention : dataGlobalRetention; + } + + @Nullable + private DataStreamGlobalRetention createDataStreamGlobalRetention(boolean failureStore) { + if (areDefined(failureStore) == false) { return null; } - return new DataStreamGlobalRetention(getDefaultRetention(), getMaxRetention()); + TimeValue defaultRetention = getDefaultRetention(failureStore); + TimeValue maxRetention = getMaxRetention(); + // We ensure that we create valid DataStreamGlobalRetention where default is less or equal to max. + // If it's not we set it to null. + if (defaultRetention != null && maxRetention != null && defaultRetention.getMillis() > maxRetention.getMillis()) { + return new DataStreamGlobalRetention(null, getMaxRetention()); + } + return new DataStreamGlobalRetention(defaultRetention, maxRetention); } /** @@ -169,4 +251,8 @@ public DataStreamGlobalRetention get() { private static TimeValue getSettingValueOrNull(TimeValue value) { return value == null || value.equals(TimeValue.MINUS_ONE) ? null : value; } + + private boolean areDefined(boolean failureStore) { + return getDefaultRetention(failureStore) != null || getMaxRetention() != null; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java index 20b32d2f3591f..6be586f2d6944 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java @@ -233,7 +233,10 @@ public Tuple getEffectiveDataRetentionWithSource( } if (dataRetention() == null) { return globalRetention.defaultRetention() != null - ? Tuple.tuple(globalRetention.defaultRetention(), RetentionSource.DEFAULT_GLOBAL_RETENTION) + ? Tuple.tuple( + globalRetention.defaultRetention(), + targetsFailureStore() ? RetentionSource.DEFAULT_FAILURES_RETENTION : RetentionSource.DEFAULT_GLOBAL_RETENTION + ) : Tuple.tuple(globalRetention.maxRetention(), RetentionSource.MAX_GLOBAL_RETENTION); } if (globalRetention.maxRetention() != null && globalRetention.maxRetention().getMillis() < dataRetention().getMillis()) { @@ -506,7 +509,8 @@ public static ToXContent.Params addEffectiveRetentionParams(ToXContent.Params pa public enum RetentionSource { DATA_STREAM_CONFIGURATION, DEFAULT_GLOBAL_RETENTION, - MAX_GLOBAL_RETENTION; + MAX_GLOBAL_RETENTION, + DEFAULT_FAILURES_RETENTION; public String displayName() { return this.toString().toLowerCase(Locale.ROOT); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index d64dfc627c5f6..7d103b667f2bb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -279,7 +279,7 @@ ClusterState updateDataLifecycle(ClusterState currentState, List dataStr } if (lifecycle != null) { // We don't issue any warnings if all data streams are internal data streams - lifecycle.addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(), onlyInternalDataStreams); + lifecycle.addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(false), onlyInternalDataStreams); } return ClusterState.builder(currentState).metadata(builder.build()).build(); } @@ -305,7 +305,7 @@ ClusterState updateDataStreamOptions( // We don't issue any warnings if all data streams are internal data streams dataStreamOptions.failureStore() .lifecycle() - .addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(), onlyInternalDataStreams); + .addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(true), onlyInternalDataStreams); } return ClusterState.builder(currentState).metadata(builder.build()).build(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java index c03d977e98525..926e20e1752be 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataIndexTemplateService.java @@ -341,9 +341,14 @@ public ClusterState addComponentTemplate( tempStateWithComponentTemplateAdded.metadata(), composableTemplateName, composableTemplate, - globalRetentionSettings.get() + globalRetentionSettings.get(false) + ); + validateDataStreamOptions( + tempStateWithComponentTemplateAdded.metadata(), + composableTemplateName, + composableTemplate, + globalRetentionSettings.get(true) ); - validateDataStreamOptions(tempStateWithComponentTemplateAdded.metadata(), composableTemplateName, composableTemplate); validateIndexTemplateV2(composableTemplateName, composableTemplate, tempStateWithComponentTemplateAdded); } catch (Exception e) { if (validationFailure == null) { @@ -370,7 +375,7 @@ public ClusterState addComponentTemplate( finalComponentTemplate.template() .lifecycle() .toDataStreamLifecycle() - .addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(), false); + .addWarningHeaderIfDataRetentionNotEffective(globalRetentionSettings.get(false), false); } logger.info("{} component template [{}]", existing == null ? "adding" : "updating", name); @@ -725,8 +730,8 @@ void validateIndexTemplateV2(String name, ComposableIndexTemplate indexTemplate, validate(name, templateToValidate); validateDataStreamsStillReferenced(currentState, name, templateToValidate); - validateLifecycle(currentState.metadata(), name, templateToValidate, globalRetentionSettings.get()); - validateDataStreamOptions(currentState.metadata(), name, templateToValidate); + validateLifecycle(currentState.metadata(), name, templateToValidate, globalRetentionSettings.get(false)); + validateDataStreamOptions(currentState.metadata(), name, templateToValidate, globalRetentionSettings.get(true)); if (templateToValidate.isDeprecated() == false) { validateUseOfDeprecatedComponentTemplates(name, templateToValidate, currentState.metadata().componentTemplates()); @@ -821,7 +826,12 @@ static void validateLifecycle( } // Visible for testing - static void validateDataStreamOptions(Metadata metadata, String indexTemplateName, ComposableIndexTemplate template) { + static void validateDataStreamOptions( + Metadata metadata, + String indexTemplateName, + ComposableIndexTemplate template, + DataStreamGlobalRetention globalRetention + ) { DataStreamOptions.Builder dataStreamOptionsBuilder = resolveDataStreamOptions(template, metadata.componentTemplates()); if (dataStreamOptionsBuilder != null) { if (template.getDataStreamTemplate() == null) { @@ -831,6 +841,17 @@ static void validateDataStreamOptions(Metadata metadata, String indexTemplateNam + "] specifies data stream options that can only be used in combination with a data stream" ); } + if (globalRetention != null) { + // We cannot know for sure if the template will apply to internal data streams, so we use a simpler heuristic: + // If all the index patterns start with a dot, we consider that all the connected data streams are internal. + boolean isInternalDataStream = template.indexPatterns().stream().allMatch(indexPattern -> indexPattern.charAt(0) == '.'); + DataStreamOptions dataStreamOptions = dataStreamOptionsBuilder.build(); + if (dataStreamOptions.failureStore() != null && dataStreamOptions.failureStore().lifecycle() != null) { + dataStreamOptions.failureStore() + .lifecycle() + .addWarningHeaderIfDataRetentionNotEffective(globalRetention, isInternalDataStream); + } + } } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index ae79370811ee2..b9aa4563464f5 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -619,6 +619,7 @@ public void apply(Settings value, Settings current, Settings previous) { TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE, DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING, DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING, + DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING, ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME, DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING, TransportGetAllocationStatsAction.CACHE_TTL_SETTING diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java index 834a9a50b257f..64d6c6ef002e9 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java @@ -75,7 +75,7 @@ private Map getXContentMap( ToXContent.Params params = new ToXContent.MapParams(DataStreamLifecycle.INCLUDE_EFFECTIVE_RETENTION_PARAMS); RolloverConfiguration rolloverConfiguration = null; DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(globalDefaultRetention, globalMaxRetention); - dataStreamInfo.toXContent(builder, params, rolloverConfiguration, globalRetention); + dataStreamInfo.toXContent(builder, params, rolloverConfiguration, globalRetention, globalRetention); String serialized = Strings.toString(builder); return XContentHelper.convertToMap(XContentType.JSON.xContent(), serialized, randomBoolean()); } diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleResponseTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleResponseTests.java index 762aff6e3c4e0..cc1cab040bfef 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/lifecycle/ExplainDataStreamLifecycleResponseTests.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction.Response; import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; @@ -67,7 +68,7 @@ public void testToXContent() throws IOException { ExplainIndexDataStreamLifecycle explainIndex = createRandomIndexDataStreamLifecycleExplanation(now, lifecycle); explainIndex.setNowSupplier(() -> now); { - Response response = new Response(List.of(explainIndex), null, null); + Response response = new Response(List.of(explainIndex), null, null, null); XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); response.toXContentChunked(EMPTY_PARAMS).forEachRemaining(xcontent -> { @@ -133,10 +134,16 @@ public void testToXContent() throws IOException { new MinPrimaryShardDocsCondition(4L) ) ); + DataStreamGlobalRetention dataGlobalRetention = DataStreamTestHelper.randomGlobalRetention(); + DataStreamGlobalRetention failuresGlobalRetention = new DataStreamGlobalRetention( + randomTimeValue(1, 30, TimeUnit.DAYS), + dataGlobalRetention == null ? null : dataGlobalRetention.maxRetention() + ); Response response = new Response( List.of(explainIndex), new RolloverConfiguration(rolloverConditions), - DataStreamTestHelper.randomGlobalRetention() + dataGlobalRetention, + failuresGlobalRetention ); XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); @@ -219,7 +226,7 @@ public void testToXContent() throws IOException { ) : null ); - Response response = new Response(List.of(explainIndexWithNullGenerationDate), null, null); + Response response = new Response(List.of(explainIndexWithNullGenerationDate), null, null, null); XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); response.toXContentChunked(EMPTY_PARAMS).forEachRemaining(xcontent -> { @@ -249,6 +256,7 @@ public void testChunkCount() { createRandomIndexDataStreamLifecycleExplanation(now, lifecycle) ), null, + null, null ); @@ -297,6 +305,14 @@ protected Response mutateInstance(Response instance) { } private Response randomResponse() { + var dataGlobalRetention = DataStreamGlobalRetention.create( + randomBoolean() ? TimeValue.timeValueDays(randomIntBetween(1, 10)) : null, + randomBoolean() ? TimeValue.timeValueDays(randomIntBetween(10, 20)) : null + ); + var failuresGlobalRetention = DataStreamGlobalRetention.create( + randomBoolean() ? TimeValue.timeValueDays(randomIntBetween(1, 10)) : null, + dataGlobalRetention == null ? null : dataGlobalRetention.maxRetention() + ); return new Response( List.of( createRandomIndexDataStreamLifecycleExplanation( @@ -311,12 +327,8 @@ private Response randomResponse() { ) ) : null, - randomBoolean() - ? new DataStreamGlobalRetention( - TimeValue.timeValueDays(randomIntBetween(1, 10)), - TimeValue.timeValueDays(randomIntBetween(10, 20)) - ) - : null + dataGlobalRetention, + failuresGlobalRetention ); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettingsTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettingsTests.java index 17fa520ad1c4a..453dcca1c8945 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamGlobalRetentionSettingsTests.java @@ -27,6 +27,8 @@ public void testDefaults() { assertThat(globalRetentionSettings.getDefaultRetention(), nullValue()); assertThat(globalRetentionSettings.getMaxRetention(), nullValue()); + assertThat(globalRetentionSettings.get(false), nullValue()); + assertThat(globalRetentionSettings.get(true), equalTo(DataStreamGlobalRetention.create(TimeValue.timeValueDays(30), null))); } public void testMonitorsDefaultRetention() { @@ -43,7 +45,8 @@ public void testMonitorsDefaultRetention() { .build(); clusterSettings.applySettings(newSettings); - assertThat(newDefaultRetention, equalTo(globalRetentionSettings.getDefaultRetention())); + assertThat(globalRetentionSettings.getDefaultRetention(), equalTo(newDefaultRetention)); + assertThat(globalRetentionSettings.get(false), equalTo(DataStreamGlobalRetention.create(newDefaultRetention, null))); // Test invalid update Settings newInvalidSettings = Settings.builder() @@ -57,6 +60,7 @@ public void testMonitorsDefaultRetention() { exception.getCause().getMessage(), containsString("Setting 'data_streams.lifecycle.retention.default' should be greater than") ); + assertThat(globalRetentionSettings.get(false), equalTo(DataStreamGlobalRetention.create(newDefaultRetention, null))); } public void testMonitorsMaxRetention() { @@ -64,13 +68,25 @@ public void testMonitorsMaxRetention() { DataStreamGlobalRetentionSettings globalRetentionSettings = DataStreamGlobalRetentionSettings.create(clusterSettings); // Test valid update - TimeValue newMaxRetention = TimeValue.timeValueDays(randomIntBetween(10, 30)); + TimeValue newMaxRetention = TimeValue.timeValueDays(randomIntBetween(10, 29)); Settings newSettings = Settings.builder() .put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), newMaxRetention.toHumanReadableString(0)) .build(); clusterSettings.applySettings(newSettings); - assertThat(newMaxRetention, equalTo(globalRetentionSettings.getMaxRetention())); + assertThat(globalRetentionSettings.getMaxRetention(), equalTo(newMaxRetention)); + assertThat(globalRetentionSettings.get(false), equalTo(DataStreamGlobalRetention.create(null, newMaxRetention))); + assertThat(globalRetentionSettings.get(true), equalTo(DataStreamGlobalRetention.create(null, newMaxRetention))); + + newMaxRetention = TimeValue.timeValueDays(100); + newSettings = Settings.builder() + .put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), newMaxRetention.toHumanReadableString(0)) + .build(); + clusterSettings.applySettings(newSettings); + assertThat( + globalRetentionSettings.get(true), + equalTo(DataStreamGlobalRetention.create(TimeValue.timeValueDays(30), newMaxRetention)) + ); // Test invalid update Settings newInvalidSettings = Settings.builder() @@ -84,11 +100,57 @@ public void testMonitorsMaxRetention() { exception.getCause().getMessage(), containsString("Setting 'data_streams.lifecycle.retention.max' should be greater than") ); + assertThat(globalRetentionSettings.get(false), equalTo(DataStreamGlobalRetention.create(null, newMaxRetention))); + } + + public void testMonitorsDefaultFailuresRetention() { + ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(); + DataStreamGlobalRetentionSettings globalRetentionSettings = DataStreamGlobalRetentionSettings.create(clusterSettings); + + // Test valid update + TimeValue newDefaultRetention = TimeValue.timeValueDays(randomIntBetween(1, 10)); + Settings newSettings = Settings.builder() + .put( + DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey(), + newDefaultRetention.toHumanReadableString(0) + ) + .build(); + clusterSettings.applySettings(newSettings); + + assertThat(globalRetentionSettings.getDefaultRetention(true), equalTo(newDefaultRetention)); + assertThat(globalRetentionSettings.get(true), equalTo(DataStreamGlobalRetention.create(newDefaultRetention, null))); + + // Test update default failures retention to infinite retention + newDefaultRetention = TimeValue.MINUS_ONE; + newSettings = Settings.builder() + .put( + DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey(), + newDefaultRetention.toHumanReadableString(0) + ) + .build(); + clusterSettings.applySettings(newSettings); + + assertThat(globalRetentionSettings.getDefaultRetention(true), nullValue()); + assertThat(globalRetentionSettings.get(true), nullValue()); + + // Test invalid update + Settings newInvalidSettings = Settings.builder() + .put(DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey(), TimeValue.ZERO) + .build(); + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> clusterSettings.applySettings(newInvalidSettings) + ); + assertThat( + exception.getCause().getMessage(), + containsString("Setting 'data_streams.lifecycle.retention.failures_default' should be greater than") + ); + assertThat(globalRetentionSettings.get(true), nullValue()); } public void testCombinationValidation() { ClusterSettings clusterSettings = ClusterSettings.createBuiltInClusterSettings(); - DataStreamGlobalRetentionSettings.create(clusterSettings); + DataStreamGlobalRetentionSettings dataStreamGlobalRetentionSettings = DataStreamGlobalRetentionSettings.create(clusterSettings); // Test invalid update Settings newInvalidSettings = Settings.builder() @@ -105,5 +167,17 @@ public void testCombinationValidation() { "Setting [data_streams.lifecycle.retention.default=90d] cannot be greater than [data_streams.lifecycle.retention.max=30d]" ) ); + + // Test valid update even if the failures default is greater than max. + Settings newValidSettings = Settings.builder() + .put(DataStreamGlobalRetentionSettings.FAILURE_STORE_DEFAULT_RETENTION_SETTING.getKey(), TimeValue.timeValueDays(90)) + .put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), TimeValue.timeValueDays(30)) + .build(); + clusterSettings.applySettings(newValidSettings); + assertThat(dataStreamGlobalRetentionSettings.getDefaultRetention(true), equalTo(TimeValue.timeValueDays(90))); + assertThat( + dataStreamGlobalRetentionSettings.get(true), + equalTo(DataStreamGlobalRetention.create(null, TimeValue.timeValueDays(30))) + ); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java index 926b9ac764e5a..e430c81f4fad9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java @@ -37,6 +37,7 @@ import java.util.stream.Stream; import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DATA_STREAM_CONFIGURATION; +import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DEFAULT_FAILURES_RETENTION; import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DEFAULT_GLOBAL_RETENTION; import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.MAX_GLOBAL_RETENTION; import static org.hamcrest.Matchers.containsString; @@ -372,7 +373,7 @@ public void testEffectiveRetention() { false ); assertThat(effectiveFailuresRetentionWithSource.v1(), equalTo(defaultRetention)); - assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(DEFAULT_GLOBAL_RETENTION)); + assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(DEFAULT_FAILURES_RETENTION)); } // With retention in the data stream lifecycle @@ -477,7 +478,7 @@ public void testEffectiveRetention() { assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(MAX_GLOBAL_RETENTION)); } else { assertThat(effectiveFailuresRetentionWithSource.v1(), equalTo(globalRetention.defaultRetention())); - assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(DEFAULT_GLOBAL_RETENTION)); + assertThat(effectiveFailuresRetentionWithSource.v2(), equalTo(DEFAULT_FAILURES_RETENTION)); } // Now verify that internal data streams do not use global retention diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamUsageTransportAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamUsageTransportAction.java index 656af355b1d03..215ae51ecaf95 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamUsageTransportAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamUsageTransportAction.java @@ -107,7 +107,7 @@ protected void masterOperation( if (effectiveDataRetentionWithSource.v2().equals(DataStreamLifecycle.RetentionSource.MAX_GLOBAL_RETENTION)) { affectedByMaxRetentionCounter++; } - if (effectiveDataRetentionWithSource.v2().equals(DataStreamLifecycle.RetentionSource.DEFAULT_GLOBAL_RETENTION)) { + if (effectiveDataRetentionWithSource.v2().equals(DataStreamLifecycle.RetentionSource.DEFAULT_FAILURES_RETENTION)) { affectedByDefaultRetentionCounter++; } } From 77ead29411c2628193e09eeaa1f142ba215f3887 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Sat, 3 May 2025 16:40:03 +0300 Subject: [PATCH 2/2] Add rest API capability for failures default retention --- .../datastreams/rest/RestGetDataStreamsAction.java | 3 ++- .../test/data_stream/240_failure_store_info.yml | 7 +++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java index b99c23e688ab1..7f1011bddde39 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/rest/RestGetDataStreamsAction.java @@ -47,7 +47,8 @@ public class RestGetDataStreamsAction extends BaseRestHandler { public static final String FAILURES_LIFECYCLE_API_CAPABILITY = "failure_store.lifecycle"; private static final Set CAPABILITIES = Set.of( DataStreamLifecycle.EFFECTIVE_RETENTION_REST_API_CAPABILITY, - FAILURES_LIFECYCLE_API_CAPABILITY + FAILURES_LIFECYCLE_API_CAPABILITY, + "failure_store.lifecycle.default_retention" ); @Override diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_failure_store_info.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_failure_store_info.yml index 98d9d3a6ddb60..d66ccecebff6b 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_failure_store_info.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/240_failure_store_info.yml @@ -199,6 +199,13 @@ teardown: --- "Get failure store info from cluster setting enabled failure store": + - requires: + test_runner_features: [ capabilities ] + reason: "Default retention for failures was added in 8.19+" + capabilities: + - method: GET + path: /_data_stream/{target} + capabilities: [ 'failure_store.lifecycle.default_retention' ] - do: indices.create_data_stream: name: fs-default-data-stream