Skip to content

Commit

Permalink
ReindexIT wait for task to really start (#73018)
Browse files Browse the repository at this point in the history
Reindex and friends have tasks that start but are not ready to
rethrottle before they figured out if they are leader or worker
tasks. Now wait for the task to fully start before rethrottling.

Also added additional assertions to help see if the inability
to rethrottle is caused by some failure.

Closes #60811
  • Loading branch information
henningandersen committed Jul 1, 2021
1 parent e31d5f8 commit decae8a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.tasks.RawTaskStatus;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass;
Expand Down Expand Up @@ -324,7 +325,10 @@ protected static TaskId findTaskToRethrottle(String actionName, String descripti
}
TaskGroup taskGroup = taskGroups.get(0);
assertThat(taskGroup.getChildTasks(), empty());
return taskGroup.getTaskInfo().getTaskId();
// check that the task initialized enough that it can rethrottle too.
if (((RawTaskStatus) taskGroup.getTaskInfo().getStatus()).toMap().containsKey("batches")) {
return taskGroup.getTaskInfo().getTaskId();
}
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ public void onFailure(Exception e) {
float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::deleteByQueryRethrottle, highLevelClient()::deleteByQueryRethrottleAsync);
assertThat(response.getTaskFailures(), empty());
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTasks(), hasSize(1));
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
Expand Down

0 comments on commit decae8a

Please sign in to comment.