From c68016bb83e2aad815b4bf2fe2b374a06b7fd9e2 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Thu, 31 Oct 2013 15:54:18 +0100 Subject: [PATCH] better timeout handling waiting for primary to be active for indexing take into account the correct delta timeout when scheduling it, since now we can retry again after removing a listener --- ...nsportShardReplicationOperationAction.java | 102 ++++++++++-------- 1 file changed, 57 insertions(+), 45 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 0e24282814784..16f51288f793f 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -313,6 +313,7 @@ protected class AsyncShardOperationAction { private volatile ShardIterator shardIt; private final AtomicBoolean primaryOperationStarted = new AtomicBoolean(); private final ReplicationType replicationType; + protected final long startTime = System.currentTimeMillis(); AsyncShardOperationAction(Request request, ActionListener listener) { this.request = request; @@ -481,62 +482,73 @@ public void handleException(TransportException exp) { } void retry(boolean fromClusterEvent, @Nullable final Throwable failure) { - if (!fromClusterEvent) { - // make it threaded operation so we fork on the discovery listener thread - request.beforeLocalFork(); - request.operationThreaded(true); - clusterService.add(request.timeout(), new TimeoutClusterStateListener() { - @Override - public void postAdded() { - // check if state version changed while we were adding this listener - long sampledVersion = clusterState.version(); - long currentVersion = clusterService.state().version(); - if (sampledVersion != currentVersion) { - logger.trace("state change while we were trying to add listener, trying to start again, sampled_version [{}], current_version [{}]", sampledVersion, currentVersion); - if (start(true)) { - // if we managed to start and perform the operation on the primary, we can remove this listener - clusterService.remove(this); - } - } - } + if (fromClusterEvent) { + logger.trace("retry scheduling ignored as it as we already have a listener in place"); + return; + } - @Override - public void onClose() { - clusterService.remove(this); - listener.onFailure(new NodeClosedException(clusterService.localNode())); - } + // make it threaded operation so we fork on the discovery listener thread + request.beforeLocalFork(); + request.operationThreaded(true); - @Override - public void clusterChanged(ClusterChangedEvent event) { - logger.trace("cluster changed (version {}), trying to start again", event.state().version()); + TimeValue timeout = new TimeValue(request.timeout().millis() - (System.currentTimeMillis() - startTime)); + if (timeout.millis() <= 0) { + raiseTimeoutFailure(timeout, failure); + return; + } + + clusterService.add(timeout, new TimeoutClusterStateListener() { + @Override + public void postAdded() { + // check if state version changed while we were adding this listener + long sampledVersion = clusterState.version(); + long currentVersion = clusterService.state().version(); + if (sampledVersion != currentVersion) { + logger.trace("state change while we were trying to add listener, trying to start again, sampled_version [{}], current_version [{}]", sampledVersion, currentVersion); if (start(true)) { // if we managed to start and perform the operation on the primary, we can remove this listener clusterService.remove(this); } } + } - @Override - public void onTimeout(TimeValue timeValue) { - // just to be on the safe side, see if we can start it now? - if (start(true)) { - clusterService.remove(this); - return; - } + @Override + public void onClose() { + clusterService.remove(this); + listener.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + logger.trace("cluster changed (version {}), trying to start again", event.state().version()); + if (start(true)) { + // if we managed to start and perform the operation on the primary, we can remove this listener clusterService.remove(this); - Throwable listenerFailure = failure; - if (listenerFailure == null) { - if (shardIt == null) { - listenerFailure = new UnavailableShardsException(null, "no available shards: Timeout waiting for [" + timeValue + "], request: " + request.toString()); - } else { - listenerFailure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString()); - } - } - listener.onFailure(listenerFailure); } - }); - } else { - logger.trace("retry scheduling ignored as it as we already have a listener in place"); + } + + @Override + public void onTimeout(TimeValue timeValue) { + // just to be on the safe side, see if we can start it now? + if (start(true)) { + clusterService.remove(this); + return; + } + clusterService.remove(this); + raiseTimeoutFailure(timeValue, failure); + } + }); + } + + void raiseTimeoutFailure(TimeValue timeout, @Nullable Throwable failure) { + if (failure == null) { + if (shardIt == null) { + failure = new UnavailableShardsException(null, "no available shards: Timeout waiting for [" + timeout + "], request: " + request.toString()); + } else { + failure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeout + "], request: " + request.toString()); + } } + listener.onFailure(failure); } void performOnPrimary(int primaryShardId, final ShardRouting shard, ClusterState clusterState) {