Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lazy rollover for failure stores #108108

Merged
merged 16 commits into from
May 24, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public Set<NodeFeature> getFeatures() {
DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13
DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE,
DataStreamGlobalRetention.GLOBAL_RETENTION // Added in 8.14
DataStreamGlobalRetention.GLOBAL_RETENTION, // Added in 8.14
LazyRolloverAction.FAILURE_STORE_LAZY_ROLLOVER // Added in 8.15
nielsbauman marked this conversation as resolved.
Show resolved Hide resolved
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ setup:
- requires:
cluster_features: ["gte_v8.15.0"]
reason: "data stream failure stores REST structure changed in 8.15+"
test_runner_features: allowed_warnings
test_runner_features: [allowed_warnings, contains]

- do:
allowed_warnings:
Expand All @@ -27,13 +27,24 @@ setup:
name: data-stream-for-rollover

---
teardown:
- do:
indices.delete_data_stream:
name: data-stream-for-lazy-rollover
ignore: 404

- do:
ingest.delete_pipeline:
id: failing_pipeline
ignore: 404
---
"Roll over a data stream's failure store without conditions":
# rollover data stream to create new backing index
- do:
indices.rollover:
alias: "data-stream-for-rollover"
target_failure_store: true

- match: { acknowledged: true }
- match: { old_index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
- match: { new_index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
- match: { rolled_over: true }
Expand All @@ -54,7 +65,6 @@ setup:

---
"Roll over a data stream's failure store with conditions":
# index first document and wait for refresh
- do:
index:
index: data-stream-for-rollover
Expand All @@ -63,7 +73,6 @@ setup:
'@timestamp': '2020-12-12'
count: 'invalid value'

# rollover data stream to create new backing index
- do:
indices.rollover:
alias: "data-stream-for-rollover"
Expand All @@ -72,6 +81,7 @@ setup:
conditions:
max_docs: 1

- match: { acknowledged: true }
- match: { old_index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
- match: { new_index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
- match: { rolled_over: true }
Expand All @@ -92,7 +102,6 @@ setup:

---
"Don't roll over a data stream's failure store when conditions aren't met":
# rollover data stream to create new backing index
- do:
indices.rollover:
alias: "data-stream-for-rollover"
Expand All @@ -101,6 +110,182 @@ setup:
conditions:
max_docs: 1

- match: { acknowledged: false }
- match: { rolled_over: false }
- match: { dry_run: false }

- do:
indices.get_data_stream:
name: "*"
- match: { data_streams.0.name: data-stream-for-rollover }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
- match: { data_streams.0.generation: 1 }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- length: { data_streams.0.failure_store.indices: 1 }
- match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }

---
"Lazily roll over a data stream's failure store after a shard failure":
- requires:
cluster_features: ["data_stream.rollover.lazy.failure_store"]
reason: "data stream failure store lazy rollover only supported in 8.15+"
test_runner_features: allowed_warnings

# Mark the failure store for lazy rollover
- do:
indices.rollover:
alias: "data-stream-for-rollover"
target_failure_store: true
lazy: true

- match: { acknowledged: true }
- match: { rolled_over: false }
- match: { dry_run: false }
nielsbauman marked this conversation as resolved.
Show resolved Hide resolved

- do:
indices.get_data_stream:
name: "*"
- match: { data_streams.0.name: data-stream-for-rollover }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
- match: { data_streams.0.generation: 1 }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- length: { data_streams.0.failure_store.indices: 1 }
- match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }

- do:
index:
index: data-stream-for-rollover
refresh: true
body:
'@timestamp': '2020-12-12'
count: 'invalid value'

- do:
indices.get_data_stream:
name: "*"
- match: { data_streams.0.name: data-stream-for-rollover }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
# Both backing and failure indices use the same generation field.
- match: { data_streams.0.generation: 2 }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- length: { data_streams.0.failure_store.indices: 2 }
- match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { data_streams.0.failure_store.indices.1.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }

- do:
search:
index: .fs-data-stream-for-rollover-*
- length: { hits.hits: 1 }
- match: { hits.hits.0._index: "/\\.fs-data-stream-for-rollover-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000002/" }
- exists: hits.hits.0._source.@timestamp
- not_exists: hits.hits.0._source.count
- match: { hits.hits.0._source.document.index: 'data-stream-for-rollover' }
- match: { hits.hits.0._source.document.source.@timestamp: '2020-12-12' }
- match: { hits.hits.0._source.document.source.count: 'invalid value' }
- match: { hits.hits.0._source.error.type: 'document_parsing_exception' }

---
"Lazily roll over a data stream's failure store after an ingest failure":
- requires:
cluster_features: ["data_stream.rollover.lazy.failure_store"]
reason: "data stream failure store lazy rollover only supported in 8.15+"
test_runner_features: allowed_warnings

- do:
ingest.put_pipeline:
id: "failing_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"fail" : {
"message" : "error_message"
}
}
]
}
- match: { acknowledged: true }

- do:
allowed_warnings:
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
indices.put_index_template:
name: my-template
body:
index_patterns: [data-*]
data_stream:
failure_store: true
template:
settings:
index:
default_pipeline: "failing_pipeline"

- do:
indices.create_data_stream:
name: data-stream-for-lazy-rollover

# Mark the failure store for lazy rollover
- do:
indices.rollover:
alias: data-stream-for-lazy-rollover
target_failure_store: true
lazy: true

- match: { acknowledged: true }
- match: { rolled_over: false }
- match: { dry_run: false }
nielsbauman marked this conversation as resolved.
Show resolved Hide resolved

- do:
indices.get_data_stream:
name: "*"
- match: { data_streams.0.name: data-stream-for-lazy-rollover }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
- match: { data_streams.0.generation: 1 }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-lazy-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- length: { data_streams.0.failure_store.indices: 1 }
- match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-lazy-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }

- do:
index:
index: data-stream-for-lazy-rollover
refresh: true
body:
'@timestamp': '2020-12-12'
count: 1

- do:
indices.get_data_stream:
name: "*"
- match: { data_streams.0.name: data-stream-for-lazy-rollover }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
# Both backing and failure indices use the same generation field.
- match: { data_streams.0.generation: 2 }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-lazy-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- length: { data_streams.0.failure_store.indices: 2 }
- match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-lazy-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- match: { data_streams.0.failure_store.indices.1.index_name: '/\.fs-data-stream-for-lazy-rollover-(\d{4}\.\d{2}\.\d{2}-)?000002/' }

---
"A failure store marked for lazy rollover should only be rolled over when there is a failure":
- requires:
cluster_features: ["data_stream.rollover.lazy.failure_store"]
reason: "data stream failure store lazy rollover only supported in 8.15+"
test_runner_features: allowed_warnings

# Mark the failure store for lazy rollover
- do:
indices.rollover:
alias: "data-stream-for-rollover"
target_failure_store: true
lazy: true

- match: { acknowledged: true }
- match: { rolled_over: false }
- match: { dry_run: false }

Expand All @@ -114,3 +299,23 @@ setup:
- match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- length: { data_streams.0.failure_store.indices: 1 }
- match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }

- do:
index:
index: data-stream-for-rollover
refresh: true
body:
'@timestamp': '2020-12-12'
count: 3

- do:
indices.get_data_stream:
name: "*"
- match: { data_streams.0.name: data-stream-for-rollover }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
# Both backing and failure indices use the same generation field.
- match: { data_streams.0.generation: 1 }
- length: { data_streams.0.indices: 1 }
- match: { data_streams.0.indices.0.index_name: '/\.ds-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
- length: { data_streams.0.failure_store.indices: 1 }
- match: { data_streams.0.failure_store.indices.0.index_name: '/\.fs-data-stream-for-rollover-(\d{4}\.\d{2}\.\d{2}-)?000001/' }
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
Expand All @@ -35,6 +36,7 @@
public final class LazyRolloverAction extends ActionType<RolloverResponse> {

public static final NodeFeature DATA_STREAM_LAZY_ROLLOVER = new NodeFeature("data_stream.rollover.lazy");
public static final NodeFeature FAILURE_STORE_LAZY_ROLLOVER = new NodeFeature("data_stream.rollover.lazy.failure_store");
nielsbauman marked this conversation as resolved.
Show resolved Hide resolved

public static final LazyRolloverAction INSTANCE = new LazyRolloverAction();
public static final String NAME = "indices:admin/data_stream/lazy_rollover";
Expand All @@ -61,7 +63,8 @@ public TransportLazyRolloverAction(
AllocationService allocationService,
MetadataDataStreamsService metadataDataStreamsService,
DataStreamAutoShardingService dataStreamAutoShardingService,
Client client
Client client,
FeatureService featureService
) {
super(
LazyRolloverAction.INSTANCE,
Expand All @@ -74,7 +77,8 @@ public TransportLazyRolloverAction(
client,
allocationService,
metadataDataStreamsService,
dataStreamAutoShardingService
dataStreamAutoShardingService,
featureService
);
}

Expand All @@ -99,7 +103,7 @@ protected void masterOperation(
rolloverRequest.getRolloverTarget(),
rolloverRequest.getNewIndexName(),
rolloverRequest.getCreateIndexRequest(),
false
rolloverRequest.indicesOptions().failureStoreOptions().includeFailureIndices()
);
final String trialSourceIndexName = trialRolloverNames.sourceName();
final String trialRolloverIndexName = trialRolloverNames.rolloverName();
Expand All @@ -121,13 +125,9 @@ protected void masterOperation(
String source = "lazy_rollover source [" + trialRolloverIndexName + "] to target [" + trialRolloverIndexName + "]";
// We create a new rollover request to ensure that it doesn't contain any other parameters apart from the data stream name
// This will provide a more resilient user experience
RolloverTask rolloverTask = new RolloverTask(
new RolloverRequest(rolloverRequest.getRolloverTarget(), null),
null,
trialRolloverResponse,
null,
listener
);
var newRolloverRequest = new RolloverRequest(rolloverRequest.getRolloverTarget(), null);
newRolloverRequest.setIndicesOptions(rolloverRequest.indicesOptions());
RolloverTask rolloverTask = new RolloverTask(newRolloverRequest, null, trialRolloverResponse, null, listener);
submitRolloverTask(rolloverRequest, source, rolloverTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,6 @@ public ActionRequestValidationException validate() {
);
}

if (failureStoreOptions.includeFailureIndices() && lazy) {
validationException = addValidationError("lazily rolling over a failure store is currently not supported", validationException);
nielsbauman marked this conversation as resolved.
Show resolved Hide resolved
}

return validationException;
}

Expand Down