diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 5351579278db5..843e718a94e2c 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -48,9 +48,10 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; +import java.util.Set; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -73,6 +74,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques * the one with the least casts. */ final List requests = new ArrayList<>(); + private final Set indices = new HashSet<>(); List payloads = null; protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT; @@ -114,6 +116,7 @@ public BulkRequest add(DocWriteRequest request, @Nullable Object payload) { } else { throw new IllegalArgumentException("No support for request [" + request + "]"); } + indices.add(request.index()); return this; } @@ -145,6 +148,7 @@ BulkRequest internalAdd(IndexRequest request, @Nullable Object payload) { addPayload(payload); // lack of source is validated in validate() method sizeInBytes += (request.source() != null ? request.source().length() : 0) + REQUEST_OVERHEAD; + indices.add(request.index()); return this; } @@ -172,6 +176,7 @@ BulkRequest internalAdd(UpdateRequest request, @Nullable Object payload) { if (request.script() != null) { sizeInBytes += request.script().getIdOrCode().length() * 2; } + indices.add(request.index()); return this; } @@ -187,6 +192,7 @@ public BulkRequest add(DeleteRequest request, @Nullable Object payload) { requests.add(request); addPayload(payload); sizeInBytes += REQUEST_OVERHEAD; + indices.add(request.index()); return this; } @@ -548,4 +554,10 @@ public void writeTo(StreamOutput out) throws IOException { refreshPolicy.writeTo(out); timeout.writeTo(out); } + + @Override + public String getDescription() { + return "requests[" + requests.size() + "], indices[" + Strings.collectionToDelimitedString(indices, ", ") + "]"; + } + } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java index 25366d034ca04..d53e9f8997ef7 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java @@ -100,6 +100,11 @@ public String toString() { return b.toString(); } + @Override + public String getDescription() { + return "requests[" + items.length + "], index[" + index + "]"; + } + @Override public void onRetry() { for (BulkItemRequest item : items) { diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 994cb3b430813..7f490ebab9050 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -303,7 +303,9 @@ public void testTransportBulkTasks() { client().prepareBulk().add(client().prepareIndex("test", "doc", "test_id").setSource("{\"foo\": \"bar\"}")).get(); // the bulk operation should produce one main task - assertEquals(1, numberOfEvents(BulkAction.NAME, Tuple::v1)); + List topTask = findEvents(BulkAction.NAME, Tuple::v1); + assertEquals(1, topTask.size()); + assertEquals("requests[1], indices[test]", topTask.get(0).getDescription()); // we should also get 1 or 2 [s] operation with main operation as a parent // in case the primary is located on the coordinating node we will have 1 operation, otherwise - 2 @@ -317,17 +319,20 @@ public void testTransportBulkTasks() { shardTask = shardTasks.get(0); // and it should have the main task as a parent assertParentTask(shardTask, findEvents(BulkAction.NAME, Tuple::v1).get(0)); + assertEquals("requests[1], index[test]", shardTask.getDescription()); } else { if (shardTasks.get(0).getParentTaskId().equals(shardTasks.get(1).getTaskId())) { // task 1 is the parent of task 0, that means that task 0 will control [s][p] and [s][r] tasks shardTask = shardTasks.get(0); // in turn the parent of the task 1 should be the main task assertParentTask(shardTasks.get(1), findEvents(BulkAction.NAME, Tuple::v1).get(0)); + assertEquals("requests[1], index[test]", shardTask.getDescription()); } else { // otherwise task 1 will control [s][p] and [s][r] tasks shardTask = shardTasks.get(1); // in turn the parent of the task 0 should be the main task assertParentTask(shardTasks.get(0), findEvents(BulkAction.NAME, Tuple::v1).get(0)); + assertEquals("requests[1], index[test]", shardTask.getDescription()); } }