From 0be736277319abb3c073592f5a0db8303f2465e2 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Mon, 18 Aug 2025 11:13:23 -0400 Subject: [PATCH] Update TasksIT for batched execution (#132762) --- .../action/admin/cluster/node/tasks/TasksIT.java | 12 ++++++++---- .../search/SearchQueryThenFetchAsyncAction.java | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index b2ba1d34e3280..f9651c71ecf13 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -41,7 +41,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.RemovedTaskListener; import org.elasticsearch.tasks.Task; @@ -82,6 +81,8 @@ import static java.util.Collections.singleton; import static org.elasticsearch.action.admin.cluster.node.tasks.TestTaskPlugin.TEST_TASK_ACTION; import static org.elasticsearch.action.admin.cluster.node.tasks.TestTaskPlugin.UNBLOCK_TASK_ACTION; +import static org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction.NODE_SEARCH_ACTION_NAME; +import static org.elasticsearch.action.search.SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME; import static org.elasticsearch.core.TimeValue.timeValueMillis; import static org.elasticsearch.core.TimeValue.timeValueSeconds; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; @@ -353,8 +354,6 @@ public void testTransportBulkTasks() { } public void testSearchTaskDescriptions() { - // TODO: enhance this test to also check the tasks created by batched query execution - updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false)); registerTaskManagerListeners(TransportSearchAction.TYPE.name()); // main task registerTaskManagerListeners(TransportSearchAction.TYPE.name() + "[*]"); // shard task createIndex("test"); @@ -380,6 +379,11 @@ public void testSearchTaskDescriptions() { // check that if we have any shard-level requests they all have non-zero length description List shardTasks = findEvents(TransportSearchAction.TYPE.name() + "[*]", Tuple::v1); for (TaskInfo taskInfo : shardTasks) { + // During batched query execution, if a partial reduction was done on the data node, a task will be created to free the reader. + // These tasks don't have descriptions or parent tasks, so they're ignored for this test. + if (taskInfo.action().equals(FREE_CONTEXT_SCROLL_ACTION_NAME)) { + continue; + } assertThat(taskInfo.parentTaskId(), notNullValue()); assertEquals(mainTask.get(0).taskId(), taskInfo.parentTaskId()); assertTaskHeaders(taskInfo); @@ -396,12 +400,12 @@ public void testSearchTaskDescriptions() { taskInfo.description(), Regex.simpleMatch("id[*], size[1], lastEmittedDoc[null]", taskInfo.description()) ); + case NODE_SEARCH_ACTION_NAME -> assertEquals("NodeQueryRequest", taskInfo.description()); default -> fail("Unexpected action [" + taskInfo.action() + "] with description [" + taskInfo.description() + "]"); } // assert that all task descriptions have non-zero length assertThat(taskInfo.description().length(), greaterThan(0)); } - updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey())); } public void testSearchTaskHeaderLimit() { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 8d763698c63c0..b2d5693762dca 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -552,7 +552,7 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP } } - private static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]"; + public static final String NODE_SEARCH_ACTION_NAME = "indices:data/read/search[query][n]"; static void registerNodeSearchAction( SearchTransportService searchTransportService,