Skip to content

Commit

Permalink
Discovery: make sure NodeJoinController.ElectionCallback is always ca…
Browse files Browse the repository at this point in the history
…lled from the update cluster state thread

This is important for correct handling of the joining thread. This causes assertions to trip in our test runs. See http://build-us-00.elastic.co/job/es_g1gc_master_metal/11653/ as an example

Closes #12372
  • Loading branch information
bleskes authored and rmuir committed Jul 22, 2015
1 parent 3318537 commit b81ccba
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,13 @@ public TimeValue getMaxTaskWaitTime() {
return updateTasksExecutor.getMaxTaskWaitTime();
}


/** asserts that the current thread is the cluster state update thread */
public boolean assertClusterStateThread() {
assert Thread.currentThread().getName().contains(InternalClusterService.UPDATE_THREAD_NAME) : "not called from the cluster state update thread";
return true;
}

static abstract class SourcePrioritizedRunnable extends PrioritizedRunnable {
protected final String source;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -77,11 +79,11 @@ public NodeJoinController(ClusterService clusterService, RoutingService routingS
* @param callback the result of the election (success or failure) will be communicated by calling methods on this
* object
**/
public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final Callback callback) {
public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {
assert accumulateJoins.get() : "waitToBeElectedAsMaster is called we are not accumulating joins";

final CountDownLatch done = new CountDownLatch(1);
final ElectionContext newContext = new ElectionContext(callback, requiredMasterJoins) {
final ElectionContext newContext = new ElectionContext(callback, requiredMasterJoins, clusterService) {
@Override
void onClose() {
if (electionContext.compareAndSet(this, null)) {
Expand All @@ -95,7 +97,7 @@ void onClose() {

if (electionContext.compareAndSet(null, newContext) == false) {
// should never happen, but be conservative
callback.onFailure(new IllegalStateException("double waiting for election"));
failContext(newContext, new IllegalStateException("double waiting for election"));
return;
}
try {
Expand All @@ -118,13 +120,35 @@ void onClose() {
logger.trace("timed out waiting to be elected. waited [{}]. pending node joins [{}]", timeValue, pendingNodes);
}
// callback will clear the context, if it's active
newContext.onFailure(new ElasticsearchTimeoutException("timed out waiting to be elected"));
failContext(newContext, new ElasticsearchTimeoutException("timed out waiting to be elected"));
} catch (Throwable t) {
logger.error("unexpected failure while waiting for incoming joins", t);
newContext.onFailure(t);
failContext(newContext, "unexpected failure while waiting for pending joins", t);
}
}

private void failContext(final ElectionContext context, final Throwable throwable) {
failContext(context, throwable.getMessage(), throwable);
}

/** utility method to fail the given election context under the cluster state thread */
private void failContext(final ElectionContext context, final String reason, final Throwable throwable) {
clusterService.submitStateUpdateTask("zen-disco-join(failure [" + reason + "])", Priority.IMMEDIATE, new ClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
context.onFailure(throwable);
return currentState;
}

@Override
public void onFailure(String source, Throwable updateFailure) {
logger.warn("unexpected error while trying to fail election context due to [{}]. original exception [{}]", updateFailure, reason, throwable);
context.onFailure(updateFailure);
}
});

}

/**
* Accumulates any future incoming join request. Pending join requests will be processed in the final steps of becoming a
* master or when {@link #stopAccumulatingJoins()} is called.
Expand Down Expand Up @@ -252,30 +276,42 @@ private void processJoins(String reason) {
}


public interface Callback {
public interface ElectionCallback {
/**
* called when the local node is successfully elected as master
* Guaranteed to be called on the cluster state update thread
**/
void onElectedAsMaster(ClusterState state);

/**
* called when the local node failed to be elected as master
* Guaranteed to be called on the cluster state update thread
**/
void onFailure(Throwable t);
}

static abstract class ElectionContext implements Callback {
private final Callback callback;
static abstract class ElectionContext implements ElectionCallback {
private final ElectionCallback callback;
private final int requiredMasterJoins;
private final ClusterService clusterService;

/** set to true after enough joins have been seen and a cluster update task is submitted to become master */
final AtomicBoolean pendingSetAsMasterTask = new AtomicBoolean();
final AtomicBoolean closed = new AtomicBoolean();

ElectionContext(Callback callback, int requiredMasterJoins) {
ElectionContext(ElectionCallback callback, int requiredMasterJoins, ClusterService clusterService) {
this.callback = callback;
this.requiredMasterJoins = requiredMasterJoins;
this.clusterService = clusterService;
}

abstract void onClose();

@Override
public void onElectedAsMaster(ClusterState state) {
assert pendingSetAsMasterTask.get() : "onElectedAsMaster called but pendingSetAsMasterTask is not set";
assertClusterStateThread();
assert state.nodes().localNodeMaster() : "onElectedAsMaster called but local node is not master";
if (closed.compareAndSet(false, true)) {
try {
onClose();
Expand All @@ -287,6 +323,7 @@ public void onElectedAsMaster(ClusterState state) {

@Override
public void onFailure(Throwable t) {
assertClusterStateThread();
if (closed.compareAndSet(false, true)) {
try {
onClose();
Expand All @@ -295,6 +332,10 @@ public void onFailure(Throwable t) {
}
}
}

private void assertClusterStateThread() {
assert clusterService instanceof InternalClusterService == false || ((InternalClusterService) clusterService).assertClusterStateThread();
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.InternalClusterService;
Expand Down Expand Up @@ -372,7 +371,7 @@ private void innerJoinCluster() {
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.Callback() {
new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
joinThreadControl.markThreadAsDone(currentThread);
Expand Down Expand Up @@ -1307,7 +1306,7 @@ public void start() {
}

private void assertClusterStateThread() {
assert Thread.currentThread().getName().contains(InternalClusterService.UPDATE_THREAD_NAME) : "not called from the cluster state update thread";
assert clusterService instanceof InternalClusterService == false || ((InternalClusterService) clusterService).assertClusterStateThread();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import static org.hamcrest.Matchers.*;

@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@LuceneTestCase.AwaitsFix(bugUrl = "boaz is looking at failures in this test class. Example failure: http://build-us-00.elastic.co/job/es_g1gc_master_metal/11653/")
public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void onFailure(Throwable t) {

@Override
protected void doRun() throws Exception {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.Callback() {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().localNodeMaster(), equalTo(true));
Expand Down Expand Up @@ -205,7 +205,7 @@ public void onFailure(Throwable t) {

@Override
protected void doRun() throws Exception {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.Callback() {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().localNodeMaster(), equalTo(true));
Expand Down Expand Up @@ -323,7 +323,7 @@ public void testMasterElectionTimeout() throws InterruptedException {

final AtomicReference<Throwable> failure = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueMillis(1), new NodeJoinController.Callback() {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueMillis(1), new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().localNodeMaster(), equalTo(true));
Expand Down Expand Up @@ -440,7 +440,7 @@ protected void doRun() throws Exception {
logger.info("--> waiting to be elected as master (required joins [{}])", requiredJoins);
final AtomicReference<Throwable> failure = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.Callback() {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().localNodeMaster(), equalTo(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ synchronized public void submitStateUpdateTask(String source, Priority priority,
try {
newState = updateTask.execute(previousClusterState);
} catch (Exception e) {
throw new ElasticsearchException("failed to process cluster state update task [" + source + "]", e);
updateTask.onFailure(source, new ElasticsearchException("failed to process cluster state update task [" + source + "]", e));
return;
}
setStateAndNotifyListeners(newState);
if (updateTask instanceof ProcessedClusterStateUpdateTask) {
Expand Down

0 comments on commit b81ccba

Please sign in to comment.