Skip to content

Commit

Permalink
Data stream lifecycle manages failure store indices (#107478)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmarouli committed May 22, 2024
1 parent 2a8a723 commit 6cbaa7d
Show file tree
Hide file tree
Showing 9 changed files with 485 additions and 110 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.action.downsample.DownsampleAction;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
Expand Down Expand Up @@ -359,7 +360,7 @@ void run(ClusterState state) {
indicesToExcludeForRemainingRun.addAll(
timeSeriesIndicesStillWithinTimeBounds(
state.metadata(),
getTargetIndices(dataStream, indicesToExcludeForRemainingRun, state.metadata()::index),
getTargetIndices(dataStream, indicesToExcludeForRemainingRun, state.metadata()::index, false),
nowSupplier
)
);
Expand All @@ -381,7 +382,10 @@ void run(ClusterState state) {

try {
indicesToExcludeForRemainingRun.addAll(
maybeExecuteForceMerge(state, getTargetIndices(dataStream, indicesToExcludeForRemainingRun, state.metadata()::index))
maybeExecuteForceMerge(
state,
getTargetIndices(dataStream, indicesToExcludeForRemainingRun, state.metadata()::index, true)
)
);
} catch (Exception e) {
logger.error(
Expand All @@ -399,7 +403,7 @@ void run(ClusterState state) {
maybeExecuteDownsampling(
state,
dataStream,
getTargetIndices(dataStream, indicesToExcludeForRemainingRun, state.metadata()::index)
getTargetIndices(dataStream, indicesToExcludeForRemainingRun, state.metadata()::index, false)
)
);
} catch (Exception e) {
Expand Down Expand Up @@ -735,18 +739,31 @@ private void addIndexBlockOnce(String indexName) {
/**
* Returns the data stream lifecycle managed indices that are not part of the set of indices to exclude.
*/
private static List<Index> getTargetIndices(
// For testing
static List<Index> getTargetIndices(
DataStream dataStream,
Set<Index> indicesToExcludeForRemainingRun,
Function<String, IndexMetadata> indexMetadataSupplier
Function<String, IndexMetadata> indexMetadataSupplier,
boolean withFailureStore
) {
return dataStream.getIndices()
.stream()
.filter(
index -> dataStream.isIndexManagedByDataStreamLifecycle(index, indexMetadataSupplier)
&& indicesToExcludeForRemainingRun.contains(index) == false
)
.toList();
List<Index> targetIndices = new ArrayList<>();
for (Index index : dataStream.getIndices()) {
if (dataStream.isIndexManagedByDataStreamLifecycle(index, indexMetadataSupplier)
&& indicesToExcludeForRemainingRun.contains(index) == false) {
targetIndices.add(index);
}
}
if (withFailureStore
&& DataStream.isFailureStoreFeatureFlagEnabled()
&& dataStream.getFailureIndices().getIndices().isEmpty() == false) {
for (Index index : dataStream.getFailureIndices().getIndices()) {
if (dataStream.isIndexManagedByDataStreamLifecycle(index, indexMetadataSupplier)
&& indicesToExcludeForRemainingRun.contains(index) == false) {
targetIndices.add(index);
}
}
}
return targetIndices;
}

/**
Expand Down Expand Up @@ -776,19 +793,36 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) {
}

/**
* This method will attempt to rollover the write index of a data stream. The rollover will occur only if the conditions
* This method will attempt to roll over the write index of a data stream. The rollover will occur only if the conditions
* apply. In any case, we return the write backing index back to the caller, so it can be excluded from the next steps.
* @return the write index of this data stream before rollover was requested.
*/
private Set<Index> maybeExecuteRollover(ClusterState state, DataStream dataStream) {
Index currentRunWriteIndex = dataStream.getWriteIndex();
Set<Index> currentRunWriteIndices = new HashSet<>();
currentRunWriteIndices.add(maybeExecuteRollover(state, dataStream, false));
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true);
if (failureStoreWriteIndex != null) {
currentRunWriteIndices.add(failureStoreWriteIndex);
}
}
return currentRunWriteIndices;
}

@Nullable
private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) {
Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getFailureStoreWriteIndex() : dataStream.getWriteIndex();
if (currentRunWriteIndex == null) {
return null;
}
try {
if (dataStream.isIndexManagedByDataStreamLifecycle(currentRunWriteIndex, state.metadata()::index)) {
RolloverRequest rolloverRequest = getDefaultRolloverRequest(
rolloverConfiguration,
dataStream.getName(),
dataStream.getLifecycle()
.getEffectiveDataRetention(dataStream.isSystem() ? null : globalRetentionResolver.resolve(state))
.getEffectiveDataRetention(dataStream.isSystem() ? null : globalRetentionResolver.resolve(state)),
rolloverFailureStore
);
transportActionsDeduplicator.executeOnce(
rolloverRequest,
Expand All @@ -797,7 +831,8 @@ private Set<Index> maybeExecuteRollover(ClusterState state, DataStream dataStrea
currentRunWriteIndex.getName(),
errorStore,
Strings.format(
"Data stream lifecycle encountered an error trying to rollover data steam [%s]",
"Data stream lifecycle encountered an error trying to roll over%s data stream [%s]",
rolloverFailureStore ? " the failure store of " : "",
dataStream.getName()
),
signallingErrorRetryInterval
Expand All @@ -807,7 +842,12 @@ private Set<Index> maybeExecuteRollover(ClusterState state, DataStream dataStrea
}
} catch (Exception e) {
logger.error(
() -> String.format(Locale.ROOT, "Data stream lifecycle failed to rollover data stream [%s]", dataStream.getName()),
() -> String.format(
Locale.ROOT,
"Data stream lifecycle encountered an error trying to roll over%s data stream [%s]",
rolloverFailureStore ? " the failure store of " : "",
dataStream.getName()
),
e
);
DataStream latestDataStream = clusterService.state().metadata().dataStreams().get(dataStream.getName());
Expand All @@ -819,7 +859,7 @@ private Set<Index> maybeExecuteRollover(ClusterState state, DataStream dataStrea
}
}
}
return Set.of(currentRunWriteIndex);
return currentRunWriteIndex;
}

/**
Expand All @@ -833,16 +873,15 @@ private Set<Index> maybeExecuteRollover(ClusterState state, DataStream dataStrea
*/
private Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStream, Set<Index> indicesToExcludeForRemainingRun) {
Metadata metadata = state.metadata();
DataStreamGlobalRetention globalRetention = globalRetentionResolver.resolve(state);
DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionResolver.resolve(state);
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier, globalRetention);
if (backingIndicesOlderThanRetention.isEmpty()) {
return Set.of();
}
Set<Index> indicesToBeRemoved = new HashSet<>();
// We know that there is lifecycle and retention because there are indices to be deleted
assert dataStream.getLifecycle() != null;
TimeValue effectiveDataRetention = dataStream.getLifecycle()
.getEffectiveDataRetention(dataStream.isSystem() ? null : globalRetention);
TimeValue effectiveDataRetention = dataStream.getLifecycle().getEffectiveDataRetention(globalRetention);
for (Index index : backingIndicesOlderThanRetention) {
if (indicesToExcludeForRemainingRun.contains(index) == false) {
IndexMetadata backingIndex = metadata.index(index);
Expand Down Expand Up @@ -898,6 +937,11 @@ private Set<Index> maybeExecuteForceMerge(ClusterState state, List<Index> indice
if ((configuredFloorSegmentMerge == null || configuredFloorSegmentMerge.equals(targetMergePolicyFloorSegment) == false)
|| (configuredMergeFactor == null || configuredMergeFactor.equals(targetMergePolicyFactor) == false)) {
UpdateSettingsRequest updateMergePolicySettingsRequest = new UpdateSettingsRequest();
updateMergePolicySettingsRequest.indicesOptions(
IndicesOptions.builder(updateMergePolicySettingsRequest.indicesOptions())
.failureStoreOptions(new IndicesOptions.FailureStoreOptions(true, true))
.build()
);
updateMergePolicySettingsRequest.indices(indexName);
updateMergePolicySettingsRequest.settings(
Settings.builder()
Expand Down Expand Up @@ -976,7 +1020,7 @@ public void onFailure(Exception e) {
DataStream dataStream = clusterService.state().metadata().dataStreams().get(rolloverTarget);
if (dataStream == null || dataStream.getWriteIndex().getName().equals(writeIndexName) == false) {
// the data stream has another write index so no point in recording an error for the previous write index we were
// attempting to rollover
// attempting to roll over
// if there are persistent issues with rolling over this data stream, the next data stream lifecycle run will attempt to
// rollover the _current_ write index and the error problem should surface then
listener.onResponse(null);
Expand Down Expand Up @@ -1351,9 +1395,17 @@ static void recordAndLogError(
static RolloverRequest getDefaultRolloverRequest(
RolloverConfiguration rolloverConfiguration,
String dataStream,
TimeValue dataRetention
TimeValue dataRetention,
boolean rolloverFailureStore
) {
RolloverRequest rolloverRequest = new RolloverRequest(dataStream, null).masterNodeTimeout(TimeValue.MAX_VALUE);
if (rolloverFailureStore) {
rolloverRequest.setIndicesOptions(
IndicesOptions.builder(rolloverRequest.indicesOptions())
.failureStoreOptions(opts -> opts.includeFailureIndices(true).includeRegularIndices(false))
.build()
);
}
rolloverRequest.setConditions(rolloverConfiguration.resolveRolloverConditions(dataRetention));
return rolloverRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,21 @@ public static DataStream createDataStream(
Settings.Builder backingIndicesSettings,
@Nullable DataStreamLifecycle lifecycle,
Long now
) {
return createDataStream(builder, dataStreamName, backingIndicesCount, 0, backingIndicesSettings, lifecycle, now);
}

public static DataStream createDataStream(
Metadata.Builder builder,
String dataStreamName,
int backingIndicesCount,
int failureIndicesCount,
Settings.Builder backingIndicesSettings,
@Nullable DataStreamLifecycle lifecycle,
Long now
) {
final List<Index> backingIndices = new ArrayList<>();
final List<Index> failureIndices = new ArrayList<>();
for (int k = 1; k <= backingIndicesCount; k++) {
IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, k))
.settings(backingIndicesSettings)
Expand All @@ -70,7 +83,22 @@ public static DataStream createDataStream(
builder.put(indexMetadata, false);
backingIndices.add(indexMetadata.getIndex());
}
return newInstance(dataStreamName, backingIndices, backingIndicesCount, null, false, lifecycle);
for (int k = 1; k <= failureIndicesCount; k++) {
IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultFailureStoreName(dataStreamName, k, now))
.settings(backingIndicesSettings)
.numberOfShards(1)
.numberOfReplicas(0)
.creationDate(now - 3000L);
if (k < failureIndicesCount) {
// add rollover info only for non-write indices
MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L));
indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L));
}
IndexMetadata indexMetadata = indexMetaBuilder.build();
builder.put(indexMetadata, false);
failureIndices.add(indexMetadata.getIndex());
}
return newInstance(dataStreamName, backingIndices, backingIndicesCount, null, false, lifecycle, failureIndices);
}

static void putComposableIndexTemplate(
Expand Down
Loading

0 comments on commit 6cbaa7d

Please sign in to comment.