Skip to content

Commit

Permalink
Bulk API: Properly retry execution on temporal state changes, closes e…
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed Sep 16, 2011
1 parent 0977b79 commit 305cf4a
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 30 deletions.
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
Expand Down Expand Up @@ -158,6 +159,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version));
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
throw (ElasticSearchException) e;
}
logger.debug("[{}][{}] failed to bulk item (index) {}", e, shardRequest.request.index(), shardRequest.shardId, indexRequest);
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(e)));
Expand All @@ -174,6 +179,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
responses[i] = new BulkItemResponse(item.id(), "delete",
new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.notFound()));
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
throw (ElasticSearchException) e;
}
logger.debug("[{}][{}] failed to bulk item (delete) {}", e, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
responses[i] = new BulkItemResponse(item.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(e)));
Expand Down
Expand Up @@ -160,6 +160,42 @@ protected IndexShard indexShard(ShardOperationRequest shardRequest) {
return indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
}

protected boolean retryPrimaryException(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
return cause instanceof IndexShardMissingException ||
cause instanceof IllegalIndexShardStateException ||
cause instanceof IndexMissingException;
}

/**
* Should an exception be ignored when the operation is performed on the replica.
*/
boolean ignoreReplicaException(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IllegalIndexShardStateException) {
return true;
}
if (cause instanceof IndexMissingException) {
return true;
}
if (cause instanceof IndexShardMissingException) {
return true;
}
if (cause instanceof ConnectTransportException) {
return true;
}
// on version conflict or document missing, it means
// that a news change has crept into the replica, and its fine
if (cause instanceof VersionConflictEngineException) {
return true;
}
// same here
if (cause instanceof DocumentAlreadyExistsEngineException) {
return true;
}
return false;
}

class OperationTransportHandler extends BaseTransportRequestHandler<Request> {

@Override public Request newInstance() {
Expand Down Expand Up @@ -429,7 +465,7 @@ void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, final S
performReplicas(response);
} catch (Exception e) {
// shard has not been allocated yet, retry it here
if (e instanceof IndexShardMissingException || e instanceof IllegalIndexShardStateException || e instanceof IndexMissingException) {
if (retryPrimaryException(e)) {
retry(fromDiscoveryListener, shard.shardId());
return;
}
Expand Down Expand Up @@ -572,35 +608,6 @@ private void finishIfPossible() {
}
}
}

/**
* Should an exception be ignored when the operation is performed on the replica.
*/
boolean ignoreReplicaException(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IllegalIndexShardStateException) {
return true;
}
if (cause instanceof IndexMissingException) {
return true;
}
if (cause instanceof IndexShardMissingException) {
return true;
}
if (cause instanceof ConnectTransportException) {
return true;
}
// on version conflict or document missing, it means
// that a news change has crept into the replica, and its fine
if (cause instanceof VersionConflictEngineException) {
return true;
}
// same here
if (cause instanceof DocumentAlreadyExistsEngineException) {
return true;
}
return false;
}
}

public static class PrimaryResponse<T> {
Expand Down

0 comments on commit 305cf4a

Please sign in to comment.