Skip to content

Commit

Permalink
Rest high level ReindexIT fix (#60834)
Browse files Browse the repository at this point in the history
ReindexIT would rethrottle any delete or update by query task, fixed to
more precisely match the task started by the test.

Closes #60811
  • Loading branch information
henningandersen committed Aug 11, 2020
1 parent 01c6c9b commit 00fc0d2
Showing 1 changed file with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -283,7 +284,7 @@ public void onFailure(Exception e) {
}
});

TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME);
TaskId taskIdToRethrottle = findTaskToRethrottle(UpdateByQueryAction.NAME, updateByQueryRequest.getDescription());
float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::updateByQueryRethrottle, highLevelClient()::updateByQueryRethrottleAsync);
Expand Down Expand Up @@ -414,7 +415,7 @@ public void onFailure(Exception e) {
}
});

TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME);
TaskId taskIdToRethrottle = findTaskToRethrottle(DeleteByQueryAction.NAME, deleteByQueryRequest.getDescription());
float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
Expand Down Expand Up @@ -477,21 +478,24 @@ public void testDeleteByQueryTask() throws Exception {
}
}

private static TaskId findTaskToRethrottle(String actionName) throws IOException {
private static TaskId findTaskToRethrottle(String actionName, String description) throws IOException {
long start = System.nanoTime();
ListTasksRequest request = new ListTasksRequest();
request.setActions(actionName);
request.setDetailed(true);
do {
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
list.rethrowFailures("Finding tasks to rethrottle");
List<TaskGroup> taskGroups =
list.getTaskGroups().stream()
.filter(taskGroup -> taskGroup.getTaskInfo().getDescription().equals(description)).collect(Collectors.toList());
assertThat("tasks are left over from the last execution of this test",
list.getTaskGroups(), hasSize(lessThan(2)));
if (0 == list.getTaskGroups().size()) {
taskGroups, hasSize(lessThan(2)));
if (0 == taskGroups.size()) {
// The parent task hasn't started yet
continue;
}
TaskGroup taskGroup = list.getTaskGroups().get(0);
TaskGroup taskGroup = taskGroups.get(0);
assertThat(taskGroup.getChildTasks(), empty());
return taskGroup.getTaskInfo().getTaskId();
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
Expand Down

0 comments on commit 00fc0d2

Please sign in to comment.