diff --git a/docs/changelog/103474.yaml b/docs/changelog/103474.yaml new file mode 100644 index 0000000000000..a1da15a6bfbe5 --- /dev/null +++ b/docs/changelog/103474.yaml @@ -0,0 +1,6 @@ +pr: 103474 +summary: Fix now in millis for ESQL search contexts +area: ES|QL +type: bug +issues: + - 103455 diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeBasedIndicesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeBasedIndicesIT.java new file mode 100644 index 0000000000000..a1fbee17ef8ec --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeBasedIndicesIT.java @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.RangeQueryBuilder; + +import java.util.List; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.hasSize; + +public class TimeBasedIndicesIT extends AbstractEsqlIntegTestCase { + + public void testFilter() { + long epoch = System.currentTimeMillis(); + assertAcked(client().admin().indices().prepareCreate("test").setMapping("@timestamp", "type=date", "value", "type=long")); + BulkRequestBuilder bulk = client().prepareBulk("test").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + int oldDocs = between(10, 100); + for (int i = 0; i < oldDocs; i++) { + long timestamp = epoch - TimeValue.timeValueHours(between(1, 2)).millis(); + bulk.add(new IndexRequest().source("@timestamp", timestamp, "value", -i)); + } + int newDocs = between(10, 100); + for (int i = 0; i < newDocs; i++) { + long timestamp = epoch + TimeValue.timeValueHours(between(1, 2)).millis(); + bulk.add(new IndexRequest().source("@timestamp", timestamp, "value", i)); + } + bulk.get(); + { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM test | limit 1000"); + request.filter(new RangeQueryBuilder("@timestamp").from(epoch - TimeValue.timeValueHours(3).millis()).to("now")); + try (var resp = run(request)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(oldDocs)); + } + } + { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM test | limit 1000"); + request.filter(new RangeQueryBuilder("@timestamp").from("now").to(epoch + TimeValue.timeValueHours(3).millis())); + try (var resp = run(request)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(newDocs)); + } + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index b7b31868d65e2..20fcc05e80440 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -328,6 +328,7 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, private void acquireSearchContexts( List shardIds, + EsqlConfiguration configuration, Map aliasFilters, ActionListener> listener ) { @@ -351,11 +352,12 @@ private void acquireSearchContexts( try { for (IndexShard shard : targetShards) { var aliasFilter = aliasFilters.getOrDefault(shard.shardId().getIndex(), AliasFilter.EMPTY); - ShardSearchRequest shardSearchLocalRequest = new ShardSearchRequest(shard.shardId(), 0, aliasFilter); - SearchContext context = searchService.createSearchContext( - shardSearchLocalRequest, - SearchService.NO_TIMEOUT + var shardRequest = new ShardSearchRequest( + shard.shardId(), + configuration.absoluteStartedTimeInMillis(), + aliasFilter ); + SearchContext context = searchService.createSearchContext(shardRequest, SearchService.NO_TIMEOUT); searchContexts.add(context); } for (SearchContext searchContext : searchContexts) { @@ -501,8 +503,9 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T final var exchangeSink = exchangeService.getSinkHandler(sessionId); parentTask.addListener(() -> exchangeService.finishSinkHandler(sessionId, new TaskCancelledException("task cancelled"))); final ActionListener listener = new OwningChannelActionListener<>(channel); - acquireSearchContexts(request.shardIds(), request.aliasFilters(), ActionListener.wrap(searchContexts -> { - var computeContext = new ComputeContext(sessionId, searchContexts, request.configuration(), null, exchangeSink); + final EsqlConfiguration configuration = request.configuration(); + acquireSearchContexts(request.shardIds(), configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> { + var computeContext = new ComputeContext(sessionId, searchContexts, configuration, null, exchangeSink); runCompute(parentTask, computeContext, request.plan(), ActionListener.wrap(driverProfiles -> { // don't return until all pages are fetched exchangeSink.addCompletionListener( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java index ac13f25c2d2a9..ccec6554cb2cb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java @@ -112,6 +112,15 @@ public String query() { return query; } + /** + * Returns the current time in milliseconds from the time epoch for the execution of this request. + * It ensures consistency by using the same value on all nodes involved in the search request. + * Note: Currently, it returns {@link System#currentTimeMillis()}, but this value will be serialized between nodes. + */ + public long absoluteStartedTimeInMillis() { + return System.currentTimeMillis(); + } + /** * Enable profiling, sacrificing performance to return information about * what operations are taking the most time.