From 55aa13a1fc7364d5606353b25e1ab0a122c1de4d Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 4 May 2011 20:27:07 +0300 Subject: [PATCH] Possible failure to start a river after cluster restart, closes #902. --- .../elasticsearch/river/RiversService.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversService.java index c508d067d9855..0e66a497eae1a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/river/RiversService.java @@ -22,9 +22,11 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.ImmutableSet; @@ -36,6 +38,7 @@ import org.elasticsearch.common.inject.Injectors; import org.elasticsearch.common.inject.ModulesBuilder; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.plugins.PluginsService; @@ -225,7 +228,21 @@ private class ApplyRivers implements RiverClusterStateListener { } @Override public void onFailure(Throwable e) { - logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name()); + // if its this is a failure that need to be retried, then do it + // this might happen if the state of the river index has not been propagated yet to this node, which + // should happen pretty fast since we managed to get the _meta in the RiversRouter + Throwable failure = ExceptionsHelper.unwrapCause(e); + if ((failure instanceof NoShardAvailableActionException) || (failure instanceof ClusterBlockException)) { + logger.debug("failed to get _meta from [{}]/[{}], retrying...", e, routing.riverName().type(), routing.riverName().name()); + final ActionListener listener = this; + threadPool.schedule(TimeValue.timeValueSeconds(5), ThreadPool.Names.SAME, new Runnable() { + @Override public void run() { + client.prepareGet(riverIndexName, routing.riverName().name(), "_meta").execute(listener); + } + }); + } else { + logger.warn("failed to get _meta from [{}]/[{}]", e, routing.riverName().type(), routing.riverName().name()); + } } }); }