Skip to content

Commit

Permalink
Use ClusterService#localNode instead of checking the cluster state.
Browse files Browse the repository at this point in the history
The ClusterState can hold an 'invalid' local 'DiscoveryNode' during
node startup and rare race conditions can cause NPEs if an 'invalid'
'DiscoveryNode' is serialized.

Closes elastic#3515
  • Loading branch information
s1monw committed Aug 15, 2013
1 parent 0472bac commit e89290a
Show file tree
Hide file tree
Showing 14 changed files with 28 additions and 27 deletions.
Expand Up @@ -98,7 +98,7 @@ protected NodeHotThreads nodeOperation(NodeRequest request) throws ElasticSearch
.interval(request.request.interval)
.threadElementsSnapshotCount(request.request.snapshots);
try {
return new NodeHotThreads(clusterService.state().nodes().localNode(), hotThreads.detect());
return new NodeHotThreads(clusterService.localNode(), hotThreads.detect());
} catch (Exception e) {
throw new ElasticSearchException("failed to detect hot threads", e);
}
Expand Down
Expand Up @@ -116,7 +116,7 @@ protected NodesRestartResponse.NodeRestartResponse nodeOperation(NodeRestartRequ
throw new ElasticSearchIllegalStateException("Restart is disabled");
}
if (!restartRequested.compareAndSet(false, true)) {
return new NodesRestartResponse.NodeRestartResponse(clusterService.state().nodes().localNode());
return new NodesRestartResponse.NodeRestartResponse(clusterService.localNode());
}
logger.info("Restarting in [{}]", request.delay);
threadPool.schedule(request.delay, ThreadPool.Names.GENERIC, new Runnable() {
Expand Down Expand Up @@ -146,7 +146,7 @@ public void run() {
}
}
});
return new NodesRestartResponse.NodeRestartResponse(clusterService.state().nodes().localNode());
return new NodesRestartResponse.NodeRestartResponse(clusterService.localNode());
}

@Override
Expand Down
Expand Up @@ -167,7 +167,7 @@ public void postAdded() {
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(nodes.localNode()));
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
Expand Down Expand Up @@ -222,7 +222,7 @@ public void postAdded() {
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(nodes.localNode()));
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
Expand Down
Expand Up @@ -37,6 +37,7 @@ protected NodeOperationResponse() {
}

protected NodeOperationResponse(DiscoveryNode node) {
assert node != null;
this.node = node;
}

Expand Down
Expand Up @@ -483,7 +483,7 @@ public void postAdded() {
@Override
public void onClose() {
clusterService.remove(this);
listener.onFailure(new NodeClosedException(clusterState.nodes().localNode()));
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
Expand Down
Expand Up @@ -259,7 +259,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
// Set up everything, now locally create the index to see that things are ok, and apply

// create the index here (on the master) to validate it can be created, as well as adding the mapping
indicesService.createIndex(request.index, actualIndexSettings, clusterService.state().nodes().localNode().id());
indicesService.createIndex(request.index, actualIndexSettings, clusterService.localNode().id());
indexCreated = true;
// now add the mappings
IndexService indexService = indicesService.indexServiceSafe(request.index);
Expand Down
Expand Up @@ -122,7 +122,7 @@ public ClusterState execute(final ClusterState currentState) {
if (indexService == null) {
// temporarily create the index so we have can parse the filter
try {
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), clusterService.localNode().id());
} catch (Exception e) {
logger.warn("[{}] failed to temporary create in order to apply alias action", e, indexMetaData.index());
continue;
Expand Down
Expand Up @@ -377,7 +377,7 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
continue;
}
final IndexMetaData indexMetaData = currentState.metaData().index(index);
IndexService indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), currentState.nodes().localNode().id());
IndexService indexService = indicesService.createIndex(indexMetaData.index(), indexMetaData.settings(), clusterService.localNode().id());
indicesToClose.add(indexMetaData.index());
// only add the current relevant mapping (if exists)
if (indexMetaData.mappings().containsKey(request.mappingType)) {
Expand Down
Expand Up @@ -69,7 +69,7 @@ public void allocateDangled(IndexMetaData[] indices, final Listener listener) {
listener.onFailure(new MasterNotDiscoveredException("no master to send allocate dangled request"));
return;
}
AllocateDangledRequest request = new AllocateDangledRequest(clusterState.nodes().localNode(), indices);
AllocateDangledRequest request = new AllocateDangledRequest(clusterService.localNode(), indices);
transportService.sendRequest(masterNode, AllocateDangledRequestHandler.ACTION, request, new TransportResponseHandler<AllocateDangledResponse>() {
@Override
public AllocateDangledResponse newInstance() {
Expand Down
Expand Up @@ -127,18 +127,18 @@ protected NodeStoreFilesMetaData nodeOperation(NodeRequest request) throws Elast
if (request.unallocated) {
IndexService indexService = indicesService.indexService(request.shardId.index().name());
if (indexService == null) {
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), null);
return new NodeStoreFilesMetaData(clusterService.localNode(), null);
}
if (!indexService.hasShard(request.shardId.id())) {
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), null);
return new NodeStoreFilesMetaData(clusterService.localNode(), null);
}
}
IndexMetaData metaData = clusterService.state().metaData().index(request.shardId.index().name());
if (metaData == null) {
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), null);
return new NodeStoreFilesMetaData(clusterService.localNode(), null);
}
try {
return new NodeStoreFilesMetaData(clusterService.state().nodes().localNode(), listStoreMetaData(request.shardId));
return new NodeStoreFilesMetaData(clusterService.localNode(), listStoreMetaData(request.shardId));
} catch (IOException e) {
throw new ElasticSearchException("Failed to list store metadata for shard [" + request.shardId + "]", e);
}
Expand Down
Expand Up @@ -114,7 +114,7 @@ private Map<String, Object> fillContextMap(Map<String, Object> context) {
context = newHashMap();
}
if (clusterService != null) {
context.put("localNode", clusterService.state().nodes().localNode());
context.put("localNode", clusterService.localNode());
}
return context;
}
Expand Down
17 changes: 8 additions & 9 deletions src/main/java/org/elasticsearch/node/service/NodeService.java
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent;
Expand All @@ -49,8 +48,6 @@ public class NodeService extends AbstractComponent {

private final MonitorService monitorService;

private final ClusterService clusterService;

private final TransportService transportService;

private final IndicesService indicesService;
Expand All @@ -67,16 +64,18 @@ public class NodeService extends AbstractComponent {

private final Version version;

private final Discovery disovery;

@Inject
public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery,
ClusterService clusterService, TransportService transportService, IndicesService indicesService,
TransportService transportService, IndicesService indicesService,
PluginsService pluginService, Version version) {
super(settings);
this.threadPool = threadPool;
this.monitorService = monitorService;
this.clusterService = clusterService;
this.transportService = transportService;
this.indicesService = indicesService;
this.disovery = discovery;
discovery.setNodeService(this);
InetAddress address = NetworkUtils.getLocalAddress();
if (address != null) {
Expand Down Expand Up @@ -116,7 +115,7 @@ public ImmutableMap<String, String> attributes() {
}

public NodeInfo info() {
return new NodeInfo(hostname, version, clusterService.state().nodes().localNode(), serviceAttributes,
return new NodeInfo(hostname, version, disovery.localNode(), serviceAttributes,
settings,
monitorService.osService().info(),
monitorService.processService().info(),
Expand All @@ -131,7 +130,7 @@ public NodeInfo info() {

public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool,
boolean network, boolean transport, boolean http, boolean plugin) {
return new NodeInfo(hostname, version, clusterService.state().nodes().localNode(), serviceAttributes,
return new NodeInfo(hostname, version, disovery.localNode(), serviceAttributes,
settings ? this.settings : null,
os ? monitorService.osService().info() : null,
process ? monitorService.processService().info() : null,
Expand All @@ -147,7 +146,7 @@ public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm,
public NodeStats stats() {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
return new NodeStats(clusterService.state().nodes().localNode(), System.currentTimeMillis(), hostname,
return new NodeStats(disovery.localNode(), System.currentTimeMillis(), hostname,
indicesService.stats(true),
monitorService.osService().stats(),
monitorService.processService().stats(),
Expand All @@ -163,7 +162,7 @@ public NodeStats stats() {
public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, boolean jvm, boolean threadPool, boolean network, boolean fs, boolean transport, boolean http) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
return new NodeStats(clusterService.state().nodes().localNode(), System.currentTimeMillis(), hostname,
return new NodeStats(disovery.localNode(), System.currentTimeMillis(), hostname,
indices.anySet() ? indicesService.stats(true, indices) : null,
os ? monitorService.osService().stats() : null,
process ? monitorService.processService().stats() : null,
Expand Down
Expand Up @@ -61,8 +61,9 @@ public void close() {

public void publish(RiverClusterState clusterState) {
final DiscoveryNodes discoNodes = clusterService.state().nodes();
final DiscoveryNode localNode = discoNodes.localNode();
for (final DiscoveryNode node : discoNodes) {
if (node.equals(discoNodes.localNode())) {
if (node.equals(localNode)) {
// no need to send to our self
continue;
}
Expand Down
Expand Up @@ -340,13 +340,13 @@ public MasterAwareService(Settings settings, ClusterService clusterService) {

@Override
public void onMaster() {
logger.info("on master [" + clusterService.state().nodes().localNode() + "]");
logger.info("on master [" + clusterService.localNode() + "]");
master = true;
}

@Override
public void offMaster() {
logger.info("off master [" + clusterService.state().nodes().localNode() + "]");
logger.info("off master [" + clusterService.localNode() + "]");
master = false;
}

Expand Down

0 comments on commit e89290a

Please sign in to comment.