diff --git a/docs/changelog/135209.yaml b/docs/changelog/135209.yaml new file mode 100644 index 0000000000000..ef1e2d96658de --- /dev/null +++ b/docs/changelog/135209.yaml @@ -0,0 +1,6 @@ +pr: 135209 +summary: Fix expiration time in ES|QL async +area: ES|QL +type: bug +issues: + - 135169 diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementService.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementService.java index 91fdb9c39b6e3..05b3afed7c5a7 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementService.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementService.java @@ -175,7 +175,6 @@ public AsyncTaskManagementService( public void asyncExecute( Request request, TimeValue waitForCompletionTimeout, - TimeValue keepAlive, boolean keepOnCompletion, ActionListener listener ) { @@ -188,7 +187,7 @@ public void asyncExecute( operation.execute( request, searchTask, - wrapStoringListener(searchTask, waitForCompletionTimeout, keepAlive, keepOnCompletion, listener) + wrapStoringListener(searchTask, waitForCompletionTimeout, keepOnCompletion, listener) ); operationStarted = true; } finally { @@ -203,7 +202,6 @@ public void asyncExecute( private ActionListener wrapStoringListener( T searchTask, TimeValue waitForCompletionTimeout, - TimeValue keepAlive, boolean keepOnCompletion, ActionListener listener ) { @@ -225,7 +223,7 @@ private ActionListener wrapStoringListener( if (keepOnCompletion) { storeResults( searchTask, - new StoredAsyncResponse<>(response, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()), + new StoredAsyncResponse<>(response, searchTask.getExpirationTimeMillis()), ActionListener.running(() -> acquiredListener.onResponse(response)) ); } else { @@ -237,7 +235,7 @@ private ActionListener wrapStoringListener( // We finished after timeout - saving results storeResults( searchTask, - new StoredAsyncResponse<>(response, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()), + new StoredAsyncResponse<>(response, searchTask.getExpirationTimeMillis()), ActionListener.running(response::decRef) ); } @@ -249,7 +247,7 @@ private ActionListener wrapStoringListener( if (keepOnCompletion) { storeResults( searchTask, - new StoredAsyncResponse<>(e, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()), + new StoredAsyncResponse<>(e, searchTask.getExpirationTimeMillis()), ActionListener.running(() -> acquiredListener.onFailure(e)) ); } else { @@ -259,7 +257,7 @@ private ActionListener wrapStoringListener( } } else { // We finished after timeout - saving exception - storeResults(searchTask, new StoredAsyncResponse<>(e, threadPool.absoluteTimeInMillis() + keepAlive.getMillis())); + storeResults(searchTask, new StoredAsyncResponse<>(e, searchTask.getExpirationTimeMillis())); } }); } diff --git a/x-pack/plugin/esql-core/src/test/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementServiceTests.java b/x-pack/plugin/esql-core/src/test/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementServiceTests.java index 2a7b58aecb1df..b0a8451b376ac 100644 --- a/x-pack/plugin/esql-core/src/test/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementServiceTests.java +++ b/x-pack/plugin/esql-core/src/test/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementServiceTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.LegacyActionRequest; import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -40,6 +41,7 @@ import static org.elasticsearch.xpack.esql.core.async.AsyncTaskManagementService.addCompletionListener; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -52,9 +54,11 @@ public class AsyncTaskManagementServiceTests extends ESSingleNodeTestCase { public static class TestRequest extends LegacyActionRequest { private final String string; + private final TimeValue keepAlive; - public TestRequest(String string) { + public TestRequest(String string, TimeValue keepAlive) { this.string = string; + this.keepAlive = keepAlive; } @Override @@ -129,7 +133,7 @@ public TestTask createTask( headers, originHeaders, asyncExecutionId, - TimeValue.timeValueDays(5) + request.keepAlive ); } @@ -172,7 +176,7 @@ public void setup() { ); results = new AsyncResultsService<>( store, - true, + false, TestTask.class, (task, listener, timeout) -> addCompletionListener(transportService.getThreadPool(), task, listener, timeout), transportService.getTaskManager(), @@ -212,23 +216,17 @@ public void testReturnBeforeTimeout() throws Exception { boolean success = randomBoolean(); boolean keepOnCompletion = randomBoolean(); CountDownLatch latch = new CountDownLatch(1); - TestRequest request = new TestRequest(success ? randomAlphaOfLength(10) : "die"); - service.asyncExecute( - request, - TimeValue.timeValueMinutes(1), - TimeValue.timeValueMinutes(10), - keepOnCompletion, - ActionListener.wrap(r -> { - assertThat(success, equalTo(true)); - assertThat(r.string, equalTo("response for [" + request.string + "]")); - assertThat(r.id, notNullValue()); - latch.countDown(); - }, e -> { - assertThat(success, equalTo(false)); - assertThat(e.getMessage(), equalTo("test exception")); - latch.countDown(); - }) - ); + TestRequest request = new TestRequest(success ? randomAlphaOfLength(10) : "die", TimeValue.timeValueDays(1)); + service.asyncExecute(request, TimeValue.timeValueMinutes(1), keepOnCompletion, ActionListener.wrap(r -> { + assertThat(success, equalTo(true)); + assertThat(r.string, equalTo("response for [" + request.string + "]")); + assertThat(r.id, notNullValue()); + latch.countDown(); + }, e -> { + assertThat(success, equalTo(false)); + assertThat(e.getMessage(), equalTo("test exception")); + latch.countDown(); + })); assertThat(latch.await(10, TimeUnit.SECONDS), equalTo(true)); } @@ -252,20 +250,14 @@ public void execute(TestRequest request, TestTask task, ActionListener responseHolder = new AtomicReference<>(); - service.asyncExecute( - request, - TimeValue.timeValueMillis(1), - TimeValue.timeValueMinutes(10), - keepOnCompletion, - ActionTestUtils.assertNoFailureListener(r -> { - assertThat(r.string, nullValue()); - assertThat(r.id, notNullValue()); - assertThat(responseHolder.getAndSet(r), nullValue()); - latch.countDown(); - }) - ); + service.asyncExecute(request, TimeValue.timeValueMillis(1), keepOnCompletion, ActionTestUtils.assertNoFailureListener(r -> { + assertThat(r.string, nullValue()); + assertThat(r.id, notNullValue()); + assertThat(responseHolder.getAndSet(r), nullValue()); + latch.countDown(); + })); assertThat(latch.await(20, TimeUnit.SECONDS), equalTo(true)); if (timeoutOnFirstAttempt) { @@ -281,17 +273,11 @@ public void execute(TestRequest request, TestTask task, ActionListener> responseRef = new AtomicReference<>(); - CountDownLatch getResponseCountDown = getResponse( - responseHolder.get().id, - TimeValue.timeValueSeconds(5), - ActionTestUtils.assertNoFailureListener(responseRef::set) - ); + var getFuture = getResponse(responseHolder.get().id, TimeValue.timeValueSeconds(5), TimeValue.MINUS_ONE); executionLatch.countDown(); - assertThat(getResponseCountDown.await(10, TimeUnit.SECONDS), equalTo(true)); + var response = safeGet(getFuture); - StoredAsyncResponse response = responseRef.get(); if (success) { assertThat(response.getException(), nullValue()); assertThat(response.getResponse(), notNullValue()); @@ -326,26 +312,46 @@ public void execute(TestRequest request, TestTask task, ActionListener service = createManagementService(new TestOperation() { + @Override + public void execute(TestRequest request, TestTask task, ActionListener listener) { + executorService.submit(() -> { + try { + assertThat(executionLatch.await(10, TimeUnit.SECONDS), equalTo(true)); + } catch (InterruptedException ex) { + throw new AssertionError(ex); + } + super.execute(request, task, listener); + }); + } + }); + TestRequest request = new TestRequest(randomAlphaOfLength(10), TimeValue.timeValueHours(1)); + PlainActionFuture submitResp = new PlainActionFuture<>(); + try { + service.asyncExecute(request, TimeValue.timeValueMillis(1), true, submitResp); + String id = submitResp.get().id; + assertThat(id, notNullValue()); + TimeValue keepAlive = TimeValue.timeValueDays(between(1, 10)); + var resp1 = safeGet(getResponse(id, TimeValue.ZERO, keepAlive)); + assertThat(resp1.getExpirationTime(), greaterThanOrEqualTo(now + keepAlive.millis())); + } finally { + executionLatch.countDown(); + } + } + private StoredAsyncResponse getResponse(String id, TimeValue timeout) throws InterruptedException { - AtomicReference> response = new AtomicReference<>(); - assertThat( - getResponse(id, timeout, ActionTestUtils.assertNoFailureListener(response::set)).await(10, TimeUnit.SECONDS), - equalTo(true) - ); - return response.get(); + return safeGet(getResponse(id, timeout, TimeValue.MINUS_ONE)); } - private CountDownLatch getResponse(String id, TimeValue timeout, ActionListener> listener) { - CountDownLatch responseLatch = new CountDownLatch(1); + private PlainActionFuture> getResponse(String id, TimeValue timeout, TimeValue keepAlive) { + PlainActionFuture> future = new PlainActionFuture<>(); GetAsyncResultRequest getResultsRequest = new GetAsyncResultRequest(id).setWaitForCompletionTimeout(timeout); - results.retrieveResult(getResultsRequest, ActionListener.wrap(r -> { - listener.onResponse(r); - responseLatch.countDown(); - }, e -> { - listener.onFailure(e); - responseLatch.countDown(); - })); - return responseLatch; + getResultsRequest.setKeepAlive(keepAlive); + results.retrieveResult(getResultsRequest, future); + return future; } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java index 273c84453f00b..60835596e0468 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java @@ -307,6 +307,7 @@ public void testUpdateKeepAlive() throws Exception { assertThat(resp.isRunning(), is(false)); } }); + assertThat(getExpirationFromDoc(asyncId), greaterThanOrEqualTo(nowInMillis + keepAlive.getMillis())); // update the keepAlive after the query has completed int iters = between(1, 5); for (int i = 0; i < iters; i++) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index d19b9b5787885..10f7c24ed4f07 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -187,13 +187,7 @@ protected void doExecute(Task task, EsqlQueryRequest request, ActionListener listener) { assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH); if (requestIsAsync(request)) { - asyncTaskManagementService.asyncExecute( - request, - request.waitForCompletionTimeout(), - request.keepAlive(), - request.keepOnCompletion(), - listener - ); + asyncTaskManagementService.asyncExecute(request, request.waitForCompletionTimeout(), request.keepOnCompletion(), listener); } else { innerExecute(task, request, listener); }