Skip to content

Commit

Permalink
simplify sharinfo logic on TransportIndexReplicationOperationAction
Browse files Browse the repository at this point in the history
  • Loading branch information
bleskes committed Dec 21, 2014
1 parent c99bb0f commit a9b659d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ protected IndexDeleteResponse newResponseInstance(IndexDeleteRequest request, Li
return indexDeleteResponse;
}

@Override
protected boolean accumulateExceptions() {
return false;
}

@Override
protected GroupShardsIterator shards(IndexDeleteRequest request) {
return clusterService.operationRouting().broadcastDeleteShards(clusterService.state(), request.index());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ protected IndexDeleteByQueryResponse newResponseInstance(IndexDeleteByQueryReque
return new IndexDeleteByQueryResponse(request.index(), shardInfo);
}

@Override
protected boolean accumulateExceptions() {
return true;
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, IndexDeleteByQueryRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,21 @@
package org.elasticsearch.action.support.replication;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ActionWriteResponse.ShardInfo.Failure;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
Expand Down Expand Up @@ -88,7 +87,7 @@ protected void doExecute(final Request request, final ActionListener<Response> l
final AtomicReferenceArray<ShardActionResult> shardsResponses = new AtomicReferenceArray<>(groups.size());

for (final ShardIterator shardIt : groups) {
ShardRequest shardRequest = newShardRequestInstance(request, shardIt.shardId().id());
final ShardRequest shardRequest = newShardRequestInstance(request, shardIt.shardId().id());

// TODO for now, we fork operations on shardIt of the index
shardRequest.beforeLocalFork(); // optimize for local fork
Expand All @@ -107,10 +106,16 @@ public void onResponse(ShardResponse result) {
public void onFailure(Throwable e) {
failureCounter.getAndIncrement();
int index = indexCounter.getAndIncrement();
if (accumulateExceptions()) {
shardsResponses.set(index, new ShardActionResult(
new DefaultShardOperationFailedException(request.index(), shardIt.shardId().id(), e), shardIt));
// this is a failure for an entire shard group, constructs shard info accordingly
final RestStatus status;
if (e != null && e instanceof ElasticsearchException) {
status = ((ElasticsearchException) e).status();
} else {
status = RestStatus.INTERNAL_SERVER_ERROR;
}
Failure failure = new Failure(request.index(), shardIt.shardId().id(), null,
"Failed to execute on all shard copies [" + ExceptionsHelper.detailedMessage(e) + "]", status, true);
shardsResponses.set(index, new ShardActionResult(new ActionWriteResponse.ShardInfo(shardIt.size(), 0, 0, failure)));
returnIfNeeded();
}

Expand All @@ -119,35 +124,23 @@ private void returnIfNeeded() {
List<ShardResponse> responses = new ArrayList<>();
List<Failure> failureList = new ArrayList<>();

int total = groups.totalSize();
int total = 0;
int pending = 0;
int successful = 0;
for (int i = 0; i < shardsResponses.length(); i++) {
ShardActionResult shardActionResult = shardsResponses.get(i);
if (shardActionResult == null) {
assert !accumulateExceptions();
continue;
}
final ActionWriteResponse.ShardInfo sf;
if (shardActionResult.isFailure()) {
assert accumulateExceptions() && shardActionResult.shardFailure != null;
// Set the status here, since it is a failure on primary shard
// The failure doesn't include the node id, maybe add it to ShardOperationFailedException...
ShardOperationFailedException sf = shardActionResult.shardFailure;

ShardIterator shardIterator = shardActionResult.shardIterator;
for (ShardRouting shardRouting = shardIterator.nextOrNull(); shardRouting != null; shardRouting = shardIterator.nextOrNull()) {
if (shardRouting.primary()) {
failureList.add(new Failure(sf.index(), sf.shardId(), shardRouting.currentNodeId(), sf.reason(), sf.status(), true));
} else {
failureList.add(new Failure(sf.index(), sf.shardId(), shardRouting.currentNodeId(), "Failed to execute on replica shard: " + sf.reason(), sf.status(), false));
}
}
assert shardActionResult.shardInfoOnFailure != null;
sf = shardActionResult.shardInfoOnFailure;
} else {
pending += shardActionResult.shardResponse.getShardInfo().getPending();
successful += shardActionResult.shardResponse.getShardInfo().getSuccessful();
failureList.addAll(Arrays.asList(shardActionResult.shardResponse.getShardInfo().getFailures()));
responses.add(shardActionResult.shardResponse);
sf = shardActionResult.shardResponse.getShardInfo();
}
total += sf.getTotal();
pending += sf.getPending();
successful += sf.getSuccessful();
failureList.addAll(Arrays.asList(sf.getFailures()));
}
assert failureList.size() == 0 || numShardGroupFailures(failureList) == failureCounter.get();

Expand Down Expand Up @@ -181,8 +174,6 @@ private int numShardGroupFailures(List<Failure> failures) {

protected abstract ShardRequest newShardRequestInstance(Request request, int shardId);

protected abstract boolean accumulateExceptions();

protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
}
Expand All @@ -194,25 +185,22 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, Request re
private class ShardActionResult {

private final ShardResponse shardResponse;
private final ShardOperationFailedException shardFailure;
private final ShardIterator shardIterator;
private final ActionWriteResponse.ShardInfo shardInfoOnFailure;

private ShardActionResult(ShardResponse shardResponse) {
assert shardResponse != null;
this.shardResponse = shardResponse;
this.shardFailure = null;
this.shardIterator = null;
this.shardInfoOnFailure = null;
}

private ShardActionResult(ShardOperationFailedException shardOperationFailedException, ShardIterator shardIterator) {
assert shardOperationFailedException != null;
this.shardFailure = shardOperationFailedException;
this.shardIterator = shardIterator;
private ShardActionResult(ActionWriteResponse.ShardInfo shardInfoOnFailure) {
assert shardInfoOnFailure != null;
this.shardInfoOnFailure = shardInfoOnFailure;
this.shardResponse = null;
}

boolean isFailure() {
return shardFailure != null;
return shardInfoOnFailure != null;
}
}
}

0 comments on commit a9b659d

Please sign in to comment.