Skip to content

Commit

Permalink
Fix TaskIT (#64724) (#64848)
Browse files Browse the repository at this point in the history
Closes #64056
  • Loading branch information
tlrx committed Nov 10, 2020
1 parent 21a6a11 commit 09ff421
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@
import org.elasticsearch.client.ml.job.config.MlFilter;
import org.elasticsearch.client.ml.job.process.ModelSnapshot;
import org.elasticsearch.client.ml.job.stats.JobStats;
import org.elasticsearch.client.tasks.GetTaskRequest;
import org.elasticsearch.client.tasks.GetTaskResponse;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand All @@ -187,6 +189,7 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.TaskId;
import org.junit.After;

import java.io.IOException;
Expand All @@ -198,6 +201,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -299,7 +303,18 @@ public void testDeleteJob_GivenWaitForCompletionIsFalse() throws Exception {
DeleteJobResponse response = execute(deleteJobRequest, machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync);

assertNull(response.getAcknowledged());
assertNotNull(response.getTask());

final TaskId taskId = response.getTask();
assertNotNull(taskId);

// When wait_for_completion=false the DeleteJobAction stored the task result in the .tasks index. In tests we need to wait
// for the delete job task to complete, otherwise the .tasks index could be created during the execution of a following test.
final GetTaskRequest taskRequest = new GetTaskRequest(taskId.getNodeId(), taskId.getId());
assertBusy(() -> {
Optional<GetTaskResponse> taskResponse = highLevelClient().tasks().get(taskRequest, RequestOptions.DEFAULT);
assertTrue(taskResponse.isPresent());
assertTrue(taskResponse.get().isCompleted());
}, 30L, TimeUnit.SECONDS);
}

public void testOpenJob() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@
import org.elasticsearch.client.tasks.GetTaskRequest;
import org.elasticsearch.client.tasks.GetTaskResponse;
import org.elasticsearch.client.tasks.TaskId;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -70,11 +71,10 @@ public void testListTasks() throws IOException {
}
assertTrue("List tasks were not found", listTasksFound);
}

public void testGetValidTask() throws Exception {

// Run a Reindex to create a task

final String sourceIndex = "source1";
final String destinationIndex = "dest";
Settings settings = Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0).build();
Expand All @@ -85,34 +85,19 @@ public void testGetValidTask() throws Exception {
.add(new IndexRequest(sourceIndex).id("2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
assertEquals(RestStatus.OK, highLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT).status());

// (need to use low level client because currently high level client
// doesn't support async return of task id - needs
// https://github.com/elastic/elasticsearch/pull/35202 )
RestClient lowClient = highLevelClient().getLowLevelClient();
Request request = new Request("POST", "_reindex");
request.addParameter("wait_for_completion", "false");
request.setJsonEntity(
"{"
+ " \"source\": {\n"
+ " \"index\": \"source1\"\n"
+ " },\n"
+ " \"dest\": {\n"
+ " \"index\": \"dest\"\n"
+ " }"
+ "}"
);
Response response = lowClient.performRequest(request);
Map<String, Object> map = entityAsMap(response);
Object taskId = map.get("task");

final ReindexRequest reindexRequest = new ReindexRequest().setSourceIndices(sourceIndex).setDestIndex(destinationIndex);
final TaskSubmissionResponse taskSubmissionResponse = highLevelClient().submitReindexTask(reindexRequest, RequestOptions.DEFAULT);

final String taskId = taskSubmissionResponse.getTask();
assertNotNull(taskId);

TaskId childTaskId = new TaskId(taskId.toString());
TaskId childTaskId = new TaskId(taskId);
GetTaskRequest gtr = new GetTaskRequest(childTaskId.getNodeId(), childTaskId.getId());
gtr.setWaitForCompletion(randomBoolean());
Optional<GetTaskResponse> getTaskResponse = execute(gtr, highLevelClient().tasks()::get, highLevelClient().tasks()::getAsync);
assertTrue(getTaskResponse.isPresent());
GetTaskResponse taskResponse = getTaskResponse.get();
GetTaskResponse taskResponse = getTaskResponse.get();
if (gtr.getWaitForCompletion()) {
assertTrue(taskResponse.isCompleted());
}
Expand All @@ -121,15 +106,15 @@ public void testGetValidTask() throws Exception {
assertEquals("reindex from [source1] to [dest][_doc]", info.getDescription());
assertEquals("indices:data/write/reindex", info.getAction());
if (taskResponse.isCompleted() == false) {
assertBusy(checkTaskCompletionStatus(client(), taskId.toString()));
assertBusy(checkTaskCompletionStatus(client(), taskId));
}
}
}

public void testGetInvalidTask() throws IOException {
// Check 404s are returned as empty Optionals
GetTaskRequest gtr = new GetTaskRequest("doesNotExistNodeName", 123);
GetTaskRequest gtr = new GetTaskRequest("doesNotExistNodeName", 123);
Optional<GetTaskResponse> getTaskResponse = execute(gtr, highLevelClient().tasks()::get, highLevelClient().tasks()::getAsync);
assertFalse(getTaskResponse.isPresent());
assertFalse(getTaskResponse.isPresent());
}

public void testCancelTasks() throws IOException {
Expand Down

0 comments on commit 09ff421

Please sign in to comment.