From 995cd77e6c14a31431b7a65cb82cec2b5b8e1220 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Thu, 10 Jul 2014 13:40:05 +0200 Subject: [PATCH] Add local node to cluster state Today, the tribe node needs the local node so it adds it when it starts, but other APIs would benefit from adding the local node, also, adding the local node should be done in a cleaner manner, where it belongs, which is right after the discovery service starts in the cluster service closes #6811 --- .../service/InternalClusterService.java | 19 ++++++ .../common/component/LifecycleListener.java | 26 ++++++-- .../discovery/DiscoveryService.java | 66 +++++++++++-------- .../node/internal/InternalNode.java | 2 + .../org/elasticsearch/tribe/TribeService.java | 30 --------- .../cluster/MinimumMasterNodesTests.java | 2 + 6 files changed, 79 insertions(+), 66 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index ef7be424a6bef..fad94ba194485 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -130,6 +131,24 @@ protected void doStart() throws ElasticsearchException { this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build(); this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(daemonThreadFactory(settings, "clusterService#updateTask")); this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes()); + discoveryService.addLifecycleListener(new LifecycleListener() { + @Override + public void afterStart() { + submitStateUpdateTask("update local node", Priority.IMMEDIATE, new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return ClusterState.builder(currentState) + .nodes(DiscoveryNodes.builder(currentState.nodes()).put(localNode()).localNodeId(localNode().id())) + .build(); + } + + @Override + public void onFailure(String source, Throwable t) { + logger.warn("failed ot update local node", t); + } + }); + } + }); } @Override diff --git a/src/main/java/org/elasticsearch/common/component/LifecycleListener.java b/src/main/java/org/elasticsearch/common/component/LifecycleListener.java index c553957eec804..52f9bea410b32 100644 --- a/src/main/java/org/elasticsearch/common/component/LifecycleListener.java +++ b/src/main/java/org/elasticsearch/common/component/LifecycleListener.java @@ -22,17 +22,29 @@ /** * */ -public interface LifecycleListener { +public abstract class LifecycleListener { - void beforeStart(); + public void beforeStart() { - void afterStart(); + } - void beforeStop(); + public void afterStart() { - void afterStop(); + } - void beforeClose(); + public void beforeStop() { - void afterClose(); + } + + public void afterStop() { + + } + + public void beforeClose() { + + } + + public void afterClose() { + + } } diff --git a/src/main/java/org/elasticsearch/discovery/DiscoveryService.java b/src/main/java/org/elasticsearch/discovery/DiscoveryService.java index f4fff6f7874ae..2174f442f87fa 100644 --- a/src/main/java/org/elasticsearch/discovery/DiscoveryService.java +++ b/src/main/java/org/elasticsearch/discovery/DiscoveryService.java @@ -20,6 +20,7 @@ package org.elasticsearch.discovery; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; @@ -37,11 +38,28 @@ */ public class DiscoveryService extends AbstractLifecycleComponent { - private final TimeValue initialStateTimeout; + private static class InitialStateListener implements InitialStateDiscoveryListener { - private final Discovery discovery; + private final CountDownLatch latch = new CountDownLatch(1); + private volatile boolean initialStateReceived; + + @Override + public void initialStateProcessed() { + initialStateReceived = true; + latch.countDown(); + } + + public boolean waitForInitialState(TimeValue timeValue) throws InterruptedException { + if (timeValue.millis() > 0) { + latch.await(timeValue.millis(), TimeUnit.MILLISECONDS); + } + return initialStateReceived; + } + } - private volatile boolean initialStateReceived; + private final TimeValue initialStateTimeout; + private final Discovery discovery; + private InitialStateListener initialStateListener; @Inject public DiscoveryService(Settings settings, Discovery discovery) { @@ -52,38 +70,28 @@ public DiscoveryService(Settings settings, Discovery discovery) { @Override protected void doStart() throws ElasticsearchException { - final CountDownLatch latch = new CountDownLatch(1); - InitialStateDiscoveryListener listener = new InitialStateDiscoveryListener() { - @Override - public void initialStateProcessed() { - latch.countDown(); - } - }; - discovery.addListener(listener); + initialStateListener = new InitialStateListener(); + discovery.addListener(initialStateListener); + discovery.start(); + logger.info(discovery.nodeDescription()); + } + + public void waitForInitialState() { try { - discovery.start(); - if (initialStateTimeout.millis() > 0) { - try { - logger.trace("waiting for {} for the initial state to be set by the discovery", initialStateTimeout); - if (latch.await(initialStateTimeout.millis(), TimeUnit.MILLISECONDS)) { - logger.trace("initial state set from discovery"); - initialStateReceived = true; - } else { - initialStateReceived = false; - logger.warn("waited for {} and no initial state was set by the discovery", initialStateTimeout); - } - } catch (InterruptedException e) { - // ignore - } + if (!initialStateListener.waitForInitialState(initialStateTimeout)) { + logger.warn("waited for {} and no initial state was set by the discovery", initialStateTimeout); } - } finally { - discovery.removeListener(listener); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state"); } - logger.info(discovery.nodeDescription()); } @Override protected void doStop() throws ElasticsearchException { + if (initialStateListener != null) { + discovery.removeListener(initialStateListener); + } discovery.stop(); } @@ -101,7 +109,7 @@ public DiscoveryNode localNode() { * on {@link #doStart()}. */ public boolean initialStateReceived() { - return initialStateReceived; + return initialStateListener.initialStateReceived; } public String nodeDescription() { diff --git a/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 9921d06b67d90..8668383db03c1 100644 --- a/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.service.InternalClusterService; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.Lifecycle; @@ -245,6 +246,7 @@ public Node start() { injector.getInstance(RestController.class).start(); injector.getInstance(TransportService.class).start(); DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start(); + discoService.waitForInitialState(); // gateway should start after disco, so it can try and recovery from gateway on "start" injector.getInstance(GatewayService.class).start(); diff --git a/src/main/java/org/elasticsearch/tribe/TribeService.java b/src/main/java/org/elasticsearch/tribe/TribeService.java index 42106219123d5..e706e40065898 100644 --- a/src/main/java/org/elasticsearch/tribe/TribeService.java +++ b/src/main/java/org/elasticsearch/tribe/TribeService.java @@ -167,36 +167,6 @@ public TribeService(Settings settings, ClusterService clusterService) { @Override protected void doStart() throws ElasticsearchException { - final CountDownLatch latch = new CountDownLatch(1); - clusterService.submitStateUpdateTask("updating local node id", new ProcessedClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - // add our local node to the mix... - return ClusterState.builder(currentState) - .nodes(DiscoveryNodes.builder(currentState.nodes()).put(clusterService.localNode()).localNodeId(clusterService.localNode().id())) - .build(); - } - - @Override - public void onFailure(String source, Throwable t) { - try { - logger.error("{}", t, source); - } finally { - latch.countDown(); - } - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - } - }); - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ElasticsearchIllegalStateException("Interrupted while starting [" + this.getClass().getSimpleName() + "]", e); - } for (InternalNode node : nodes) { try { node.start(); diff --git a/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java b/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java index d6cc0979b0c1c..5445214ec2049 100644 --- a/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java +++ b/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesTests.java @@ -61,6 +61,7 @@ public void simpleMinimumMasterNodes() throws Exception { logger.info("--> should be blocked, no master..."); ClusterState state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true)); + assertThat(state.nodes().size(), equalTo(1)); // verify that we still see the local node in the cluster state logger.info("--> start second node, cluster should be formed"); internalCluster().startNode(settings); @@ -102,6 +103,7 @@ public boolean apply(Object obj) { }); state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); assertThat(state.blocks().hasGlobalBlock(Discovery.NO_MASTER_BLOCK), equalTo(true)); + assertThat(state.nodes().size(), equalTo(1)); // verify that we still see the local node in the cluster state logger.info("--> starting the previous master node again..."); internalCluster().startNode(settings);