diff --git a/docs/changelog/96035.yaml b/docs/changelog/96035.yaml new file mode 100644 index 0000000000000..6fa0a6b976192 --- /dev/null +++ b/docs/changelog/96035.yaml @@ -0,0 +1,7 @@ +pr: 96035 +summary: Expand start and end time to nanoseconds during coordinator rewrite when + needed +area: TSDB +type: bug +issues: + - 96030 diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java index 507523362518c..b298e0ce8614d 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java @@ -45,6 +45,9 @@ public class TSDBIndexingIT extends ESSingleNodeTestCase { { "_doc":{ "properties": { + "@timestamp" : { + "type": "date" + }, "metricset": { "type": "keyword", "time_series_dimension": true @@ -86,28 +89,18 @@ protected Settings nodeSettings() { } public void testTimeRanges() throws Exception { - var mappingTemplate = """ - { - "_doc":{ - "properties": { - "metricset": { - "type": "keyword", - "time_series_dimension": true - } - } - } - }"""; var templateSettings = Settings.builder().put("index.mode", "time_series"); if (randomBoolean()) { templateSettings.put("index.routing_path", "metricset"); } + var mapping = new CompressedXContent(randomBoolean() ? MAPPING_TEMPLATE : MAPPING_TEMPLATE.replace("date", "date_nanos")); if (randomBoolean()) { var request = new PutComposableIndexTemplateAction.Request("id"); request.indexTemplate( new ComposableIndexTemplate( List.of("k8s*"), - new Template(templateSettings.build(), new CompressedXContent(mappingTemplate), null), + new Template(templateSettings.build(), mapping, null), null, null, null, @@ -119,9 +112,7 @@ public void testTimeRanges() throws Exception { client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet(); } else { var putComponentTemplateRequest = new PutComponentTemplateAction.Request("1"); - putComponentTemplateRequest.componentTemplate( - new ComponentTemplate(new Template(null, new CompressedXContent(mappingTemplate), null), null, null) - ); + putComponentTemplateRequest.componentTemplate(new ComponentTemplate(new Template(null, mapping, null), null, null)); client().execute(PutComponentTemplateAction.INSTANCE, putComponentTemplateRequest).actionGet(); var putTemplateRequest = new PutComposableIndexTemplateAction.Request("id"); @@ -376,13 +367,14 @@ public void testInvalidTsdbTemplatesMissingSettings() throws Exception { public void testSkippingShards() throws Exception { Instant time = Instant.now(); + var mapping = new CompressedXContent(randomBoolean() ? MAPPING_TEMPLATE : MAPPING_TEMPLATE.replace("date", "date_nanos")); { var templateSettings = Settings.builder().put("index.mode", "time_series").put("index.routing_path", "metricset").build(); var request = new PutComposableIndexTemplateAction.Request("id1"); request.indexTemplate( new ComposableIndexTemplate( List.of("pattern-1"), - new Template(templateSettings, new CompressedXContent(MAPPING_TEMPLATE), null), + new Template(templateSettings, mapping, null), null, null, null, @@ -401,7 +393,7 @@ public void testSkippingShards() throws Exception { request.indexTemplate( new ComposableIndexTemplate( List.of("pattern-2"), - new Template(null, new CompressedXContent(MAPPING_TEMPLATE), null), + new Template(null, mapping, null), null, null, null, diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java index 9ae76394011c3..14fe7ad855626 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java @@ -19,7 +19,6 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; @@ -213,77 +212,19 @@ public void setup() throws IOException { } public void testTsdbDataStreams() throws Exception { - var bulkRequest = new Request("POST", "/k8s/_bulk"); - bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now()))); - bulkRequest.addParameter("refresh", "true"); - var response = client().performRequest(bulkRequest); - assertOK(response); - var responseBody = entityAsMap(response); - assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false)); - - var getDataStreamsRequest = new Request("GET", "/_data_stream"); - response = client().performRequest(getDataStreamsRequest); - assertOK(response); - var dataStreams = entityAsMap(response); - assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1)); - assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s")); - assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1)); - assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo("1")); - assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1)); - String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name"); - assertThat(firstBackingIndex, backingIndexEqualTo("k8s", 1)); - - var indices = getIndex(firstBackingIndex); - var escapedBackingIndex = firstBackingIndex.replace(".", "\\."); - assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s")); - assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series")); - String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"); - assertThat(startTimeFirstBackingIndex, notNullValue()); - String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"); - assertThat(endTimeFirstBackingIndex, notNullValue()); - List routingPaths = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.routing_path"); - assertThat(routingPaths, containsInAnyOrder("metricset", "k8s.pod.uid", "pod.labels.*")); - - var rolloverRequest = new Request("POST", "/k8s/_rollover"); - assertOK(client().performRequest(rolloverRequest)); - - response = client().performRequest(getDataStreamsRequest); - assertOK(response); - dataStreams = entityAsMap(response); - assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s")); - assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(2)); - String secondBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.1.index_name"); - assertThat(secondBackingIndex, backingIndexEqualTo("k8s", 2)); - - indices = getIndex(secondBackingIndex); - escapedBackingIndex = secondBackingIndex.replace(".", "\\."); - assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s")); - String startTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"); - assertThat(startTimeSecondBackingIndex, equalTo(endTimeFirstBackingIndex)); - String endTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"); - assertThat(endTimeSecondBackingIndex, notNullValue()); - - var indexRequest = new Request("POST", "/k8s/_doc"); - Instant time = parseInstant(startTimeFirstBackingIndex); - indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time))); - response = client().performRequest(indexRequest); - assertOK(response); - assertThat(entityAsMap(response).get("_index"), equalTo(firstBackingIndex)); - - indexRequest = new Request("POST", "/k8s/_doc"); - time = parseInstant(endTimeSecondBackingIndex).minusMillis(1); - indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time))); - response = client().performRequest(indexRequest); - assertOK(response); - assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex)); + assertTsdbDataStream(); } public void testTsdbDataStreamsNanos() throws Exception { - // Create a template + // Overwrite template to use date_nanos field type: var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1"); putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE.replace("date", "date_nanos")); assertOK(client().performRequest(putComposableIndexTemplateRequest)); + assertTsdbDataStream(); + } + + private void assertTsdbDataStream() throws IOException { var bulkRequest = new Request("POST", "/k8s/_bulk"); bulkRequest.setJsonEntity(BULK.replace("$now", formatInstantNanos(Instant.now()))); bulkRequest.addParameter("refresh", "true"); @@ -333,6 +274,7 @@ public void testTsdbDataStreamsNanos() throws Exception { assertThat(endTimeSecondBackingIndex, notNullValue()); var indexRequest = new Request("POST", "/k8s/_doc"); + indexRequest.addParameter("refresh", "true"); Instant time = parseInstant(startTimeFirstBackingIndex); indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time))); response = client().performRequest(indexRequest); @@ -340,11 +282,45 @@ public void testTsdbDataStreamsNanos() throws Exception { assertThat(entityAsMap(response).get("_index"), equalTo(firstBackingIndex)); indexRequest = new Request("POST", "/k8s/_doc"); + indexRequest.addParameter("refresh", "true"); time = parseInstant(endTimeSecondBackingIndex).minusMillis(1); indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time))); response = client().performRequest(indexRequest); assertOK(response); assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex)); + + var searchRequest = new Request("GET", "k8s/_search"); + searchRequest.setJsonEntity(""" + { + "query": { + "range":{ + "@timestamp":{ + "gte": "now-7d", + "lte": "now+7d" + } + } + }, + "sort": [ + { + "@timestamp": { + "order": "desc" + } + } + ] + } + """); + response = client().performRequest(searchRequest); + assertOK(response); + responseBody = entityAsMap(response); + try { + assertThat(ObjectPath.evaluate(responseBody, "hits.total.value"), equalTo(10)); + assertThat(ObjectPath.evaluate(responseBody, "hits.total.relation"), equalTo("eq")); + assertThat(ObjectPath.evaluate(responseBody, "hits.hits.0._index"), equalTo(secondBackingIndex)); + assertThat(ObjectPath.evaluate(responseBody, "hits.hits.1._index"), equalTo(firstBackingIndex)); + } catch (Exception | AssertionError e) { + logger.error("search response body causing assertion error [" + responseBody + "]", e); + throw e; + } } public void testSimulateTsdbDataStreamTemplate() throws Exception { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java index 09157c819577c..2e069d690dd1e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadata.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexLongFieldRange; @@ -1297,14 +1298,27 @@ public IndexLongFieldRange getTimestampRange() { } /** + * @return whether this index has a time series timestamp range + */ + public boolean hasTimeSeriesTimestampRange() { + return indexMode != null && indexMode.getTimestampBound(this) != null; + } + + /** + * @param dateFieldType the date field type of '@timestamp' field which is + * used to convert the start and end times recorded in index metadata + * to the right format that is being used by '@timestamp' field. + * For example, the '@timestamp' can be configured with nanosecond precision. * @return the time range this index represents if this index is in time series mode. * Otherwise null is returned. */ @Nullable - public IndexLongFieldRange getTimeSeriesTimestampRange() { + public IndexLongFieldRange getTimeSeriesTimestampRange(DateFieldMapper.DateFieldType dateFieldType) { var bounds = indexMode != null ? indexMode.getTimestampBound(this) : null; if (bounds != null) { - return IndexLongFieldRange.NO_SHARDS.extendWithShardRange(0, 1, ShardLongFieldRange.of(bounds.startTime(), bounds.endTime())); + long start = dateFieldType.resolution().convert(Instant.ofEpochMilli(bounds.startTime())); + long end = dateFieldType.resolution().convert(Instant.ofEpochMilli(bounds.endTime())); + return IndexLongFieldRange.NO_SHARDS.extendWithShardRange(0, 1, ShardLongFieldRange.of(start, end)); } else { return null; } diff --git a/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java index 9f1447d436ad5..e44861b4afe8a 100644 --- a/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java +++ b/server/src/main/java/org/elasticsearch/index/query/CoordinatorRewriteContextProvider.java @@ -49,20 +49,18 @@ public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) { if (indexMetadata == null) { return null; } + DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index); + if (dateFieldType == null) { + return null; + } IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange(); if (timestampRange.containsAllShardRanges() == false) { - timestampRange = indexMetadata.getTimeSeriesTimestampRange(); + timestampRange = indexMetadata.getTimeSeriesTimestampRange(dateFieldType); if (timestampRange == null) { return null; } } - DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index); - - if (dateFieldType == null) { - return null; - } - return new CoordinatorRewriteContext(parserConfig, client, nowInMillis, timestampRange, dateFieldType); } } diff --git a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java index c49334181bff0..49187bd7a9003 100644 --- a/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java +++ b/server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java @@ -141,7 +141,7 @@ private static boolean hasUsefulTimestampField(IndexMetadata indexMetadata) { return false; } - if (indexMetadata.getTimeSeriesTimestampRange() != null) { + if (indexMetadata.hasTimeSeriesTimestampRange()) { // Tsdb indices have @timestamp field and index.time_series.start_time / index.time_series.end_time range return true; }