Added `_shards` header to all write responses. #7994

Merged
merged 1 commit into from Jan 8, 2015

Projects

None yet

5 participants

@martijnvg
Member

The header indicates to how many shard copies (primary and replicas shards) a write was supposed to go to, to how many shard copies to write succeeded and potentially captures shard failures if writing into a replica shard fails.

For async writes it also includes the number of shards a write is still pending.

@bleskes bleskes and 1 other commented on an outdated diff Oct 8, 2014
docs/reference/docs/index_.asciidoc
@@ -27,6 +31,16 @@ The result of the above index operation is:
}
--------------------------------------------------
+The `_shards` header provides information about the replication process of the index operation.
+* `total` - Indicates to how many shard copies (primary and replica shards) the index operation should be executed on.
+* `successful`- Indicates the number of shard copies to index operation succeeded on.
+* `pending` - Indicates to how many shard copies this index operation still needs to go to at the time index operation
+ succeeded on the primary shard. This field is only returned if `async` replication is used.
+* `failures` - An array that contains replication related errors in the case an index operation failed on a replica shard.
@bleskes
bleskes Oct 8, 2014 Member

I think we need failed: INT , just like in search.

@martijnvg
martijnvg Oct 8, 2014 Member

+1 I'll add that.

@bleskes bleskes commented on an outdated diff Oct 8, 2014
docs/reference/migration/migrate_1_x.asciidoc
+pre `1.4.0.Beta1` behaviour can be enabled, see: <<modules-discovery-zen,no-master-block>>
+
+[[breaking-changes-1.5]]
+=== 1.5
+
+==== Delete by query
+
+The meaning of the `_shards` headers in the delete by query response has changed. Before version `1.5.0` the `total`,
+`successful` and `failed` fields in the header are based on the number of primary shards. The failures on replica
+shards aren't being kept track of. From version `1.5.0` the stats in the `_shards` header are based on all shards
+in an index. The http result code is remains to be based on the failures that have occurred in the primary shards.
@bleskes
bleskes Oct 8, 2014 Member

i think this should be all shards of an index. Also "remains to be based on" should be "The http status code is left unchanged and is only based on failures that occurred while executing on primary shards".

@bleskes bleskes commented on an outdated diff Oct 8, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Base class for write action responses.
+ */
+public abstract class ActionWriteResponse extends ActionResponse {
+
+ private ShardInfo shardInfo;
+
+ @Override
+ public void readFrom(StreamInput in) throws IOException {
+ super.readFrom(in);
+ if (in.getVersion().onOrAfter(Version.V_1_5_0)) {
@bleskes
bleskes Oct 8, 2014 Member

how do we deal with earlier versions? Is it OK for shardInfo to be null? do we want a marker Unknown instance?

@bleskes
bleskes Oct 8, 2014 Member

I see some places where null is not protected against...

@bleskes bleskes commented on an outdated diff Oct 8, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+ private int pending;
+ private Failure[] failures = new Failure[0];
+
+ /**
+ * @return the total number of shards the write should go to.
+ */
+ public int getTotal() {
+ return total;
+ }
+
+ public void setTotal(int total) {
+ this.total = total;
+ }
+
+ /**
+ * @return the total number of shards the write actually succeeded on.
@bleskes
bleskes Oct 8, 2014 Member

drop the actually? sounds so "uncertain" :)

@javanna javanna and 3 others commented on an outdated diff Oct 8, 2014
docs/reference/docs/delete-by-query.asciidoc
@@ -32,9 +32,8 @@ commands is:
"_indices" : {
"twitter" : {
"_shards" : {
- "total" : 5,
- "successful" : 5,
- "failed" : 0
+ "total" : 10,
+ "successful" : 10,
@javanna
javanna Oct 8, 2014 Member

is it correct that the failed is gone?

@martijnvg
martijnvg Oct 8, 2014 Member

I'll add it back :-)

(my idea was that failures.length would be enough, but that wouldn't be consistent with the _shards header in search and percolate apis)

@javanna
javanna Oct 10, 2014 Member

I think you have to add back the failed to the docs too then? That means there are no changes to the delete by query json response?

@bleskes
bleskes Dec 1, 2014 Member

failed was supposed to come back no?

@imotov
imotov Jan 6, 2015 Member

+1 on adding failed back to docs.

@javanna javanna and 1 other commented on an outdated diff Oct 8, 2014
...ices/mapping/delete/TransportDeleteMappingAction.java
@@ -147,10 +148,10 @@ public void onResponse(FlushResponse flushResponse) {
public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
if (logger.isTraceEnabled()) {
for (IndexDeleteByQueryResponse indexResponse : deleteByQueryResponse) {
- logger.trace("Delete by query[{}] completed with total[{}], successful[{}] and failed[{}]", indexResponse.getIndex(), indexResponse.getTotalShards(), indexResponse.getSuccessfulShards(), indexResponse.getFailedShards());
- if (indexResponse.getFailedShards() > 0) {
- for (ShardOperationFailedException failure : indexResponse.getFailures()) {
- logger.trace("[{}/{}] Delete by query shard failure reason: {}", failure.index(), failure.shardId(), failure.reason());
+ logger.trace("Delete by query[{}] completed with total[{}], successful[{}] and failed[{}]", indexResponse.getIndex(), indexResponse.getShardInfo().getTotal(), indexResponse.getShardInfo().getSuccessful(), indexResponse.getShardInfo().getFailures().length);
+ if (indexResponse.getShardInfo().getFailures().length > 0) {
+ for (ActionWriteResponse.ShardInfo.Failure failure : indexResponse.getShardInfo().getFailures()) {
+ logger.trace("[{}/{}/] Delete by query shard failure reason: {}", failure.getIndex(), failure.getShardId(), failure.getNodeId(), failure.getReason());
@javanna
javanna Oct 8, 2014 Member

missing an {} somewhere?

@martijnvg
martijnvg Oct 8, 2014 Member

true! I'll fix that.

@javanna javanna and 1 other commented on an outdated diff Oct 8, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+import org.elasticsearch.rest.RestStatus;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Base class for write action responses.
+ */
+public abstract class ActionWriteResponse extends ActionResponse {
+
+ private ShardInfo shardInfo;
@javanna
javanna Oct 8, 2014 Member

I'd consider adding a constructor with shardInfo and changing the subclasses constructors to accept it there, just to enforce that this info is needed so we don't forget it anywhere. Thoughts?

@martijnvg
martijnvg Oct 8, 2014 Member

+1 make sense.

@bleskes bleskes commented on an outdated diff Oct 8, 2014
...ication/TransportShardReplicationOperationAction.java
finishIfPossible();
}
@Override
public void handleException(TransportException exp) {
+ response.shardReplicaFailures.put(nodeId, exp);
@bleskes
bleskes Oct 8, 2014 Member

we don't count shard not allocated / not started/ closed etc. as shard failures - see Search logic. This will end up as a difference between total shards and shard failed. The reason is that there is no way to distinguish this case with the one that our cluster state said they were unassigned.

@bleskes bleskes commented on an outdated diff Oct 8, 2014
...ication/TransportShardReplicationOperationAction.java
@@ -737,10 +731,12 @@ protected void doRun() {
try {
shardOperationOnReplica(shardRequest);
} catch (Throwable e) {
+ response.shardReplicaFailures.put(nodeId, e);
@bleskes
bleskes Oct 8, 2014 Member

same here. Not all failures are counted.

@bleskes bleskes commented on an outdated diff Oct 8, 2014
...ication/TransportShardReplicationOperationAction.java
@@ -754,21 +750,24 @@ public boolean isForceExecution() {
public void onFailure(Throwable t) {}
});
} catch (Throwable e) {
+ response.shardReplicaFailures.put(nodeId, e);
@bleskes
bleskes Oct 8, 2014 Member

same here. Not all failures are counted.

@bleskes bleskes commented on an outdated diff Oct 8, 2014
...ication/TransportShardReplicationOperationAction.java
failReplicaIfNeeded(shard.index(), shard.id(), e);
}
+ response.successFullShardCopyCount.incrementAndGet();
@bleskes
bleskes Oct 8, 2014 Member

This is not necessarily a success, right?

@martijnvg
Member

@javanna @bleskes I updated the PR and the provided feedback has been applied.

The other most noticeable change is that the replication logic in TransportShardReplicationOperationAction has been changed to keep better track of the state of replication while it is being performed. This helped to simplify the replication logic.

@javanna javanna and 1 other commented on an outdated diff Oct 10, 2014
docs/reference/docs/delete.asciidoc
@@ -16,6 +16,10 @@ The result of the above delete operation is:
[source,js]
--------------------------------------------------
{
+ "_shards" : {
+ "total" : 10,
+ "successful" : 10
@javanna
javanna Oct 10, 2014 Member

missing failed?

@martijnvg
martijnvg Oct 10, 2014 Member

I added it to the code, I'll add it to the examples as well!

@javanna javanna commented on an outdated diff Oct 10, 2014
docs/reference/docs/index_.asciidoc
@@ -19,6 +19,10 @@ The result of the above index operation is:
[source,js]
--------------------------------------------------
{
+ "_shards" : {
+ "total" : 10,
+ "successful" : 10
@javanna
javanna Oct 10, 2014 Member

missing failed?

@javanna javanna and 1 other commented on an outdated diff Oct 10, 2014
.../action/deletebyquery/IndexDeleteByQueryResponse.java
this.index = index;
- this.successfulShards = successfulShards;
- this.failedShards = failedShards;
- if (failures == null || failures.isEmpty()) {
- this.failures = new DefaultShardOperationFailedException[0];
- } else {
- this.failures = failures.toArray(new ShardOperationFailedException[failures.size()]);
+ this.shardInfo = new ActionWriteResponse.ShardInfo();
+ this.shardInfo.append(shardResponses);
+ // just append the primary failures:
+ if (!failures.isEmpty()) {
+ List<ActionWriteResponse.ShardInfo.Failure> k = new ArrayList<>();
@javanna
javanna Oct 10, 2014 Member

can you rename k to something a bit more meaningful? :)

@martijnvg
martijnvg Oct 10, 2014 Member

heh :) will do

@javanna javanna and 1 other commented on an outdated diff Oct 10, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentBuilderString;
+import org.elasticsearch.rest.RestStatus;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Base class for write action responses.
+ */
+public abstract class ActionWriteResponse extends ActionResponse {
+
+ private ShardInfo shardInfo = new ShardInfo();
@javanna
javanna Oct 10, 2014 Member

I think we discussed this before, but it didn't change, thus I'm bringing it up again ;) can we add a constructor that accepts shardInfo as argument and change the subclasses constructors to accept it there, just to enforce that this info is needed so we don't forget it anywhere. Maybe then we could also remove the setter...

@martijnvg
martijnvg Oct 10, 2014 Member

The responses are created on the primary shard while the _shards header is constructed and modified while replication is in progress, that makes a bit weird if we supply it when the action of the primary has completed.

Also on certain responses (delete by query) we need to aggregate the shard responses's _shards header and supply it a level higher up. (same from index response to indices response). So that is why the setter is there.

@javanna
javanna Oct 10, 2014 Member

I see, thanks for the explanation!

@bleskes bleskes commented on an outdated diff Oct 13, 2014
...ication/TransportShardReplicationOperationAction.java
@@ -585,36 +568,24 @@ void performReplicas(final PrimaryResponse<Response, ReplicaRequest> response) {
shardIt.reset();
}
- // initialize the counter
- int replicaCounter = shardIt.assignedReplicasIncludingRelocating();
-
+ ReplicationState status = new ReplicationState(por, shardIt, primaryResponse);
@bleskes
bleskes Oct 13, 2014 Member

Can we either rename ReplicationState to ReplicationStatus or change status to replicationState?

@bleskes bleskes commented on an outdated diff Oct 13, 2014
...ication/TransportShardReplicationOperationAction.java
if (newPrimaryShard != null) {
- replicaCounter++;
+ numberOfShardInstance++;
@bleskes
bleskes Oct 13, 2014 Member

Nit picking - can we call this numberOfShardInstances?

@bleskes bleskes commented on an outdated diff Oct 13, 2014
...ication/TransportShardReplicationOperationAction.java
}
- if (replicaCounter == 0) {
- postPrimaryOperation(internalRequest, response);
- listener.onResponse(response.response());
+ status.onStartReplication(numberOfShardInstance, listener);
@bleskes
bleskes Oct 13, 2014 Member

I'm a bit worried about the differences in the total count (based on shardIt.size()) and the amount of calls we actually make (based on numberOfShardInstace). We may end up in a situation where successful > total and maybe worse successful == total but they mean different shards. I think we should not base the counts in ReplicationState on the ShardIt but rather on the total of actual work + unassigned.

@bleskes bleskes commented on an outdated diff Oct 13, 2014
...ication/TransportShardReplicationOperationAction.java
- public PrimaryResponse(ReplicaRequest replicaRequest, Response response, Object payload) {
- this.replicaRequest = replicaRequest;
- this.response = response;
- this.payload = payload;
+ public void onReplicaFailure(String nodeId, Throwable e) {
@bleskes
bleskes Oct 13, 2014 Member

Can we add @Nullable to the e param?

@bleskes bleskes commented on an outdated diff Oct 13, 2014
...ication/TransportShardReplicationOperationAction.java
- public PrimaryResponse(ReplicaRequest replicaRequest, Response response, Object payload) {
- this.replicaRequest = replicaRequest;
- this.response = response;
- this.payload = payload;
+ public void onReplicaFailure(String nodeId, Throwable e) {
+ // Only version conflict should be ignored from being put into the _shards header?
@bleskes
bleskes Oct 13, 2014 Member

We should call ignoreReplicaException() imho.

@bleskes bleskes commented on an outdated diff Oct 13, 2014
...ication/TransportShardReplicationOperationAction.java
}
- public Object payload() {
- return payload;
+ private void finishIfNeeded() {
+ if (pending.decrementAndGet() == 0) {
+ doFinish();
+ }
+ }
+
+ private void doFinish() {
+ if (finished.compareAndSet(false, true)) {
+ ActionWriteResponse.ShardInfo shardInfo = finalResponse.getShardInfo();
@bleskes
bleskes Oct 13, 2014 Member

I think it will be cleaner to have ShardInfo have a constructor that takes all parameters and set that on the finalResponse. This will make sure we will not forget anything in the future.

@bleskes bleskes commented on an outdated diff Oct 13, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+ }
+
+ public static class Failure implements Streamable, ToXContent {
+
+ private String index;
+ private int shardId;
+ private String nodeId;
+ private String reason;
+ private RestStatus status;
+
+ public Failure(String index, int shardId, String nodeId, String reason) {
+ this.index = index;
+ this.shardId = shardId;
+ this.nodeId = nodeId;
+ this.reason = reason;
+ this.status = RestStatus.OK; // <-- Replica failures are ok and can happen.
@bleskes
bleskes Oct 13, 2014 Member

We suppress and not report all errors which are OK. I don't think we need a special protection here about it.

@bleskes bleskes commented on an outdated diff Oct 13, 2014
...ication/TransportShardReplicationOperationAction.java
+ private void finishIfNeeded() {
+ if (pending.decrementAndGet() == 0) {
+ doFinish();
+ }
+ }
+
+ private void doFinish() {
+ if (finished.compareAndSet(false, true)) {
+ ActionWriteResponse.ShardInfo shardInfo = finalResponse.getShardInfo();
+ shardInfo.setPending(pending.get());
+ shardInfo.setTotal(shardIterator.size());
+ List<ActionWriteResponse.ShardInfo.Failure> failures = new ArrayList<>();
+ for (Map.Entry<String, Throwable> entry : shardReplicaFailures.entrySet()) {
+ String reason = ExceptionsHelper.detailedMessage(entry.getValue());
+ ShardId shardId = shardIterator.shardId();
+ failures.add(new ActionWriteResponse.ShardInfo.Failure(shardId.getIndex(), shardId.getId(), entry.getKey(), reason));
@bleskes
bleskes Oct 13, 2014 Member

We should try to take the rest status from the exception. See ShardSearchFailure

@bleskes bleskes commented on an outdated diff Oct 13, 2014
...ices/mapping/delete/TransportDeleteMappingAction.java
@@ -147,10 +148,10 @@ public void onResponse(FlushResponse flushResponse) {
public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
if (logger.isTraceEnabled()) {
for (IndexDeleteByQueryResponse indexResponse : deleteByQueryResponse) {
- logger.trace("Delete by query[{}] completed with total[{}], successful[{}] and failed[{}]", indexResponse.getIndex(), indexResponse.getTotalShards(), indexResponse.getSuccessfulShards(), indexResponse.getFailedShards());
- if (indexResponse.getFailedShards() > 0) {
- for (ShardOperationFailedException failure : indexResponse.getFailures()) {
- logger.trace("[{}/{}] Delete by query shard failure reason: {}", failure.index(), failure.shardId(), failure.reason());
+ logger.trace("Delete by query for index [{}] completed with total[{}], successful[{}] and failed[{}]", indexResponse.getIndex(), indexResponse.getShardInfo().getTotal(), indexResponse.getShardInfo().getSuccessful(), indexResponse.getShardInfo().getFailed());
+ if (indexResponse.getShardInfo().getFailures().length > 0) {
@bleskes
bleskes Oct 13, 2014 Member

Nit picky - we can iterate directly on the getFailures array - no need to check for length

@bleskes bleskes commented on an outdated diff Oct 13, 2014
...rg/elasticsearch/action/bulk/TransportBulkAction.java
@@ -305,6 +305,9 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
+ if (!bulkItemResponse.isFailed()) {
@bleskes
bleskes Oct 13, 2014 Member

Why do we need to the extra isFailed() check? Don't we want ShardInfos all the time?

@bleskes bleskes commented on an outdated diff Oct 13, 2014
...asticsearch/action/bulk/TransportShardBulkAction.java
@@ -252,6 +246,7 @@ protected ShardIterator shards(ClusterState clusterState, InternalRequest reques
// add the response
IndexResponse indexResponse = result.response();
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
+ updateResponse.setShardInfo(indexResponse.getShardInfo());
@bleskes
bleskes Oct 13, 2014 Member

Can we fold shard info the constructors of responses? This way it's easy to forget.

@bleskes bleskes commented on the diff Oct 13, 2014
.../elasticsearch/action/delete/IndexDeleteResponse.java
@@ -49,27 +45,6 @@ public String getIndex() {
return this.index;
}
- /**
@bleskes
bleskes Oct 13, 2014 Member

I don't think we can delete those due to BWC - maybe add all of them to the base class and make them get the info from ShardInfo?

@bleskes
bleskes Oct 13, 2014 Member

Cancel that. This is an internal class.

@bleskes bleskes commented on an outdated diff Oct 13, 2014
...csearch/action/delete/TransportIndexDeleteAction.java
@@ -46,7 +47,11 @@ public TransportIndexDeleteAction(Settings settings, ClusterService clusterServi
@Override
protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, List<ShardDeleteResponse> shardDeleteResponses, int failuresCount, List<ShardOperationFailedException> shardFailures) {
- return new IndexDeleteResponse(request.index(), failuresCount, shardDeleteResponses.toArray(new ShardDeleteResponse[shardDeleteResponses.size()]));
+ IndexDeleteResponse indexDeleteResponse = new IndexDeleteResponse(request.index(), shardDeleteResponses.toArray(new ShardDeleteResponse[shardDeleteResponses.size()]));
+ ActionWriteResponse.ShardInfo shardInfo = new ActionWriteResponse.ShardInfo();
@bleskes
bleskes Oct 13, 2014 Member

Same here - fold it into the constructor?

@bleskes bleskes commented on the diff Oct 13, 2014
.../action/deletebyquery/IndexDeleteByQueryResponse.java
@@ -60,41 +66,21 @@ public String getIndex() {
return this.index;
}
- /**
- * The total number of shards the delete by query was executed on.
- */
- public int getTotalShards() {
@bleskes
bleskes Oct 13, 2014 Member

These are exposed to through the clients, so we can't remove it. See comment about potentially moving it up to the base class.

@javanna
javanna Oct 13, 2014 Member

internal class :) I think it's fine

@bleskes
bleskes Oct 13, 2014 Member

@javanna I don't think it is? see org.elasticsearch.action.deletebyquery.DeleteByQueryResponse#iterator

@javanna
javanna Oct 13, 2014 Member

grrr...you are right @bleskes sorry for the confusion. It would be a breaking change for the java API. I don't like it when internal requests get exposed as they are to the outside world :)

@bleskes
bleskes Dec 1, 2014 Member

I wonder were we capture this java breaking change? so people would know?

@martijnvg
martijnvg Dec 1, 2014 Member

There are docs regarding bwc changes on the java api as far as I know...

On 1 December 2014 at 17:07, Boaz Leskes notifications@github.com wrote:

In
src/main/java/org/elasticsearch/action/deletebyquery/IndexDeleteByQueryResponse.java:

@@ -60,41 +66,21 @@ public String getIndex() {
return this.index;
}

  • /**
  • \* The total number of shards the delete by query was executed on.
    
  • */
    
  • public int getTotalShards() {

I wonder were we capture this breaking change? so people would know?


Reply to this email directly or view it on GitHub
https://github.com/elasticsearch/elasticsearch/pull/7994/files#r21097935
.

Met vriendelijke groet,

Martijn van Groningen

@bleskes bleskes commented on an outdated diff Oct 13, 2014
.../action/deletebyquery/IndexDeleteByQueryResponse.java
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
- successfulShards = in.readVInt();
- failedShards = in.readVInt();
- int size = in.readVInt();
- failures = new ShardOperationFailedException[size];
- for (int i = 0; i < size; i++) {
- failures[i] = DefaultShardOperationFailedException.readShardOperationFailed(in);
+ if (in.getVersion().before(Version.V_1_5_0)) {
+ in.readVInt();
@bleskes
bleskes Oct 13, 2014 Member

Can we add a comment about these are? Potentially try to create a ShardInfo from this?

@bleskes bleskes commented on an outdated diff Oct 13, 2014
.../action/deletebyquery/IndexDeleteByQueryResponse.java
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
- successfulShards = in.readVInt();
- failedShards = in.readVInt();
- int size = in.readVInt();
- failures = new ShardOperationFailedException[size];
- for (int i = 0; i < size; i++) {
- failures[i] = DefaultShardOperationFailedException.readShardOperationFailed(in);
+ if (in.getVersion().before(Version.V_1_5_0)) {
+ in.readVInt();
+ in.readVInt();
+ in.readVInt();
+ }
+ if (in.getVersion().onOrAfter(Version.V_1_5_0)) {
+ shardInfo = in.readOptionalStreamable(new ActionWriteResponse.ShardInfo());
@bleskes
bleskes Oct 13, 2014 Member

Why optional? I think we stopped using null as a possible value?

@bleskes bleskes commented on an outdated diff Oct 13, 2014
.../action/deletebyquery/IndexDeleteByQueryResponse.java
@@ -102,11 +88,13 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
- out.writeVInt(successfulShards);
- out.writeVInt(failedShards);
- out.writeVInt(failures.length);
- for (ShardOperationFailedException failure : failures) {
- failure.writeTo(out);
+ if (out.getVersion().before(Version.V_1_5_0)) {
+ out.writeVInt(0);
@bleskes
bleskes Oct 13, 2014 Member

Try to get these from the shard Info?

@bleskes bleskes commented on an outdated diff Oct 13, 2014
.../action/deletebyquery/IndexDeleteByQueryResponse.java
@@ -102,11 +88,13 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
- out.writeVInt(successfulShards);
- out.writeVInt(failedShards);
- out.writeVInt(failures.length);
- for (ShardOperationFailedException failure : failures) {
- failure.writeTo(out);
+ if (out.getVersion().before(Version.V_1_5_0)) {
+ out.writeVInt(0);
+ out.writeVInt(0);
+ out.writeVInt(0);
+ }
+ if (out.getVersion().onOrAfter(Version.V_1_5_0)) {
+ out.writeOptionalStreamable(shardInfo);
@bleskes
bleskes Oct 13, 2014 Member

same here - do we need optional?

@bleskes bleskes and 1 other commented on an outdated diff Oct 13, 2014
.../action/deletebyquery/ShardDeleteByQueryResponse.java
import java.io.IOException;
/**
* Delete by query response executed on a specific shard.
*/
-public class ShardDeleteByQueryResponse extends ActionResponse {
+public class ShardDeleteByQueryResponse extends ActionWriteResponse {
+
+ private ShardId shardId;
+
+ public ShardDeleteByQueryResponse() {
+ }
+
+ public ShardDeleteByQueryResponse(ShardId shardId) {
+ this.shardId = shardId;
+ }
+
+ public ShardId getShardId() {
@bleskes
bleskes Oct 13, 2014 Member

maybe my intellij is stupid - but where is this used?

@bleskes
bleskes Dec 1, 2014 Member

my intellij still argues this is not used... I think we can remove this change?

@imotov
imotov Jan 6, 2015 Member

This shardId seems redundant to me as well.

@bleskes
Member
bleskes commented Oct 13, 2014

I did a review cycle. I like the simplification in the TransportShardReplicationOperation a lot!

Although we don't use the ReplicaRequest feature of TransportShardReplicationOperation at the moment. I think it's a shame to loose it. I agree that the old PrimaryResponse now have a better name and implementations should be bothered by it. Maybe make shardOperationOnPrimary return a tuple of Response,RepicaRequest?

I have a concern regarding BWC of aggregated requests like bulk and delete_by_query. I might be missing something but I think we will currently use 0 for total and successful shards when aggregating responses from older nodes. If so, I think we should not aggregate if one of the sub request went to an older node (as we don't know at the moment). I'm think of suppressing the _shards section in that case on the top level of the request and also have a ShardInfo instance which returns -1 on all numbers. Make sense?

It would also be great to have some rest layer tests.

@martijnvg
Member

@bleskes @javanna I applied your feedback and updated the PR.

@bleskes
Member
bleskes commented Nov 6, 2014

@martijnvg as discussed this can go to master only. can you ping when the bwc code is removed?

@martijnvg martijnvg removed the v1.5.0 label Nov 11, 2014
@martijnvg
Member

@bleskes I removed the bwc logic.

@bleskes bleskes commented on an outdated diff Dec 1, 2014
docs/reference/docs/index_.asciidoc
@@ -27,6 +32,16 @@ The result of the above index operation is:
}
--------------------------------------------------
+The `_shards` header provides information about the replication process of the index operation.
+* `total` - Indicates to how many shard copies (primary and replica shards) the index operation should be executed on.
+* `successful`- Indicates the number of shard copies to index operation succeeded on.
@bleskes
bleskes Dec 1, 2014 Member

typo "to index" -> "the index"

@bleskes bleskes commented on an outdated diff Dec 1, 2014
docs/reference/docs/index_.asciidoc
@@ -27,6 +32,16 @@ The result of the above index operation is:
}
--------------------------------------------------
+The `_shards` header provides information about the replication process of the index operation.
+* `total` - Indicates to how many shard copies (primary and replica shards) the index operation should be executed on.
+* `successful`- Indicates the number of shard copies to index operation succeeded on.
+* `pending` - Indicates to how many shard copies this index operation still needs to go to at the time index operation
@bleskes
bleskes Dec 1, 2014 Member

to how -> how

@bleskes bleskes commented on an outdated diff Dec 1, 2014
docs/reference/docs/index_.asciidoc
@@ -27,6 +32,16 @@ The result of the above index operation is:
}
--------------------------------------------------
+The `_shards` header provides information about the replication process of the index operation.
+* `total` - Indicates to how many shard copies (primary and replica shards) the index operation should be executed on.
+* `successful`- Indicates the number of shard copies to index operation succeeded on.
+* `pending` - Indicates to how many shard copies this index operation still needs to go to at the time index operation
+ succeeded on the primary shard. This field is only returned if `async` replication is used.
+* `failures` - An array that contains replication related errors in the case an index operation failed on a replica shard.
+ In the case replica shards aren't started yet then `total` will just be unequal to `successful` and there will be no failure.
@bleskes
bleskes Dec 1, 2014 Member

maybe make this a note? i.e., pro tip: replica shards may not all be started when an indexing operation successfully returns (by default, we require a quorum). In that case, total will be equal to the total shards based on the index replica settings. successful will be equal to the number of shard started (primary plus replicas). As there were no failures, the failures count will be 0.

@bleskes bleskes and 1 other commented on an outdated diff Dec 1, 2014
docs/reference/migration/migrate_1_x.asciidoc
+pre `1.4.0.Beta1` behaviour can be enabled, see: <<modules-discovery-zen,no-master-block>>
+
+[[breaking-changes-1.5]]
@bleskes
bleskes Dec 1, 2014 Member

I think this became 2.0?

@martijnvg
martijnvg Dec 1, 2014 Member

After rebasing with master this file has disappeared :) I've moved it to migrate 2.0 file.

@bleskes bleskes and 2 others commented on an outdated diff Dec 1, 2014
rest-api-spec/test/delete/11_shard_header.yaml
@@ -0,0 +1,36 @@
+---
+"Delete check shard header":
+
+ - do:
+ indices.create:
+ index: foobar
+ body:
+ settings:
+ number_of_shards: "1"
+ number_of_replicas: "1"
+
+ - do:
+ cluster.health:
+ wait_for_status: green
@bleskes
bleskes Dec 1, 2014 Member

I wonder if we always run on two node cluster or more? worth a double check?

@javanna
javanna Dec 1, 2014 Member

good point, we (core) run on > 2 nodes all the time, but clients tests run on a single node cluster most of the time, thus the rest tests need to run against a single node. the one replica and wait for green would indeed make clients build fail.

@martijnvg
martijnvg Dec 1, 2014 Member

hmmm... ok, then I'll change it to wait to for yellow and change the assertions.

@martijnvg
martijnvg Dec 1, 2014 Member

running with no replicas makes the test easier.

@bleskes bleskes and 1 other commented on an outdated diff Dec 1, 2014
rest-api-spec/test/index/11_shard_header.yaml
+
+ - do:
+ index:
+ index: foobar
+ type: baz
+ id: 1
+ body: { foo: bar }
+
+ - match: { _index: foobar }
+ - match: { _type: baz }
+ - match: { _id: "1"}
+ - match: { _version: 1}
+ - match: { _shards.total: 2}
+ - match: { _shards.successful: 2}
+ - match: { _shards.failed: 0}
+ - is_false: _shards.pending
@bleskes
bleskes Dec 1, 2014 Member

maybe add a check for async?

@martijnvg
martijnvg Dec 1, 2014 Member

you mean a test with async?

@bleskes
bleskes Dec 1, 2014 Member

yes - with pending set to a number.

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+ private int total;
+ private int successful;
+ private int pending;
+ private Failure[] failures = new Failure[0];
+
+ private ShardInfo() {
+ }
+
+ public ShardInfo(int total, int successful, int pending, Failure... failures) {
+ this.total = total;
+ this.successful = successful;
+ this.pending = pending;
+ this.failures = failures;
+ }
+
+ public <T extends ActionWriteResponse> ShardInfo(List<T> responses, List<ShardOperationFailedException> primaryFailures) {
@bleskes
bleskes Dec 1, 2014 Member

maybe add a java doc about this is to be used for requests that touch multiple shards where responses are the per-shard (group) request etc.?

@bleskes bleskes and 1 other commented on an outdated diff Dec 1, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+ List<Failure> failures = new ArrayList<>();
+ for (ShardOperationFailedException failure : primaryFailures) {
+ // Set the status here, since it is a failure on primary shard
+ // The failure doesn't include the node id, maybe add it to ShardOperationFailedException...
+ failures.add(new ActionWriteResponse.ShardInfo.Failure(failure.index(), failure.shardId(), null, failure.reason(), failure.status(), true));
+ }
+ for (ActionWriteResponse response : responses) {
+ total += response.getShardInfo().getTotal();
+ successful += response.getShardInfo().getSuccessful();
+ pending += response.getShardInfo().getPending();
+ if (response.getShardInfo().failures.length > 0) {
+ failures.addAll(Arrays.asList(response.getShardInfo().failures));
+ }
+ }
+ if (!failures.isEmpty()) {
+ failures.addAll(Arrays.asList(this.failures));
@bleskes
bleskes Dec 1, 2014 Member

do we need this?

@martijnvg
martijnvg Dec 1, 2014 Member

Yes, we do need to transform the failure list to an array if their are any failures. The if statement saves to call to asList, addAll and toArray if there are no failures, which is the case most of the time.

@bleskes
bleskes Dec 5, 2014 Member

yeah, I meant the `failures.addAll(Arrays.asList(this.failures)); - this is a constructor, so we know this.failures is empty.

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+ }
+
+ public ShardInfo getShardInfo() {
+ return shardInfo;
+ }
+
+ public void setShardInfo(ShardInfo shardInfo) {
+ this.shardInfo = shardInfo;
+ }
+
+ public static class ShardInfo implements Streamable, ToXContent {
+
+ private int total;
+ private int successful;
+ private int pending;
+ private Failure[] failures = new Failure[0];
@bleskes
bleskes Dec 1, 2014 Member

Elasticsearch tradition is to make this an EMPTY constant :)

@bleskes bleskes and 1 other commented on an outdated diff Dec 1, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+ private ShardInfo() {
+ }
+
+ public ShardInfo(int total, int successful, int pending, Failure... failures) {
+ this.total = total;
+ this.successful = successful;
+ this.pending = pending;
+ this.failures = failures;
+ }
+
+ public <T extends ActionWriteResponse> ShardInfo(List<T> responses, List<ShardOperationFailedException> primaryFailures) {
+ List<Failure> failures = new ArrayList<>();
+ for (ShardOperationFailedException failure : primaryFailures) {
+ // Set the status here, since it is a failure on primary shard
+ // The failure doesn't include the node id, maybe add it to ShardOperationFailedException...
+ failures.add(new ActionWriteResponse.ShardInfo.Failure(failure.index(), failure.shardId(), null, failure.reason(), failure.status(), true));
@bleskes
bleskes Dec 1, 2014 Member

do we need to do this given the fact that in status() we scan for primaries any way? maybe we can just remove the primary failure list?

@martijnvg
martijnvg Dec 1, 2014 Member

primaryFailures has items of a different type is than Failure, so we do the conversion here as well.

@bleskes bleskes and 1 other commented on an outdated diff Dec 1, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+ return failures;
+ }
+
+ public RestStatus status() {
+ RestStatus status = RestStatus.OK;
+ for (Failure failure : failures) {
+ if (failure.primary() && failure.status().getStatus() > status.getStatus()) {
+ status = failure.status();
+ }
+ }
+ return status;
+ }
+
+ @Override
+ public void readFrom(StreamInput in) throws IOException {
+ total = in.readInt();
@bleskes
bleskes Dec 1, 2014 Member

do we want Vints here? (plus assertion about non-negative values in constructors)

@martijnvg
martijnvg Dec 1, 2014 Member

Can be -1 as well in case of updates with operation type none.

@bleskes bleskes and 1 other commented on an outdated diff Dec 1, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeInt(total);
+ out.writeInt(successful);
+ out.writeInt(pending);
+ out.writeInt(failures.length);
+ for (Failure failure : failures) {
+ failure.writeTo(out);
+ }
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ if (total != -1 && successful != -1 && pending != -1) {
@bleskes
bleskes Dec 1, 2014 Member

when are these -1 values used?

@martijnvg
martijnvg Dec 1, 2014 Member

If a update request results in no action then -1 will be used to indicate that nothing has happened.

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+ public static ShardInfo readShardInfo(StreamInput in) throws IOException {
+ ShardInfo shardInfo = new ShardInfo();
+ shardInfo.readFrom(in);
+ return shardInfo;
+ }
+
+ public static class Failure implements ShardOperationFailedException, ToXContent {
+
+ private String index;
+ private int shardId;
+ private String nodeId;
+ private String reason;
+ private RestStatus status;
+ private boolean primary;
+
+ public Failure(String index, int shardId, String nodeId, String reason, RestStatus status, boolean primary) {
@bleskes
bleskes Dec 1, 2014 Member

can we mark nodeId as nullable? (or find a way to always have a value)

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+ */
+ public String index() {
+ return index;
+ }
+
+ /**
+ * @return On what shard id the failure occurred.
+ */
+ public int shardId() {
+ return shardId;
+ }
+
+ /**
+ * @return On what node the failure occurred.
+ */
+ public String nodeId() {
@bleskes
bleskes Dec 1, 2014 Member

same here regarding nullable ..

@bleskes bleskes commented on the diff Dec 1, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+ primary = in.readBoolean();
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(index);
+ out.writeVInt(shardId);
+ out.writeOptionalString(nodeId);
+ out.writeString(reason);
+ RestStatus.writeTo(out, status);
+ out.writeBoolean(primary);
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ builder.startObject();
@bleskes
bleskes Dec 1, 2014 Member

maybe I'm missing something, but why not expose the primary flag too?

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...ices/mapping/delete/TransportDeleteMappingAction.java
@@ -147,11 +148,9 @@ public void onResponse(FlushResponse flushResponse) {
public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
if (logger.isTraceEnabled()) {
for (IndexDeleteByQueryResponse indexResponse : deleteByQueryResponse) {
- logger.trace("Delete by query[{}] completed with total[{}], successful[{}] and failed[{}]", indexResponse.getIndex(), indexResponse.getTotalShards(), indexResponse.getSuccessfulShards(), indexResponse.getFailedShards());
- if (indexResponse.getFailedShards() > 0) {
- for (ShardOperationFailedException failure : indexResponse.getFailures()) {
- logger.trace("[{}/{}] Delete by query shard failure reason: {}", failure.index(), failure.shardId(), failure.reason());
- }
+ logger.trace("Delete by query for index [{}] completed with total[{}], successful[{}] and failed[{}]", indexResponse.getIndex(), indexResponse.getShardInfo().getTotal(), indexResponse.getShardInfo().getSuccessful(), indexResponse.getShardInfo().getFailed());
+ for (ActionWriteResponse.ShardInfo.Failure failure : indexResponse.getShardInfo().getFailures()) {
+ logger.trace("[{}/{}/{}] Delete by query shard failure reason: {}", failure.index(), failure.shardId(), failure.nodeId(), failure.reason());
@bleskes
bleskes Dec 1, 2014 Member

do we want the primary flag here too?

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...ication/TransportShardReplicationOperationAction.java
@@ -807,28 +769,92 @@ private void failReplicaIfNeeded(String index, int shardId, Throwable t) {
}
}
- public static class PrimaryResponse<Response, ReplicaRequest> {
+ private final static ActionWriteResponse.ShardInfo.Failure[] EMPTY = new ActionWriteResponse.ShardInfo.Failure[0];
@bleskes
bleskes Dec 1, 2014 Member

can we reuse suggested empty array in Failure class?

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...ication/TransportShardReplicationOperationAction.java
private final ReplicaRequest replicaRequest;
- private final Response response;
- private final Object payload;
+ private final Response finalResponse;
+ private final ShardId shardId;
+ private final ActionListener<Response> listener;
+ private final AtomicBoolean finished = new AtomicBoolean(false);
+ private final AtomicInteger success = new AtomicInteger(1); // We already wrote into the primary shard
+ private final ConcurrentMap<String, Throwable> shardReplicaFailures = ConcurrentCollections.newConcurrentMap();
+
+ private final AtomicInteger pending;
+ private final int numberOfShardInstances;
+
+ public ReplicationState(PrimaryOperationRequest por, ShardIterator shardsIter, Tuple<Response, ReplicaRequest> primaryResponse, ActionListener<Response> listener, int numberOfPendingShardInstances, int numberOfUnassignedReplicas) {
@bleskes
bleskes Dec 1, 2014 Member

can we untangle the tuple on the caller side and have this signature clean?

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...ication/TransportShardReplicationOperationAction.java
@@ -105,17 +113,10 @@ protected void doExecute(Request request, ActionListener<Response> listener) {
protected abstract String executor();
- protected abstract PrimaryResponse<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest);
+ protected abstract Tuple<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest);
@bleskes
bleskes Dec 1, 2014 Member

while we're at it - can we java doc that response and replica request return value?

@bleskes bleskes and 1 other commented on an outdated diff Dec 1, 2014
.../action/deletebyquery/ShardDeleteByQueryResponse.java
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
+ if (in.getVersion().onOrAfter(Version.V_1_5_0)) {
@bleskes
bleskes Dec 1, 2014 Member

loose the bwc?

@martijnvg
martijnvg Dec 1, 2014 Member

oops, forgot these ones.

@bleskes bleskes commented on an outdated diff Dec 1, 2014
.../action/deletebyquery/ShardDeleteByQueryResponse.java
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
+ if (out.getVersion().onOrAfter(Version.V_1_5_0)) {
@bleskes
bleskes Dec 1, 2014 Member

same here..

@bleskes bleskes commented on the diff Dec 1, 2014
...ication/TransportIndexReplicationOperationAction.java
@@ -45,15 +46,15 @@
* It relies on a shard sub-action that gets sent over the transport and executed on each of the shard.
* The index provided with the request is expected to be a concrete index, properly resolved by the callers (parent actions).
*/
-public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
+public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionWriteResponse>
@bleskes
bleskes Dec 1, 2014 Member

Why loose the ShardReplication request? why not keep it consistent with TransportShardReplicationOperation?

@martijnvg
martijnvg Dec 1, 2014 Member

because this extra generic type isn't used on the index level, I like to keep it out, it just adds noise imo.

@bleskes bleskes commented on the diff Dec 1, 2014
...ation/TransportIndicesReplicationOperationAction.java
@@ -41,15 +42,15 @@
/**
*/
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse,
- ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
@bleskes
bleskes Dec 1, 2014 Member

same here regarding the ShardReplicaRequest...

@martijnvg
martijnvg Dec 1, 2014 Member

same opinion

@bleskes
bleskes Dec 5, 2014 Member

same OK

@bleskes bleskes and 1 other commented on an outdated diff Dec 1, 2014
...a/org/elasticsearch/action/update/UpdateResponse.java
public UpdateResponse(String index, String type, String id, long version, boolean created) {
+ this(new ShardInfo(-1, -1, -1), index, type, id, version, created);
@bleskes
bleskes Dec 1, 2014 Member

why not keep these 0? there were no shard touched, right?

@martijnvg
martijnvg Dec 1, 2014 Member

I used -1 to distinguish between noop operations and operations that actually should write on some shards, but maybe a noop should just use 0 as total... let me change that.

@bleskes bleskes and 1 other commented on an outdated diff Dec 1, 2014
...rg/elasticsearch/rest/action/bulk/RestBulkAction.java
@@ -97,6 +97,7 @@ public RestResponse buildResponse(BulkResponse response, XContentBuilder builder
builder.startArray(Fields.ITEMS);
for (BulkItemResponse itemResponse : response) {
builder.startObject();
+ itemResponse.getResponse().getShardInfo().toXContent(builder, request);
@bleskes
bleskes Dec 1, 2014 Member

can you give an example of the output here? I wonder if we have duplicate information between the shard info and the failures bellow.. also, I'm not sure it should be a top level item here, but rather inline with each op.

@martijnvg
martijnvg Dec 1, 2014 Member

There is duplicate shard info headers from responses the originate from the same shard group, but that is expected.

I can move it down to the else clause where ops are serialized.

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...lasticsearch/rest/action/delete/RestDeleteAction.java
@@ -74,8 +74,9 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
client.delete(deleteRequest, new RestBuilderListener<DeleteResponse>(channel) {
@Override
public RestResponse buildResponse(DeleteResponse result, XContentBuilder builder) throws Exception {
- builder.startObject()
- .field(Fields.FOUND, result.isFound())
+ builder.startObject();
+ result.getShardInfo().toXContent(builder, request);
@bleskes
bleskes Dec 1, 2014 Member

I think we should place it after the other _meta fields - it's less important and shouldn't be first.

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...lasticsearch/rest/action/delete/RestDeleteAction.java
@@ -74,8 +74,9 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
client.delete(deleteRequest, new RestBuilderListener<DeleteResponse>(channel) {
@Override
public RestResponse buildResponse(DeleteResponse result, XContentBuilder builder) throws Exception {
- builder.startObject()
- .field(Fields.FOUND, result.isFound())
+ builder.startObject();
+ result.getShardInfo().toXContent(builder, request);
+ builder.field(Fields.FOUND, result.isFound())
@bleskes
bleskes Dec 1, 2014 Member

I can't comment on the right place because it's not part of the change, but in the following code:

  RestStatus status = OK;
                 if (!result.isFound()) {
                     status = NOT_FOUND;
                 }

I think we should start with status = result.getShardInfo().getStatus(), no?

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...lasticsearch/rest/action/update/RestUpdateAction.java
@@ -127,8 +127,9 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
client.update(updateRequest, new RestBuilderListener<UpdateResponse>(channel) {
@Override
public RestResponse buildResponse(UpdateResponse response, XContentBuilder builder) throws Exception {
- builder.startObject()
- .field(Fields._INDEX, response.getIndex())
+ builder.startObject();
+ response.getShardInfo().toXContent(builder, request);
@bleskes
bleskes Dec 1, 2014 Member

same comments here regarding the location of the _shards key + initial status code value

@bleskes bleskes and 1 other commented on an outdated diff Dec 1, 2014
...g/elasticsearch/deleteByQuery/DeleteByQueryTests.java
@@ -194,4 +189,14 @@ public void testDeleteByTermQuery() throws ExecutionException, InterruptedExcept
private static String indexOrAlias() {
return randomBoolean() ? "test" : "alias";
}
+
+ // Sometimes if we don't wait for green not all shard copies are fully started and then a replica failure occurs and this is ok.
@bleskes
bleskes Dec 1, 2014 Member

Aren't those failures ignored? what failures are these?

@martijnvg
martijnvg Dec 1, 2014 Member

I removed this and asserted that failed must be 0, not sure why I added this comment.

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...g/elasticsearch/deleteByQuery/DeleteByQueryTests.java
@@ -194,4 +189,14 @@ public void testDeleteByTermQuery() throws ExecutionException, InterruptedExcept
private static String indexOrAlias() {
return randomBoolean() ? "test" : "alias";
}
+
+ // Sometimes if we don't wait for green not all shard copies are fully started and then a replica failure occurs and this is ok.
+ private void assertSyncShardInfo(ActionWriteResponse.ShardInfo shardInfo) {
+ assertThat(shardInfo.getSuccessful(), greaterThanOrEqualTo(1));
+ assertThat(shardInfo.getPending(), equalTo(0));
+ assertThat(shardInfo.getFailures().length, equalTo(shardInfo.getTotal() - shardInfo.getSuccessful()));
+ for (ActionWriteResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
@bleskes
bleskes Dec 1, 2014 Member

can we add tests for total as well?

@bleskes bleskes commented on an outdated diff Dec 1, 2014
.../org/elasticsearch/document/DocumentActionsTests.java
@@ -187,8 +187,8 @@ public void testIndexActions() throws Exception {
logger.info("Delete by query");
DeleteByQueryResponse queryResponse = client().prepareDeleteByQuery().setIndices("test").setQuery(termQuery("name", "test2")).execute().actionGet();
- assertThat(queryResponse.getIndex(getConcreteIndexName()).getSuccessfulShards(), equalTo(numShards.numPrimaries));
- assertThat(queryResponse.getIndex(getConcreteIndexName()).getFailedShards(), equalTo(0));
+ assertThat(queryResponse.getIndex(getConcreteIndexName()).getShardInfo().getSuccessful(), equalTo(numShards.totalNumShards));
+ assertThat(queryResponse.getIndex(getConcreteIndexName()).getShardInfo().getFailures().length, equalTo(0));
@bleskes
bleskes Dec 1, 2014 Member

can we say what total should be here?

@bleskes bleskes and 1 other commented on an outdated diff Dec 1, 2014
...t/java/org/elasticsearch/document/ShardInfoTests.java
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.Test;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.*;
+
+/**
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
+public class ShardInfoTests extends ElasticsearchIntegrationTest {
+
+ @Test
+ public void testIndexAndDelete() throws Exception {
+ int numReplicas = scaledRandomIntBetween(0, 10);
@bleskes
bleskes Dec 1, 2014 Member

do we need so many? (up to 10)

@martijnvg
martijnvg Dec 1, 2014 Member

Right, I'll reduce it to 5.

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...t/java/org/elasticsearch/document/ShardInfoTests.java
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.*;
+
+/**
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
+public class ShardInfoTests extends ElasticsearchIntegrationTest {
+
+ @Test
+ public void testIndexAndDelete() throws Exception {
+ int numReplicas = scaledRandomIntBetween(0, 10);
+ int numCopies = numReplicas + 1;
+ logger.info("Number of shard copies {}", numCopies);
+
+ int minNumberOfNodes = numCopies == 1 ? 1 : (numCopies / 2) + 1;
@bleskes
bleskes Dec 1, 2014 Member

I think 1/2 + 1 == 1 :)

@bleskes bleskes and 1 other commented on an outdated diff Dec 1, 2014
...t/java/org/elasticsearch/document/ShardInfoTests.java
+import static org.hamcrest.Matchers.*;
+
+/**
+ */
+@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
+public class ShardInfoTests extends ElasticsearchIntegrationTest {
+
+ @Test
+ public void testIndexAndDelete() throws Exception {
+ int numReplicas = scaledRandomIntBetween(0, 10);
+ int numCopies = numReplicas + 1;
+ logger.info("Number of shard copies {}", numCopies);
+
+ int minNumberOfNodes = numCopies == 1 ? 1 : (numCopies / 2) + 1;
+ logger.info("Initially starting {} nodes", minNumberOfNodes);
+ internalCluster().startNodesAsync(minNumberOfNodes).get();
@bleskes
bleskes Dec 1, 2014 Member

I wonder if we can lift off the random number of nodes and avoid starting a per test cluster?

@martijnvg
martijnvg Dec 1, 2014 Member

Would be nice, but not sure what we can reuse here, each test method may potentially need to stop many nodes, so there isn't much reuse.

@bleskes bleskes and 1 other commented on an outdated diff Dec 1, 2014
...t/java/org/elasticsearch/document/ShardInfoTests.java
+ assertThat(indexResponse.getShardInfo().getTotal(), equalTo(numCopies));
+ assertThat(indexResponse.getShardInfo().getSuccessful(), equalTo(minNumberOfNodes));
+ DeleteResponse deleteResponse = client().prepareDelete("idx", "type", indexResponse.getId()).get();
+ assertThat(deleteResponse.getShardInfo().getTotal(), equalTo(numCopies));
+ assertThat(deleteResponse.getShardInfo().getSuccessful(), equalTo(minNumberOfNodes));
+ assertThat(deleteResponse.getShardInfo().getPending(), equalTo(0));
+
+ for (int i = minNumberOfNodes + 1; i < numCopies; i++) {
+ logger.info("Checking replication information with {} active copies", i);
+ internalCluster().startNode();
+ ensureActiveShardCopies(0, i);
+
+ indexResponse = client().prepareIndex("idx", "type").setSource("{}").get();
+ assertThat(indexResponse.getShardInfo().getTotal(), equalTo(numCopies));
+ assertThat(indexResponse.getShardInfo().getSuccessful(), equalTo(i));
+ assertThat(deleteResponse.getShardInfo().getPending(), equalTo(0));
@bleskes
bleskes Dec 1, 2014 Member

can we add a variant that checks async?

@martijnvg
martijnvg Dec 1, 2014 Member

This is already tested in testIndexWithAsyncReplication(). I think that this is sufficient?

@bleskes bleskes commented on an outdated diff Dec 1, 2014
...t/java/org/elasticsearch/document/ShardInfoTests.java
+
+ indexResponse = client().prepareIndex("idx", "type").setSource("{}").get();
+ assertThat(indexResponse.getShardInfo().getTotal(), equalTo(numCopies));
+ assertThat(indexResponse.getShardInfo().getSuccessful(), equalTo(i));
+ assertThat(deleteResponse.getShardInfo().getPending(), equalTo(0));
+
+ deleteResponse = client().prepareDelete("idx", "type", indexResponse.getId()).get();
+ assertThat(deleteResponse.getShardInfo().getTotal(), equalTo(numCopies));
+ assertThat(deleteResponse.getShardInfo().getSuccessful(), equalTo(i));
+ assertThat(deleteResponse.getShardInfo().getPending(), equalTo(0));
+ }
+ }
+
+ @Test
+ public void testUpdate() throws Exception {
+ int numReplicas = scaledRandomIntBetween(0, 10);
@bleskes
bleskes Dec 1, 2014 Member

can we maybe share code between the tests and profit from the ActionWriteResponse base class?

@bleskes
Member
bleskes commented Dec 1, 2014

@martijnvg I went through it and left some mostly cosmetic comments. I really like the simplifications in the TSROA (guess what that is :)) and the base class ActionWriteResponse shard code.

@martijnvg
Member

⌘n + TSROA = TransportShardReplicationOperationAction
:)

@martijnvg
Member

@bleskes Thanks for the feedback! I updated the PR.

@bleskes bleskes commented on the diff Dec 5, 2014
rest-api-spec/test/index/11_shard_header.yaml
+
+ - do:
+ index:
+ index: foobar
+ type: baz
+ id: 1
+ body: { foo: bar }
+
+ - match: { _index: foobar }
+ - match: { _type: baz }
+ - match: { _id: "1"}
+ - match: { _version: 1}
+ - match: { _shards.total: 1}
+ - match: { _shards.successful: 1}
+ - match: { _shards.failed: 0}
+ - is_false: _shards.pending
@bleskes
bleskes Dec 5, 2014 Member

I think the comment about adding an async test with pending was missed?

@bleskes bleskes commented on an outdated diff Dec 5, 2014
...ava/org/elasticsearch/action/ActionWriteResponse.java
+ public void setShardInfo(ShardInfo shardInfo) {
+ this.shardInfo = shardInfo;
+ }
+
+ public static class ShardInfo implements Streamable, ToXContent {
+
+ private int total;
+ private int successful;
+ private int pending;
+ private Failure[] failures = EMPTY;
+
+ private ShardInfo() {
+ }
+
+ public ShardInfo(int total, int successful, int pending, Failure... failures) {
+ this.total = total;
@bleskes
bleskes Dec 5, 2014 Member

can was assert that these are non-negative numbers?

@bleskes
Member
bleskes commented Dec 5, 2014

Changes look good to me. I still have a concern regarding the ShardInfo(responses, primaryFailures) constructor and the numbers we will return when, for example, an index replication operation will fail on all primary shards (total=0, faiulre=#primaries). I think we should fix it. Also add a unit test to make sure it does the right thing.

I also made one suggestion to change the tests to use the global cluster. O.w. nice code reuse now.

@martijnvg
Member

@bleskes I rebased with master and updated the PR based on your feedback points (last four commits)

@bleskes bleskes commented on an outdated diff Dec 7, 2014
...t/java/org/elasticsearch/document/ShardInfoTests.java
+ for (BulkItemResponse item : bulkResponse) {
+ assertThat(item.isFailed(), equalTo(false));
+ assertShardInfo(item.getResponse());
+ }
+ }
+
+ @Test
+ public void testDeleteWithRoutingRequiredButNotSpecified() throws Exception {
+ prepareIndex(2, true);
+ DeleteResponse deleteResponse = client().prepareDelete("idx", "type", "1").get();
+ assertShardInfo(deleteResponse, numCopies * 2, numNodes * 2, 0);
+ }
+
+ @Test
+ public void testDeleteByQuery() throws Exception {
+ prepareIndex(2);
@bleskes
bleskes Dec 7, 2014 Member

can we randomize between 1 & 2?

@bleskes bleskes commented on an outdated diff Dec 7, 2014
...t/java/org/elasticsearch/document/ShardInfoTests.java
+ prepareIndex(1);
+ BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
+ for (int i = 0; i < 10; i++) {
+ bulkRequestBuilder.add(client().prepareUpdate("idx", "type", Integer.toString(i)).setDoc("{}").setDocAsUpsert(true));
+ }
+
+ BulkResponse bulkResponse = bulkRequestBuilder.get();
+ for (BulkItemResponse item : bulkResponse) {
+ assertThat(item.isFailed(), equalTo(false));
+ assertShardInfo(item.getResponse());
+ }
+ }
+
+ @Test
+ public void testDeleteWithRoutingRequiredButNotSpecified() throws Exception {
+ prepareIndex(2, true);
@bleskes
bleskes Dec 7, 2014 Member

can we randomize between 1 & 2?

@bleskes bleskes commented on an outdated diff Dec 7, 2014
.../elasticsearch/rest/action/index/RestIndexAction.java
.field(Fields._TYPE, response.getType())
.field(Fields._ID, response.getId())
.field(Fields._VERSION, response.getVersion())
.field(Fields.CREATED, response.isCreated());
+ ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo();
@bleskes
bleskes Dec 7, 2014 Member

Minor - can we move the _shards above CREATED? will look better. now looks like this:

{
   "_index": "index",
   "_type": "type",
   "_id": "1",
   "_version": 1,
   "created": true,
   "_shards": {
      "total": 2,
      "successful": 1,
      "failed": 0
   }
}
@bleskes bleskes commented on an outdated diff Dec 7, 2014
...rg/elasticsearch/rest/action/bulk/RestBulkAction.java
}
}
+ shardInfo.toXContent(builder, request);
@bleskes
bleskes Dec 7, 2014 Member

same here - can we move the _shards to be the last of the _ fields? Now it's mixed:

 {
         "index": {
            "_index": "index",
            "_type": "type",
            "_id": "3",
            "_version": 1,
            "status": 201,
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            }
         }
      },
@bleskes
Member
bleskes commented Dec 7, 2014

Thx @martijnvg!. Latest changes look good to me. I left some minor cosmetic comments about the XContent output.

I still have some concerns about the the failure count in the case of TransportIndexReplicationOperation. As it is now, one replica shard failing counts the same as one primary failure (and thus all it's replicas), i.e., both add up to one failure. I think we should have the failed shard count be the primary + replicas in the second case. The failure list will then have one exception for them all. Example, say you have 2 primary shards each with 1 replica and one of the primary failed, we will have { _total: 4, _successful: 2, _failed: 2, _failures: "...." } (as opposed to _failed: 1 the current code generates).

@martijnvg
Member

@bleskes I addressed the minor feedback.

Also I tried to address your concern by including replica failures too in case an operation failed on the primary shard. Not sure if we should repeat the primary exception or just tell we never executed on replica shards.

@bleskes bleskes commented on the diff Dec 8, 2014
...lasticsearch/rest/action/delete/RestDeleteAction.java
.field(Fields._INDEX, result.getIndex())
.field(Fields._TYPE, result.getType())
.field(Fields._ID, result.getId())
.field(Fields._VERSION, result.getVersion())
+ .value(shardInfo)
@bleskes
bleskes Dec 8, 2014 Member

I think we want shardInfo.toXContent(builder, request); here? like this we loose the request as params

@bleskes bleskes and 1 other commented on an outdated diff Dec 8, 2014
.../elasticsearch/rest/action/index/RestIndexAction.java
.field(Fields._TYPE, response.getType())
.field(Fields._ID, response.getId())
.field(Fields._VERSION, response.getVersion())
+ .value(shardInfo)
@bleskes
bleskes Dec 8, 2014 Member

I think we want shardInfo.toXContent(builder, request); here? like this we loose the request as params

@martijnvg
martijnvg Dec 9, 2014 Member

true! Let me change this.

@bleskes bleskes commented on an outdated diff Dec 8, 2014
...lasticsearch/rest/action/update/RestUpdateAction.java
.field(Fields._TYPE, response.getType())
.field(Fields._ID, response.getId())
- .field(Fields._VERSION, response.getVersion());
+ .field(Fields._VERSION, response.getVersion())
+ .value(shardInfo);
@bleskes
bleskes Dec 8, 2014 Member

I think we want shardInfo.toXContent(builder, request); here? like this we loose the request as params

@bleskes bleskes and 1 other commented on an outdated diff Dec 8, 2014
...ication/TransportIndexReplicationOperationAction.java
@@ -122,21 +131,61 @@ private void returnIfNeeded() {
}
if (shardActionResult.isFailure()) {
assert accumulateExceptions() && shardActionResult.shardFailure != null;
- failures.add(shardActionResult.shardFailure);
+ // Set the status here, since it is a failure on primary shard
+ // The failure doesn't include the node id, maybe add it to ShardOperationFailedException...
+ ShardOperationFailedException sf = shardActionResult.shardFailure;
+
+ ShardIterator thisShardIterator = null;
@bleskes
bleskes Dec 8, 2014 Member

why not capture the shard info during the failure handling? Then we don't need to search for the iterator..

@martijnvg
martijnvg Dec 9, 2014 Member

good call, let me change that.

@bleskes
Member
bleskes commented Dec 8, 2014

Thx @martijnvg .

Not sure if we should repeat the primary exception or just tell we never executed on replica shards.

yeah, I see what you mean. I was thinking wrapping the primary failure with a new exception which has the right info in the message? (ala "Failed to execute on shard [](primary + [] replicas): [MESSAGE]")

left some other minor comments.

@martijnvg
Member

Thanks for the feedback @bleskes. I updated the PR. Since we don't have exceptions (the Failure class has no failure wrapping) for the replica shards, I instead just wrap the original primary shard failure on the message of the replica shard failure. I think this is sufficient?

@martijnvg
Member

@bleskes @javanna I applied the feedback and Boaz's commits (bleskes@a9b659d and bleskes@6180417), rebased to master and squashed everything to a single commit for clarity purposes.

I think this is getting close to get merged now, would be great if someone else can take a look at the PR before it gets merged in.

@imotov imotov and 1 other commented on an outdated diff Jan 6, 2015
...g/elasticsearch/deleteByQuery/DeleteByQueryTests.java
@@ -194,4 +189,14 @@ public void testDeleteByTermQuery() throws ExecutionException, InterruptedExcept
private static String indexOrAlias() {
return randomBoolean() ? "test" : "alias";
}
+
+ private void assertSyncShardInfo(ActionWriteResponse.ShardInfo shardInfo, NumShards numShards) {
+ assertThat(shardInfo.getTotal(), equalTo(numShards.totalNumShards));
+ assertThat(shardInfo.getSuccessful(), greaterThanOrEqualTo(1));
@imotov
imotov Jan 6, 2015 Member

Why not greaterThanOrEqualTo(numShards.numPrimaries) or even `equalTo(numShards.totalNumShards)?

@martijnvg
martijnvg Jan 7, 2015 Member

I don't know why this wasn't using numShards.numPrimaries, let me change that!

@imotov imotov commented on an outdated diff Jan 6, 2015
...asticsearch/action/bulk/TransportShardBulkAction.java
this.response = response;
this.mappingTypeToUpdate = mappingTypeToUpdate;
this.op = op;
}
@SuppressWarnings("unchecked")
- <T> T response() {
+ <T extends ActionWriteResponse> T response() {
+ // set all to 0, we will embed this into the replica request and not use it
+ response.setShardInfo(new ActionWriteResponse.ShardInfo(0, 0, 0));
@imotov
imotov Jan 6, 2015 Member

That makes me think that ShardInfo needs a public default constructor.

@imotov
Member
imotov commented Jan 6, 2015

I left a couple of small comments. In general looks good to me.

@martijnvg
Member

@imotov I updated the PR and applied your comments.

@imotov
Member
imotov commented Jan 7, 2015

LGTM

@martijnvg martijnvg changed the title from Added `_shards` header to all write responses. to Core: added `_shards` header to all write responses. Jan 8, 2015
@martijnvg martijnvg added the :Core label Jan 8, 2015
@martijnvg martijnvg Core: Added `_shards` header to all write responses.
The header indicates to how many shard copies (primary and replicas shards) a write was supposed to go to, to how many
shard copies to write succeeded and potentially captures shard failures if writing into a replica shard fails.

For async writes it also includes the number of shards a write is still pending.

Closes #7994
ca4f27f
@martijnvg martijnvg merged commit ca4f27f into elastic:master Jan 8, 2015

1 check passed

CLA Commit author is a member of Elasticsearch
Details
@martijnvg martijnvg added the resiliency label Mar 2, 2015
@martijnvg martijnvg deleted the martijnvg:feature/shard_info_write_responses branch May 18, 2015
@clintongormley clintongormley added :REST and removed :Core labels Jun 6, 2015
@clintongormley clintongormley changed the title from Core: added `_shards` header to all write responses. to Added `_shards` header to all write responses. Jun 6, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment