Skip to content

Commit

Permalink
[DLM] Introduce default rollover cluster setting & expose it via APIs (
Browse files Browse the repository at this point in the history
…#94240)

For managing data streams with DLM we chose to have one cluster setting that will determine the rollover conditions for all data streams. This PR introduces this cluster setting, it exposes it via the 3 existing APIs under the flag `include_defaults` and adjusts DLM to use it. The feature remains behind a feature flag.
  • Loading branch information
gmarouli committed Mar 7, 2023
1 parent 1bd5ad9 commit fe20d92
Show file tree
Hide file tree
Showing 35 changed files with 855 additions and 148 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/94240.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 94240
summary: "[DLM] Introduce default rollover cluster setting & expose it via APIs"
area: DLM
type: feature
issues: []
7 changes: 5 additions & 2 deletions docs/reference/indices/get-component-template.asciidoc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[[getting-component-templates]]
[[getting-component-templates]]
=== Get component template API
++++
<titleabbrev>Get component template</titleabbrev>
Expand All @@ -10,7 +10,7 @@ Retrieves information about one or more component templates.
[source,console]
--------------------------------------------------
PUT /_component_template/template_1
PUT /_component_template/template_1
{
"template": {
"settings": {
Expand Down Expand Up @@ -71,6 +71,9 @@ include::{docdir}/rest-api/common-parms.asciidoc[tag=local]

include::{docdir}/rest-api/common-parms.asciidoc[tag=master-timeout]

`include_defaults`::
(Optional, Boolean) Functionality in experimental:[]. If `true`, return all default settings in the response.
Defaults to `false`.

[[get-component-template-api-example]]
==== {api-examples-title}
Expand Down
23 changes: 23 additions & 0 deletions docs/reference/indices/get-data-stream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=ds-expand-wildcards]
+
Defaults to `open`.

`include_defaults`::
(Optional, Boolean) Functionality in experimental:[]. If `true`, return all default settings in the response.
Defaults to `false`.

[role="child_attributes"]
[[get-data-stream-api-response-body]]
==== {api-response-body-title}
Expand Down Expand Up @@ -216,6 +220,25 @@ If `true`, the data stream this data stream allows custom routing on write reque
(Boolean)
If `true`, the data stream is created and managed by {ccr} and the local
cluster can not write into this data stream or change its mappings.
`lifecycle`::
(object)
Functionality in experimental:[]. Contains the configuration for the data lifecycle management of this data stream.
+
.Properties of `lifecycle`
[%collapsible%open]
=====
`data_retention`::
(string)
If defined, every document added to this data stream will be stored at least for this time frame. Any time after this
duration the document could be deleted. When empty, every document in this data stream will be stored indefinitely.

`rollover`::
(object)
The conditions which will trigger the rollover of a backing index as configured by the cluster setting
`cluster.dlm.default.rollover`. This property is an implementation detail and it will only be retrieved when the query
param `include_defaults` is set to `true`. The contents of this field are subject to change.
=====
====

[[get-data-stream-api-example]]
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/indices/get-index-template.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ include::{docdir}/rest-api/common-parms.asciidoc[tag=local]

include::{docdir}/rest-api/common-parms.asciidoc[tag=master-timeout]

`include_defaults`::
(Optional, Boolean) Functionality in experimental:[]. If `true`, return all default settings in the response.
Defaults to `false`.

[[get-template-api-example]]
==== {api-examples-title}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.DataLifecycle;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
Expand All @@ -46,6 +48,7 @@ public class GetDataStreamsTransportAction extends TransportMasterNodeReadAction

private static final Logger LOGGER = LogManager.getLogger(GetDataStreamsTransportAction.class);
private final SystemIndices systemIndices;
private final ClusterSettings clusterSettings;

@Inject
public GetDataStreamsTransportAction(
Expand All @@ -68,6 +71,7 @@ public GetDataStreamsTransportAction(
ThreadPool.Names.SAME
);
this.systemIndices = systemIndices;
clusterSettings = clusterService.getClusterSettings();
}

@Override
Expand All @@ -77,14 +81,15 @@ protected void masterOperation(
ClusterState state,
ActionListener<GetDataStreamAction.Response> listener
) throws Exception {
listener.onResponse(innerOperation(state, request, indexNameExpressionResolver, systemIndices));
listener.onResponse(innerOperation(state, request, indexNameExpressionResolver, systemIndices, clusterSettings));
}

static GetDataStreamAction.Response innerOperation(
ClusterState state,
GetDataStreamAction.Request request,
IndexNameExpressionResolver indexNameExpressionResolver,
SystemIndices systemIndices
SystemIndices systemIndices,
ClusterSettings clusterSettings
) {
List<DataStream> dataStreams = getDataStreams(state, indexNameExpressionResolver, request);
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = new ArrayList<>(dataStreams.size());
Expand Down Expand Up @@ -165,7 +170,12 @@ static GetDataStreamAction.Response innerOperation(
)
);
}
return new GetDataStreamAction.Response(dataStreamInfos);
return new GetDataStreamAction.Response(
dataStreamInfos,
request.includeDefaults() && DataLifecycle.isEnabled()
? clusterSettings.get(DataLifecycle.CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING)
: null
);
}

static List<DataStream> getDataStreams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.DataLifecycle;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
Expand Down Expand Up @@ -39,6 +40,9 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
GetDataStreamAction.Request getDataStreamsRequest = new GetDataStreamAction.Request(
Strings.splitStringByCommaToArray(request.param("name"))
);
if (DataLifecycle.isEnabled()) {
getDataStreamsRequest.includeDefaults(request.paramAsBoolean("include_defaults", false));
}
getDataStreamsRequest.indicesOptions(IndicesOptions.fromRequest(request, getDataStreamsRequest.indicesOptions()));
return channel -> client.execute(GetDataStreamAction.INSTANCE, getDataStreamsRequest, new RestToXContentListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndices;
Expand Down Expand Up @@ -149,7 +150,13 @@ public void testGetTimeSeriesDataStream() {
}

var req = new GetDataStreamAction.Request(new String[] {});
var response = GetDataStreamsTransportAction.innerOperation(state, req, resolver, systemIndices);
var response = GetDataStreamsTransportAction.innerOperation(
state,
req,
resolver,
systemIndices,
ClusterSettings.createBuiltInClusterSettings()
);
assertThat(response.getDataStreams(), hasSize(2));
assertThat(response.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStream1));
assertThat(response.getDataStreams().get(0).getTimeSeries().temporalRanges(), contains(new Tuple<>(sixHoursAgo, twoHoursAhead)));
Expand All @@ -164,7 +171,13 @@ public void testGetTimeSeriesDataStream() {
mBuilder.remove(dataStream.getIndices().get(1).getName());
state = ClusterState.builder(state).metadata(mBuilder).build();
}
response = GetDataStreamsTransportAction.innerOperation(state, req, resolver, systemIndices);
response = GetDataStreamsTransportAction.innerOperation(
state,
req,
resolver,
systemIndices,
ClusterSettings.createBuiltInClusterSettings()
);
assertThat(response.getDataStreams(), hasSize(2));
assertThat(response.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStream1));
assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- do:
indices.get_data_stream:
name: "dlm-managed-data-stream"
include_defaults: true
- match: { data_streams.0.name: dlm-managed-data-stream }
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
- match: { data_streams.0.generation: 1 }
Expand All @@ -34,3 +35,4 @@
- match: { data_streams.0.template: 'template-with-lifecycle' }
- match: { data_streams.0.hidden: false }
- match: { data_streams.0.lifecycle.data_retention: '30d' }
- is_true: data_streams.0.lifecycle.rollover
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -62,6 +60,7 @@ protected boolean ignoreExternalCluster() {
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
settings.put(DataLifecycleService.DLM_POLL_INTERVAL, "1s");
settings.put(DataLifecycle.CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING.getKey(), "min_docs=1,max_docs=1");
return settings.build();
}

Expand All @@ -70,15 +69,6 @@ public void testRolloverLifecycle() throws Exception {
DataLifecycle lifecycle = new DataLifecycle();

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle);
Iterable<DataLifecycleService> dataLifecycleServices = internalCluster().getInstances(DataLifecycleService.class);

for (DataLifecycleService dataLifecycleService : dataLifecycleServices) {
dataLifecycleService.setDefaultRolloverRequestSupplier((target) -> {
RolloverRequest rolloverRequest = new RolloverRequest(target, null);
rolloverRequest.setConditions(RolloverConditions.newBuilder().addMaxIndexDocsCondition(1L).build());
return rolloverRequest;
});
}
String dataStreamName = "metrics-foo";
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
Expand All @@ -104,15 +94,7 @@ public void testRolloverAndRetention() throws Exception {
DataLifecycle lifecycle = new DataLifecycle(TimeValue.timeValueMillis(0));

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle);
Iterable<DataLifecycleService> dataLifecycleServices = internalCluster().getInstances(DataLifecycleService.class);

for (DataLifecycleService dataLifecycleService : dataLifecycleServices) {
dataLifecycleService.setDefaultRolloverRequestSupplier((target) -> {
RolloverRequest rolloverRequest = new RolloverRequest(target, null);
rolloverRequest.setConditions(RolloverConditions.newBuilder().addMaxIndexDocsCondition(1L).build());
return rolloverRequest;
});
}

String dataStreamName = "metrics-foo";
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.DataLifecycle;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
Expand All @@ -32,7 +33,6 @@
import org.elasticsearch.common.scheduler.TimeValueSchedule;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
Expand All @@ -46,7 +46,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.function.LongSupplier;

/**
Expand Down Expand Up @@ -77,11 +76,9 @@ public class DataLifecycleService implements ClusterStateListener, Closeable, Sc
private final Clock clock;
private volatile boolean isMaster = false;
private volatile TimeValue pollInterval;
private volatile RolloverConditions rolloverConditions;
private SchedulerEngine.Job scheduledJob;
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
// we use this rollover supplier to facilitate testing until we'll be able to read the
// rollover configuration from a cluster setting
private Function<String, RolloverRequest> defaultRolloverRequestSupplier;

public DataLifecycleService(
Settings settings,
Expand All @@ -99,7 +96,7 @@ public DataLifecycleService(
this.nowSupplier = nowSupplier;
this.scheduledJob = null;
this.pollInterval = DLM_POLL_INTERVAL_SETTING.get(settings);
this.defaultRolloverRequestSupplier = this::getDefaultRolloverRequest;
this.rolloverConditions = clusterService.getClusterSettings().get(DataLifecycle.CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING);
}

/**
Expand All @@ -108,6 +105,8 @@ public DataLifecycleService(
public void init() {
clusterService.addListener(this);
clusterService.getClusterSettings().addSettingsUpdateConsumer(DLM_POLL_INTERVAL_SETTING, this::updatePollInterval);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(DataLifecycle.CLUSTER_DLM_DEFAULT_ROLLOVER_SETTING, this::updateRolloverConditions);
}

@Override
Expand Down Expand Up @@ -178,7 +177,7 @@ void run(ClusterState state) {

private void maybeExecuteRollover(ClusterState state, DataStream dataStream) {
if (dataStream.isIndexManagedByDLM(dataStream.getWriteIndex(), state.metadata()::index)) {
RolloverRequest rolloverRequest = defaultRolloverRequestSupplier.apply(dataStream.getName());
RolloverRequest rolloverRequest = getDefaultRolloverRequest(dataStream.getName());
transportActionsDeduplicator.executeOnce(
rolloverRequest,
ActionListener.noop(),
Expand Down Expand Up @@ -290,16 +289,7 @@ static TimeValue getCreationOrRolloverDate(String rolloverTarget, IndexMetadata

private RolloverRequest getDefaultRolloverRequest(String dataStream) {
RolloverRequest rolloverRequest = new RolloverRequest(dataStream, null).masterNodeTimeout(TimeValue.MAX_VALUE);
rolloverRequest.setConditions(
RolloverConditions.newBuilder()
// TODO get rollover from cluster setting once we have it
.addMaxIndexAgeCondition(TimeValue.timeValueDays(7))
.addMaxPrimaryShardSizeCondition(ByteSizeValue.ofGb(50))
.addMaxPrimaryShardDocsCondition(200_000_000L)
// don't rollover an empty index
.addMinIndexDocsCondition(1L)
.build()
);
rolloverRequest.setConditions(rolloverConditions);
return rolloverRequest;
}

Expand All @@ -308,6 +298,10 @@ private void updatePollInterval(TimeValue newInterval) {
maybeScheduleJob();
}

private void updateRolloverConditions(RolloverConditions newRolloverConditions) {
this.rolloverConditions = newRolloverConditions;
}

private void cancelJob() {
if (scheduler.get() != null) {
scheduler.get().remove(DATA_LIFECYCLE_JOB_NAME);
Expand Down Expand Up @@ -340,9 +334,4 @@ private void maybeScheduleJob() {
scheduledJob = new SchedulerEngine.Job(DATA_LIFECYCLE_JOB_NAME, new TimeValueSchedule(pollInterval));
scheduler.get().add(scheduledJob);
}

// package visibility for testing
void setDefaultRolloverRequestSupplier(Function<String, RolloverRequest> defaultRolloverRequestSupplier) {
this.defaultRolloverRequestSupplier = defaultRolloverRequestSupplier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
"local":{
"type":"boolean",
"description":"Return local information, do not retrieve the state from master node (default: false)"
},
"include_defaults":{
"type":"boolean",
"description":"Return all default configurations for the component template (default: false)"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
],
"default":"open",
"description":"Whether wildcard expressions should get expanded to open or closed indices (default: open)"
},
"include_defaults":{
"type":"boolean",
"description":"Return all relevant default configurations for the data stream (default: false)"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
"local":{
"type":"boolean",
"description":"Return local information, do not retrieve the state from master node (default: false)"
},
"include_defaults":{
"type":"boolean",
"description":"Return all relevant default configurations for the index template (default: false)"
}
}
}
Expand Down

0 comments on commit fe20d92

Please sign in to comment.