diff --git a/muted-tests.yml b/muted-tests.yml index edc13f3c47b78..9cc1264bbd5b3 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -232,9 +232,6 @@ tests: - class: org.elasticsearch.xpack.inference.DefaultEndPointsIT method: testInferDeploysDefaultElser issue: https://github.com/elastic/elasticsearch/issues/114913 -- class: org.elasticsearch.xpack.esql.action.EsqlActionTaskIT - method: testCancelRequestWhenFailingFetchingPages - issue: https://github.com/elastic/elasticsearch/issues/117397 - class: org.elasticsearch.xpack.security.operator.OperatorPrivilegesIT method: testEveryActionIsEitherOperatorOnlyOrNonOperator issue: https://github.com/elastic/elasticsearch/issues/102992 diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index 460ab0f5b8b38..56453a291ea81 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -392,12 +392,13 @@ protected void doRun() throws Exception { .get(); ensureYellowAndNoInitializingShards("test"); request.query("FROM test | LIMIT 10"); - request.pragmas(randomPragmas()); + QueryPragmas pragmas = randomPragmas(); + request.pragmas(pragmas); PlainActionFuture future = new PlainActionFuture<>(); client.execute(EsqlQueryAction.INSTANCE, request, future); ExchangeService exchangeService = internalCluster().getInstance(ExchangeService.class, dataNode); - boolean waitedForPages; - final String sessionId; + final boolean waitedForPages; + final String exchangeId; try { List foundTasks = new ArrayList<>(); assertBusy(() -> { @@ -411,13 +412,22 @@ protected void doRun() throws Exception { assertThat(tasks, hasSize(1)); foundTasks.addAll(tasks); }); - sessionId = foundTasks.get(0).taskId().toString(); + final String sessionId = foundTasks.get(0).taskId().toString(); assertTrue(fetchingStarted.await(1, TimeUnit.MINUTES)); - String exchangeId = exchangeService.sinkKeys().stream().filter(s -> s.startsWith(sessionId)).findFirst().get(); + List sinkKeys = exchangeService.sinkKeys() + .stream() + .filter( + s -> s.startsWith(sessionId) + // exclude the node-level reduction sink + && s.endsWith("[n]") == false + ) + .toList(); + assertThat(sinkKeys.toString(), sinkKeys.size(), equalTo(1)); + exchangeId = sinkKeys.get(0); ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(exchangeId); waitedForPages = randomBoolean(); if (waitedForPages) { - // do not fail exchange requests until we have some pages + // do not fail exchange requests until we have some pages. assertBusy(() -> assertThat(exchangeSink.bufferSize(), greaterThan(0))); } } finally { @@ -429,7 +439,7 @@ protected void doRun() throws Exception { // As a result, the exchange sinks on data-nodes won't be removed until the inactive_timeout elapses, which is // longer than the assertBusy timeout. if (waitedForPages == false) { - exchangeService.finishSinkHandler(sessionId, failure); + exchangeService.finishSinkHandler(exchangeId, failure); } } finally { transportService.clearAllRules();