diff --git a/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java new file mode 100644 index 0000000000000..ca4f697df74b3 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/AckedClusterStateUpdateTask.java @@ -0,0 +1,55 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.unit.TimeValue; + +/** + * An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when + * all the nodes have acknowledged a cluster state update request + */ +public interface AckedClusterStateUpdateTask extends TimeoutClusterStateUpdateTask { + + /** + * Called to determine which nodes the acknowledgement is expected from + * @param discoveryNode a node + * @return true if the node is expected to send ack back, false otherwise + */ + boolean mustAck(DiscoveryNode discoveryNode); + + /** + * Called once all the nodes have acknowledged the cluster state update request. Must be + * very lightweight execution, since it gets executed on the cluster service thread. + * @param t optional error that might have been thrown + */ + void onAllNodesAcked(@Nullable Throwable t); + + /** + * Called once the acknowledgement timeout defined by + * {@link AckedClusterStateUpdateTask#ackTimeout()} has expired + */ + void onAckTimeout(); + + /** + * Acknowledgement timeout, maximum time interval to wait for acknowledgements + */ + TimeValue ackTimeout(); +} diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index 3e9f769a0deea..5f49deaa3af46 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.operation.OperationRouting; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -49,6 +50,8 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory; @@ -317,6 +320,7 @@ public void run() { } try { + Discovery.AckListener ackListener = new NoOpAckListener(); if (newClusterState.nodes().localNodeMaster()) { // only the master controls the version numbers Builder builder = ClusterState.builder().state(newClusterState).version(newClusterState.version() + 1); @@ -327,6 +331,19 @@ public void run() { builder.metaData(MetaData.builder().metaData(newClusterState.metaData()).version(newClusterState.metaData().version() + 1)); } newClusterState = builder.build(); + + if (updateTask instanceof AckedClusterStateUpdateTask) { + final AckedClusterStateUpdateTask ackedUpdateTask = (AckedClusterStateUpdateTask) updateTask; + try { + ackListener = new AckCountDownListener(ackedUpdateTask, newClusterState.version(), newClusterState.nodes(), threadPool); + } catch(EsRejectedExecutionException ex) { + if (logger.isDebugEnabled()) { + logger.debug("Couldn't schedule timeout thread - node might be shutting down", ex); + } + //timeout straightaway, otherwise we could wait forever as the timeout thread has not started + ackedUpdateTask.onAckTimeout(); + } + } } else { if (previousClusterState.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK) && !newClusterState.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK)) { // force an update, its a fresh update from the master as we transition from a start of not having a master to having one @@ -381,7 +398,7 @@ public void run() { // we don't want to notify if (newClusterState.nodes().localNodeMaster()) { logger.debug("publishing cluster state version {}", newClusterState.version()); - discoveryService.publish(newClusterState); + discoveryService.publish(newClusterState, ackListener); } // update the current cluster state @@ -409,18 +426,26 @@ public void run() { }); } + //manual ack only from the master at the end of the publish + if (newClusterState.nodes().localNodeMaster()) { + try { + ackListener.onNodeAck(localNode(), null); + } catch(Throwable t) { + logger.debug("error while processing ack for master node [{}]", t, newClusterState.nodes().localNode()); + } + } if (updateTask instanceof ProcessedClusterStateUpdateTask) { ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(source, previousClusterState, newClusterState); } logger.debug("processing [{}]: done applying updated cluster_state (version: {})", source, newClusterState.version()); - } catch (Exception e) { + } catch (Throwable t) { StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(newClusterState.version()).append("], source [").append(source).append("]\n"); sb.append(newClusterState.nodes().prettyPrint()); sb.append(newClusterState.routingTable().prettyPrint()); sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint()); - logger.warn(sb.toString(), e); + logger.warn(sb.toString(), t); // TODO: do we want to call updateTask.onFailure here? } } @@ -584,4 +609,73 @@ public void run() { listener.offMaster(); } } + + private static class NoOpAckListener implements Discovery.AckListener { + @Override + public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) { + } + + @Override + public void onTimeout() { + } + } + + private class AckCountDownListener implements Discovery.AckListener { + private final AckedClusterStateUpdateTask ackedUpdateTask; + private final long version; + private final AtomicInteger countDown; + private final AtomicBoolean notified = new AtomicBoolean(false); + private final Future ackTimeoutCallback; + private Throwable lastFailure; + + AckCountDownListener(AckedClusterStateUpdateTask ackedUpdateTask, long clusterStateVersion, DiscoveryNodes nodes, ThreadPool threadPool) { + this.ackedUpdateTask = ackedUpdateTask; + this.version = clusterStateVersion; + int countDown = 0; + for (DiscoveryNode node : nodes) { + if (ackedUpdateTask.mustAck(node)) { + countDown++; + } + } + logger.trace("expecting {} acknowledgements for cluster_state update (version: {})", countDown, version); + this.countDown = new AtomicInteger(countDown); + this.ackTimeoutCallback = threadPool.schedule(ackedUpdateTask.ackTimeout(), ThreadPool.Names.GENERIC, new Runnable() { + @Override + public void run() { + onTimeout(); + } + }); + } + + @Override + public void onNodeAck(DiscoveryNode node, @Nullable Throwable t) { + if (!ackedUpdateTask.mustAck(node)) { + return; + } + if (t == null) { + logger.trace("ack received from node [{}], cluster_state update (version: {})", node, version); + } else { + this.lastFailure = t; + logger.debug("ack received from node [{}], cluster_state update (version: {})", t, node, version); + } + + assert countDown.get() > 0; + if (countDown.decrementAndGet() == 0) { + if (notified.compareAndSet(false, true) ) { + logger.trace("all expected nodes acknowledged cluster_state update (version: {})", version); + ackTimeoutCallback.cancel(true); + ackedUpdateTask.onAllNodesAcked(lastFailure); + } + } + } + + @Override + public void onTimeout() { + if (notified.compareAndSet(false, true)) { + logger.trace("timeout waiting for acknowledgement for cluster_state update (version: {})", version); + ackedUpdateTask.onAckTimeout(); + } + } + } + } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/discovery/AckClusterStatePublishResponseHandler.java b/src/main/java/org/elasticsearch/discovery/AckClusterStatePublishResponseHandler.java new file mode 100644 index 0000000000000..ee6f5c4473c51 --- /dev/null +++ b/src/main/java/org/elasticsearch/discovery/AckClusterStatePublishResponseHandler.java @@ -0,0 +1,71 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.discovery; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; + +/** + * Allows to wait for all nodes to reply to the publish of a new cluster state + * and notifies the {@link org.elasticsearch.discovery.Discovery.AckListener} + * so that the cluster state update can be acknowledged + */ +public class AckClusterStatePublishResponseHandler extends BlockingClusterStatePublishResponseHandler { + + private static final ESLogger logger = ESLoggerFactory.getLogger(AckClusterStatePublishResponseHandler.class.getName()); + + private final Discovery.AckListener ackListener; + + /** + * Creates a new AckClusterStatePublishResponseHandler + * @param nonMasterNodes number of nodes that are supposed to reply to a cluster state publish from master + * @param ackListener the {@link org.elasticsearch.discovery.Discovery.AckListener} to notify for each response + * gotten from non master nodes + */ + public AckClusterStatePublishResponseHandler(int nonMasterNodes, Discovery.AckListener ackListener) { + //Don't count the master as acknowledged, because it's not done yet + //otherwise we might end up with all the nodes but the master holding the latest cluster state + super(nonMasterNodes); + this.ackListener = ackListener; + } + + @Override + public void onResponse(DiscoveryNode node) { + super.onResponse(node); + onNodeAck(ackListener, node, null); + } + + @Override + public void onFailure(DiscoveryNode node, Throwable t) { + try { + super.onFailure(node, t); + } finally { + onNodeAck(ackListener, node, t); + } + } + + private void onNodeAck(final Discovery.AckListener ackListener, DiscoveryNode node, Throwable t) { + try { + ackListener.onNodeAck(node, t); + } catch (Throwable t1) { + logger.debug("error while processing ack for node [{}]", t1, node); + } + } +} diff --git a/src/main/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandler.java b/src/main/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandler.java new file mode 100644 index 0000000000000..8568e2b823e26 --- /dev/null +++ b/src/main/java/org/elasticsearch/discovery/BlockingClusterStatePublishResponseHandler.java @@ -0,0 +1,60 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.discovery; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + + +/** + * Default implementation of {@link ClusterStatePublishResponseHandler}, allows to await a reply + * to a cluster state publish from all non master nodes, up to a timeout + */ +public class BlockingClusterStatePublishResponseHandler implements ClusterStatePublishResponseHandler { + + private final CountDownLatch latch; + + /** + * Creates a new BlockingClusterStatePublishResponseHandler + * @param nonMasterNodes number of nodes that are supposed to reply to a cluster state publish from master + */ + public BlockingClusterStatePublishResponseHandler(int nonMasterNodes) { + //Don't count the master, as it's the one that does the publish + //the master won't call onResponse either + this.latch = new CountDownLatch(nonMasterNodes); + } + + @Override + public void onResponse(DiscoveryNode node) { + latch.countDown(); + } + + @Override + public void onFailure(DiscoveryNode node, Throwable t) { + latch.countDown(); + } + + @Override + public boolean awaitAllNodes(TimeValue timeout) throws InterruptedException { + return latch.await(timeout.millis(), TimeUnit.MILLISECONDS); + } +} diff --git a/src/main/java/org/elasticsearch/discovery/ClusterStatePublishResponseHandler.java b/src/main/java/org/elasticsearch/discovery/ClusterStatePublishResponseHandler.java new file mode 100644 index 0000000000000..9a0344f5b0cb2 --- /dev/null +++ b/src/main/java/org/elasticsearch/discovery/ClusterStatePublishResponseHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.elasticsearch.discovery; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.unit.TimeValue; + +/** + * Handles responses obtained when publishing a new cluster state from master to all non master nodes. + * Allows to await a reply from all non master nodes, up to a timeout + */ +public interface ClusterStatePublishResponseHandler { + + /** + * Called for each response obtained from non master nodes + * @param node the node that replied to the publish event + */ + void onResponse(DiscoveryNode node); + + /** + * Called for each failure obtained from non master nodes + * @param node the node that replied to the publish event + */ + void onFailure(DiscoveryNode node, Throwable t); + + /** + * Allows to wait for all non master nodes to reply to the publish event up to a timeout + * @param timeout the timeout + * @return true if the timeout expired or not, false otherwise + * @throws InterruptedException + */ + boolean awaitAllNodes(TimeValue timeout) throws InterruptedException; +} diff --git a/src/main/java/org/elasticsearch/discovery/Discovery.java b/src/main/java/org/elasticsearch/discovery/Discovery.java index c8597349f879e..5dae73de02eab 100644 --- a/src/main/java/org/elasticsearch/discovery/Discovery.java +++ b/src/main/java/org/elasticsearch/discovery/Discovery.java @@ -24,8 +24,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.rest.RestStatus; @@ -60,6 +60,14 @@ public interface Discovery extends LifecycleComponent { /** * Publish all the changes to the cluster from the master (can be called just by the master). The publish * process should not publish this state to the master as well! (the master is sending it...). + * + * The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether + * they updated their own cluster state or not. */ - void publish(ClusterState clusterState); + void publish(ClusterState clusterState, AckListener ackListener); + + public static interface AckListener { + void onNodeAck(DiscoveryNode node, @Nullable Throwable t); + void onTimeout(); + } } diff --git a/src/main/java/org/elasticsearch/discovery/DiscoveryService.java b/src/main/java/org/elasticsearch/discovery/DiscoveryService.java index 206120c1b3be3..69e890435b91a 100644 --- a/src/main/java/org/elasticsearch/discovery/DiscoveryService.java +++ b/src/main/java/org/elasticsearch/discovery/DiscoveryService.java @@ -107,11 +107,13 @@ public String nodeDescription() { /** * Publish all the changes to the cluster from the master (can be called just by the master). The publish * process should not publish this state to the master as well! (the master is sending it...). + * + * The {@link org.elasticsearch.discovery.Discovery.AckListener} allows to acknowledge the publish + * event based on the response gotten from all nodes */ - public void publish(ClusterState clusterState) { - if (!lifecycle.started()) { - return; + public void publish(ClusterState clusterState, Discovery.AckListener ackListener) { + if (lifecycle.started()) { + discovery.publish(clusterState, ackListener); } - discovery.publish(clusterState); } } diff --git a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index f3eedc533fbec..049437bdf7862 100644 --- a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -35,8 +35,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.discovery.InitialStateDiscoveryListener; +import org.elasticsearch.discovery.*; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.transport.TransportService; @@ -44,8 +43,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -58,6 +55,8 @@ */ public class LocalDiscovery extends AbstractLifecycleComponent implements Discovery { + private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0]; + private final TransportService transportService; private final ClusterService clusterService; private final DiscoveryNodeService discoveryNodeService; @@ -277,24 +276,33 @@ public String nodeDescription() { return clusterName.value() + "/" + localNode.id(); } - @Override - public void publish(ClusterState clusterState) { + public void publish(ClusterState clusterState, final Discovery.AckListener ackListener) { if (!master) { throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master"); } + LocalDiscovery[] members = members(); + if (members.length > 0) { + publish(members, clusterState, new AckClusterStatePublishResponseHandler(members.length - 1, ackListener)); + } + } + + private LocalDiscovery[] members() { ClusterGroup clusterGroup = clusterGroups.get(clusterName); if (clusterGroup == null) { - // nothing to publish to - return; + return NO_MEMBERS; } + Queue members = clusterGroup.members(); + return members.toArray(new LocalDiscovery[members.size()]); + } + + private void publish(LocalDiscovery[] members, ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) { + try { // we do the marshaling intentionally, to check it works well... final byte[] clusterStateBytes = Builder.toBytes(clusterState); - LocalDiscovery[] members = clusterGroup.members().toArray(new LocalDiscovery[0]); - final CountDownLatch latch = new CountDownLatch(members.length); - for (LocalDiscovery discovery : members) { + + for (final LocalDiscovery discovery : members) { if (discovery.master) { - latch.countDown(); continue; } final ClusterState nodeSpecificClusterState = ClusterState.Builder.fromBytes(clusterStateBytes, discovery.localNode); @@ -318,23 +326,23 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Throwable t) { logger.error("unexpected failure during [{}]", t, source); - latch.countDown(); + publishResponseHandler.onFailure(discovery.localNode, t); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { sendInitialStateEventIfNeeded(); - latch.countDown(); + publishResponseHandler.onResponse(discovery.localNode); } }); } else { - latch.countDown(); + publishResponseHandler.onResponse(discovery.localNode); } } if (publishTimeout.millis() > 0) { try { - boolean awaited = latch.await(publishTimeout.millis(), TimeUnit.MILLISECONDS); + boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout); if (!awaited) { logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 170bc841d3510..678b4c673a0f6 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -262,13 +262,13 @@ public NodeService nodeService() { } @Override - public void publish(ClusterState clusterState) { + public void publish(ClusterState clusterState, AckListener ackListener) { if (!master) { throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master"); } latestDiscoNodes = clusterState.nodes(); nodesFD.updateNodes(clusterState.nodes()); - publishClusterState.publish(clusterState); + publishClusterState.publish(clusterState, ackListener); } private void asyncJoinCluster() { diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index f8618ac22e3b9..13536a1aa3f9d 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -30,14 +30,15 @@ import org.elasticsearch.common.io.stream.*; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler; +import org.elasticsearch.discovery.ClusterStatePublishResponseHandler; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; /** * @@ -78,16 +79,18 @@ public void close() { transportService.removeHandler(PublishClusterStateRequestHandler.ACTION); } - public void publish(ClusterState clusterState) { + public void publish(ClusterState clusterState, final Discovery.AckListener ackListener) { + publish(clusterState, new AckClusterStatePublishResponseHandler(clusterState.nodes().size()-1, ackListener)); + } + + private void publish(ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) { + DiscoveryNode localNode = nodesProvider.nodes().localNode(); Map serializedStates = Maps.newHashMap(); - final CountDownLatch latch = new CountDownLatch(clusterState.nodes().size()); for (final DiscoveryNode node : clusterState.nodes()) { if (node.equals(localNode)) { - // no need to send to our self - latch.countDown(); continue; } // try and serialize the cluster state once (or per version), so we don't serialize it @@ -104,7 +107,7 @@ public void publish(ClusterState clusterState) { serializedStates.put(node.version(), bytes); } catch (Throwable e) { logger.warn("failed to serialize cluster_state before publishing it to node {}", e, node); - latch.countDown(); + publishResponseHandler.onFailure(node, e); continue; } } @@ -120,24 +123,25 @@ public void publish(ClusterState clusterState) { @Override public void handleResponse(TransportResponse.Empty response) { - latch.countDown(); + publishResponseHandler.onResponse(node); } @Override public void handleException(TransportException exp) { logger.debug("failed to send cluster state to [{}]", exp, node); - latch.countDown(); + publishResponseHandler.onFailure(node, exp); } }); } catch (Throwable t) { - latch.countDown(); + logger.debug("error sending cluster state to [{}]", t, node); + publishResponseHandler.onFailure(node, t); } } if (publishTimeout.millis() > 0) { // only wait if the publish timeout is configured... try { - boolean awaited = latch.await(publishTimeout.millis(), TimeUnit.MILLISECONDS); + boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout); if (!awaited) { logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout); }