Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/127573.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 127573
summary: "[Failure store] Introduce default retention for failure indices"
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ public void testSystemDataStreamRetention() throws Exception {
builder,
withEffectiveRetention,
getDataStreamResponse.getRolloverConfiguration(),
getDataStreamResponse.getGlobalRetention()
getDataStreamResponse.getDataGlobalRetention(),
getDataStreamResponse.getFailuresGlobalRetention()
);
String serialized = Strings.toString(builder);
Map<String, Object> resultMap = XContentHelper.convertToMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,12 @@ public void testDataStreamRetention() throws Exception {
@SuppressWarnings("unchecked")
public void testDefaultRetention() throws Exception {
// Set default global retention
updateClusterSettings(Settings.builder().put("data_streams.lifecycle.retention.default", "10s").build());
updateClusterSettings(
Settings.builder()
.put("data_streams.lifecycle.retention.default", "10s")
.put("data_streams.lifecycle.retention.failures_default", "10s")
.build()
);

// Verify that the effective retention matches the default retention
{
Expand All @@ -163,7 +168,7 @@ public void testDefaultRetention() throws Exception {
assertThat(lifecycle.get("data_retention"), nullValue());
Map<String, Object> failuresLifecycle = ((Map<String, Map<String, Object>>) dataStream.get("failure_store")).get("lifecycle");
assertThat(failuresLifecycle.get("effective_retention"), is("10s"));
assertThat(failuresLifecycle.get("retention_determined_by"), is("default_global_retention"));
assertThat(failuresLifecycle.get("retention_determined_by"), is("default_failures_retention"));
assertThat(failuresLifecycle.get("data_retention"), nullValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public TransportGetDataStreamsAction(
threadPool,
actionFilters,
GetDataStreamAction.Request::new,
GetDataStreamAction.Response::new,
GetDataStreamAction.Response::read,
transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT)
);
this.indexNameExpressionResolver = indexNameExpressionResolver;
Expand Down Expand Up @@ -287,7 +287,8 @@ public int compareTo(IndexInfo o) {
return new GetDataStreamAction.Response(
dataStreamInfos,
request.includeDefaults() ? clusterSettings.get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING) : null,
globalRetentionSettings.get()
globalRetentionSettings.get(false),
globalRetentionSettings.get(true)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.cluster.SimpleBatchedExecutor;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
Expand Down Expand Up @@ -354,13 +353,18 @@ void run(ClusterState state) {
continue;
}

// Retrieve the effective retention to ensure the same retention is used for this data stream
// through all operations.
var dataRetention = getEffectiveRetention(dataStream, globalRetentionSettings, false);
var failuresRetention = getEffectiveRetention(dataStream, globalRetentionSettings, true);

// the following indices should not be considered for the remainder of this service run, for various reasons.
Set<Index> indicesToExcludeForRemainingRun = new HashSet<>();

// These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed,
// depending on rollover criteria, for this reason we exclude them for the remaining run.
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, false));
Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, dataRetention, false));
Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, failuresRetention, true);
if (failureStoreWriteIndex != null) {
indicesToExcludeForRemainingRun.add(failureStoreWriteIndex);
}
Expand All @@ -376,7 +380,9 @@ void run(ClusterState state) {
);

try {
indicesToExcludeForRemainingRun.addAll(maybeExecuteRetention(state, dataStream, indicesToExcludeForRemainingRun));
indicesToExcludeForRemainingRun.addAll(
maybeExecuteRetention(state, dataStream, dataRetention, failuresRetention, indicesToExcludeForRemainingRun)
);
} catch (Exception e) {
// individual index errors would be reported via the API action listener for every delete call
// we could potentially record errors at a data stream level and expose it via the _data_stream API?
Expand Down Expand Up @@ -807,7 +813,12 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) {
}

@Nullable
private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) {
private Index maybeExecuteRollover(
ClusterState state,
DataStream dataStream,
TimeValue effectiveRetention,
boolean rolloverFailureStore
) {
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex();
if (currentRunWriteIndex == null) {
return null;
Expand All @@ -818,7 +829,7 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
rolloverConfiguration,
dataStream.getName(),
lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(), dataStream.isInternal()),
effectiveRetention,
rolloverFailureStore
);
transportActionsDeduplicator.executeOnce(
Expand Down Expand Up @@ -868,14 +879,17 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo
* @param indicesToExcludeForRemainingRun Indices to exclude from retention even if it would be time for them to be deleted
* @return The set of indices that delete requests have been sent for
*/
Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
Metadata metadata = state.metadata();
DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get();
var dataRetention = getRetention(dataStream, globalRetention, false);
var failureRetention = getRetention(dataStream, globalRetention, true);
Set<Index> maybeExecuteRetention(
ClusterState state,
DataStream dataStream,
TimeValue dataRetention,
TimeValue failureRetention,
Set<Index> indicesToExcludeForRemainingRun
) {
if (dataRetention == null && failureRetention == null) {
return Set.of();
}
Metadata metadata = state.metadata();
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(
metadata::index,
nowSupplier,
Expand Down Expand Up @@ -1320,11 +1334,15 @@ private static boolean isForceMergeComplete(IndexMetadata backingIndex) {
}

@Nullable
private static TimeValue getRetention(DataStream dataStream, DataStreamGlobalRetention globalRetention, boolean failureStore) {
private static TimeValue getEffectiveRetention(
DataStream dataStream,
DataStreamGlobalRetentionSettings globalRetentionSettings,
boolean failureStore
) {
DataStreamLifecycle lifecycle = failureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle();
return lifecycle == null || lifecycle.enabled() == false
? null
: lifecycle.getEffectiveDataRetention(globalRetention, dataStream.isInternal());
: lifecycle.getEffectiveDataRetention(globalRetentionSettings.get(failureStore), dataStream.isInternal());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ protected void masterOperation(
new ExplainDataStreamLifecycleAction.Response(
explainIndices,
request.includeDefaults() ? clusterSettings.get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING) : null,
globalRetentionSettings.get()
globalRetentionSettings.get(false),
globalRetentionSettings.get(true)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class RestGetDataStreamsAction extends BaseRestHandler {
public static final String FAILURES_LIFECYCLE_API_CAPABILITY = "failure_store.lifecycle";
private static final Set<String> CAPABILITIES = Set.of(
DataStreamLifecycle.EFFECTIVE_RETENTION_REST_API_CAPABILITY,
FAILURES_LIFECYCLE_API_CAPABILITY
FAILURES_LIFECYCLE_API_CAPABILITY,
"failure_store.lifecycle.default_retention"
);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase

@Override
protected Writeable.Reader<Response> instanceReader() {
return Response::new;
return Response::read;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ public void testPassingGlobalRetention() {
emptyDataStreamFailureStoreSettings,
null
);
assertThat(response.getGlobalRetention(), nullValue());
DataStreamGlobalRetention globalRetention = new DataStreamGlobalRetention(
assertThat(response.getDataGlobalRetention(), nullValue());
DataStreamGlobalRetention dataGlobalRetention = new DataStreamGlobalRetention(
TimeValue.timeValueDays(randomIntBetween(1, 5)),
TimeValue.timeValueDays(randomIntBetween(5, 10))
);
Expand All @@ -361,9 +361,9 @@ public void testPassingGlobalRetention() {
Settings.builder()
.put(
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING.getKey(),
globalRetention.defaultRetention()
dataGlobalRetention.defaultRetention()
)
.put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), globalRetention.maxRetention())
.put(DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING.getKey(), dataGlobalRetention.maxRetention())
.build()
)
);
Expand All @@ -377,7 +377,9 @@ public void testPassingGlobalRetention() {
emptyDataStreamFailureStoreSettings,
null
);
assertThat(response.getGlobalRetention(), equalTo(globalRetention));
assertThat(response.getDataGlobalRetention(), equalTo(dataGlobalRetention));
// We used the default failures retention here which is greater than the max
assertThat(response.getFailuresGlobalRetention(), equalTo(new DataStreamGlobalRetention(null, dataGlobalRetention.maxRetention())));
}

public void testDataStreamIsFailureStoreEffectivelyEnabled_disabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1566,20 +1566,33 @@ public void testMaybeExecuteRetentionSuccessfulDownsampledIndex() {
ClusterState state = downsampleSetup(dataStreamName, SUCCESS);
DataStream dataStream = state.metadata().dataStreams().get(dataStreamName);
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();

// Executing the method to be tested:
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of());
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
clusterService.state(),
dataStream,
dataRetention,
null,
Set.of()
);
assertThat(indicesToBeRemoved, contains(state.getMetadata().index(firstGenIndexName).getIndex()));
}

public void testMaybeExecuteRetentionDownsampledIndexInProgress() {
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
ClusterState state = downsampleSetup(dataStreamName, STARTED);
DataStream dataStream = state.metadata().dataStreams().get(dataStreamName);
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();

// Executing the method to be tested:
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of());
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
clusterService.state(),
dataStream,
dataRetention,
null,
Set.of()
);
assertThat(indicesToBeRemoved, empty());
}

Expand All @@ -1588,9 +1601,16 @@ public void testMaybeExecuteRetentionDownsampledUnknown() {
ClusterState state = downsampleSetup(dataStreamName, UNKNOWN);
DataStream dataStream = state.metadata().dataStreams().get(dataStreamName);
String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention();

// Executing the method to be tested:
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of());
Set<Index> indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(
clusterService.state(),
dataStream,
dataRetention,
null,
Set.of()
);
assertThat(indicesToBeRemoved, contains(state.getMetadata().index(firstGenIndexName).getIndex()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,23 @@ teardown:
- match: { data_streams.0.template: 'my-template1' }
- match: { data_streams.0.failure_store.enabled: true }
- match: { data_streams.0.failure_store.lifecycle.enabled: false }
- is_false: data_streams.0.failure_store.lifecycle.data_retention
- is_false: data_streams.0.failure_store.lifecycle.effective_retention
- is_false: data_streams.0.failure_store.lifecycle.retention_determined_by
- length: { data_streams.0.failure_store.indices: 1 }
- match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-fs-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
- is_false: data_streams.0.failure_store.indices.0.prefer_ilm
- match: { data_streams.0.failure_store.indices.0.managed_by: 'Unmanaged' }

---
"Get failure store info from cluster setting enabled failure store":
- requires:
test_runner_features: [ capabilities ]
reason: "Default retention for failures was added in 8.19+"
capabilities:
- method: GET
path: /_data_stream/{target}
capabilities: [ 'failure_store.lifecycle.default_retention' ]
- do:
indices.create_data_stream:
name: fs-default-data-stream
Expand All @@ -212,6 +222,9 @@ teardown:
- match: { data_streams.0.template: 'my-template2' }
- match: { data_streams.0.failure_store.enabled: true }
- match: { data_streams.0.failure_store.lifecycle.enabled: true }
- is_false: data_streams.0.failure_store.lifecycle.data_retention
- match: { data_streams.0.failure_store.lifecycle.effective_retention: '30d' }
- match: { data_streams.0.failure_store.lifecycle.retention_determined_by: 'default_failures_retention' }
- match: { data_streams.0.failure_store.indices: [] }

# Initialize failure store
Expand All @@ -234,6 +247,9 @@ teardown:
- match: { data_streams.0.template: 'my-template2' }
- match: { data_streams.0.failure_store.enabled: true }
- match: { data_streams.0.failure_store.lifecycle.enabled: true }
- is_false: data_streams.0.failure_store.lifecycle.data_retention
- match: { data_streams.0.failure_store.lifecycle.effective_retention: '30d' }
- match: { data_streams.0.failure_store.lifecycle.retention_determined_by: 'default_failures_retention' }
- length: { data_streams.0.failure_store.indices: 1 }
- match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-fs-default-data-stream-(\d{4}\.\d{2}\.\d{2}-)?000002/' }
- is_false: data_streams.0.failure_store.indices.0.prefer_ilm
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ static TransportVersion def(int id) {
public static final TransportVersion PINNED_RETRIEVER_8_19 = def(8_841_0_23);
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_BLOCK_8_19 = def(8_841_0_24);
public static final TransportVersion INTRODUCE_FAILURES_LIFECYCLE_BACKPORT_8_19 = def(8_841_0_25);
public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION_BACKPORT_8_19 = def(8_841_0_26);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Loading