Skip to content

Commit

Permalink
Initial implementation for DataLifecycleService (#94012)
Browse files Browse the repository at this point in the history
This adds support for managing the lifecycle of data streams. It
currently supports rollover and data retention.
  • Loading branch information
andreidan committed Feb 27, 2023
1 parent 95daf49 commit 4760f00
Show file tree
Hide file tree
Showing 13 changed files with 929 additions and 17 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/94012.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 94012
summary: Initial implementation for `DataLifecycleService`
area: DLM
type: feature
issues: []
1 change: 1 addition & 0 deletions modules/data-streams/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
requires org.apache.lucene.core;

exports org.elasticsearch.datastreams.action to org.elasticsearch.server;
exports org.elasticsearch.datastreams to org.elasticsearch.dlm;
}
5 changes: 3 additions & 2 deletions modules/dlm/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import org.elasticsearch.gradle.internal.info.BuildParams

apply plugin: 'elasticsearch.internal-es-plugin'
apply plugin: 'elasticsearch.internal-cluster-test'

Expand All @@ -9,5 +7,8 @@ esplugin {
classname 'org.elasticsearch.dlm.DataLifecyclePlugin'
}
archivesBaseName = 'dlm'
dependencies {
testImplementation project(':modules:data-streams')
}

addQaCheckDependencies(project)
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.dlm;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataLifecycle;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

public class DataLifecycleServiceIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataLifecyclePlugin.class, DataStreamsPlugin.class, MockTransportService.TestPlugin.class);
}

protected boolean ignoreExternalCluster() {
return true;
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
settings.put(DataLifecycleService.DLM_POLL_INTERVAL, "1s");
return settings.build();
}

public void testRolloverLifecycle() throws Exception {
// empty lifecycle contains the default rollover
DataLifecycle lifecycle = new DataLifecycle();

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle);
Iterable<DataLifecycleService> dataLifecycleServices = internalCluster().getInstances(DataLifecycleService.class);

for (DataLifecycleService dataLifecycleService : dataLifecycleServices) {
dataLifecycleService.setDefaultRolloverRequestSupplier((target) -> {
RolloverRequest rolloverRequest = new RolloverRequest(target, null);
rolloverRequest.addMaxIndexDocsCondition(1);
return rolloverRequest;
});
}
String dataStreamName = "metrics-foo";
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();

indexDocs(dataStreamName, 1);

assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(2));
String backingIndex = backingIndices.get(0).getName();
assertThat(backingIndex, backingIndexEqualTo(dataStreamName, 1));
String writeIndex = backingIndices.get(1).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});
}

public void testRolloverAndRetention() throws Exception {
DataLifecycle lifecycle = new DataLifecycle(TimeValue.timeValueMillis(0));

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle);
Iterable<DataLifecycleService> dataLifecycleServices = internalCluster().getInstances(DataLifecycleService.class);

for (DataLifecycleService dataLifecycleService : dataLifecycleServices) {
dataLifecycleService.setDefaultRolloverRequestSupplier((target) -> {
RolloverRequest rolloverRequest = new RolloverRequest(target, null);
rolloverRequest.addMaxIndexDocsCondition(1);
return rolloverRequest;
});
}
String dataStreamName = "metrics-foo";
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();

indexDocs(dataStreamName, 1);

assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
assertThat(backingIndices.size(), equalTo(1));
// we expect the data stream to have only one backing index, the write one, with generation 2
// as generation 1 would've been deleted by DLM given the lifecycle configuration
String writeIndex = backingIndices.get(0).getName();
assertThat(writeIndex, backingIndexEqualTo(dataStreamName, 2));
});
}

static void indexDocs(String dataStream, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
String value = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
bulkRequest.add(
new IndexRequest(dataStream).opType(DocWriteRequest.OpType.CREATE)
.source(String.format(Locale.ROOT, "{\"%s\":\"%s\"}", DEFAULT_TIMESTAMP_FIELD, value), XContentType.JSON)
);
}
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertThat(bulkResponse.getItems().length, equalTo(numDocs));
String backingIndexPrefix = DataStream.BACKING_INDEX_PREFIX + dataStream;
for (BulkItemResponse itemResponse : bulkResponse) {
assertThat(itemResponse.getFailureMessage(), nullValue());
assertThat(itemResponse.status(), equalTo(RestStatus.CREATED));
assertThat(itemResponse.getIndex(), startsWith(backingIndexPrefix));
}
client().admin().indices().refresh(new RefreshRequest(dataStream)).actionGet();
}

static void putComposableIndexTemplate(
String id,
@Nullable String mappings,
List<String> patterns,
@Nullable Settings settings,
@Nullable Map<String, Object> metadata,
@Nullable DataLifecycle lifecycle
) throws IOException {
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id);
request.indexTemplate(
new ComposableIndexTemplate(
patterns,
new Template(settings, mappings == null ? null : CompressedXContent.fromJSON(mappings), null, lifecycle),
null,
null,
null,
metadata,
new ComposableIndexTemplate.DataStreamTemplate(),
null
)
);
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
}

}
2 changes: 2 additions & 0 deletions modules/dlm/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@
requires org.elasticsearch.xcontent;
requires org.apache.lucene.core;
requires org.apache.logging.log4j;

exports org.elasticsearch.dlm;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@

package org.elasticsearch.dlm;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.metadata.DataLifecycle;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.ActionPlugin;
Expand All @@ -25,17 +31,21 @@
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;

import java.io.IOException;
import java.time.Clock;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

import static org.elasticsearch.cluster.metadata.DataLifecycle.DLM_ORIGIN;

/**
* Plugin encapsulating Data Lifecycle Management Service.
*/
public class DataLifecyclePlugin extends Plugin implements ActionPlugin {

private final Settings settings;
private final SetOnce<DataLifecycleService> dataLifecycleInitialisationService = new SetOnce<>();

public DataLifecyclePlugin(Settings settings) {
this.settings = settings;
Expand Down Expand Up @@ -66,20 +76,39 @@ public Collection<Object> createComponents(
Tracer tracer,
AllocationService allocationService
) {
return super.createComponents(
client,
clusterService,
threadPool,
resourceWatcherService,
scriptService,
xContentRegistry,
environment,
nodeEnvironment,
namedWriteableRegistry,
indexNameExpressionResolver,
repositoriesServiceSupplier,
tracer,
allocationService
if (DataLifecycle.isEnabled() == false) {
return List.of();
}

dataLifecycleInitialisationService.set(
new DataLifecycleService(
settings,
new OriginSettingClient(client, DLM_ORIGIN),
clusterService,
getClock(),
threadPool,
threadPool::absoluteTimeInMillis
)
);
dataLifecycleInitialisationService.get().init();
return List.of(dataLifecycleInitialisationService.get());
}

@Override
public List<Setting<?>> getSettings() {
if (DataLifecycle.isEnabled() == false) {
return List.of();
}

return List.of(DataLifecycleService.DLM_POLL_INTERVAL_SETTING);
}

@Override
public void close() throws IOException {
try {
IOUtils.close(dataLifecycleInitialisationService.get());
} catch (IOException e) {
throw new ElasticsearchException("unable to close the data lifecycle service", e);
}
}
}

0 comments on commit 4760f00

Please sign in to comment.