Skip to content

Commit

Permalink
better timeout handling waiting for primary to be active for indexing
Browse files Browse the repository at this point in the history
take into account the correct delta timeout when scheduling it, since now we can retry again after removing a listener
  • Loading branch information
kimchy committed Oct 31, 2013
1 parent fcfc412 commit c68016b
Showing 1 changed file with 57 additions and 45 deletions.
Expand Up @@ -313,6 +313,7 @@ protected class AsyncShardOperationAction {
private volatile ShardIterator shardIt;
private final AtomicBoolean primaryOperationStarted = new AtomicBoolean();
private final ReplicationType replicationType;
protected final long startTime = System.currentTimeMillis();

AsyncShardOperationAction(Request request, ActionListener<Response> listener) {
this.request = request;
Expand Down Expand Up @@ -481,62 +482,73 @@ public void handleException(TransportException exp) {
}

void retry(boolean fromClusterEvent, @Nullable final Throwable failure) {
if (!fromClusterEvent) {
// make it threaded operation so we fork on the discovery listener thread
request.beforeLocalFork();
request.operationThreaded(true);
clusterService.add(request.timeout(), new TimeoutClusterStateListener() {
@Override
public void postAdded() {
// check if state version changed while we were adding this listener
long sampledVersion = clusterState.version();
long currentVersion = clusterService.state().version();
if (sampledVersion != currentVersion) {
logger.trace("state change while we were trying to add listener, trying to start again, sampled_version [{}], current_version [{}]", sampledVersion, currentVersion);
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
}
}
}
if (fromClusterEvent) {
logger.trace("retry scheduling ignored as it as we already have a listener in place");
return;
}

@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
// make it threaded operation so we fork on the discovery listener thread
request.beforeLocalFork();
request.operationThreaded(true);

@Override
public void clusterChanged(ClusterChangedEvent event) {
logger.trace("cluster changed (version {}), trying to start again", event.state().version());
TimeValue timeout = new TimeValue(request.timeout().millis() - (System.currentTimeMillis() - startTime));
if (timeout.millis() <= 0) {
raiseTimeoutFailure(timeout, failure);
return;
}

clusterService.add(timeout, new TimeoutClusterStateListener() {
@Override
public void postAdded() {
// check if state version changed while we were adding this listener
long sampledVersion = clusterState.version();
long currentVersion = clusterService.state().version();
if (sampledVersion != currentVersion) {
logger.trace("state change while we were trying to add listener, trying to start again, sampled_version [{}], current_version [{}]", sampledVersion, currentVersion);
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
}
}
}

@Override
public void onTimeout(TimeValue timeValue) {
// just to be on the safe side, see if we can start it now?
if (start(true)) {
clusterService.remove(this);
return;
}
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
logger.trace("cluster changed (version {}), trying to start again", event.state().version());
if (start(true)) {
// if we managed to start and perform the operation on the primary, we can remove this listener
clusterService.remove(this);
Throwable listenerFailure = failure;
if (listenerFailure == null) {
if (shardIt == null) {
listenerFailure = new UnavailableShardsException(null, "no available shards: Timeout waiting for [" + timeValue + "], request: " + request.toString());
} else {
listenerFailure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString());
}
}
listener.onFailure(listenerFailure);
}
});
} else {
logger.trace("retry scheduling ignored as it as we already have a listener in place");
}

@Override
public void onTimeout(TimeValue timeValue) {
// just to be on the safe side, see if we can start it now?
if (start(true)) {
clusterService.remove(this);
return;
}
clusterService.remove(this);
raiseTimeoutFailure(timeValue, failure);
}
});
}

void raiseTimeoutFailure(TimeValue timeout, @Nullable Throwable failure) {
if (failure == null) {
if (shardIt == null) {
failure = new UnavailableShardsException(null, "no available shards: Timeout waiting for [" + timeout + "], request: " + request.toString());
} else {
failure = new UnavailableShardsException(shardIt.shardId(), "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeout + "], request: " + request.toString());
}
}
listener.onFailure(failure);
}

void performOnPrimary(int primaryShardId, final ShardRouting shard, ClusterState clusterState) {
Expand Down

0 comments on commit c68016b

Please sign in to comment.