Skip to content

Commit

Permalink
Expand start and end time to nanoseconds during coordinator rewrite w…
Browse files Browse the repository at this point in the history
…hen needed (#96035) (#96316)

Expand index.time_series.start_time and end_time to nanoseconds if
timestamp field's resolution is set to nanoseconds. When creating
coordinator rewrite context.

Closes #96030
  • Loading branch information
martijnvg committed May 24, 2023
1 parent 63c9d98 commit c13d5b1
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 92 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/96035.yaml
@@ -0,0 +1,7 @@
pr: 96035
summary: Expand start and end time to nanoseconds during coordinator rewrite when
needed
area: TSDB
type: bug
issues:
- 96030
Expand Up @@ -45,6 +45,9 @@ public class TSDBIndexingIT extends ESSingleNodeTestCase {
{
"_doc":{
"properties": {
"@timestamp" : {
"type": "date"
},
"metricset": {
"type": "keyword",
"time_series_dimension": true
Expand Down Expand Up @@ -86,28 +89,18 @@ protected Settings nodeSettings() {
}

public void testTimeRanges() throws Exception {
var mappingTemplate = """
{
"_doc":{
"properties": {
"metricset": {
"type": "keyword",
"time_series_dimension": true
}
}
}
}""";
var templateSettings = Settings.builder().put("index.mode", "time_series");
if (randomBoolean()) {
templateSettings.put("index.routing_path", "metricset");
}
var mapping = new CompressedXContent(randomBoolean() ? MAPPING_TEMPLATE : MAPPING_TEMPLATE.replace("date", "date_nanos"));

if (randomBoolean()) {
var request = new PutComposableIndexTemplateAction.Request("id");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("k8s*"),
new Template(templateSettings.build(), new CompressedXContent(mappingTemplate), null),
new Template(templateSettings.build(), mapping, null),
null,
null,
null,
Expand All @@ -119,9 +112,7 @@ public void testTimeRanges() throws Exception {
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
} else {
var putComponentTemplateRequest = new PutComponentTemplateAction.Request("1");
putComponentTemplateRequest.componentTemplate(
new ComponentTemplate(new Template(null, new CompressedXContent(mappingTemplate), null), null, null)
);
putComponentTemplateRequest.componentTemplate(new ComponentTemplate(new Template(null, mapping, null), null, null));
client().execute(PutComponentTemplateAction.INSTANCE, putComponentTemplateRequest).actionGet();

var putTemplateRequest = new PutComposableIndexTemplateAction.Request("id");
Expand Down Expand Up @@ -376,13 +367,14 @@ public void testInvalidTsdbTemplatesMissingSettings() throws Exception {

public void testSkippingShards() throws Exception {
Instant time = Instant.now();
var mapping = new CompressedXContent(randomBoolean() ? MAPPING_TEMPLATE : MAPPING_TEMPLATE.replace("date", "date_nanos"));
{
var templateSettings = Settings.builder().put("index.mode", "time_series").put("index.routing_path", "metricset").build();
var request = new PutComposableIndexTemplateAction.Request("id1");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("pattern-1"),
new Template(templateSettings, new CompressedXContent(MAPPING_TEMPLATE), null),
new Template(templateSettings, mapping, null),
null,
null,
null,
Expand All @@ -401,7 +393,7 @@ public void testSkippingShards() throws Exception {
request.indexTemplate(
new ComposableIndexTemplate(
List.of("pattern-2"),
new Template(null, new CompressedXContent(MAPPING_TEMPLATE), null),
new Template(null, mapping, null),
null,
null,
null,
Expand Down
Expand Up @@ -19,7 +19,6 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -213,77 +212,19 @@ public void setup() throws IOException {
}

public void testTsdbDataStreams() throws Exception {
var bulkRequest = new Request("POST", "/k8s/_bulk");
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
bulkRequest.addParameter("refresh", "true");
var response = client().performRequest(bulkRequest);
assertOK(response);
var responseBody = entityAsMap(response);
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));

var getDataStreamsRequest = new Request("GET", "/_data_stream");
response = client().performRequest(getDataStreamsRequest);
assertOK(response);
var dataStreams = entityAsMap(response);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo("1"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
assertThat(firstBackingIndex, backingIndexEqualTo("k8s", 1));

var indices = getIndex(firstBackingIndex);
var escapedBackingIndex = firstBackingIndex.replace(".", "\\.");
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series"));
String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
assertThat(startTimeFirstBackingIndex, notNullValue());
String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
assertThat(endTimeFirstBackingIndex, notNullValue());
List<?> routingPaths = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.routing_path");
assertThat(routingPaths, containsInAnyOrder("metricset", "k8s.pod.uid", "pod.labels.*"));

var rolloverRequest = new Request("POST", "/k8s/_rollover");
assertOK(client().performRequest(rolloverRequest));

response = client().performRequest(getDataStreamsRequest);
assertOK(response);
dataStreams = entityAsMap(response);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(2));
String secondBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.1.index_name");
assertThat(secondBackingIndex, backingIndexEqualTo("k8s", 2));

indices = getIndex(secondBackingIndex);
escapedBackingIndex = secondBackingIndex.replace(".", "\\.");
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
String startTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
assertThat(startTimeSecondBackingIndex, equalTo(endTimeFirstBackingIndex));
String endTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
assertThat(endTimeSecondBackingIndex, notNullValue());

var indexRequest = new Request("POST", "/k8s/_doc");
Instant time = parseInstant(startTimeFirstBackingIndex);
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
response = client().performRequest(indexRequest);
assertOK(response);
assertThat(entityAsMap(response).get("_index"), equalTo(firstBackingIndex));

indexRequest = new Request("POST", "/k8s/_doc");
time = parseInstant(endTimeSecondBackingIndex).minusMillis(1);
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
response = client().performRequest(indexRequest);
assertOK(response);
assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex));
assertTsdbDataStream();
}

public void testTsdbDataStreamsNanos() throws Exception {
// Create a template
// Overwrite template to use date_nanos field type:
var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE.replace("date", "date_nanos"));
assertOK(client().performRequest(putComposableIndexTemplateRequest));

assertTsdbDataStream();
}

private void assertTsdbDataStream() throws IOException {
var bulkRequest = new Request("POST", "/k8s/_bulk");
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstantNanos(Instant.now())));
bulkRequest.addParameter("refresh", "true");
Expand Down Expand Up @@ -333,18 +274,53 @@ public void testTsdbDataStreamsNanos() throws Exception {
assertThat(endTimeSecondBackingIndex, notNullValue());

var indexRequest = new Request("POST", "/k8s/_doc");
indexRequest.addParameter("refresh", "true");
Instant time = parseInstant(startTimeFirstBackingIndex);
indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time)));
response = client().performRequest(indexRequest);
assertOK(response);
assertThat(entityAsMap(response).get("_index"), equalTo(firstBackingIndex));

indexRequest = new Request("POST", "/k8s/_doc");
indexRequest.addParameter("refresh", "true");
time = parseInstant(endTimeSecondBackingIndex).minusMillis(1);
indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time)));
response = client().performRequest(indexRequest);
assertOK(response);
assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex));

var searchRequest = new Request("GET", "k8s/_search");
searchRequest.setJsonEntity("""
{
"query": {
"range":{
"@timestamp":{
"gte": "now-7d",
"lte": "now+7d"
}
}
},
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}
""");
response = client().performRequest(searchRequest);
assertOK(response);
responseBody = entityAsMap(response);
try {
assertThat(ObjectPath.evaluate(responseBody, "hits.total.value"), equalTo(10));
assertThat(ObjectPath.evaluate(responseBody, "hits.total.relation"), equalTo("eq"));
assertThat(ObjectPath.evaluate(responseBody, "hits.hits.0._index"), equalTo(secondBackingIndex));
assertThat(ObjectPath.evaluate(responseBody, "hits.hits.1._index"), equalTo(firstBackingIndex));
} catch (Exception | AssertionError e) {
logger.error("search response body causing assertion error [" + responseBody + "]", e);
throw e;
}
}

public void testSimulateTsdbDataStreamTemplate() throws Exception {
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexLongFieldRange;
Expand Down Expand Up @@ -1299,14 +1300,27 @@ public IndexLongFieldRange getTimestampRange() {
}

/**
* @return whether this index has a time series timestamp range
*/
public boolean hasTimeSeriesTimestampRange() {
return indexMode != null && indexMode.getTimestampBound(this) != null;
}

/**
* @param dateFieldType the date field type of '@timestamp' field which is
* used to convert the start and end times recorded in index metadata
* to the right format that is being used by '@timestamp' field.
* For example, the '@timestamp' can be configured with nanosecond precision.
* @return the time range this index represents if this index is in time series mode.
* Otherwise <code>null</code> is returned.
*/
@Nullable
public IndexLongFieldRange getTimeSeriesTimestampRange() {
public IndexLongFieldRange getTimeSeriesTimestampRange(DateFieldMapper.DateFieldType dateFieldType) {
var bounds = indexMode != null ? indexMode.getTimestampBound(this) : null;
if (bounds != null) {
return IndexLongFieldRange.NO_SHARDS.extendWithShardRange(0, 1, ShardLongFieldRange.of(bounds.startTime(), bounds.endTime()));
long start = dateFieldType.resolution().convert(Instant.ofEpochMilli(bounds.startTime()));
long end = dateFieldType.resolution().convert(Instant.ofEpochMilli(bounds.endTime()));
return IndexLongFieldRange.NO_SHARDS.extendWithShardRange(0, 1, ShardLongFieldRange.of(start, end));
} else {
return null;
}
Expand Down
Expand Up @@ -53,20 +53,18 @@ public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) {
if (indexMetadata == null) {
return null;
}
DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index);
if (dateFieldType == null) {
return null;
}
IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange();
if (timestampRange.containsAllShardRanges() == false) {
timestampRange = indexMetadata.getTimeSeriesTimestampRange();
timestampRange = indexMetadata.getTimeSeriesTimestampRange(dateFieldType);
if (timestampRange == null) {
return null;
}
}

DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index);

if (dateFieldType == null) {
return null;
}

return new CoordinatorRewriteContext(parserConfig, writeableRegistry, client, nowInMillis, timestampRange, dateFieldType);
}
}
Expand Up @@ -141,7 +141,7 @@ private static boolean hasUsefulTimestampField(IndexMetadata indexMetadata) {
return false;
}

if (indexMetadata.getTimeSeriesTimestampRange() != null) {
if (indexMetadata.hasTimeSeriesTimestampRange()) {
// Tsdb indices have @timestamp field and index.time_series.start_time / index.time_series.end_time range
return true;
}
Expand Down

0 comments on commit c13d5b1

Please sign in to comment.