Skip to content

Commit

Permalink
[STATE] Observe cluster state on health request
Browse files Browse the repository at this point in the history
Today we use busy waiting and sampling when we execute HealthReqeusts
on the master. This is tricky sicne we might sample a not yet fully applied
cluster state and make a decsions base on the partial cluster state. This can
lead to ugly problems since requests might be routed to nodes where shards are
already marked as relocated but on the actual cluster state they are still started.
Yet, this window is very small usually it can lead to ugly test failures.

This commit moves the health request over to a listener pattern that gets the actual
applied cluster state.

Closes #8350
  • Loading branch information
s1monw committed Nov 7, 2014
1 parent 9192219 commit cc8e8e6
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 121 deletions.
Expand Up @@ -20,18 +20,17 @@
package org.elasticsearch.action.admin.cluster.health;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -56,8 +55,8 @@ public TransportClusterHealthAction(Settings settings, TransportService transpor

@Override
protected String executor() {
// we block here...
return ThreadPool.Names.GENERIC;
// this should be executing quickly no need to fork off
return ThreadPool.Names.SAME;
}

@Override
Expand All @@ -77,11 +76,8 @@ protected ClusterHealthResponse newResponse() {

@Override
protected void masterOperation(final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener<ClusterHealthResponse> listener) throws ElasticsearchException {
long endTime = System.currentTimeMillis() + request.timeout().millis();

if (request.waitForEvents() != null) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<ElasticsearchException> failure = new AtomicReference<>();
final long endTime = System.currentTimeMillis() + request.timeout().millis();
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
Expand All @@ -90,33 +86,30 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
final long timeoutInMillis = Math.max(0, endTime - System.currentTimeMillis());
final TimeValue newTimeout = TimeValue.timeValueMillis(timeoutInMillis);
request.timeout(newTimeout);
executeHealth(request, listener);
}

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
failure.set(new ElasticsearchException("Error while waiting for events", t));
latch.countDown();
listener.onFailure(t);
}

@Override
public boolean runOnlyOnMaster() {
return !request.local();
}
});

try {
latch.await(request.timeout().millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}
if (failure.get() != null) {
throw failure.get();
}
} else {
executeHealth(request, listener);
}

}

private void executeHealth(final ClusterHealthRequest request, final ActionListener<ClusterHealthResponse> listener) {
int waitFor = 5;
if (request.waitForStatus() == null) {
waitFor--;
Expand All @@ -133,99 +126,134 @@ public boolean runOnlyOnMaster() {
if (request.indices().length == 0) { // check that they actually exists in the meta data
waitFor--;
}
if (waitFor == 0) {

assert waitFor >= 0;
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger);
final ClusterState state = observer.observedState();
if (waitFor == 0 || request.timeout().millis() == 0) {
// we check for a timeout here since this method might be called from the wait_for_events
// response handler which might have timed out already.
ClusterHealthResponse response = clusterHealth(request, state);
response.timedOut = request.timeout().millis() == 0;
// no need to wait for anything
ClusterState clusterState = clusterService.state();
listener.onResponse(clusterHealth(request, clusterState));
listener.onResponse(response);
return;
}
while (true) {
int waitForCounter = 0;
ClusterState clusterState = clusterService.state();
ClusterHealthResponse response = clusterHealth(request, clusterState);
if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) {
waitForCounter++;
final int concreteWaitFor = waitFor;
final ClusterStateObserver.ChangePredicate validationPredicate = new ClusterStateObserver.ValidationPredicate() {
@Override
protected boolean validate(ClusterState newState) {
return newState.status() == ClusterState.ClusterStateStatus.APPLIED && validateRequest(request, newState, concreteWaitFor);
}
if (request.waitForRelocatingShards() != -1 && response.getRelocatingShards() <= request.waitForRelocatingShards()) {
waitForCounter++;
};

final ClusterStateObserver.Listener stateListener = new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState clusterState) {
listener.onResponse(getResponse(request, clusterState, concreteWaitFor, false));
}
if (request.waitForActiveShards() != -1 && response.getActiveShards() >= request.waitForActiveShards()) {

@Override
public void onClusterServiceClose() {
listener.onFailure(new ElasticsearchIllegalStateException("ClusterService was close during health call"));
}

@Override
public void onTimeout(TimeValue timeout) {
final ClusterState clusterState = clusterService.state();
final ClusterHealthResponse response = getResponse(request, clusterState, concreteWaitFor, true);
listener.onResponse(response);
}
};
if (state.status() == ClusterState.ClusterStateStatus.APPLIED && validateRequest(request, state, concreteWaitFor)) {
stateListener.onNewClusterState(state);
} else {
observer.waitForNextChange(stateListener, validationPredicate, request.timeout());
}
}

private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor) {
ClusterHealthResponse response = clusterHealth(request, clusterState);
return prepareResponse(request, response, clusterState, waitFor);
}

private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState, final int waitFor, boolean timedOut) {
ClusterHealthResponse response = clusterHealth(request, clusterState);
boolean valid = prepareResponse(request, response, clusterState, waitFor);
assert valid || timedOut;
response.timedOut = timedOut;
return response;
}

private boolean prepareResponse(final ClusterHealthRequest request, final ClusterHealthResponse response, ClusterState clusterState, final int waitFor) {
int waitForCounter = 0;
if (request.waitForStatus() != null && response.getStatus().value() <= request.waitForStatus().value()) {
waitForCounter++;
}
if (request.waitForRelocatingShards() != -1 && response.getRelocatingShards() <= request.waitForRelocatingShards()) {
waitForCounter++;
}
if (request.waitForActiveShards() != -1 && response.getActiveShards() >= request.waitForActiveShards()) {
waitForCounter++;
}
if (request.indices().length > 0) {
try {
clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), request.indices());
waitForCounter++;
} catch (IndexMissingException e) {
response.status = ClusterHealthStatus.RED; // no indices, make sure its RED
// missing indices, wait a bit more...
}
if (request.indices().length > 0) {
try {
clusterState.metaData().concreteIndices(IndicesOptions.strictExpand(), request.indices());
}
if (!request.waitForNodes().isEmpty()) {
if (request.waitForNodes().startsWith(">=")) {
int expected = Integer.parseInt(request.waitForNodes().substring(2));
if (response.getNumberOfNodes() >= expected) {
waitForCounter++;
} catch (IndexMissingException e) {
response.status = ClusterHealthStatus.RED; // no indices, make sure its RED
// missing indices, wait a bit more...
}
}
if (!request.waitForNodes().isEmpty()) {
if (request.waitForNodes().startsWith(">=")) {
int expected = Integer.parseInt(request.waitForNodes().substring(2));
if (response.getNumberOfNodes() >= expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("ge(")) {
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
if (response.getNumberOfNodes() >= expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("<=")) {
int expected = Integer.parseInt(request.waitForNodes().substring(2));
if (response.getNumberOfNodes() <= expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("le(")) {
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
if (response.getNumberOfNodes() <= expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith(">")) {
int expected = Integer.parseInt(request.waitForNodes().substring(1));
if (response.getNumberOfNodes() > expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("gt(")) {
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
if (response.getNumberOfNodes() > expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("<")) {
int expected = Integer.parseInt(request.waitForNodes().substring(1));
if (response.getNumberOfNodes() < expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("lt(")) {
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
if (response.getNumberOfNodes() < expected) {
waitForCounter++;
}
} else {
int expected = Integer.parseInt(request.waitForNodes());
if (response.getNumberOfNodes() == expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("ge(")) {
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
if (response.getNumberOfNodes() >= expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("<=")) {
int expected = Integer.parseInt(request.waitForNodes().substring(2));
if (response.getNumberOfNodes() <= expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("le(")) {
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
if (response.getNumberOfNodes() <= expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith(">")) {
int expected = Integer.parseInt(request.waitForNodes().substring(1));
if (response.getNumberOfNodes() > expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("gt(")) {
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
if (response.getNumberOfNodes() > expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("<")) {
int expected = Integer.parseInt(request.waitForNodes().substring(1));
if (response.getNumberOfNodes() < expected) {
waitForCounter++;
}
} else if (request.waitForNodes().startsWith("lt(")) {
int expected = Integer.parseInt(request.waitForNodes().substring(3, request.waitForNodes().length() - 1));
if (response.getNumberOfNodes() < expected) {
waitForCounter++;
}
} else {
int expected = Integer.parseInt(request.waitForNodes());
if (response.getNumberOfNodes() == expected) {
waitForCounter++;
}
}
if (waitForCounter == waitFor) {
listener.onResponse(response);
return;
}
if (System.currentTimeMillis() > endTime) {
response.timedOut = true;
listener.onResponse(response);
return;
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
response.timedOut = true;
listener.onResponse(response);
return;
}
}
return waitForCounter == waitFor;
}


Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/elasticsearch/cluster/ClusterService.java
Expand Up @@ -92,7 +92,9 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
void remove(LocalNodeMasterListener listener);

/**
* Adds a cluster state listener that will timeout after the provided timeout.
* Adds a cluster state listener that will timeout after the provided timeout,
* and is executed after the clusterstate has been successfully applied ie. is
* in state {@link org.elasticsearch.cluster.ClusterState.ClusterStateStatus#APPLIED}
*/
void add(TimeValue timeout, TimeoutClusterStateListener listener);

Expand Down
Expand Up @@ -42,7 +42,6 @@ public boolean apply(ClusterChangedEvent changedEvent) {
return changedEvent.previousState().version() != changedEvent.state().version();
}
};

private ClusterService clusterService;
volatile TimeValue timeOutValue;

Expand Down Expand Up @@ -241,7 +240,7 @@ public void onTimeout(TimeValue timeout) {
}
}

public interface Listener {
public static interface Listener {

/** called when a new state is observed */
void onNewClusterState(ClusterState state);
Expand Down

0 comments on commit cc8e8e6

Please sign in to comment.