Skip to content

Commit

Permalink
Add component that updates the index.time_series.end_time (#82798)
Browse files Browse the repository at this point in the history
Added a component that updates the index.time_series.end_time
index setting of the most recent backing index of a time series data stream.

This allows a time series data stream to accept data when
the data stream hasn't rolled over yet, but its end_time
is get close to current time.

This component computes a new end_time based on current time,
the configured look_ahead_time and the poll interval. If
this new end time is larger than the set end_time then this
new value overwrites the old value.

The component runs periodically in the background and bulk
updates the end_time for the most recent backing index for
all time series data streams.

Relates to #74660
  • Loading branch information
martijnvg committed Feb 8, 2022
1 parent 7b952c2 commit a953813
Show file tree
Hide file tree
Showing 13 changed files with 707 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.datastreams;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.junit.After;

import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class TSDBIndexingIT extends ESSingleNodeTestCase {

private static final String DOC = """
{
"@timestamp": "$time",
"metricset": "pod",
"k8s": {
"pod": {
"name": "dog",
"uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9",
"ip": "10.10.55.3",
"network": {
"tx": 1434595272,
"rx": 530605511
}
}
}
}
""";

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(DataStreamsPlugin.class);
}

@Override
protected Settings nodeSettings() {
Settings.Builder newSettings = Settings.builder();
newSettings.put(super.nodeSettings());
// This essentially disables the automatic updates to end_time settings of a data stream's latest backing index.
newSettings.put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), "10m");
return newSettings.build();
}

@After
public void cleanup() {
DeleteDataStreamAction.Request deleteDataStreamsRequest = new DeleteDataStreamAction.Request("*");
assertAcked(client().execute(DeleteDataStreamAction.INSTANCE, deleteDataStreamsRequest).actionGet());
}

public void testTimeRanges() throws Exception {
var mappingTemplate = """
{
"_doc":{
"properties": {
"metricset": {
"type": "keyword",
"time_series_dimension": true
}
}
}
}""";
Settings templateSettings = Settings.builder().put("index.routing_path", "metricset").build();
var request = new PutComposableIndexTemplateAction.Request("id");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("k8s*"),
new Template(templateSettings, new CompressedXContent(mappingTemplate), null),
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(false, false, IndexMode.TIME_SERIES),
null
)
);
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();

// index doc
Instant time = Instant.now();
String backingIndexName;
{
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
var indexResponse = client().index(indexRequest).actionGet();
backingIndexName = indexResponse.getIndex();
}

// fetch start and end time
var getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(backingIndexName)).actionGet();
Instant startTime = IndexSettings.TIME_SERIES_START_TIME.get(getIndexResponse.getSettings().get(backingIndexName));
Instant endTime = IndexSettings.TIME_SERIES_END_TIME.get(getIndexResponse.getSettings().get(backingIndexName));

// index another doc and verify index
{
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
indexRequest.source(DOC.replace("$time", formatInstant(endTime.minusSeconds(1))), XContentType.JSON);
var indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getIndex(), equalTo(backingIndexName));
}

// index doc beyond range and check failure
{
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
time = randomBoolean() ? endTime : endTime.plusSeconds(randomIntBetween(1, 99));
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
expectThrows(IllegalArgumentException.class, () -> client().index(indexRequest).actionGet());
}

// Fetch UpdateTimeSeriesRangeService and increment time range of latest backing index:
UpdateTimeSeriesRangeService updateTimeSeriesRangeService = getInstanceFromNode(UpdateTimeSeriesRangeService.class);
CountDownLatch latch = new CountDownLatch(1);
updateTimeSeriesRangeService.perform(latch::countDown);
latch.await();

// index again and check for success
{
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
var indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getIndex(), equalTo(backingIndexName));
}

// rollover
var rolloverRequest = new RolloverRequest("k8s", null);
var rolloverResponse = client().admin().indices().rolloverIndex(rolloverRequest).actionGet();
var newBackingIndexName = rolloverResponse.getNewIndex();

// index and check target index is new
getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(newBackingIndexName)).actionGet();
Instant newStartTime = IndexSettings.TIME_SERIES_START_TIME.get(getIndexResponse.getSettings().get(newBackingIndexName));
Instant newEndTime = IndexSettings.TIME_SERIES_END_TIME.get(getIndexResponse.getSettings().get(newBackingIndexName));

// Check whether the document lands in the newest backing index:
time = Instant.ofEpochMilli(randomLongBetween(newStartTime.toEpochMilli(), newEndTime.toEpochMilli() - 1));
{
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
var indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getIndex(), equalTo(newBackingIndexName));
}

// Double check indexing against previous backing index:
time = newStartTime.minusMillis(1);
{
var indexRequest = new IndexRequest("k8s").opType(DocWriteRequest.OpType.CREATE);
indexRequest.source(DOC.replace("$time", formatInstant(time)), XContentType.JSON);
var indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getIndex(), equalTo(backingIndexName));
}
}

static String formatInstant(Instant instant) {
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public Settings getAdditionalIndexSettings(
String dataStreamName,
IndexMode templateIndexMode,
Metadata metadata,
long resolvedAt,
Instant resolvedAt,
Settings allSettings
) {
if (dataStreamName != null) {
Expand All @@ -48,9 +48,11 @@ public Settings getAdditionalIndexSettings(

if (indexMode == IndexMode.TIME_SERIES) {
TimeValue lookAheadTime = IndexSettings.LOOK_AHEAD_TIME.get(allSettings);
Instant start;
final Instant start;
final Instant end;
if (dataStream == null) {
start = Instant.ofEpochMilli(resolvedAt).minusMillis(lookAheadTime.getMillis());
start = resolvedAt.minusMillis(lookAheadTime.getMillis());
end = resolvedAt.plusMillis(lookAheadTime.getMillis());
} else {
IndexMetadata currentLatestBackingIndex = metadata.index(dataStream.getWriteIndex());
if (currentLatestBackingIndex.getSettings().hasValue(IndexSettings.TIME_SERIES_END_TIME.getKey()) == false) {
Expand All @@ -64,9 +66,14 @@ public Settings getAdditionalIndexSettings(
);
}
start = IndexSettings.TIME_SERIES_END_TIME.get(currentLatestBackingIndex.getSettings());
if (start.isAfter(resolvedAt)) {
end = start.plusMillis(lookAheadTime.getMillis());
} else {
end = resolvedAt.plusMillis(lookAheadTime.getMillis());
}
}
assert start.isBefore(end) : "data stream backing index's start time is not before end time";
builder.put(IndexSettings.TIME_SERIES_START_TIME.getKey(), FORMATTER.format(start));
Instant end = Instant.ofEpochMilli(resolvedAt).plusMillis(lookAheadTime.getMillis());
builder.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), FORMATTER.format(end));
}
return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
import org.elasticsearch.action.datastreams.MigrateToDataStreamAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.datastreams.PromoteDataStreamAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.action.CreateDataStreamTransportAction;
import org.elasticsearch.datastreams.action.DataStreamsStatsTransportAction;
import org.elasticsearch.datastreams.action.DeleteDataStreamTransportAction;
Expand All @@ -36,18 +41,66 @@
import org.elasticsearch.datastreams.rest.RestMigrateToDataStreamAction;
import org.elasticsearch.datastreams.rest.RestModifyDataStreamsAction;
import org.elasticsearch.datastreams.rest.RestPromoteDataStreamAction;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettingProvider;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;

import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

public class DataStreamsPlugin extends Plugin implements ActionPlugin {

public static final Setting<TimeValue> TIME_SERIES_POLL_INTERVAL = Setting.timeSetting(
"time_series.poll_interval",
TimeValue.timeValueMinutes(5),
TimeValue.timeValueMinutes(1),
TimeValue.timeValueMinutes(10),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

@Override
public List<Setting<?>> getSettings() {
if (IndexSettings.isTimeSeriesModeEnabled() == false) {
return List.of();
}

return List.of(TIME_SERIES_POLL_INTERVAL);
}

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
if (IndexSettings.isTimeSeriesModeEnabled() == false) {
return List.of();
}

var service = new UpdateTimeSeriesRangeService(environment.settings(), threadPool, clusterService);
return List.of(service);
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
var createDsAction = new ActionHandler<>(CreateDataStreamAction.INSTANCE, CreateDataStreamTransportAction.class);
Expand Down

0 comments on commit a953813

Please sign in to comment.