diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java index 1c33a7e183e5a..8c764bd253e8c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ReindexIT.java @@ -50,6 +50,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -283,7 +284,7 @@ public void onFailure(Exception e) { } }); - TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME); + TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME, updateByQueryRequest.getDescription()); float requestsPerSecond = 1000f; ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync); @@ -414,7 +415,7 @@ public void onFailure(Exception e) { } }); - TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME); + TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME, deleteByQueryRequest.getDescription()); float requestsPerSecond = 1000f; ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond), highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync); @@ -477,7 +478,7 @@ public void testDeleteByQueryTask() throws Exception { } } - private static TaskId findTaskToRethrottle(String actionName) throws IOException { + private static TaskId findTaskToRethrottle(String actionName, String description) throws IOException { long start = System.nanoTime(); ListTasksRequest request = new ListTasksRequest(); request.setActions(actionName); @@ -485,13 +486,16 @@ private static TaskId findTaskToRethrottle(String actionName) throws IOException do { ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT); list.rethrowFailures("Finding tasks to rethrottle"); + List taskGroups = + list.getTaskGroups().stream() + .filter(taskGroup -> taskGroup.getTaskInfo().getDescription().equals(description)).collect(Collectors.toList()); assertThat("tasks are left over from the last execution of this test", - list.getTaskGroups(), hasSize(lessThan(2))); - if (0 == list.getTaskGroups().size()) { + taskGroups, hasSize(lessThan(2))); + if (0 == taskGroups.size()) { // The parent task hasn't started yet continue; } - TaskGroup taskGroup = list.getTaskGroups().get(0); + TaskGroup taskGroup = taskGroups.get(0); assertThat(taskGroup.getChildTasks(), empty()); return taskGroup.getTaskInfo().getTaskId(); } while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));