Skip to content

Commit

Permalink
Adding origination date to DLM (#95113)
Browse files Browse the repository at this point in the history
This adds support for an index's index.lifecycle.origination_date setting in DLM. If an index has a
value for index.lifecycle.origination_date then it is used instead of the creation date or rollover
date (except in the case of the write index when the write index has not been rolled over yet).
  • Loading branch information
masseyke committed Apr 19, 2023
1 parent cffe0f0 commit efcdbbc
Show file tree
Hide file tree
Showing 9 changed files with 517 additions and 84 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/95113.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 95113
summary: Adding origination date to DLM
area: DLM
type: enhancement
issues: []
8 changes: 4 additions & 4 deletions docs/reference/dlm/apis/explain-data-lifecycle.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ shown.
data stream)
<7> The generation time of the index represents the time since the index started progressing
towards the user configurable / business specific parts of the lifecycle (e.g. retention).
Every index will have to wait for it to be rolled over before being able to progress to the
business-specific part of the lifecycle (i.e. the index advances in its lifecycle after it
stops being the write index of a data stream). If the index has not been rolled over the
`generation_time` will not be reported.
The `generation_time` is calculated from the origination date if it exists, or from the
rollover date if it exists, or from the creation date if neither of the other two exist.
If the index is the write index the `generation_time` will not be reported because it is not
eligible for retention or other parts of the lifecycle.

The `explain` will also report any errors related to the lifecycle execution for the target
index:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataLifecycle;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -41,10 +43,13 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -130,6 +135,74 @@ public void testRolloverAndRetention() throws Exception {
});
}

public void testOriginationDate() throws Exception {
/*
* In this test, we set up a datastream with 7 day retention. Then we add two indices to it -- one with an origination date 365
* days ago, and one with an origination date 1 day ago. After DLM runs, we expect the one with the old origination date to have
* been deleted, and the one with the newer origination date to remain.
*/
DataLifecycle lifecycle = new DataLifecycle(TimeValue.timeValueDays(7).millis());

putComposableIndexTemplate("id1", null, List.of("metrics-foo*"), null, null, lifecycle);

String dataStreamName = "metrics-foo";
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).get();

indexDocs(dataStreamName, 1);

String mapping = """
{
"properties":{
"@timestamp": {
"type": "date"
}
}
}""";
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request("id2");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("index_*"),
new Template(null, CompressedXContent.fromJSON(mapping), null, null),
null,
null,
null,
null,
null,
null
)
);
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();

String indexWithOldOriginationDate = "index_old";
long originTimeMillis = System.currentTimeMillis() - TimeValue.timeValueDays(365).millis();
createIndex(indexWithOldOriginationDate, Settings.builder().put(LIFECYCLE_ORIGINATION_DATE, originTimeMillis).build());
client().execute(
ModifyDataStreamsAction.INSTANCE,
new ModifyDataStreamsAction.Request(List.of(DataStreamAction.addBackingIndex(dataStreamName, indexWithOldOriginationDate)))
).get();

String indexWithNewOriginationDate = "index_new";
originTimeMillis = System.currentTimeMillis() - TimeValue.timeValueDays(1).millis();
createIndex(indexWithNewOriginationDate, Settings.builder().put(LIFECYCLE_ORIGINATION_DATE, originTimeMillis).build());
client().execute(
ModifyDataStreamsAction.INSTANCE,
new ModifyDataStreamsAction.Request(List.of(DataStreamAction.addBackingIndex(dataStreamName, indexWithNewOriginationDate)))
).get();

assertBusy(() -> {
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request(new String[] { dataStreamName });
GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest)
.actionGet();
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
List<Index> backingIndices = getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices();
Set<String> indexNames = backingIndices.stream().map(index -> index.getName()).collect(Collectors.toSet());
assertTrue(indexNames.contains("index_new"));
assertFalse(indexNames.contains("index_old"));
});
}

public void testUpdatingLifecycleAppliesToAllBackingIndices() throws Exception {
DataLifecycle lifecycle = new DataLifecycle();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.dlm.DataLifecycleErrorStore;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -87,16 +88,18 @@ protected void masterOperation(
}
DataStream parentDataStream = indexAbstraction.getParentDataStream();
if (parentDataStream == null || parentDataStream.isIndexManagedByDLM(idxMetadata.getIndex(), metadata::index) == false) {
explainIndices.add(new ExplainIndexDataLifecycle(index, false, null, null, null, null));
explainIndices.add(new ExplainIndexDataLifecycle(index, false, null, null, null, null, null));
continue;
}

RolloverInfo rolloverInfo = idxMetadata.getRolloverInfos().get(parentDataStream.getName());
TimeValue generationDate = parentDataStream.getGenerationLifecycleDate(idxMetadata);
ExplainIndexDataLifecycle explainIndexDataLifecycle = new ExplainIndexDataLifecycle(
index,
true,
idxMetadata.getCreationDate(),
rolloverInfo == null ? null : rolloverInfo.getTime(),
generationDate,
parentDataStream.getLifecycle(),
errorStore.getError(index)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public void testToXContent() throws IOException {
explainIndexMap.get("time_since_rollover"),
is(explainIndex.getTimeSinceRollover(() -> now).toHumanReadableString(2))
);
}
if (explainIndex.getGenerationTime(() -> now) != null) {
assertThat(
explainIndexMap.get("generation_time"),
is(explainIndex.getGenerationTime(() -> now).toHumanReadableString(2))
Expand Down Expand Up @@ -145,6 +147,8 @@ public void testToXContent() throws IOException {
explainIndexMap.get("time_since_rollover"),
is(explainIndex.getTimeSinceRollover(() -> now).toHumanReadableString(2))
);
}
if (explainIndex.getGenerationTime(() -> now) != null) {
assertThat(
explainIndexMap.get("generation_time"),
is(explainIndex.getGenerationTime(() -> now).toHumanReadableString(2))
Expand All @@ -161,6 +165,35 @@ public void testToXContent() throws IOException {
assertThat(lifecycleRollover.get("max_primary_shard_docs"), is(9));
}
}
{
// Make sure generation_date is not present if it is null (which it is for a write index):
ExplainIndexDataLifecycle explainIndexWithNullGenerationDate = new ExplainIndexDataLifecycle(
randomAlphaOfLengthBetween(10, 30),
true,
now,
randomBoolean() ? now + TimeValue.timeValueDays(1).getMillis() : null,
null,
lifecycle,
randomBoolean() ? new NullPointerException("bad times").getMessage() : null
);
Response response = new Response(List.of(explainIndexWithNullGenerationDate), null);

XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
response.toXContentChunked(EMPTY_PARAMS).forEachRemaining(xcontent -> {
try {
xcontent.toXContent(builder, EMPTY_PARAMS);
} catch (IOException e) {
logger.error(e.getMessage(), e);
fail(e.getMessage());
}
});
Map<String, Object> xContentMap = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2();
Map<String, Object> indices = (Map<String, Object>) xContentMap.get("indices");
assertThat(indices.size(), is(1));
Map<String, Object> explainIndexMap = (Map<String, Object>) indices.get(explainIndexWithNullGenerationDate.getIndex());
assertThat(explainIndexMap.get("managed_by_dlm"), is(true));
assertThat(explainIndexMap.get("generation_time"), is(nullValue()));
}
}

public void testChunkCount() {
Expand All @@ -186,6 +219,7 @@ private static ExplainIndexDataLifecycle createRandomIndexDLMExplanation(long no
true,
now,
randomBoolean() ? now + TimeValue.timeValueDays(1).getMillis() : null,
randomBoolean() ? TimeValue.timeValueMillis(now) : null,
lifecycle,
randomBoolean() ? new NullPointerException("bad times").getMessage() : null
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class ExplainIndexDataLifecycle implements Writeable, ToXContentObject {
@Nullable
private final Long rolloverDate;
@Nullable
private final Long generationDateMillis;
@Nullable
private final DataLifecycle lifecycle;
@Nullable
private final String error;
Expand All @@ -56,13 +58,15 @@ public ExplainIndexDataLifecycle(
boolean managedByDLM,
@Nullable Long indexCreationDate,
@Nullable Long rolloverDate,
@Nullable TimeValue generationDate,
@Nullable DataLifecycle lifecycle,
@Nullable String error
) {
this.index = index;
this.managedByDLM = managedByDLM;
this.indexCreationDate = indexCreationDate;
this.rolloverDate = rolloverDate;
this.generationDateMillis = generationDate == null ? null : generationDate.millis();
this.lifecycle = lifecycle;
this.error = error;
}
Expand All @@ -73,11 +77,13 @@ public ExplainIndexDataLifecycle(StreamInput in) throws IOException {
if (managedByDLM) {
this.indexCreationDate = in.readOptionalLong();
this.rolloverDate = in.readOptionalLong();
this.generationDateMillis = in.readOptionalLong();
this.lifecycle = in.readOptionalWriteable(DataLifecycle::new);
this.error = in.readOptionalString();
} else {
this.indexCreationDate = null;
this.rolloverDate = null;
this.generationDateMillis = null;
this.lifecycle = null;
this.error = null;
}
Expand Down Expand Up @@ -108,7 +114,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nulla
if (rolloverDate != null) {
builder.timeField(ROLLOVER_DATE_MILLIS_FIELD.getPreferredName(), ROLLOVER_DATE_FIELD.getPreferredName(), rolloverDate);
builder.field(TIME_SINCE_ROLLOVER_FIELD.getPreferredName(), getTimeSinceRollover(nowSupplier).toHumanReadableString(2));
// if the index has been rolled over we'll start reporting the generation time
}
if (generationDateMillis != null) {
builder.field(GENERATION_TIME.getPreferredName(), getGenerationTime(nowSupplier).toHumanReadableString(2));
}
if (this.lifecycle != null) {
Expand All @@ -130,24 +137,24 @@ public void writeTo(StreamOutput out) throws IOException {
if (managedByDLM) {
out.writeOptionalLong(indexCreationDate);
out.writeOptionalLong(rolloverDate);
out.writeOptionalLong(generationDateMillis);
out.writeOptionalWriteable(lifecycle);
out.writeOptionalString(error);
}
}

/**
* Calculates the time since this index started progressing towards the remaining of its lifecycle past rollover.
* Every index will have to wait to be rolled over before progressing towards its retention part of its lifecycle.
* If the index has not been rolled over this will return null.
* In the future, this will also consider the origination date of the index (however, it'll again only be displayed
* after the index is rolled over).
* Every index will either have to wait to be rolled over before progressing towards its retention part of its lifecycle,
* or be added to the datastream manually.
* If the index is the write index this will return null.
*/
@Nullable
public TimeValue getGenerationTime(Supplier<Long> now) {
if (rolloverDate == null) {
if (generationDateMillis == null) {
return null;
}
return TimeValue.timeValueMillis(Math.max(0L, now.get() - rolloverDate));
return TimeValue.timeValueMillis(Math.max(0L, now.get() - generationDateMillis));
}

/**
Expand Down

0 comments on commit efcdbbc

Please sign in to comment.