Skip to content

Commit

Permalink
Add time series related information to get data stream API (#86395)
Browse files Browse the repository at this point in the history
In case if a data stream is a time series data stream then include time series information.
This includes the continuous temporal ranges a time series data stream encapsulates.
This is computed based on combing the index.time_series.start_time and index.time_series.end_time
ranges of all backing indices of a time series data stream

Closes #83518
  • Loading branch information
martijnvg committed May 9, 2022
1 parent fae5099 commit f14da48
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -72,6 +77,15 @@ protected void masterOperation(
ClusterState state,
ActionListener<GetDataStreamAction.Response> listener
) throws Exception {
listener.onResponse(innerOperation(state, request, indexNameExpressionResolver, systemIndices));
}

static GetDataStreamAction.Response innerOperation(
ClusterState state,
GetDataStreamAction.Request request,
IndexNameExpressionResolver indexNameExpressionResolver,
SystemIndices systemIndices
) {
List<DataStream> dataStreams = getDataStreams(state, indexNameExpressionResolver, request);
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = new ArrayList<>(dataStreams.size());
for (DataStream dataStream : dataStreams) {
Expand Down Expand Up @@ -105,11 +119,53 @@ protected void masterOperation(
state,
dataStream.getIndices().stream().map(Index::getName).toArray(String[]::new)
);

GetDataStreamAction.Response.TimeSeries timeSeries = null;
if (dataStream.getIndexMode() == IndexMode.TIME_SERIES) {
List<Tuple<Instant, Instant>> ranges = new ArrayList<>();
Tuple<Instant, Instant> current = null;
for (Index index : dataStream.getIndices()) {
IndexMetadata metadata = state.getMetadata().index(index);
Instant start = IndexSettings.TIME_SERIES_START_TIME.get(metadata.getSettings());
Instant end = IndexSettings.TIME_SERIES_END_TIME.get(metadata.getSettings());
if (current == null) {
current = new Tuple<>(start, end);
} else if (current.v2().compareTo(start) == 0) {
current = new Tuple<>(current.v1(), end);
} else if (current.v2().compareTo(start) < 0) {
ranges.add(current);
current = new Tuple<>(start, end);
} else {
String message = "previous backing index ["
+ current.v1()
+ "/"
+ current.v2()
+ "] range is colliding with current backing index range ["
+ start
+ "/"
+ end
+ "]";
assert current.v2().compareTo(start) < 0 : message;
LOGGER.warn(message);
}
}
if (current != null) {
ranges.add(current);
}
timeSeries = new GetDataStreamAction.Response.TimeSeries(ranges);
}

dataStreamInfos.add(
new GetDataStreamAction.Response.DataStreamInfo(dataStream, streamHealth.getStatus(), indexTemplate, ilmPolicyName)
new GetDataStreamAction.Response.DataStreamInfo(
dataStream,
streamHealth.getStatus(),
indexTemplate,
ilmPolicyName,
timeSeries
)
);
}
listener.onResponse(new GetDataStreamAction.Response(dataStreamInfos));
return new GetDataStreamAction.Response(dataStreamInfos);
}

static List<DataStream> getDataStreams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -28,12 +30,22 @@ protected Response createTestInstance() {
int numDataStreams = randomIntBetween(0, 8);
List<Response.DataStreamInfo> dataStreams = new ArrayList<>();
for (int i = 0; i < numDataStreams; i++) {
List<Tuple<Instant, Instant>> timeSeries = null;
if (randomBoolean()) {
timeSeries = new ArrayList<>();
int numTimeSeries = randomIntBetween(0, 3);
for (int j = 0; j < numTimeSeries; j++) {
timeSeries.add(new Tuple<>(Instant.now(), Instant.now()));
}
}

dataStreams.add(
new Response.DataStreamInfo(
DataStreamTestHelper.randomInstance(),
ClusterHealthStatus.GREEN,
randomAlphaOfLengthBetween(2, 10),
randomAlphaOfLengthBetween(2, 10)
randomAlphaOfLengthBetween(2, 10),
timeSeries != null ? new Response.TimeSeries(timeSeries) : null
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,30 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.test.ESTestCase;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.getClusterStateWithDataStreams;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public class GetDataStreamsTransportActionTests extends ESTestCase {

private final IndexNameExpressionResolver resolver = TestIndexNameExpressionResolver.newInstance();
private final SystemIndices systemIndices = new SystemIndices(Map.of());

public void testGetDataStream() {
final String dataStreamName = "my-data-stream";
Expand Down Expand Up @@ -107,4 +116,64 @@ public void testGetNonexistentDataStream() {
assertThat(e.getMessage(), containsString("no such index [" + dataStreamName + "]"));
}

public void testGetTimeSeriesDataStream() {
Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
String dataStream1 = "ds-1";
String dataStream2 = "ds-2";
Instant sixHoursAgo = now.minus(6, ChronoUnit.HOURS);
Instant fourHoursAgo = now.minus(4, ChronoUnit.HOURS);
Instant twoHoursAgo = now.minus(2, ChronoUnit.HOURS);
Instant twoHoursAhead = now.plus(2, ChronoUnit.HOURS);

ClusterState state;
{
var mBuilder = new Metadata.Builder();
DataStreamTestHelper.getClusterStateWithDataStream(
mBuilder,
dataStream1,
List.of(
new Tuple<>(sixHoursAgo, fourHoursAgo),
new Tuple<>(fourHoursAgo, twoHoursAgo),
new Tuple<>(twoHoursAgo, twoHoursAhead)
)
);
DataStreamTestHelper.getClusterStateWithDataStream(
mBuilder,
dataStream2,
List.of(
new Tuple<>(sixHoursAgo, fourHoursAgo),
new Tuple<>(fourHoursAgo, twoHoursAgo),
new Tuple<>(twoHoursAgo, twoHoursAhead)
)
);
state = ClusterState.builder(new ClusterName("_name")).metadata(mBuilder).build();
}

var req = new GetDataStreamAction.Request(new String[] {});
var response = GetDataStreamsTransportAction.innerOperation(state, req, resolver, systemIndices);
assertThat(response.getDataStreams(), hasSize(2));
assertThat(response.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStream1));
assertThat(response.getDataStreams().get(0).getTimeSeries().temporalRanges(), contains(new Tuple<>(sixHoursAgo, twoHoursAhead)));
assertThat(response.getDataStreams().get(1).getDataStream().getName(), equalTo(dataStream2));
assertThat(response.getDataStreams().get(1).getTimeSeries().temporalRanges(), contains(new Tuple<>(sixHoursAgo, twoHoursAhead)));

// Remove the middle backing index first data stream, so that there is time gap in the data stream:
{
Metadata.Builder mBuilder = Metadata.builder(state.getMetadata());
DataStream dataStream = state.getMetadata().dataStreams().get(dataStream1);
mBuilder.put(dataStream.removeBackingIndex(dataStream.getIndices().get(1)));
mBuilder.remove(dataStream.getIndices().get(1).getName());
state = ClusterState.builder(state).metadata(mBuilder).build();
}
response = GetDataStreamsTransportAction.innerOperation(state, req, resolver, systemIndices);
assertThat(response.getDataStreams(), hasSize(2));
assertThat(response.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStream1));
assertThat(
response.getDataStreams().get(0).getTimeSeries().temporalRanges(),
contains(new Tuple<>(sixHoursAgo, fourHoursAgo), new Tuple<>(twoHoursAgo, twoHoursAhead))
);
assertThat(response.getDataStreams().get(1).getDataStream().getName(), equalTo(dataStream2));
assertThat(response.getDataStreams().get(1).getTimeSeries().temporalRanges(), contains(new Tuple<>(sixHoursAgo, twoHoursAhead)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ created the data stream:
- match: { data_streams.0.template: 'my-template1' }
- match: { data_streams.0.hidden: false }
- match: { data_streams.0.system: false }
- match: { data_streams.0.time_series.temporal_ranges.0.start: 2021-04-28T00:00:00.000Z }
- match: { data_streams.0.time_series.temporal_ranges.0.end: 2021-04-29T00:00:00.000Z }
- set: { data_streams.0.indices.0.index_name: backing_index }

- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.action.datastreams;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
Expand All @@ -18,12 +19,16 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -122,28 +127,42 @@ public static class DataStreamInfo implements SimpleDiffable<DataStreamInfo>, To
public static final ParseField SYSTEM_FIELD = new ParseField("system");
public static final ParseField ALLOW_CUSTOM_ROUTING = new ParseField("allow_custom_routing");
public static final ParseField REPLICATED = new ParseField("replicated");
public static final ParseField TIME_SERIES = new ParseField("time_series");
public static final ParseField TEMPORAL_RANGES = new ParseField("temporal_ranges");
public static final ParseField TEMPORAL_RANGE_START = new ParseField("start");
public static final ParseField TEMPORAL_RANGE_END = new ParseField("end");

DataStream dataStream;
ClusterHealthStatus dataStreamStatus;
private final DataStream dataStream;
private final ClusterHealthStatus dataStreamStatus;
@Nullable
String indexTemplate;
private final String indexTemplate;
@Nullable
String ilmPolicyName;
private final String ilmPolicyName;
@Nullable
private final TimeSeries timeSeries;

public DataStreamInfo(
DataStream dataStream,
ClusterHealthStatus dataStreamStatus,
@Nullable String indexTemplate,
@Nullable String ilmPolicyName
@Nullable String ilmPolicyName,
@Nullable TimeSeries timeSeries
) {
this.dataStream = dataStream;
this.dataStreamStatus = dataStreamStatus;
this.indexTemplate = indexTemplate;
this.ilmPolicyName = ilmPolicyName;
this.timeSeries = timeSeries;
}

public DataStreamInfo(StreamInput in) throws IOException {
this(new DataStream(in), ClusterHealthStatus.readFrom(in), in.readOptionalString(), in.readOptionalString());
DataStreamInfo(StreamInput in) throws IOException {
this(
new DataStream(in),
ClusterHealthStatus.readFrom(in),
in.readOptionalString(),
in.readOptionalString(),
in.getVersion().onOrAfter(Version.V_8_3_0) ? in.readOptionalWriteable(TimeSeries::new) : null
);
}

public DataStream getDataStream() {
Expand All @@ -164,12 +183,20 @@ public String getIlmPolicy() {
return ilmPolicyName;
}

@Nullable
public TimeSeries getTimeSeries() {
return timeSeries;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
dataStream.writeTo(out);
dataStreamStatus.writeTo(out);
out.writeOptionalString(indexTemplate);
out.writeOptionalString(ilmPolicyName);
if (out.getVersion().onOrAfter(Version.V_8_3_0)) {
out.writeOptionalWriteable(timeSeries);
}
}

@Override
Expand All @@ -193,6 +220,20 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(SYSTEM_FIELD.getPreferredName(), dataStream.isSystem());
builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), dataStream.isAllowCustomRouting());
builder.field(REPLICATED.getPreferredName(), dataStream.isReplicated());
if (timeSeries != null) {
builder.startObject(TIME_SERIES.getPreferredName());
builder.startArray(TEMPORAL_RANGES.getPreferredName());
for (var range : timeSeries.temporalRanges()) {
builder.startObject();
Instant start = range.v1();
builder.field(TEMPORAL_RANGE_START.getPreferredName(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(start));
Instant end = range.v2();
builder.field(TEMPORAL_RANGE_END.getPreferredName(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(end));
builder.endObject();
}
builder.endArray();
builder.endObject();
}
builder.endObject();
return builder;
}
Expand All @@ -205,12 +246,41 @@ public boolean equals(Object o) {
return dataStream.equals(that.dataStream)
&& dataStreamStatus == that.dataStreamStatus
&& Objects.equals(indexTemplate, that.indexTemplate)
&& Objects.equals(ilmPolicyName, that.ilmPolicyName);
&& Objects.equals(ilmPolicyName, that.ilmPolicyName)
&& Objects.equals(timeSeries, that.timeSeries);
}

@Override
public int hashCode() {
return Objects.hash(dataStream, dataStreamStatus, indexTemplate, ilmPolicyName, timeSeries);
}
}

public static record TimeSeries(List<Tuple<Instant, Instant>> temporalRanges) implements Writeable {

TimeSeries(StreamInput in) throws IOException {
this(in.readList(in1 -> new Tuple<>(in1.readInstant(), in1.readInstant())));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(temporalRanges, (out1, value) -> {
out1.writeInstant(value.v1());
out1.writeInstant(value.v2());
});
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TimeSeries that = (TimeSeries) o;
return temporalRanges.equals(that.temporalRanges);
}

@Override
public int hashCode() {
return Objects.hash(dataStream, dataStreamStatus, indexTemplate, ilmPolicyName);
return Objects.hash(temporalRanges);
}
}

Expand Down

0 comments on commit f14da48

Please sign in to comment.