Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.cluster.SimpleBatchedExecutor;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -796,7 +797,7 @@ private void maybeExecuteRollover(ClusterState state, DataStream dataStream) {
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
rolloverConfiguration,
dataStream.getName(),
dataStream.getLifecycle().getEffectiveDataRetention()
dataStream.getLifecycle().getEffectiveDataRetention(DataStreamGlobalRetention.getFromClusterState(state))
);
transportActionsDeduplicator.executeOnce(
rolloverRequest,
Expand All @@ -823,14 +824,15 @@ private void maybeExecuteRollover(ClusterState state, DataStream dataStream) {
*/
private Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
Metadata metadata = state.metadata();
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier);
DataStreamGlobalRetention globalRetention = DataStreamGlobalRetention.getFromClusterState(state);
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier, globalRetention);
if (backingIndicesOlderThanRetention.isEmpty()) {
return Set.of();
}
Set<Index> indicesToBeRemoved = new HashSet<>();
// We know that there is lifecycle and retention because there are indices to be deleted
assert dataStream.getLifecycle() != null;
TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention();
TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention(globalRetention);
for (Index index : backingIndicesOlderThanRetention) {
if (indicesToExcludeForRemainingRun.contains(index) == false) {
IndexMetadata backingIndex = metadata.index(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,13 +759,17 @@ public DataStream snapshot(Collection<String> indicesInSnapshot) {
* NOTE that this specifically does not return the write index of the data stream as usually retention
* is treated differently for the write index (i.e. they first need to be rolled over)
*/
public List<Index> getIndicesPastRetention(Function<String, IndexMetadata> indexMetadataSupplier, LongSupplier nowSupplier) {
if (lifecycle == null || lifecycle.isEnabled() == false || lifecycle.getEffectiveDataRetention() == null) {
public List<Index> getIndicesPastRetention(
Function<String, IndexMetadata> indexMetadataSupplier,
LongSupplier nowSupplier,
DataStreamGlobalRetention globalRetention
) {
if (lifecycle == null || lifecycle.isEnabled() == false || lifecycle.getEffectiveDataRetention(globalRetention) == null) {
return List.of();
}

List<Index> indicesPastRetention = getNonWriteIndicesOlderThan(
lifecycle.getEffectiveDataRetention(),
lifecycle.getEffectiveDataRetention(globalRetention),
indexMetadataSupplier,
this::isIndexManagedByDataStreamLifecycle,
nowSupplier
Expand Down Expand Up @@ -1098,14 +1102,18 @@ public static DataStream fromXContent(XContentParser parser) throws IOException

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return toXContent(builder, params, null);
return toXContent(builder, params, null, null);
}

/**
* Converts the data stream to XContent and passes the RolloverConditions, when provided, to the lifecycle.
*/
public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nullable RolloverConfiguration rolloverConfiguration)
throws IOException {
public XContentBuilder toXContent(
XContentBuilder builder,
Params params,
@Nullable RolloverConfiguration rolloverConfiguration,
@Nullable DataStreamGlobalRetention globalRetention
) throws IOException {
builder.startObject();
builder.field(NAME_FIELD.getPreferredName(), name);
builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName())
Expand All @@ -1132,7 +1140,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nulla
}
if (lifecycle != null) {
builder.field(LIFECYCLE.getPreferredName());
lifecycle.toXContent(builder, params, rolloverConfiguration);
lifecycle.toXContent(builder, params, rolloverConfiguration, globalRetention);
}
builder.field(ROLLOVER_ON_WRITE_FIELD.getPreferredName(), rolloverOnWrite);
if (autoShardingEvent != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.xcontent.AbstractObjectParser;
import org.elasticsearch.xcontent.ConstructingObjectParser;
Expand All @@ -34,6 +35,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Objects;

import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
Expand Down Expand Up @@ -131,15 +133,52 @@ public boolean isEnabled() {
/**
* The least amount of time data should be kept by elasticsearch.
* @return the time period or null, null represents that data should never be deleted.
* @deprecated use {@link #getEffectiveDataRetention(DataStreamGlobalRetention)}
*/
@Deprecated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to not just delete this method? It's not part of any public API, and it looks like it's only used in 2 tests.

Copy link
Contributor Author

@gmarouli gmarouli Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be easier to track down the places that it is being used to properly update them in the follow up PR, because I will be able to use the Find usages.

It's a convenience thing and they will be removed in the next PR.

Of course, if you feel strongly about it I can just inline it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future PR sounds good.

@Nullable
public TimeValue getEffectiveDataRetention() {
return getDataStreamRetention();
return getEffectiveDataRetention(null);
}

/**
* The least amount of time data should be kept by elasticsearch.
* @return the time period or null, null represents that data should never be deleted.
*/
@Nullable
public TimeValue getEffectiveDataRetention(@Nullable DataStreamGlobalRetention globalRetention) {
return getEffectiveDataRetentionWithSource(globalRetention).v1();
}

/**
* The least amount of time data should be kept by elasticsearch.
* @return the time period or null, null represents that data should never be deleted.
*/
@Nullable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like this can actually ever return null, can it? Also, maybe best to make this package private? The only reason it needs to be exposed at all is for a unit test (and do we even need the source there since it doesn't seem to be used in any production code, or could it just use the method above (I haven't read the test yet so maybe it will become clear then)?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at the test, I think we could test the same things without using the RetentionSource at all. Then we could get rid of this method and the RetentionSource enum, and just test getEffectiveDataRetention directly, right? Or is this something that's going to be used in a future PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's something coming in a future PR. When we return the lifecycle in a few GET APIs, the lifecycle section will the following structure:

{
  "enabled": true,
  "data_retention": "365d",
  "effective_retention": "90d",
  "retention_determined_by": "max_global_retention"
}

I thought it makes sense to calculate them together always to avoid duplicating this logic.

public Tuple<TimeValue, RetentionSource> getEffectiveDataRetentionWithSource(@Nullable DataStreamGlobalRetention globalRetention) {
// If lifecycle is disabled there is no effective retention
if (enabled == false) {
return Tuple.tuple(null, RetentionSource.DATA_STREAM_CONFIGURATION);
}
var dataStreamRetention = getDataStreamRetention();
if (globalRetention == null) {
return Tuple.tuple(dataStreamRetention, RetentionSource.DATA_STREAM_CONFIGURATION);
}
if (dataStreamRetention == null) {
return globalRetention.getDefaultRetention() != null
? Tuple.tuple(globalRetention.getDefaultRetention(), RetentionSource.DEFAULT_GLOBAL_RETENTION)
: Tuple.tuple(globalRetention.getMaxRetention(), RetentionSource.MAX_GLOBAL_RETENTION);
}
if (globalRetention.getMaxRetention() != null && globalRetention.getMaxRetention().getMillis() < dataStreamRetention.getMillis()) {
return Tuple.tuple(globalRetention.getMaxRetention(), RetentionSource.MAX_GLOBAL_RETENTION);
} else {
return Tuple.tuple(dataStreamRetention, RetentionSource.DATA_STREAM_CONFIGURATION);
}
}

/**
* The least amount of time data the data stream is requesting es to keep the data.
* NOTE: this can be overriden by the {@link DataStreamLifecycle#getEffectiveDataRetention()}.
* NOTE: this can be overridden by the {@link DataStreamLifecycle#getEffectiveDataRetention(DataStreamGlobalRetention)}.
* @return the time period or null, null represents that data should never be deleted.
*/
@Nullable
Expand Down Expand Up @@ -232,14 +271,28 @@ public String toString() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return toXContent(builder, params, null);
return toXContent(builder, params, null, null);
}

/**
* Converts the data stream lifecycle to XContent and injects the RolloverConditions if they exist.
* @deprecated use {@link #toXContent(XContentBuilder, Params, RolloverConfiguration, DataStreamGlobalRetention)}
*/
@Deprecated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used in 4 places -- might as well just remove this method rather than adding deprecation, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be easier to track down the places that it is being used to properly update them in the follow up PR, because I will be able to use the Find usages.

It's a convenience thing and they will be removed in the next PR.

Of course, if you feel strongly about it I can just inline it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with removing them in a follow-up PR!

public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nullable RolloverConfiguration rolloverConfiguration)
throws IOException {
return toXContent(builder, params, rolloverConfiguration, null);
}

/**
* Converts the data stream lifecycle to XContent and injects the RolloverConditions and the global retention if they exist.
*/
public XContentBuilder toXContent(
XContentBuilder builder,
Params params,
@Nullable RolloverConfiguration rolloverConfiguration,
@Nullable DataStreamGlobalRetention globalRetention
) throws IOException {
builder.startObject();
builder.field(ENABLED_FIELD.getPreferredName(), enabled);
if (dataRetention != null) {
Expand All @@ -255,7 +308,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nulla
}
if (rolloverConfiguration != null) {
builder.field(ROLLOVER_FIELD.getPreferredName());
rolloverConfiguration.evaluateAndConvertToXContent(builder, params, getEffectiveDataRetention());
rolloverConfiguration.evaluateAndConvertToXContent(builder, params, getEffectiveDataRetention(globalRetention));
}
builder.endObject();
return builder;
Expand Down Expand Up @@ -466,4 +519,17 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}
}

/**
* This enum represents all configuration sources that can influence the retention of a data stream.
*/
public enum RetentionSource {
DATA_STREAM_CONFIGURATION,
DEFAULT_GLOBAL_RETENTION,
MAX_GLOBAL_RETENTION;

public String displayName() {
return this.toString().toLowerCase(Locale.ROOT);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -33,6 +34,9 @@
import java.util.Set;
import java.util.stream.Stream;

import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DATA_STREAM_CONFIGURATION;
import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DEFAULT_GLOBAL_RETENTION;
import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.MAX_GLOBAL_RETENTION;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -107,10 +111,11 @@ public void testXContentSerializationWithRollover() throws IOException {
try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
builder.humanReadable(true);
RolloverConfiguration rolloverConfiguration = RolloverConfigurationTests.randomRolloverConditions();
lifecycle.toXContent(builder, ToXContent.EMPTY_PARAMS, rolloverConfiguration);
DataStreamGlobalRetention globalRetention = DataStreamGlobalRetentionSerializationTests.randomGlobalRetention();
lifecycle.toXContent(builder, ToXContent.EMPTY_PARAMS, rolloverConfiguration, globalRetention);
String serialized = Strings.toString(builder);
assertThat(serialized, containsString("rollover"));
for (String label : rolloverConfiguration.resolveRolloverConditions(lifecycle.getEffectiveDataRetention())
for (String label : rolloverConfiguration.resolveRolloverConditions(lifecycle.getEffectiveDataRetention(globalRetention))
.getConditions()
.keySet()) {
assertThat(serialized, containsString(label));
Expand Down Expand Up @@ -253,6 +258,72 @@ public void testInvalidDownsamplingConfiguration() {
}
}

public void testEffectiveRetention() {
// No retention in the data stream lifecycle
{
DataStreamLifecycle noRetentionLifecycle = DataStreamLifecycle.newBuilder().downsampling(randomDownsampling()).build();
TimeValue maxRetention = TimeValue.timeValueDays(randomIntBetween(50, 100));
TimeValue defaultRetention = TimeValue.timeValueDays(randomIntBetween(1, 50));
Tuple<TimeValue, DataStreamLifecycle.RetentionSource> effectiveDataRetentionWithSource = noRetentionLifecycle
.getEffectiveDataRetentionWithSource(null);
assertThat(effectiveDataRetentionWithSource.v1(), nullValue());
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION));

effectiveDataRetentionWithSource = noRetentionLifecycle.getEffectiveDataRetentionWithSource(
new DataStreamGlobalRetention(null, maxRetention)
);
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(maxRetention));
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(MAX_GLOBAL_RETENTION));

effectiveDataRetentionWithSource = noRetentionLifecycle.getEffectiveDataRetentionWithSource(
new DataStreamGlobalRetention(defaultRetention, null)
);
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(defaultRetention));
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DEFAULT_GLOBAL_RETENTION));

effectiveDataRetentionWithSource = noRetentionLifecycle.getEffectiveDataRetentionWithSource(
new DataStreamGlobalRetention(defaultRetention, maxRetention)
);
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(defaultRetention));
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DEFAULT_GLOBAL_RETENTION));
}

// With retention in the data stream lifecycle
{
TimeValue dataStreamRetention = TimeValue.timeValueDays(randomIntBetween(5, 100));
DataStreamLifecycle lifecycleRetention = DataStreamLifecycle.newBuilder()
.dataRetention(dataStreamRetention)
.downsampling(randomDownsampling())
.build();
TimeValue defaultRetention = TimeValue.timeValueDays(randomIntBetween(1, (int) dataStreamRetention.getDays() - 1));

Tuple<TimeValue, DataStreamLifecycle.RetentionSource> effectiveDataRetentionWithSource = lifecycleRetention
.getEffectiveDataRetentionWithSource(null);
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(dataStreamRetention));
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION));

effectiveDataRetentionWithSource = lifecycleRetention.getEffectiveDataRetentionWithSource(
new DataStreamGlobalRetention(defaultRetention, null)
);
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(dataStreamRetention));
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION));

TimeValue maxGlobalRetention = randomBoolean() ? dataStreamRetention : TimeValue.timeValueDays(dataStreamRetention.days() + 1);
effectiveDataRetentionWithSource = lifecycleRetention.getEffectiveDataRetentionWithSource(
new DataStreamGlobalRetention(defaultRetention, maxGlobalRetention)
);
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(dataStreamRetention));
assertThat(effectiveDataRetentionWithSource.v2(), equalTo(DATA_STREAM_CONFIGURATION));

TimeValue maxRetentionLessThanDataStream = TimeValue.timeValueDays(dataStreamRetention.days() - 1);
effectiveDataRetentionWithSource = lifecycleRetention.getEffectiveDataRetentionWithSource(
new DataStreamGlobalRetention(randomBoolean() ? null : TimeValue.timeValueDays(10), maxRetentionLessThanDataStream)
);
assertThat(effectiveDataRetentionWithSource.v1(), equalTo(maxRetentionLessThanDataStream));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor thing, but I think one case that isn't covered is when max retention is greater than data stream retention (you cover it being equal to above, and less than here). Seems worth having in a test just because it's likely the most common situation.

assertThat(effectiveDataRetentionWithSource.v2(), equalTo(MAX_GLOBAL_RETENTION));
}
}

@Nullable
public static DataStreamLifecycle randomLifecycle() {
return DataStreamLifecycle.newBuilder()
Expand Down
Loading