Skip to content

Commit

Permalink
Add check when trying to reroute a shard to a non-data discovery node (
Browse files Browse the repository at this point in the history
…#28886)

While trying to reroute a shard to or from a non-data node (a node with ``node.data=false``), I encountered a null pointer exception. Though an exception is to be expected, the NPE was occurring because ``allocation.routingNodes()`` would not contain any non-data nodes, so when you attempt to do ``allocation.routingNodes.node(non-data-node)`` it would not find it, and thus error. This occurred regardless of whether I was rerouting to or from a non-data node.

This PR adds a check (as well as a test for these use cases) to return a legible, useful exception if the discovery node you are rerouting to or from is not a data node.
  • Loading branch information
autophagy authored and bleskes committed Mar 12, 2018
1 parent 7dcd48a commit a7b53fd
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,20 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
Decision decision = null;

boolean found = false;
for (ShardRouting shardRouting : allocation.routingNodes().node(fromDiscoNode.getId())) {
RoutingNode fromRoutingNode = allocation.routingNodes().node(fromDiscoNode.getId());
if (fromRoutingNode == null && !fromDiscoNode.isDataNode()) {
throw new IllegalArgumentException("[move_allocation] can't move [" + index + "][" + shardId + "] from "
+ fromDiscoNode + " to " + toDiscoNode + ": source [" + fromDiscoNode.getName()
+ "] is not a data node.");
}
RoutingNode toRoutingNode = allocation.routingNodes().node(toDiscoNode.getId());
if (toRoutingNode == null && !toDiscoNode.isDataNode()) {
throw new IllegalArgumentException("[move_allocation] can't move [" + index + "][" + shardId + "] from "
+ fromDiscoNode + " to " + toDiscoNode + ": source [" + toDiscoNode.getName()
+ "] is not a data node.");
}

for (ShardRouting shardRouting : fromRoutingNode) {
if (!shardRouting.shardId().getIndexName().equals(index)) {
continue;
}
Expand All @@ -121,7 +134,6 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
", shard is not started (state = " + shardRouting.state() + "]");
}

RoutingNode toRoutingNode = allocation.routingNodes().node(toDiscoNode.getId());
decision = allocation.deciders().canAllocate(shardRouting, toRoutingNode, allocation);
if (decision.type() == Decision.Type.NO) {
if (explain) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@

import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand;
Expand All @@ -36,6 +39,7 @@
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
Expand All @@ -47,12 +51,16 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;

import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
Expand Down Expand Up @@ -520,4 +528,75 @@ public void testXContent() throws Exception {
protected NamedXContentRegistry xContentRegistry() {
return new NamedXContentRegistry(NetworkModule.getNamedXContents());
}

public void testMoveShardToNonDataNode() {
AllocationService allocation = createAllocationService(Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());

logger.info("creating an index with 1 shard, no replica");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build();

logger.info("--> adding two nodes");

DiscoveryNode node1 = new DiscoveryNode("node1", "node1", "node1", "test1", "test1", buildNewFakeTransportAddress(), emptyMap(),
MASTER_DATA_ROLES, Version.CURRENT);
DiscoveryNode node2 = new DiscoveryNode("node2", "node2", "node2", "test2", "test2", buildNewFakeTransportAddress(), emptyMap(),
new HashSet<>(randomSubsetOf(EnumSet.of(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.INGEST))), Version.CURRENT);

clusterState = ClusterState.builder(clusterState).nodes(
DiscoveryNodes.builder()
.add(node1)
.add(node2)).build();

logger.info("start primary shard");
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));

Index index = clusterState.getMetaData().index("test").getIndex();
MoveAllocationCommand command = new MoveAllocationCommand(index.getName(), 0, "node1", "node2");
RoutingAllocation routingAllocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, Collections.emptyList()),
new RoutingNodes(clusterState, false), clusterState, ClusterInfo.EMPTY, System.nanoTime());
logger.info("--> executing move allocation command to non-data node");
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> command.execute(routingAllocation, false));
assertEquals("[move_allocation] can't move [test][0] from " + node1 + " to " + node2 + ": source [" + node2.getName() + "] is not a data node.", e.getMessage());
}

public void testMoveShardFromNonDataNode() {
AllocationService allocation = createAllocationService(Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());

logger.info("creating an index with 1 shard, no replica");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build();

logger.info("--> adding two nodes");

DiscoveryNode node1 = new DiscoveryNode("node1", "node1", "node1", "test1", "test1", buildNewFakeTransportAddress(), emptyMap(),
MASTER_DATA_ROLES, Version.CURRENT);
DiscoveryNode node2 = new DiscoveryNode("node2", "node2", "node2", "test2", "test2", buildNewFakeTransportAddress(), emptyMap(),
new HashSet<>(randomSubsetOf(EnumSet.of(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.INGEST))), Version.CURRENT);

clusterState = ClusterState.builder(clusterState).nodes(
DiscoveryNodes.builder()
.add(node1)
.add(node2)).build();
logger.info("start primary shard");
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));

Index index = clusterState.getMetaData().index("test").getIndex();
MoveAllocationCommand command = new MoveAllocationCommand(index.getName(), 0, "node2", "node1");
RoutingAllocation routingAllocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, Collections.emptyList()),
new RoutingNodes(clusterState, false), clusterState, ClusterInfo.EMPTY, System.nanoTime());
logger.info("--> executing move allocation command from non-data node");
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> command.execute(routingAllocation, false));
assertEquals("[move_allocation] can't move [test][0] from " + node2 + " to " + node1 + ": source [" + node2.getName() + "] is not a data node.", e.getMessage());
}
}

0 comments on commit a7b53fd

Please sign in to comment.