Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand start and end time to nanoseconds during coordinator rewrite when needed #96035

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/96035.yaml
@@ -0,0 +1,7 @@
pr: 96035
summary: Expand start and end time to nanoseconds during coordinator rewrite when
needed
area: TSDB
type: bug
issues:
- 96030
Expand Up @@ -45,6 +45,9 @@ public class TSDBIndexingIT extends ESSingleNodeTestCase {
{
"_doc":{
"properties": {
"@timestamp" : {
"type": "date"
},
"metricset": {
"type": "keyword",
"time_series_dimension": true
Expand Down Expand Up @@ -86,28 +89,18 @@ protected Settings nodeSettings() {
}

public void testTimeRanges() throws Exception {
var mappingTemplate = """
{
"_doc":{
"properties": {
"metricset": {
"type": "keyword",
"time_series_dimension": true
}
}
}
}""";
var templateSettings = Settings.builder().put("index.mode", "time_series");
if (randomBoolean()) {
templateSettings.put("index.routing_path", "metricset");
}
var mapping = new CompressedXContent(randomBoolean() ? MAPPING_TEMPLATE : MAPPING_TEMPLATE.replace("date", "date_nanos"));

if (randomBoolean()) {
var request = new PutComposableIndexTemplateAction.Request("id");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("k8s*"),
new Template(templateSettings.build(), new CompressedXContent(mappingTemplate), null),
new Template(templateSettings.build(), mapping, null),
null,
null,
null,
Expand All @@ -119,9 +112,7 @@ public void testTimeRanges() throws Exception {
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
} else {
var putComponentTemplateRequest = new PutComponentTemplateAction.Request("1");
putComponentTemplateRequest.componentTemplate(
new ComponentTemplate(new Template(null, new CompressedXContent(mappingTemplate), null), null, null)
);
putComponentTemplateRequest.componentTemplate(new ComponentTemplate(new Template(null, mapping, null), null, null));
client().execute(PutComponentTemplateAction.INSTANCE, putComponentTemplateRequest).actionGet();

var putTemplateRequest = new PutComposableIndexTemplateAction.Request("id");
Expand Down Expand Up @@ -376,13 +367,14 @@ public void testInvalidTsdbTemplatesMissingSettings() throws Exception {

public void testSkippingShards() throws Exception {
Instant time = Instant.now();
var mapping = new CompressedXContent(randomBoolean() ? MAPPING_TEMPLATE : MAPPING_TEMPLATE.replace("date", "date_nanos"));
{
var templateSettings = Settings.builder().put("index.mode", "time_series").put("index.routing_path", "metricset").build();
var request = new PutComposableIndexTemplateAction.Request("id1");
request.indexTemplate(
new ComposableIndexTemplate(
List.of("pattern-1"),
new Template(templateSettings, new CompressedXContent(MAPPING_TEMPLATE), null),
new Template(templateSettings, mapping, null),
null,
null,
null,
Expand All @@ -401,7 +393,7 @@ public void testSkippingShards() throws Exception {
request.indexTemplate(
new ComposableIndexTemplate(
List.of("pattern-2"),
new Template(null, new CompressedXContent(MAPPING_TEMPLATE), null),
new Template(null, mapping, null),
null,
null,
null,
Expand Down
Expand Up @@ -19,7 +19,6 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -213,77 +212,19 @@ public void setup() throws IOException {
}

public void testTsdbDataStreams() throws Exception {
var bulkRequest = new Request("POST", "/k8s/_bulk");
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
bulkRequest.addParameter("refresh", "true");
var response = client().performRequest(bulkRequest);
assertOK(response);
var responseBody = entityAsMap(response);
assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));

var getDataStreamsRequest = new Request("GET", "/_data_stream");
response = client().performRequest(getDataStreamsRequest);
assertOK(response);
var dataStreams = entityAsMap(response);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(1));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.template"), equalTo("1"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.indices"), hasSize(1));
String firstBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.0.index_name");
assertThat(firstBackingIndex, backingIndexEqualTo("k8s", 1));

var indices = getIndex(firstBackingIndex);
var escapedBackingIndex = firstBackingIndex.replace(".", "\\.");
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), equalTo("time_series"));
String startTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
assertThat(startTimeFirstBackingIndex, notNullValue());
String endTimeFirstBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
assertThat(endTimeFirstBackingIndex, notNullValue());
List<?> routingPaths = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.routing_path");
assertThat(routingPaths, containsInAnyOrder("metricset", "k8s.pod.uid", "pod.labels.*"));

var rolloverRequest = new Request("POST", "/k8s/_rollover");
assertOK(client().performRequest(rolloverRequest));

response = client().performRequest(getDataStreamsRequest);
assertOK(response);
dataStreams = entityAsMap(response);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(2));
String secondBackingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices.1.index_name");
assertThat(secondBackingIndex, backingIndexEqualTo("k8s", 2));

indices = getIndex(secondBackingIndex);
escapedBackingIndex = secondBackingIndex.replace(".", "\\.");
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
String startTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time");
assertThat(startTimeSecondBackingIndex, equalTo(endTimeFirstBackingIndex));
String endTimeSecondBackingIndex = ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time");
assertThat(endTimeSecondBackingIndex, notNullValue());

var indexRequest = new Request("POST", "/k8s/_doc");
Instant time = parseInstant(startTimeFirstBackingIndex);
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
response = client().performRequest(indexRequest);
assertOK(response);
assertThat(entityAsMap(response).get("_index"), equalTo(firstBackingIndex));

indexRequest = new Request("POST", "/k8s/_doc");
time = parseInstant(endTimeSecondBackingIndex).minusMillis(1);
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
response = client().performRequest(indexRequest);
assertOK(response);
assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex));
assertTsdbDataStream();
}

public void testTsdbDataStreamsNanos() throws Exception {
// Create a template
// Overwrite template to use date_nanos field type:
var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE.replace("date", "date_nanos"));
assertOK(client().performRequest(putComposableIndexTemplateRequest));

assertTsdbDataStream();
}

private void assertTsdbDataStream() throws IOException {
var bulkRequest = new Request("POST", "/k8s/_bulk");
bulkRequest.setJsonEntity(BULK.replace("$now", formatInstantNanos(Instant.now())));
bulkRequest.addParameter("refresh", "true");
Expand Down Expand Up @@ -333,18 +274,53 @@ public void testTsdbDataStreamsNanos() throws Exception {
assertThat(endTimeSecondBackingIndex, notNullValue());

var indexRequest = new Request("POST", "/k8s/_doc");
indexRequest.addParameter("refresh", "true");
Instant time = parseInstant(startTimeFirstBackingIndex);
indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time)));
response = client().performRequest(indexRequest);
assertOK(response);
assertThat(entityAsMap(response).get("_index"), equalTo(firstBackingIndex));

indexRequest = new Request("POST", "/k8s/_doc");
indexRequest.addParameter("refresh", "true");
time = parseInstant(endTimeSecondBackingIndex).minusMillis(1);
indexRequest.setJsonEntity(DOC.replace("$time", formatInstantNanos(time)));
response = client().performRequest(indexRequest);
assertOK(response);
assertThat(entityAsMap(response).get("_index"), equalTo(secondBackingIndex));

var searchRequest = new Request("GET", "k8s/_search");
searchRequest.setJsonEntity("""
{
"query": {
"range":{
"@timestamp":{
"gte": "now-7d",
"lte": "now+7d"
}
}
},
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}
""");
response = client().performRequest(searchRequest);
assertOK(response);
responseBody = entityAsMap(response);
try {
assertThat(ObjectPath.evaluate(responseBody, "hits.total.value"), equalTo(10));
assertThat(ObjectPath.evaluate(responseBody, "hits.total.relation"), equalTo("eq"));
assertThat(ObjectPath.evaluate(responseBody, "hits.hits.0._index"), equalTo(secondBackingIndex));
assertThat(ObjectPath.evaluate(responseBody, "hits.hits.1._index"), equalTo(firstBackingIndex));
} catch (Exception | AssertionError e) {
logger.error("search response body causing assertion error [" + responseBody + "]", e);
throw e;
}
}

public void testSimulateTsdbDataStreamTemplate() throws Exception {
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexLongFieldRange;
Expand Down Expand Up @@ -1297,14 +1298,27 @@ public IndexLongFieldRange getTimestampRange() {
}

/**
* @return whether this index has a time series timestamp range
*/
public boolean hasTimeSeriesTimestampRange() {
return indexMode != null && indexMode.getTimestampBound(this) != null;
}

/**
* @param dateFieldType the date field type of '@timestamp' field which is
* used to convert the start and end times recorded in index metadata
* to the right format that is being used by '@timestamp' field.
* For example, the '@timestamp' can be configured with nanosecond precision.
* @return the time range this index represents if this index is in time series mode.
* Otherwise <code>null</code> is returned.
*/
@Nullable
public IndexLongFieldRange getTimeSeriesTimestampRange() {
public IndexLongFieldRange getTimeSeriesTimestampRange(DateFieldMapper.DateFieldType dateFieldType) {
var bounds = indexMode != null ? indexMode.getTimestampBound(this) : null;
if (bounds != null) {
return IndexLongFieldRange.NO_SHARDS.extendWithShardRange(0, 1, ShardLongFieldRange.of(bounds.startTime(), bounds.endTime()));
long start = dateFieldType.resolution().convert(Instant.ofEpochMilli(bounds.startTime()));
long end = dateFieldType.resolution().convert(Instant.ofEpochMilli(bounds.endTime()));
return IndexLongFieldRange.NO_SHARDS.extendWithShardRange(0, 1, ShardLongFieldRange.of(start, end));
} else {
return null;
}
Expand Down
Expand Up @@ -49,20 +49,18 @@ public CoordinatorRewriteContext getCoordinatorRewriteContext(Index index) {
if (indexMetadata == null) {
return null;
}
DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index);
if (dateFieldType == null) {
return null;
}
IndexLongFieldRange timestampRange = indexMetadata.getTimestampRange();
if (timestampRange.containsAllShardRanges() == false) {
timestampRange = indexMetadata.getTimeSeriesTimestampRange();
timestampRange = indexMetadata.getTimeSeriesTimestampRange(dateFieldType);
if (timestampRange == null) {
return null;
}
}

DateFieldMapper.DateFieldType dateFieldType = mappingSupplier.apply(index);

if (dateFieldType == null) {
return null;
}

return new CoordinatorRewriteContext(parserConfig, client, nowInMillis, timestampRange, dateFieldType);
}
}
Expand Up @@ -141,7 +141,7 @@ private static boolean hasUsefulTimestampField(IndexMetadata indexMetadata) {
return false;
}

if (indexMetadata.getTimeSeriesTimestampRange() != null) {
if (indexMetadata.hasTimeSeriesTimestampRange()) {
// Tsdb indices have @timestamp field and index.time_series.start_time / index.time_series.end_time range
return true;
}
Expand Down