Skip to content

Commit

Permalink
[Discovery] add a debug log if a node responds to a publish request a…
Browse files Browse the repository at this point in the history
…fter publishing timed out.
  • Loading branch information
bleskes committed Aug 22, 2014
1 parent afe1f91 commit 4393336
Showing 1 changed file with 13 additions and 7 deletions.
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.transport.*;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/**
*
Expand Down Expand Up @@ -82,12 +83,15 @@ public void publish(ClusterState clusterState, final Discovery.AckListener ackLi
publish(clusterState, new AckClusterStatePublishResponseHandler(clusterState.nodes().size() - 1, ackListener));
}

private void publish(ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) {
private void publish(final ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) {

DiscoveryNode localNode = nodesProvider.nodes().localNode();

Map<Version, BytesReference> serializedStates = Maps.newHashMap();

final AtomicBoolean timedOutWaitingForNodes = new AtomicBoolean(false);
final TimeValue publishTimeout = discoverySettings.getPublishTimeout();

for (final DiscoveryNode node : clusterState.nodes()) {
if (node.equals(localNode)) {
continue;
Expand Down Expand Up @@ -122,28 +126,30 @@ private void publish(ClusterState clusterState, final ClusterStatePublishRespons

@Override
public void handleResponse(TransportResponse.Empty response) {
if (timedOutWaitingForNodes.get()) {
logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, clusterState.version(), publishTimeout);
}
publishResponseHandler.onResponse(node);
}

@Override
public void handleException(TransportException exp) {
logger.debug("failed to send cluster state to [{}]", exp, node);
logger.debug("failed to send cluster state to {}", exp, node);
publishResponseHandler.onFailure(node, exp);
}
});
} catch (Throwable t) {
logger.debug("error sending cluster state to [{}]", t, node);
logger.debug("error sending cluster state to {}", t, node);
publishResponseHandler.onFailure(node, t);
}
}

TimeValue publishTimeout = discoverySettings.getPublishTimeout();
if (publishTimeout.millis() > 0) {
// only wait if the publish timeout is configured...
try {
boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout);
if (!awaited) {
logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout);
timedOutWaitingForNodes.set(!publishResponseHandler.awaitAllNodes(publishTimeout));
if (timedOutWaitingForNodes.get()) {
logger.debug("timed out waiting for all nodes to process published state [{}] (timeout [{}])", clusterState.version(), publishTimeout);
}
} catch (InterruptedException e) {
// ignore & restore interrupt
Expand Down

0 comments on commit 4393336

Please sign in to comment.