Skip to content

Commit

Permalink
use ClusterState.nodes().localNodeId() instead of ClusterService.loca…
Browse files Browse the repository at this point in the history
…lNode().id()

as there are cases where the node is gone but from the cluster state but the localNodeId is still there
  • Loading branch information
msbt committed Apr 15, 2015
1 parent 86496f6 commit 76e477f
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 9 deletions.
2 changes: 1 addition & 1 deletion blob/src/main/java/io/crate/blob/BlobService.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public String getRedirectAddress(String index, String digest) throws MissingHTTP
if (!shard.active()) {
continue;
}
if (shard.currentNodeId().equals(clusterService.localNode().getId())) {
if (shard.currentNodeId().equals(clusterService.state().nodes().localNodeId())) {
return null;
}
nodeIds.add(shard.currentNodeId());
Expand Down
6 changes: 4 additions & 2 deletions blob/src/main/java/io/crate/blob/BlobTransferTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.crate.blob.v2.BlobShard;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -142,8 +143,9 @@ private BlobTransferStatus restoreTransferStatus(PutChunkReplicaRequest request,
logger.trace("Restoring transferContext for PutChunkReplicaRequest with transferId {}",
request.transferId);

DiscoveryNode recipientNodeId = clusterService.state().getNodes().get(request.sourceNodeId);
String senderNodeId = clusterService.localNode().getId();
DiscoveryNodes nodes = clusterService.state().getNodes();
DiscoveryNode recipientNodeId = nodes.get(request.sourceNodeId);
String senderNodeId = nodes.localNodeId();

BlobTransferInfoResponse transferInfoResponse =
(BlobTransferInfoResponse)transportService.submitRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private SearchContext createContext(QueryShardRequest request, @Nullable Engine.
IndexShard indexShard = indexService.shardSafe(request.shardId());

SearchShardTarget searchShardTarget = new SearchShardTarget(
clusterService.localNode().id(),
clusterService.state().nodes().localNodeId(),
request.index(),
request.shardId()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public ListenableFuture<Bucket> collect(CollectNode collectNode,
RamAccountingContext ramAccountingContext) {
assert collectNode.isRouted(); // not routed collect is not handled here
assert collectNode.jobId().isPresent() : "no jobId present for collect operation";
String localNodeId = clusterService.localNode().id();
String localNodeId = clusterService.state().nodes().localNodeId();
if (collectNode.executionNodes().contains(localNodeId)) {
if (!collectNode.routing().containsShards(localNodeId)) {
// node collect
Expand Down Expand Up @@ -249,7 +249,7 @@ private CrateCollector getCollector(CollectNode collectNode,
ImmutableMap.<String, FileInputFactory>of(),
fileUriCollectNode.sharedStorage(),
readers.length,
Arrays.binarySearch(readers, clusterService.localNode().id())
Arrays.binarySearch(readers, clusterService.state().nodes().localNodeId())
);
} else {
CollectService service = collectServiceResolver.getService(collectNode.routing());
Expand All @@ -274,7 +274,7 @@ private CrateCollector getCollector(CollectNode collectNode,
*/
protected ListenableFuture<Bucket> handleShardCollect(CollectNode collectNode, RamAccountingContext ramAccountingContext) {

String localNodeId = clusterService.localNode().id();
String localNodeId = clusterService.state().nodes().localNodeId();
final int numShards = collectNode.routing().numShards(localNodeId);

collectNode = collectNode.normalize(nodeNormalizer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ private CrateCollector getLuceneIndexCollector(final CollectNode collectNode,
final int jobSearchContextId) throws Exception {
final CollectInputSymbolVisitor.Context docCtx = docInputSymbolVisitor.process(collectNode);
final SearchShardTarget searchShardTarget = new SearchShardTarget(
clusterService.localNode().id(), shardId.getIndex(), shardId.id());
clusterService.state().nodes().localNodeId(),
shardId.getIndex(),
shardId.id());
final IndexShard indexShard = indexService.shardSafe(shardId.id());

return jobCollectContext.createCollectorAndContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ protected void configure() {
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
when(nodes.get(TEST_NODE_ID)).thenReturn(testNode);
when(nodes.get(OTHER_NODE_ID)).thenReturn(otherNode);
when(nodes.localNodeId()).thenReturn(TEST_NODE_ID);
when(nodes.iterator()).thenReturn(nodeMap.valuesIt());
when(state.nodes()).thenReturn(nodes);
when(clusterService.state()).thenReturn(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
Expand Down Expand Up @@ -218,6 +220,12 @@ protected void configure() {
when(discoveryService.localNode()).thenReturn(discoveryNode);

ClusterService clusterService = mock(ClusterService.class);
ClusterState state = mock(ClusterState.class);
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(discoveryNodes.localNodeId()).thenReturn(TEST_NODE_ID);
when(state.nodes()).thenReturn(discoveryNodes);
when(clusterService.state()).thenReturn(state);

when(clusterService.localNode()).thenReturn(discoveryNode);
bind(ClusterService.class).toInstance(clusterService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import io.crate.planner.symbol.Symbol;
import io.crate.types.DataTypes;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -63,7 +65,11 @@ public void testFileUriCollect() throws Exception {
ClusterService clusterService = mock(ClusterService.class);
DiscoveryNode discoveryNode = mock(DiscoveryNode.class);
when(discoveryNode.id()).thenReturn("dummyNodeId");
when(clusterService.localNode()).thenReturn(discoveryNode);
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(discoveryNodes.localNodeId()).thenReturn("dummyNodeId");
ClusterState clusterState = mock(ClusterState.class);
when(clusterState.nodes()).thenReturn(discoveryNodes);
when(clusterService.state()).thenReturn(clusterState);
DiscoveryService discoveryService = mock(DiscoveryService.class);
when(discoveryService.localNode()).thenReturn(discoveryNode);
IndicesService indicesService = mock(IndicesService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.ImmutableSettings;
Expand Down Expand Up @@ -61,6 +62,12 @@ protected void configure() {
bind(OsService.class).toInstance(osService);
Discovery discovery = mock(Discovery.class);
bind(Discovery.class).toInstance(discovery);

DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(discoveryNodes.localNodeId()).thenReturn("node-id-1");
when(state.nodes()).thenReturn(discoveryNodes);
when(clusterService.state()).thenReturn(state);

DiscoveryNode node = mock(DiscoveryNode.class);
when(discovery.localNode()).thenReturn(node);
when(node.getId()).thenReturn("node-id-1");
Expand Down

0 comments on commit 76e477f

Please sign in to comment.