From 305cf4a5671cf4390ed0e9d5db15b5e068370aa8 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sat, 17 Sep 2011 02:21:20 +0300 Subject: [PATCH] Bulk API: Properly retry execution on temporal state changes, closes #1343. --- .../action/bulk/TransportShardBulkAction.java | 9 +++ ...nsportShardReplicationOperationAction.java | 67 ++++++++++--------- 2 files changed, 46 insertions(+), 30 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 116a288f8d7a7..7c76335b01f31 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; @@ -158,6 +159,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(), new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version)); } catch (Exception e) { + // rethrow the failure if we are going to retry on primary and let parent failure to handle it + if (retryPrimaryException(e)) { + throw (ElasticSearchException) e; + } logger.debug("[{}][{}] failed to bulk item (index) {}", e, shardRequest.request.index(), shardRequest.shardId, indexRequest); responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().toString().toLowerCase(), new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(e))); @@ -174,6 +179,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation responses[i] = new BulkItemResponse(item.id(), "delete", new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.notFound())); } catch (Exception e) { + // rethrow the failure if we are going to retry on primary and let parent failure to handle it + if (retryPrimaryException(e)) { + throw (ElasticSearchException) e; + } logger.debug("[{}][{}] failed to bulk item (delete) {}", e, shardRequest.request.index(), shardRequest.shardId, deleteRequest); responses[i] = new BulkItemResponse(item.id(), "delete", new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(e))); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 5a5935d4ce83d..3a5491eb959eb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -160,6 +160,42 @@ protected IndexShard indexShard(ShardOperationRequest shardRequest) { return indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); } + protected boolean retryPrimaryException(Throwable e) { + Throwable cause = ExceptionsHelper.unwrapCause(e); + return cause instanceof IndexShardMissingException || + cause instanceof IllegalIndexShardStateException || + cause instanceof IndexMissingException; + } + + /** + * Should an exception be ignored when the operation is performed on the replica. + */ + boolean ignoreReplicaException(Throwable e) { + Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof IllegalIndexShardStateException) { + return true; + } + if (cause instanceof IndexMissingException) { + return true; + } + if (cause instanceof IndexShardMissingException) { + return true; + } + if (cause instanceof ConnectTransportException) { + return true; + } + // on version conflict or document missing, it means + // that a news change has crept into the replica, and its fine + if (cause instanceof VersionConflictEngineException) { + return true; + } + // same here + if (cause instanceof DocumentAlreadyExistsEngineException) { + return true; + } + return false; + } + class OperationTransportHandler extends BaseTransportRequestHandler { @Override public Request newInstance() { @@ -429,7 +465,7 @@ void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, final S performReplicas(response); } catch (Exception e) { // shard has not been allocated yet, retry it here - if (e instanceof IndexShardMissingException || e instanceof IllegalIndexShardStateException || e instanceof IndexMissingException) { + if (retryPrimaryException(e)) { retry(fromDiscoveryListener, shard.shardId()); return; } @@ -572,35 +608,6 @@ private void finishIfPossible() { } } } - - /** - * Should an exception be ignored when the operation is performed on the replica. - */ - boolean ignoreReplicaException(Throwable e) { - Throwable cause = ExceptionsHelper.unwrapCause(e); - if (cause instanceof IllegalIndexShardStateException) { - return true; - } - if (cause instanceof IndexMissingException) { - return true; - } - if (cause instanceof IndexShardMissingException) { - return true; - } - if (cause instanceof ConnectTransportException) { - return true; - } - // on version conflict or document missing, it means - // that a news change has crept into the replica, and its fine - if (cause instanceof VersionConflictEngineException) { - return true; - } - // same here - if (cause instanceof DocumentAlreadyExistsEngineException) { - return true; - } - return false; - } } public static class PrimaryResponse {