Skip to content

Commit

Permalink
Add local node to cluster state
Browse files Browse the repository at this point in the history
Today, the tribe node needs the local node so it adds it when it starts, but other APIs would benefit from adding the local node, also, adding the local node should be done in a cleaner manner, where it belongs, which is right after the discovery service starts in the cluster service
closes #6811
  • Loading branch information
kimchy committed Jul 10, 2014
1 parent 280102a commit 995cd77
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 66 deletions.
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
Expand Down Expand Up @@ -130,6 +131,24 @@ protected void doStart() throws ElasticsearchException {
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(daemonThreadFactory(settings, "clusterService#updateTask"));
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());
discoveryService.addLifecycleListener(new LifecycleListener() {
@Override
public void afterStart() {
submitStateUpdateTask("update local node", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return ClusterState.builder(currentState)
.nodes(DiscoveryNodes.builder(currentState.nodes()).put(localNode()).localNodeId(localNode().id()))
.build();
}

@Override
public void onFailure(String source, Throwable t) {
logger.warn("failed ot update local node", t);
}
});
}
});
}

@Override
Expand Down
Expand Up @@ -22,17 +22,29 @@
/**
*
*/
public interface LifecycleListener {
public abstract class LifecycleListener {

void beforeStart();
public void beforeStart() {

void afterStart();
}

void beforeStop();
public void afterStart() {

void afterStop();
}

void beforeClose();
public void beforeStop() {

void afterClose();
}

public void afterStop() {

}

public void beforeClose() {

}

public void afterClose() {

}
}
66 changes: 37 additions & 29 deletions src/main/java/org/elasticsearch/discovery/DiscoveryService.java
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.discovery;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
Expand All @@ -37,11 +38,28 @@
*/
public class DiscoveryService extends AbstractLifecycleComponent<DiscoveryService> {

private final TimeValue initialStateTimeout;
private static class InitialStateListener implements InitialStateDiscoveryListener {

private final Discovery discovery;
private final CountDownLatch latch = new CountDownLatch(1);
private volatile boolean initialStateReceived;

@Override
public void initialStateProcessed() {
initialStateReceived = true;
latch.countDown();
}

public boolean waitForInitialState(TimeValue timeValue) throws InterruptedException {
if (timeValue.millis() > 0) {
latch.await(timeValue.millis(), TimeUnit.MILLISECONDS);
}
return initialStateReceived;
}
}

private volatile boolean initialStateReceived;
private final TimeValue initialStateTimeout;
private final Discovery discovery;
private InitialStateListener initialStateListener;

@Inject
public DiscoveryService(Settings settings, Discovery discovery) {
Expand All @@ -52,38 +70,28 @@ public DiscoveryService(Settings settings, Discovery discovery) {

@Override
protected void doStart() throws ElasticsearchException {
final CountDownLatch latch = new CountDownLatch(1);
InitialStateDiscoveryListener listener = new InitialStateDiscoveryListener() {
@Override
public void initialStateProcessed() {
latch.countDown();
}
};
discovery.addListener(listener);
initialStateListener = new InitialStateListener();
discovery.addListener(initialStateListener);
discovery.start();
logger.info(discovery.nodeDescription());
}

public void waitForInitialState() {
try {
discovery.start();
if (initialStateTimeout.millis() > 0) {
try {
logger.trace("waiting for {} for the initial state to be set by the discovery", initialStateTimeout);
if (latch.await(initialStateTimeout.millis(), TimeUnit.MILLISECONDS)) {
logger.trace("initial state set from discovery");
initialStateReceived = true;
} else {
initialStateReceived = false;
logger.warn("waited for {} and no initial state was set by the discovery", initialStateTimeout);
}
} catch (InterruptedException e) {
// ignore
}
if (!initialStateListener.waitForInitialState(initialStateTimeout)) {
logger.warn("waited for {} and no initial state was set by the discovery", initialStateTimeout);
}
} finally {
discovery.removeListener(listener);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
}
logger.info(discovery.nodeDescription());
}

@Override
protected void doStop() throws ElasticsearchException {
if (initialStateListener != null) {
discovery.removeListener(initialStateListener);
}
discovery.stop();
}

Expand All @@ -101,7 +109,7 @@ public DiscoveryNode localNode() {
* on {@link #doStart()}.
*/
public boolean initialStateReceived() {
return initialStateReceived;
return initialStateListener.initialStateReceived;
}

public String nodeDescription() {
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
Expand Down Expand Up @@ -245,6 +246,7 @@ public Node start() {
injector.getInstance(RestController.class).start();
injector.getInstance(TransportService.class).start();
DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start();
discoService.waitForInitialState();

// gateway should start after disco, so it can try and recovery from gateway on "start"
injector.getInstance(GatewayService.class).start();
Expand Down
30 changes: 0 additions & 30 deletions src/main/java/org/elasticsearch/tribe/TribeService.java
Expand Up @@ -167,36 +167,6 @@ public TribeService(Settings settings, ClusterService clusterService) {

@Override
protected void doStart() throws ElasticsearchException {
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("updating local node id", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// add our local node to the mix...
return ClusterState.builder(currentState)
.nodes(DiscoveryNodes.builder(currentState.nodes()).put(clusterService.localNode()).localNodeId(clusterService.localNode().id()))
.build();
}

@Override
public void onFailure(String source, Throwable t) {
try {
logger.error("{}", t, source);
} finally {
latch.countDown();
}
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ElasticsearchIllegalStateException("Interrupted while starting [" + this.getClass().getSimpleName() + "]", e);
}
for (InternalNode node : nodes) {
try {
node.start();
Expand Down
Expand Up @@ -61,6 +61,7 @@ public void simpleMinimumMasterNodes() throws Exception {
logger.info("--> should be blocked, no master...");
ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
assertThat(state.nodes().size(), equalTo(1)); // verify that we still see the local node in the cluster state

logger.info("--> start second node, cluster should be formed");
internalCluster().startNode(settings);
Expand Down Expand Up @@ -102,6 +103,7 @@ public boolean apply(Object obj) {
});
state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true));
assertThat(state.nodes().size(), equalTo(1)); // verify that we still see the local node in the cluster state

logger.info("--> starting the previous master node again...");
internalCluster().startNode(settings);
Expand Down

0 comments on commit 995cd77

Please sign in to comment.