From 8074d04b06f3caa115ee5b1806a172080d1b9c32 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Tue, 12 Mar 2024 22:11:04 +0200 Subject: [PATCH 1/7] Add effective retention calculation to DataStreamLifecycle --- .../cluster/metadata/DataStreamLifecycle.java | 79 +++++++++++++++++-- .../metadata/DataStreamLifecycleTests.java | 74 ++++++++++++++++- 2 files changed, 143 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java index b4a3a1eb3502a..904bf89b273ef 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.xcontent.AbstractObjectParser; import org.elasticsearch.xcontent.ConstructingObjectParser; @@ -34,6 +35,7 @@ import java.io.IOException; import java.util.List; +import java.util.Locale; import java.util.Objects; import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg; @@ -77,12 +79,14 @@ public static boolean isDataStreamsLifecycleOnlyMode(final Settings settings) { public static final ParseField ENABLED_FIELD = new ParseField("enabled"); public static final ParseField DATA_RETENTION_FIELD = new ParseField("data_retention"); + public static final ParseField EFFECTIVE_RETENTION_FIELD = new ParseField("effective_retention"); + public static final ParseField RETENTION_SOURCE_FIELD = new ParseField("retention_determined_by"); public static final ParseField DOWNSAMPLING_FIELD = new ParseField("downsampling"); private static final ParseField ROLLOVER_FIELD = new ParseField("rollover"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "lifecycle", - false, + true, (args, unused) -> new DataStreamLifecycle((Retention) args[0], (Downsampling) args[1], (Boolean) args[2]) ); @@ -133,13 +137,39 @@ public boolean isEnabled() { * @return the time period or null, null represents that data should never be deleted. */ @Nullable - public TimeValue getEffectiveDataRetention() { - return getDataStreamRetention(); + public TimeValue getEffectiveDataRetention(@Nullable DataStreamGlobalRetention globalRetention) { + return getEffectiveDataRetentionWithSource(globalRetention).v1(); + } + + /** + * The least amount of time data should be kept by elasticsearch. + * @return the time period or null, null represents that data should never be deleted. + */ + @Nullable + public Tuple getEffectiveDataRetentionWithSource(@Nullable DataStreamGlobalRetention globalRetention) { + // If lifecycle is disabled there is no effective retention + if (enabled == false) { + return Tuple.tuple(null, RetentionSource.DATA_STREAM_CONFIGURATION); + } + var dataStreamRetention = getDataStreamRetention(); + if (globalRetention == null) { + return Tuple.tuple(dataStreamRetention, RetentionSource.DATA_STREAM_CONFIGURATION); + } + if (dataStreamRetention == null) { + return globalRetention.getDefaultRetention() != null + ? Tuple.tuple(globalRetention.getDefaultRetention(), RetentionSource.DEFAULT_GLOBAL_RETENTION) + : Tuple.tuple(globalRetention.getMaxRetention(), RetentionSource.MAX_GLOBAL_RETENTION); + } + if (globalRetention.getMaxRetention() != null && globalRetention.getMaxRetention().getMillis() < dataStreamRetention.getMillis()) { + return Tuple.tuple(globalRetention.getMaxRetention(), RetentionSource.MAX_GLOBAL_RETENTION); + } else { + return Tuple.tuple(dataStreamRetention, RetentionSource.DATA_STREAM_CONFIGURATION); + } } /** * The least amount of time data the data stream is requesting es to keep the data. - * NOTE: this can be overriden by the {@link DataStreamLifecycle#getEffectiveDataRetention()}. + * NOTE: this can be overridden by the {@link DataStreamLifecycle#getEffectiveDataRetention(DataStreamGlobalRetention)}. * @return the time period or null, null represents that data should never be deleted. */ @Nullable @@ -232,30 +262,50 @@ public String toString() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return toXContent(builder, params, null); + return toXContent(builder, params, null, null); } /** * Converts the data stream lifecycle to XContent and injects the RolloverConditions if they exist. + * @deprecated use {@link #toXContent(XContentBuilder, Params, RolloverConfiguration, DataStreamGlobalRetention)} */ + @Deprecated public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nullable RolloverConfiguration rolloverConfiguration) throws IOException { + return toXContent(builder, params, rolloverConfiguration, null); + } + + /** + * Converts the data stream lifecycle to XContent and injects the RolloverConditions and the global retention if they exist. + */ + public XContentBuilder toXContent( + XContentBuilder builder, + Params params, + @Nullable RolloverConfiguration rolloverConfiguration, + @Nullable DataStreamGlobalRetention globalRetention + ) throws IOException { builder.startObject(); builder.field(ENABLED_FIELD.getPreferredName(), enabled); if (dataRetention != null) { - if (dataRetention.value() == null) { + if (dataRetention.value == null) { builder.nullField(DATA_RETENTION_FIELD.getPreferredName()); } else { - builder.field(DATA_RETENTION_FIELD.getPreferredName(), dataRetention.value().getStringRep()); + builder.field(DATA_RETENTION_FIELD.getPreferredName(), dataRetention.value.getStringRep()); } } + Tuple effectiveRetention = getEffectiveDataRetentionWithSource(globalRetention); + if (effectiveRetention.v1() != null) { + builder.field(EFFECTIVE_RETENTION_FIELD.getPreferredName(), effectiveRetention.v1().getStringRep()); + builder.field(RETENTION_SOURCE_FIELD.getPreferredName(), effectiveRetention.v2().displayName()); + } + if (downsampling != null) { builder.field(DOWNSAMPLING_FIELD.getPreferredName()); downsampling.toXContent(builder, params); } if (rolloverConfiguration != null) { builder.field(ROLLOVER_FIELD.getPreferredName()); - rolloverConfiguration.evaluateAndConvertToXContent(builder, params, getEffectiveDataRetention()); + rolloverConfiguration.evaluateAndConvertToXContent(builder, params, getEffectiveDataRetention(globalRetention)); } builder.endObject(); return builder; @@ -466,4 +516,17 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } } + + /** + * This enum represents all configuration sources that can influence the retention of a data stream. + */ + public enum RetentionSource { + DATA_STREAM_CONFIGURATION, + DEFAULT_GLOBAL_RETENTION, + MAX_GLOBAL_RETENTION; + + public String displayName() { + return this.toString().toLowerCase(Locale.ROOT); + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java index 441e8491b4b92..9bd2673f53446 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.AbstractXContentSerializingTestCase; import org.elasticsearch.test.ESTestCase; @@ -33,6 +34,9 @@ import java.util.Set; import java.util.stream.Stream; +import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DATA_STREAM_CONFIGURATION; +import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DEFAULT_GLOBAL_RETENTION; +import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.MAX_GLOBAL_RETENTION; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -107,10 +111,11 @@ public void testXContentSerializationWithRollover() throws IOException { try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { builder.humanReadable(true); RolloverConfiguration rolloverConfiguration = RolloverConfigurationTests.randomRolloverConditions(); - lifecycle.toXContent(builder, ToXContent.EMPTY_PARAMS, rolloverConfiguration); + DataStreamGlobalRetention globalRetention = DataStreamGlobalRetentionSerializationTests.randomGlobalRetention(); + lifecycle.toXContent(builder, ToXContent.EMPTY_PARAMS, rolloverConfiguration, globalRetention); String serialized = Strings.toString(builder); assertThat(serialized, containsString("rollover")); - for (String label : rolloverConfiguration.resolveRolloverConditions(lifecycle.getEffectiveDataRetention()) + for (String label : rolloverConfiguration.resolveRolloverConditions(lifecycle.getEffectiveDataRetention(globalRetention)) .getConditions() .keySet()) { assertThat(serialized, containsString(label)); @@ -253,6 +258,71 @@ public void testInvalidDownsamplingConfiguration() { } } + public void testEffectiveRetention() { + // No retention in the data stream lifecycle + { + DataStreamLifecycle noRetentionLifecycle = DataStreamLifecycle.newBuilder().downsampling(randomDownsampling()).build(); + TimeValue maxRetention = TimeValue.timeValueDays(randomIntBetween(50, 100)); + TimeValue defaultRetention = TimeValue.timeValueDays(randomIntBetween(1, 50)); + Tuple effectiveDataRetentionWithSource = noRetentionLifecycle + .getEffectiveDataRetentionWithSource(null); + assertThat(effectiveDataRetentionWithSource.v1(), nullValue()); + assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION)); + + effectiveDataRetentionWithSource = noRetentionLifecycle.getEffectiveDataRetentionWithSource( + new DataStreamGlobalRetention(null, maxRetention) + ); + assertThat(effectiveDataRetentionWithSource.v1(), equalTo(maxRetention)); + assertThat(effectiveDataRetentionWithSource.v2(), equalTo(MAX_GLOBAL_RETENTION)); + + effectiveDataRetentionWithSource = noRetentionLifecycle.getEffectiveDataRetentionWithSource( + new DataStreamGlobalRetention(defaultRetention, null) + ); + assertThat(effectiveDataRetentionWithSource.v1(), equalTo(defaultRetention)); + assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DEFAULT_GLOBAL_RETENTION)); + + effectiveDataRetentionWithSource = noRetentionLifecycle.getEffectiveDataRetentionWithSource( + new DataStreamGlobalRetention(defaultRetention, maxRetention) + ); + assertThat(effectiveDataRetentionWithSource.v1(), equalTo(defaultRetention)); + assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DEFAULT_GLOBAL_RETENTION)); + } + + // With retention in the data stream lifecycle + { + TimeValue dataStreamRetention = TimeValue.timeValueDays(randomIntBetween(5, 100)); + DataStreamLifecycle lifecycleRetention = DataStreamLifecycle.newBuilder() + .dataRetention(dataStreamRetention) + .downsampling(randomDownsampling()) + .build(); + TimeValue defaultRetention = TimeValue.timeValueDays(randomIntBetween(1, (int) dataStreamRetention.getDays() - 1)); + + Tuple effectiveDataRetentionWithSource = lifecycleRetention + .getEffectiveDataRetentionWithSource(null); + assertThat(effectiveDataRetentionWithSource.v1(), equalTo(dataStreamRetention)); + assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION)); + + effectiveDataRetentionWithSource = lifecycleRetention.getEffectiveDataRetentionWithSource( + new DataStreamGlobalRetention(defaultRetention, null) + ); + assertThat(effectiveDataRetentionWithSource.v1(), equalTo(dataStreamRetention)); + assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION)); + + effectiveDataRetentionWithSource = lifecycleRetention.getEffectiveDataRetentionWithSource( + new DataStreamGlobalRetention(defaultRetention, dataStreamRetention) + ); + assertThat(effectiveDataRetentionWithSource.v1(), equalTo(dataStreamRetention)); + assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION)); + + TimeValue maxRetentionLessThanDataStream = TimeValue.timeValueDays(dataStreamRetention.days() - 1); + effectiveDataRetentionWithSource = lifecycleRetention.getEffectiveDataRetentionWithSource( + new DataStreamGlobalRetention(randomBoolean() ? null : TimeValue.timeValueDays(10), maxRetentionLessThanDataStream) + ); + assertThat(effectiveDataRetentionWithSource.v1(), equalTo(maxRetentionLessThanDataStream)); + assertThat(effectiveDataRetentionWithSource.v2(), equalTo(MAX_GLOBAL_RETENTION)); + } + } + @Nullable public static DataStreamLifecycle randomLifecycle() { return DataStreamLifecycle.newBuilder() From d5f786e60965e8e9dc9268ebd5c8ad95dbb90f39 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Tue, 12 Mar 2024 22:11:32 +0200 Subject: [PATCH 2/7] Use effective retention in DataStream & DSL --- .../lifecycle/DataStreamLifecycleService.java | 8 +-- .../cluster/metadata/DataStream.java | 22 +++++--- .../cluster/metadata/DataStreamTests.java | 54 ++++++++++++++----- 3 files changed, 61 insertions(+), 23 deletions(-) diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index 1b875c28f7f43..d1dd008e27977 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -42,6 +42,7 @@ import org.elasticsearch.cluster.SimpleBatchedExecutor; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -796,7 +797,7 @@ private void maybeExecuteRollover(ClusterState state, DataStream dataStream) { RolloverRequest rolloverRequest = getDefaultRolloverRequest( rolloverConfiguration, dataStream.getName(), - dataStream.getLifecycle().getEffectiveDataRetention() + dataStream.getLifecycle().getEffectiveDataRetention(DataStreamGlobalRetention.getFromClusterState(state)) ); transportActionsDeduplicator.executeOnce( rolloverRequest, @@ -823,14 +824,15 @@ private void maybeExecuteRollover(ClusterState state, DataStream dataStream) { */ private Set maybeExecuteRetention(ClusterState state, DataStream dataStream, Set indicesToExcludeForRemainingRun) { Metadata metadata = state.metadata(); - List backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier); + DataStreamGlobalRetention globalRetention = DataStreamGlobalRetention.getFromClusterState(state); + List backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier, globalRetention); if (backingIndicesOlderThanRetention.isEmpty()) { return Set.of(); } Set indicesToBeRemoved = new HashSet<>(); // We know that there is lifecycle and retention because there are indices to be deleted assert dataStream.getLifecycle() != null; - TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention(); + TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention(globalRetention); for (Index index : backingIndicesOlderThanRetention) { if (indicesToExcludeForRemainingRun.contains(index) == false) { IndexMetadata backingIndex = metadata.index(index); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 073ba460a4698..6c58dd4c57e29 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -753,13 +753,17 @@ public DataStream snapshot(Collection indicesInSnapshot) { * NOTE that this specifically does not return the write index of the data stream as usually retention * is treated differently for the write index (i.e. they first need to be rolled over) */ - public List getIndicesPastRetention(Function indexMetadataSupplier, LongSupplier nowSupplier) { - if (lifecycle == null || lifecycle.isEnabled() == false || lifecycle.getEffectiveDataRetention() == null) { + public List getIndicesPastRetention( + Function indexMetadataSupplier, + LongSupplier nowSupplier, + DataStreamGlobalRetention globalRetention + ) { + if (lifecycle == null || lifecycle.isEnabled() == false || lifecycle.getEffectiveDataRetention(globalRetention) == null) { return List.of(); } List indicesPastRetention = getNonWriteIndicesOlderThan( - lifecycle.getEffectiveDataRetention(), + lifecycle.getEffectiveDataRetention(globalRetention), indexMetadataSupplier, this::isIndexManagedByDataStreamLifecycle, nowSupplier @@ -1092,14 +1096,18 @@ public static DataStream fromXContent(XContentParser parser) throws IOException @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return toXContent(builder, params, null); + return toXContent(builder, params, null, null); } /** * Converts the data stream to XContent and passes the RolloverConditions, when provided, to the lifecycle. */ - public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nullable RolloverConfiguration rolloverConfiguration) - throws IOException { + public XContentBuilder toXContent( + XContentBuilder builder, + Params params, + @Nullable RolloverConfiguration rolloverConfiguration, + @Nullable DataStreamGlobalRetention globalRetention + ) throws IOException { builder.startObject(); builder.field(NAME_FIELD.getPreferredName(), name); builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName()) @@ -1126,7 +1134,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nulla } if (lifecycle != null) { builder.field(LIFECYCLE.getPreferredName()); - lifecycle.toXContent(builder, params, rolloverConfiguration); + lifecycle.toXContent(builder, params, rolloverConfiguration, globalRetention); } builder.field(ROLLOVER_ON_WRITE_FIELD.getPreferredName(), rolloverOnWrite); if (autoShardingEvent != null) { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index a07cd8e60411a..45f4ddc468623 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -50,6 +50,7 @@ import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance; +import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.randomGlobalRetention; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.randomIndexInstances; import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.randomNonEmptyIndexInstances; import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE; @@ -1146,11 +1147,34 @@ public void testGetIndicesPastRetention() { ); Metadata metadata = builder.build(); - assertThat(dataStream.getIndicesPastRetention(metadata::index, () -> now).isEmpty(), is(true)); + assertThat(dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()).isEmpty(), is(true)); } { - // no retention configured so we expect an empty list + // no retention configured but we have default retention + DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention( + TimeValue.timeValueMillis(2500), + randomBoolean() ? TimeValue.timeValueMillis(randomIntBetween(2500, 5000)) : null + ); + Metadata.Builder builder = Metadata.builder(); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + creationAndRolloverTimes, + settings(IndexVersion.current()), + new DataStreamLifecycle() + ); + Metadata metadata = builder.build(); + + List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, globalRetention); + assertThat(backingIndices.size(), is(2)); + assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); + assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); + } + + { + // no retention configured but we have max retention + DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(null, TimeValue.timeValueMillis(2500)); Metadata.Builder builder = Metadata.builder(); DataStream dataStream = createDataStream( builder, @@ -1161,7 +1185,10 @@ public void testGetIndicesPastRetention() { ); Metadata metadata = builder.build(); - assertThat(dataStream.getIndicesPastRetention(metadata::index, () -> now).isEmpty(), is(true)); + List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, globalRetention); + assertThat(backingIndices.size(), is(2)); + assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); + assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); } { @@ -1175,7 +1202,7 @@ public void testGetIndicesPastRetention() { ); Metadata metadata = builder.build(); - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now); + List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()); assertThat(backingIndices.size(), is(2)); assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); @@ -1193,7 +1220,7 @@ public void testGetIndicesPastRetention() { ); Metadata metadata = builder.build(); - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now); + List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()); assertThat(backingIndices.size(), is(4)); assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); @@ -1214,7 +1241,7 @@ public void testGetIndicesPastRetention() { ); Metadata metadata = builder.build(); - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now); + List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()); assertThat(backingIndices.isEmpty(), is(true)); } @@ -1232,7 +1259,7 @@ public void testGetIndicesPastRetention() { ); Metadata metadata = builder.build(); - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now); + List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()); assertThat(backingIndices.isEmpty(), is(true)); } } @@ -1267,13 +1294,13 @@ public TimeValue getDataStreamRetention() { { // no retention configured so we expect an empty list testRetentionReference.set(null); - assertThat(dataStream.getIndicesPastRetention(metadata::index, () -> now).isEmpty(), is(true)); + assertThat(dataStream.getIndicesPastRetention(metadata::index, () -> now, null).isEmpty(), is(true)); } { // retention period where oldIndex is too old, but newIndex should be retained testRetentionReference.set(TimeValue.timeValueMillis(2500)); - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now); + List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, null); assertThat(backingIndices.size(), is(3)); assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); @@ -1283,7 +1310,7 @@ public TimeValue getDataStreamRetention() { { // even though all indices match the write index should not be returned testRetentionReference.set(TimeValue.timeValueMillis(0)); - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now); + List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, null); assertThat(backingIndices.size(), is(6)); assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); @@ -1297,7 +1324,7 @@ public TimeValue getDataStreamRetention() { { // no index matches the retention age testRetentionReference.set(TimeValue.timeValueMillis(9000)); - List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now); + List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, null); assertThat(backingIndices.isEmpty(), is(true)); } } @@ -1670,10 +1697,11 @@ public void testXContentSerializationWithRollover() throws IOException { try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { builder.humanReadable(true); RolloverConfiguration rolloverConfiguration = RolloverConfigurationTests.randomRolloverConditions(); - dataStream.toXContent(builder, ToXContent.EMPTY_PARAMS, rolloverConfiguration); + DataStreamGlobalRetention globalRetention = DataStreamGlobalRetentionSerializationTests.randomGlobalRetention(); + dataStream.toXContent(builder, ToXContent.EMPTY_PARAMS, rolloverConfiguration, globalRetention); String serialized = Strings.toString(builder); assertThat(serialized, containsString("rollover")); - for (String label : rolloverConfiguration.resolveRolloverConditions(lifecycle.getEffectiveDataRetention()) + for (String label : rolloverConfiguration.resolveRolloverConditions(lifecycle.getEffectiveDataRetention(globalRetention)) .getConditions() .keySet()) { assertThat(serialized, containsString(label)); From 9663576db3ab6c241dfd184b748c8edc847909aa Mon Sep 17 00:00:00 2001 From: gmarouli Date: Tue, 12 Mar 2024 22:11:45 +0200 Subject: [PATCH 3/7] Small touches --- .../cluster/metadata/DataStreamTestHelper.java | 13 +++++++++++++ .../DataStreamLifecycleUsageTransportAction.java | 4 ++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index 8402b5756e915..37339da092e94 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; import org.elasticsearch.index.Index; @@ -346,6 +347,18 @@ public static DataStreamAlias randomAliasInstance() { ); } + @Nullable + public static DataStreamGlobalRetention randomGlobalRetention() { + if (randomBoolean()) { + return null; + } + boolean withDefault = randomBoolean(); + return new DataStreamGlobalRetention( + withDefault ? TimeValue.timeValueDays(randomIntBetween(1, 30)) : null, + withDefault == false || randomBoolean() ? TimeValue.timeValueDays(randomIntBetween(31, 100)) : null + ); + } + /** * Constructs {@code ClusterState} with the specified data streams and indices. * diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportAction.java index fb49ba6c7e7a7..947adf9f8462f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportAction.java @@ -85,8 +85,8 @@ public static Tuple calculateStats(Collection Date: Tue, 12 Mar 2024 22:14:08 +0200 Subject: [PATCH 4/7] Add another deprecated method to help with the clean up in follow up PRs --- .../cluster/metadata/DataStreamLifecycle.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java index 904bf89b273ef..613b0564b1d72 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java @@ -132,6 +132,17 @@ public boolean isEnabled() { return enabled; } + /** + * The least amount of time data should be kept by elasticsearch. + * @return the time period or null, null represents that data should never be deleted. + * @deprecated use {@link #getEffectiveDataRetention(DataStreamGlobalRetention)} + */ + @Deprecated + @Nullable + public TimeValue getEffectiveDataRetention() { + return getEffectiveDataRetention(null); + } + /** * The least amount of time data should be kept by elasticsearch. * @return the time period or null, null represents that data should never be deleted. From daa25f1e3abca8b223ebbe0126553399117b25ad Mon Sep 17 00:00:00 2001 From: gmarouli Date: Tue, 12 Mar 2024 22:18:16 +0200 Subject: [PATCH 5/7] Remove effective retention from XContent (for now) --- .../cluster/metadata/DataStreamLifecycle.java | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java index 613b0564b1d72..a8b094bafde2e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java @@ -79,14 +79,12 @@ public static boolean isDataStreamsLifecycleOnlyMode(final Settings settings) { public static final ParseField ENABLED_FIELD = new ParseField("enabled"); public static final ParseField DATA_RETENTION_FIELD = new ParseField("data_retention"); - public static final ParseField EFFECTIVE_RETENTION_FIELD = new ParseField("effective_retention"); - public static final ParseField RETENTION_SOURCE_FIELD = new ParseField("retention_determined_by"); public static final ParseField DOWNSAMPLING_FIELD = new ParseField("downsampling"); private static final ParseField ROLLOVER_FIELD = new ParseField("rollover"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "lifecycle", - true, + false, (args, unused) -> new DataStreamLifecycle((Retention) args[0], (Downsampling) args[1], (Boolean) args[2]) ); @@ -298,18 +296,12 @@ public XContentBuilder toXContent( builder.startObject(); builder.field(ENABLED_FIELD.getPreferredName(), enabled); if (dataRetention != null) { - if (dataRetention.value == null) { + if (dataRetention.value() == null) { builder.nullField(DATA_RETENTION_FIELD.getPreferredName()); } else { - builder.field(DATA_RETENTION_FIELD.getPreferredName(), dataRetention.value.getStringRep()); + builder.field(DATA_RETENTION_FIELD.getPreferredName(), dataRetention.value().getStringRep()); } } - Tuple effectiveRetention = getEffectiveDataRetentionWithSource(globalRetention); - if (effectiveRetention.v1() != null) { - builder.field(EFFECTIVE_RETENTION_FIELD.getPreferredName(), effectiveRetention.v1().getStringRep()); - builder.field(RETENTION_SOURCE_FIELD.getPreferredName(), effectiveRetention.v2().displayName()); - } - if (downsampling != null) { builder.field(DOWNSAMPLING_FIELD.getPreferredName()); downsampling.toXContent(builder, params); From becb4f8ba056a6b933c20c6727737759c06fb8d4 Mon Sep 17 00:00:00 2001 From: gmarouli Date: Thu, 14 Mar 2024 09:08:10 +0200 Subject: [PATCH 6/7] Add test case when max retention is larger than the data stream one --- .../cluster/metadata/DataStreamLifecycleTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java index 9bd2673f53446..e3bf5260a7445 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java @@ -308,8 +308,9 @@ public void testEffectiveRetention() { assertThat(effectiveDataRetentionWithSource.v1(), equalTo(dataStreamRetention)); assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION)); + TimeValue maxGlobalRetention = randomBoolean() ? dataStreamRetention : TimeValue.timeValueDays(dataStreamRetention.days() + 1); effectiveDataRetentionWithSource = lifecycleRetention.getEffectiveDataRetentionWithSource( - new DataStreamGlobalRetention(defaultRetention, dataStreamRetention) + new DataStreamGlobalRetention(defaultRetention, maxGlobalRetention) ); assertThat(effectiveDataRetentionWithSource.v1(), equalTo(dataStreamRetention)); assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION)); From 552e5085047df4fa869f919290cc16be95469cfd Mon Sep 17 00:00:00 2001 From: gmarouli Date: Thu, 14 Mar 2024 09:18:18 +0200 Subject: [PATCH 7/7] Use a more robust way to retrieve the backing index names in retention tests --- .../cluster/metadata/DataStreamTests.java | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 45f4ddc468623..3a0ebc37c1729 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -1168,8 +1168,8 @@ public void testGetIndicesPastRetention() { List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, globalRetention); assertThat(backingIndices.size(), is(2)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); - assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); + assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); + assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); } { @@ -1187,8 +1187,8 @@ public void testGetIndicesPastRetention() { List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, globalRetention); assertThat(backingIndices.size(), is(2)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); - assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); + assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); + assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); } { @@ -1204,8 +1204,8 @@ public void testGetIndicesPastRetention() { List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()); assertThat(backingIndices.size(), is(2)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); - assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); + assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); + assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); } { @@ -1223,10 +1223,10 @@ public void testGetIndicesPastRetention() { List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, randomGlobalRetention()); assertThat(backingIndices.size(), is(4)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); - assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); - assertThat(backingIndices.get(2).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 3))); - assertThat(backingIndices.get(3).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 4))); + assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); + assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); + assertThat(backingIndices.get(2).getName(), is(dataStream.getIndices().get(2).getName())); + assertThat(backingIndices.get(3).getName(), is(dataStream.getIndices().get(3).getName())); } { @@ -1265,7 +1265,7 @@ public void testGetIndicesPastRetention() { } public void testGetIndicesPastRetentionWithOriginationDate() { - // First, build an ordinary datastream: + // First, build an ordinary data stream: String dataStreamName = "metrics-foo"; long now = System.currentTimeMillis(); List creationAndRolloverTimes = List.of( @@ -1302,9 +1302,9 @@ public TimeValue getDataStreamRetention() { testRetentionReference.set(TimeValue.timeValueMillis(2500)); List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, null); assertThat(backingIndices.size(), is(3)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); - assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); - assertThat(backingIndices.get(2).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 6))); + assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); + assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); + assertThat(backingIndices.get(2).getName(), is(dataStream.getIndices().get(5).getName())); } { @@ -1313,12 +1313,12 @@ public TimeValue getDataStreamRetention() { List backingIndices = dataStream.getIndicesPastRetention(metadata::index, () -> now, null); assertThat(backingIndices.size(), is(6)); - assertThat(backingIndices.get(0).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))); - assertThat(backingIndices.get(1).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))); - assertThat(backingIndices.get(2).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 3))); - assertThat(backingIndices.get(3).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 4))); - assertThat(backingIndices.get(4).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 5))); - assertThat(backingIndices.get(5).getName(), is(DataStream.getDefaultBackingIndexName(dataStreamName, 6))); + assertThat(backingIndices.get(0).getName(), is(dataStream.getIndices().get(0).getName())); + assertThat(backingIndices.get(1).getName(), is(dataStream.getIndices().get(1).getName())); + assertThat(backingIndices.get(2).getName(), is(dataStream.getIndices().get(2).getName())); + assertThat(backingIndices.get(3).getName(), is(dataStream.getIndices().get(3).getName())); + assertThat(backingIndices.get(4).getName(), is(dataStream.getIndices().get(4).getName())); + assertThat(backingIndices.get(5).getName(), is(dataStream.getIndices().get(5).getName())); } {