Skip to content

Commit

Permalink
Possible failure to start a river after cluster restart, closes elast…
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed May 4, 2011
1 parent 8b7348a commit 55aa13a
Showing 1 changed file with 18 additions and 1 deletion.
Expand Up @@ -22,9 +22,11 @@
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.ImmutableSet;
Expand All @@ -36,6 +38,7 @@
import org.elasticsearch.common.inject.Injectors;
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.PluginsService;
Expand Down Expand Up @@ -225,7 +228,21 @@ private class ApplyRivers implements RiverClusterStateListener {
}

@Override public void onFailure(Throwable e) {
logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name());
// if its this is a failure that need to be retried, then do it
// this might happen if the state of the river index has not been propagated yet to this node, which
// should happen pretty fast since we managed to get the _meta in the RiversRouter
Throwable failure = ExceptionsHelper.unwrapCause(e);
if ((failure instanceof NoShardAvailableActionException) || (failure instanceof ClusterBlockException)) {
logger.debug("failed to get _meta from [{}]/[{}], retrying...", e, routing.riverName().type(), routing.riverName().name());
final ActionListener<GetResponse> listener = this;
threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.SAME, new Runnable() {
@Override public void run() {
client.prepareGet(riverIndexName, routing.riverName().name(), "_meta").execute(listener);
}
});
} else {
logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name());
}
}
});
}
Expand Down

0 comments on commit 55aa13a

Please sign in to comment.