Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add descriptions to bulk tasks #22059

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -48,9 +48,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.Set;

import static org.elasticsearch.action.ValidateActions.addValidationError;

Expand All @@ -73,6 +74,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
* the one with the least casts.
*/
final List<DocWriteRequest> requests = new ArrayList<>();
private final Set<String> indices = new HashSet<>();
List<Object> payloads = null;

protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
Expand Down Expand Up @@ -114,6 +116,7 @@ public BulkRequest add(DocWriteRequest request, @Nullable Object payload) {
} else {
throw new IllegalArgumentException("No support for request [" + request + "]");
}
indices.add(request.index());
return this;
}

Expand Down Expand Up @@ -145,6 +148,7 @@ BulkRequest internalAdd(IndexRequest request, @Nullable Object payload) {
addPayload(payload);
// lack of source is validated in validate() method
sizeInBytes += (request.source() != null ? request.source().length() : 0) + REQUEST_OVERHEAD;
indices.add(request.index());
return this;
}

Expand Down Expand Up @@ -172,6 +176,7 @@ BulkRequest internalAdd(UpdateRequest request, @Nullable Object payload) {
if (request.script() != null) {
sizeInBytes += request.script().getIdOrCode().length() * 2;
}
indices.add(request.index());
return this;
}

Expand All @@ -187,6 +192,7 @@ public BulkRequest add(DeleteRequest request, @Nullable Object payload) {
requests.add(request);
addPayload(payload);
sizeInBytes += REQUEST_OVERHEAD;
indices.add(request.index());
return this;
}

Expand Down Expand Up @@ -548,4 +554,10 @@ public void writeTo(StreamOutput out) throws IOException {
refreshPolicy.writeTo(out);
timeout.writeTo(out);
}

@Override
public String getDescription() {
return "requests[" + requests.size() + "], indices[" + Strings.collectionToDelimitedString(indices, ", ") + "]";
}

}
Expand Up @@ -100,6 +100,11 @@ public String toString() {
return b.toString();
}

@Override
public String getDescription() {
return "requests[" + items.length + "], index[" + index + "]";
}

@Override
public void onRetry() {
for (BulkItemRequest item : items) {
Expand Down
Expand Up @@ -303,7 +303,9 @@ public void testTransportBulkTasks() {
client().prepareBulk().add(client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}")).get();

// the bulk operation should produce one main task
assertEquals(1, numberOfEvents(BulkAction.NAME, Tuple::v1));
List<TaskInfo> topTask = findEvents(BulkAction.NAME, Tuple::v1);
assertEquals(1, topTask.size());
assertEquals("requests[1], indices[test]", topTask.get(0).getDescription());

// we should also get 1 or 2 [s] operation with main operation as a parent
// in case the primary is located on the coordinating node we will have 1 operation, otherwise - 2
Expand All @@ -317,17 +319,20 @@ public void testTransportBulkTasks() {
shardTask = shardTasks.get(0);
// and it should have the main task as a parent
assertParentTask(shardTask, findEvents(BulkAction.NAME, Tuple::v1).get(0));
assertEquals("requests[1], index[test]", shardTask.getDescription());
} else {
if (shardTasks.get(0).getParentTaskId().equals(shardTasks.get(1).getTaskId())) {
// task 1 is the parent of task 0, that means that task 0 will control [s][p] and [s][r] tasks
shardTask = shardTasks.get(0);
// in turn the parent of the task 1 should be the main task
assertParentTask(shardTasks.get(1), findEvents(BulkAction.NAME, Tuple::v1).get(0));
assertEquals("requests[1], index[test]", shardTask.getDescription());
} else {
// otherwise task 1 will control [s][p] and [s][r] tasks
shardTask = shardTasks.get(1);
// in turn the parent of the task 0 should be the main task
assertParentTask(shardTasks.get(0), findEvents(BulkAction.NAME, Tuple::v1).get(0));
assertEquals("requests[1], index[test]", shardTask.getDescription());
}
}

Expand Down