Skip to content

Commit

Permalink
A new ClusterStateStatus to indicate cluster state life cycles
Browse files Browse the repository at this point in the history
When the ClusterService applies a new cluster state, it is first assigned as the new active one and then all listeners are called. Some of ES's features sample the current state and try to take action on it (for example index a document). If that fails, they will wait for change in the cluster state and try again (for example, wait for a shard to start and try indexing again).

If you're unlucky you sample the state after it has been assigned as the "active" state but before all listeners has done the work. In this cases the action take (i.e., indexing a doc) will still fail (as the shard is not yet started) but waiting for a new state may take a long time or fail.

This commit adds a new ClusterStateStatus that allows to better track the stages a cluster state goes through (currently `RECEIVED`, `BEING_APPLIED` & `APPLIED`). This allows detecting that a cluster state is not yet fully applied and retry without waiting for a new state to arrive.

This commit also adds a utility class , ClusterStateObserver, to make this pattern slightly simpler and avoid common pit falls.

Closes #5741
  • Loading branch information
bleskes committed Apr 22, 2014
1 parent 7824506 commit cdbab3c
Show file tree
Hide file tree
Showing 10 changed files with 547 additions and 227 deletions.
Expand Up @@ -26,7 +26,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -91,11 +91,11 @@ public void execute(Request request, ActionListener<Response> listener) {

@Override
protected void doExecute(final Request request, final ActionListener<Response> listener) {
innerExecute(request, listener, false);
innerExecute(request, listener, new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger), false);
}

private void innerExecute(final Request request, final ActionListener<Response> listener, final boolean retrying) {
final ClusterState clusterState = clusterService.state();
private void innerExecute(final Request request, final ActionListener<Response> listener, final ClusterStateObserver observer, final boolean retrying) {
final ClusterState clusterState = observer.observedState();
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster() || localExecute(request)) {
// check for block, if blocked, retry, else, execute locally
Expand All @@ -105,37 +105,32 @@ private void innerExecute(final Request request, final ActionListener<Response>
listener.onFailure(blockException);
return;
}
clusterService.add(request.masterNodeTimeout(), new TimeoutClusterStateListener() {
@Override
public void postAdded() {
ClusterBlockException blockException = checkBlock(request, clusterService.state());
if (blockException == null || !blockException.retryable()) {
clusterService.remove(this);
innerExecute(request, listener, false);
}
}

@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(blockException);
}
logger.trace("can't execute due to a cluster block: [{}], retrying", blockException);
observer.waitForNextChange(
new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
innerExecute(request, listener, observer, false);
}

@Override
public void onTimeout(TimeValue timeout) {
clusterService.remove(this);
listener.onFailure(blockException);
}
@Override
public void onClusterServiceClose() {
listener.onFailure(blockException);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
ClusterBlockException blockException = checkBlock(request, event.state());
if (blockException == null || !blockException.retryable()) {
clusterService.remove(this);
innerExecute(request, listener, false);
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(blockException);
}
}, new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
ClusterBlockException blockException = checkBlock(request, newState);
return (blockException == null || !blockException.retryable());
}
}
}
});
);

} else {
try {
threadPool.executor(executor).execute(new Runnable() {
Expand All @@ -158,38 +153,35 @@ public void run() {
listener.onFailure(new MasterNotDiscoveredException());
} else {
logger.debug("no known master node, scheduling a retry");
observer.waitForNextChange(
new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
innerExecute(request, listener, observer, true);
}

clusterService.add(request.masterNodeTimeout(), new TimeoutClusterStateListener() {
@Override
public void postAdded() {
ClusterState clusterStateV2 = clusterService.state();
if (clusterStateV2.nodes().masterNodeId() != null) {
// now we have a master, try and execute it...
clusterService.remove(this);
innerExecute(request, listener, true);
}
}

@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
clusterService.remove(this);
listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]"));
}
@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new MasterNotDiscoveredException("waited for [" + timeout + "]"));
}
}, new ClusterStateObserver.ChangePredicate() {
@Override
public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus,
ClusterState newState, ClusterState.ClusterStateStatus newStatus) {
return newState.nodes().masterNodeId() != null;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.nodesDelta().masterNodeChanged()) {
clusterService.remove(this);
innerExecute(request, listener, true);
@Override
public boolean apply(ClusterChangedEvent event) {
return event.nodesDelta().masterNodeChanged();
}
}
}
});
);
}
return;
}
Expand All @@ -216,38 +208,28 @@ public void handleException(final TransportException exp) {
// we want to retry here a bit to see if a new master is elected
logger.debug("connection exception while trying to forward request to master node [{}], scheduling a retry. Error: [{}]",
nodes.masterNode(), exp.getDetailedMessage());
clusterService.add(request.masterNodeTimeout(), new TimeoutClusterStateListener() {
@Override
public void postAdded() {
ClusterState clusterStateV2 = clusterService.state();
// checking for changes that happened while adding the listener. We can't check using cluster
// state versions as mater election doesn't increase version numbers
if (clusterState != clusterStateV2) {
clusterService.remove(this);
innerExecute(request, listener, false);
}
}

@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
clusterService.remove(this);
listener.onFailure(new MasterNotDiscoveredException());
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.nodesDelta().masterNodeChanged()) {
clusterService.remove(this);
innerExecute(request, listener, false);
}
}
});
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
innerExecute(request, listener, observer, false);
}

@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(new MasterNotDiscoveredException());
}
}, new ClusterStateObserver.EventPredicate() {
@Override
public boolean apply(ClusterChangedEvent event) {
return event.nodesDelta().masterNodeChanged();
}
}
);
} else {
listener.onFailure(exp);
}
Expand Down

0 comments on commit cdbab3c

Please sign in to comment.