Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/103474.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 103474
summary: Fix now in millis for ESQL search contexts
area: ES|QL
type: bug
issues:
- 103455
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.RangeQueryBuilder;

import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.hasSize;

public class TimeBasedIndicesIT extends AbstractEsqlIntegTestCase {

public void testFilter() {
long epoch = System.currentTimeMillis();
assertAcked(client().admin().indices().prepareCreate("test").setMapping("@timestamp", "type=date", "value", "type=long"));
BulkRequestBuilder bulk = client().prepareBulk("test").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
int oldDocs = between(10, 100);
for (int i = 0; i < oldDocs; i++) {
long timestamp = epoch - TimeValue.timeValueHours(between(1, 2)).millis();
bulk.add(new IndexRequest().source("@timestamp", timestamp, "value", -i));
}
int newDocs = between(10, 100);
for (int i = 0; i < newDocs; i++) {
long timestamp = epoch + TimeValue.timeValueHours(between(1, 2)).millis();
bulk.add(new IndexRequest().source("@timestamp", timestamp, "value", i));
}
bulk.get();
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM test | limit 1000");
request.filter(new RangeQueryBuilder("@timestamp").from(epoch - TimeValue.timeValueHours(3).millis()).to("now"));
try (var resp = run(request)) {
List<List<Object>> values = getValuesList(resp);
assertThat(values, hasSize(oldDocs));
}
}
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM test | limit 1000");
request.filter(new RangeQueryBuilder("@timestamp").from("now").to(epoch + TimeValue.timeValueHours(3).millis()));
try (var resp = run(request)) {
List<List<Object>> values = getValuesList(resp);
assertThat(values, hasSize(newDocs));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan,

private void acquireSearchContexts(
List<ShardId> shardIds,
EsqlConfiguration configuration,
Map<Index, AliasFilter> aliasFilters,
ActionListener<List<SearchContext>> listener
) {
Expand All @@ -351,11 +352,12 @@ private void acquireSearchContexts(
try {
for (IndexShard shard : targetShards) {
var aliasFilter = aliasFilters.getOrDefault(shard.shardId().getIndex(), AliasFilter.EMPTY);
ShardSearchRequest shardSearchLocalRequest = new ShardSearchRequest(shard.shardId(), 0, aliasFilter);
SearchContext context = searchService.createSearchContext(
shardSearchLocalRequest,
SearchService.NO_TIMEOUT
var shardRequest = new ShardSearchRequest(
shard.shardId(),
configuration.absoluteStartedTimeInMillis(),
aliasFilter
);
SearchContext context = searchService.createSearchContext(shardRequest, SearchService.NO_TIMEOUT);
searchContexts.add(context);
}
for (SearchContext searchContext : searchContexts) {
Expand Down Expand Up @@ -501,8 +503,9 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
final var exchangeSink = exchangeService.getSinkHandler(sessionId);
parentTask.addListener(() -> exchangeService.finishSinkHandler(sessionId, new TaskCancelledException("task cancelled")));
final ActionListener<DataNodeResponse> listener = new OwningChannelActionListener<>(channel);
acquireSearchContexts(request.shardIds(), request.aliasFilters(), ActionListener.wrap(searchContexts -> {
var computeContext = new ComputeContext(sessionId, searchContexts, request.configuration(), null, exchangeSink);
final EsqlConfiguration configuration = request.configuration();
acquireSearchContexts(request.shardIds(), configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> {
var computeContext = new ComputeContext(sessionId, searchContexts, configuration, null, exchangeSink);
runCompute(parentTask, computeContext, request.plan(), ActionListener.wrap(driverProfiles -> {
// don't return until all pages are fetched
exchangeSink.addCompletionListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ public String query() {
return query;
}

/**
* Returns the current time in milliseconds from the time epoch for the execution of this request.
* It ensures consistency by using the same value on all nodes involved in the search request.
* Note: Currently, it returns {@link System#currentTimeMillis()}, but this value will be serialized between nodes.
*/
public long absoluteStartedTimeInMillis() {
return System.currentTimeMillis();
}
Comment on lines +120 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not use the inherited now() instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that now will be adjusted with the timezone?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, but I guess we could (have) convert(ed) that to Z? We'd get the serialisation "for free". If feels a bit odd to have two time methods on EsqlConfiguration.java, but I guess this works for now (so to "say") too :)


/**
* Enable profiling, sacrificing performance to return information about
* what operations are taking the most time.
Expand Down