Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposed shard id related to a failure in delete by query #5125

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,9 +35,9 @@ public class IndexDeleteResponse extends ActionResponse {
private int failedShards;
private ShardDeleteResponse[] deleteResponses;

IndexDeleteResponse(String index, int successfulShards, int failedShards, ShardDeleteResponse[] deleteResponses) {
IndexDeleteResponse(String index, int failedShards, ShardDeleteResponse[] deleteResponses) {
this.index = index;
this.successfulShards = successfulShards;
this.successfulShards = deleteResponses.length;
this.failedShards = failedShards;
this.deleteResponses = deleteResponses;
}
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.delete.index;

import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -30,8 +31,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.List;

/**
*
Expand All @@ -50,19 +50,8 @@ protected IndexDeleteRequest newRequestInstance() {
}

@Override
protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, AtomicReferenceArray shardsResponses) {
int successfulShards = 0;
int failedShards = 0;
ArrayList<ShardDeleteResponse> responses = new ArrayList<ShardDeleteResponse>();
for (int i = 0; i < shardsResponses.length(); i++) {
if (shardsResponses.get(i) == null) {
failedShards++;
} else {
responses.add((ShardDeleteResponse) shardsResponses.get(i));
successfulShards++;
}
}
return new IndexDeleteResponse(request.index(), successfulShards, failedShards, responses.toArray(new ShardDeleteResponse[responses.size()]));
protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, List<ShardDeleteResponse> shardDeleteResponses, int failuresCount, List<ShardOperationFailedException> shardFailures) {
return new IndexDeleteResponse(request.index(), failuresCount, shardDeleteResponses.toArray(new ShardDeleteResponse[shardDeleteResponses.size()]));
}

@Override
Expand Down
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.action.deletebyquery;

import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.replication.TransportIndexReplicationOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -32,9 +31,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;

/**
*
Expand All @@ -53,20 +50,8 @@ protected IndexDeleteByQueryRequest newRequestInstance() {
}

@Override
protected IndexDeleteByQueryResponse newResponseInstance(IndexDeleteByQueryRequest request, AtomicReferenceArray shardsResponses) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> failures = new ArrayList<ShardOperationFailedException>(3);
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse instanceof Throwable) {
failedShards++;
failures.add(new DefaultShardOperationFailedException(request.index(), -1, (Throwable) shardResponse));
} else {
successfulShards++;
}
}
return new IndexDeleteByQueryResponse(request.index(), successfulShards, failedShards, failures);
protected IndexDeleteByQueryResponse newResponseInstance(IndexDeleteByQueryRequest request, List<ShardDeleteByQueryResponse> shardDeleteByQueryResponses, int failuresCount, List<ShardOperationFailedException> shardFailures) {
return new IndexDeleteByQueryResponse(request.index(), shardDeleteByQueryResponses.size(), failuresCount, shardFailures);
}

@Override
Expand Down
Expand Up @@ -19,10 +19,13 @@

package org.elasticsearch.action.support.replication;

import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -36,6 +39,7 @@
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;

Expand Down Expand Up @@ -81,8 +85,9 @@ protected void doExecute(final Request request, final ActionListener<Response> l
return;
}
final AtomicInteger indexCounter = new AtomicInteger();
final AtomicInteger failureCounter = new AtomicInteger();
final AtomicInteger completionCounter = new AtomicInteger(groups.size());
final AtomicReferenceArray<Object> shardsResponses = new AtomicReferenceArray<Object>(groups.size());
final AtomicReferenceArray<ShardActionResult> shardsResponses = new AtomicReferenceArray<ShardActionResult>(groups.size());

for (final ShardIterator shardIt : groups) {
ShardRequest shardRequest = newShardRequestInstance(request, shardIt.shardId().id());
Expand All @@ -96,20 +101,41 @@ protected void doExecute(final Request request, final ActionListener<Response> l
shardAction.execute(shardRequest, new ActionListener<ShardResponse>() {
@Override
public void onResponse(ShardResponse result) {
shardsResponses.set(indexCounter.getAndIncrement(), result);
if (completionCounter.decrementAndGet() == 0) {
listener.onResponse(newResponseInstance(request, shardsResponses));
}
shardsResponses.set(indexCounter.getAndIncrement(), new ShardActionResult(result));
returnIfNeeded();
}

@Override
public void onFailure(Throwable e) {
failureCounter.getAndIncrement();
int index = indexCounter.getAndIncrement();
if (accumulateExceptions()) {
shardsResponses.set(index, e);
shardsResponses.set(index, new ShardActionResult(
new DefaultShardOperationFailedException(request.index, shardIt.shardId().id(), e)));
}
returnIfNeeded();
}

private void returnIfNeeded() {
if (completionCounter.decrementAndGet() == 0) {
listener.onResponse(newResponseInstance(request, shardsResponses));
List<ShardResponse> responses = Lists.newArrayList();
List<ShardOperationFailedException> failures = Lists.newArrayList();
for (int i = 0; i < shardsResponses.length(); i++) {
ShardActionResult shardActionResult = shardsResponses.get(i);
if (shardActionResult == null) {
assert !accumulateExceptions();
continue;
}
if (shardActionResult.isFailure()) {
assert accumulateExceptions() && shardActionResult.shardFailure != null;
failures.add(shardActionResult.shardFailure);
} else {
responses.add(shardActionResult.shardResponse);
}
}

assert failures.size() == 0 || failures.size() == failureCounter.get();
listener.onResponse(newResponseInstance(request, responses, failureCounter.get(), failures));
}
}
});
Expand All @@ -118,7 +144,7 @@ public void onFailure(Throwable e) {

protected abstract Request newRequestInstance();

protected abstract Response newResponseInstance(Request request, AtomicReferenceArray shardsResponses);
protected abstract Response newResponseInstance(Request request, List<ShardResponse> shardResponses, int failuresCount, List<ShardOperationFailedException> shardFailures);

protected abstract String transportAction();

Expand All @@ -132,6 +158,28 @@ public void onFailure(Throwable e) {

protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request);

private class ShardActionResult {

private final ShardResponse shardResponse;
private final ShardOperationFailedException shardFailure;

private ShardActionResult(ShardResponse shardResponse) {
assert shardResponse != null;
this.shardResponse = shardResponse;
this.shardFailure = null;
}

private ShardActionResult(ShardOperationFailedException shardOperationFailedException) {
assert shardOperationFailedException != null;
this.shardFailure = shardOperationFailedException;
this.shardResponse = null;
}

boolean isFailure() {
return shardFailure != null;
}
}

private class TransportHandler extends BaseTransportRequestHandler<Request> {

@Override
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.deleteByQuery;

import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.search.SearchResponse;
Expand All @@ -27,12 +28,10 @@
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Assert;
import org.junit.Test;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.*;

public class DeleteByQueryTests extends ElasticsearchIntegrationTest {

Expand Down Expand Up @@ -83,9 +82,10 @@ public void testMissing() {
deleteByQueryRequestBuilder.setQuery(QueryBuilders.matchAllQuery());

try {
DeleteByQueryResponse actionGet = deleteByQueryRequestBuilder.execute().actionGet();
Assert.fail("Exception should have been thrown.");
deleteByQueryRequestBuilder.execute().actionGet();
fail("Exception should have been thrown.");
} catch (IndexMissingException e) {
//everything well
}

deleteByQueryRequestBuilder.setIndicesOptions(IndicesOptions.lenient());
Expand All @@ -110,6 +110,14 @@ public void testFailure() throws Exception {
assertThat(response.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(response.getIndex("twitter").getSuccessfulShards(), equalTo(0));
assertThat(response.getIndex("twitter").getFailedShards(), equalTo(5));
assertThat(response.getIndices().size(), equalTo(1));
assertThat(response.getIndices().get("twitter").getFailedShards(), equalTo(5));
assertThat(response.getIndices().get("twitter").getFailures().length, equalTo(5));
for (ShardOperationFailedException failure : response.getIndices().get("twitter").getFailures()) {
assertThat(failure.reason(), containsString("[twitter] [has_child] No mapping for for type [type]"));
assertThat(failure.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(failure.shardId(), greaterThan(-1));
}
}

@Test
Expand Down