Skip to content

Commit

Permalink
Core: Let the disk threshold decider take into account shards moving …
Browse files Browse the repository at this point in the history
…away from a node in order to determine if a shard can remain.

By taking this into account we can prevent that we move too many shards away than is necessary.

Closes elastic#8538
Closes elastic#8659
  • Loading branch information
martijnvg committed Nov 26, 2014
1 parent 98134ac commit f033c6a
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 10 deletions.
Expand Up @@ -217,20 +217,28 @@ public DiskThresholdDecider(Settings settings, NodeSettingsService nodeSettingsS
/**
* Returns the size of all shards that are currently being relocated to
* the node, but may not be finished transfering yet.
*
* If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size
* of all shards
*/
public long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, Map<String, Long> shardSizes) {
public long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, Map<String, Long> shardSizes, boolean subtractShardsMovingAway) {
List<ShardRouting> relocatingShards = allocation.routingTable().shardsWithState(ShardRoutingState.RELOCATING);
long totalSize = 0;
for (ShardRouting routing : relocatingShards) {
if (routing.relocatingNodeId().equals(node.nodeId())) {
Long shardSize = shardSizes.get(shardIdentifierFromRouting(routing));
shardSize = shardSize == null ? 0 : shardSize;
totalSize += shardSize;
totalSize += getShardSize(routing, shardSizes);
} else if (subtractShardsMovingAway && routing.currentNodeId().equals(node.nodeId())) {
totalSize -= getShardSize(routing, shardSizes);
}
}
return totalSize;
}

private long getShardSize(ShardRouting routing, Map<String, Long> shardSizes) {
Long shardSize = shardSizes.get(shardIdentifierFromRouting(routing));
return shardSize == null ? 0 : shardSize;
}

public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {

// Always allow allocation if the decider is disabled
Expand Down Expand Up @@ -277,7 +285,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
}

if (includeRelocations) {
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes, false);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -423,7 +431,7 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl

if (includeRelocations) {
Map<String, Long> shardSizes = clusterInfo.getShardSizes();
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes, true);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) {
Expand Down
Expand Up @@ -29,19 +29,18 @@
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.GatewayAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
Expand All @@ -54,7 +53,7 @@

import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.*;

public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {

Expand Down Expand Up @@ -327,6 +326,8 @@ public void addListener(Listener listener) {
nodeWithPrimary = "node2";
nodeWithoutPrimary = "node1";
}
logger.info("--> nodeWithPrimary: {}", nodeWithPrimary);
logger.info("--> nodeWithoutPrimary: {}", nodeWithoutPrimary);

// Make node without the primary now habitable to replicas
usages.put(nodeWithoutPrimary, new DiskUsage(nodeWithoutPrimary, "", 100, 35)); // 65% used
Expand Down Expand Up @@ -787,6 +788,116 @@ public void addListener(Listener listener) {

}

@Test
public void testCanRemainWithShardRelocatingAway() {
Settings diskSettings = settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "60%")
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "70%").build();

// We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available
Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", "n1", 100, 20)); // 80% used
usages.put("node2", new DiskUsage("node2", "n2", 100, 100)); // 0% used

Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 40L);
shardSizes.put("[test][1][p]", 40L);
final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes));

DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(2).numberOfReplicas(0))
.build();

RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();

DiscoveryNode discoveryNode1 = new DiscoveryNode("node1", new LocalTransportAddress("1"), Version.CURRENT);
DiscoveryNode discoveryNode2 = new DiscoveryNode("node2", new LocalTransportAddress("2"), Version.CURRENT);
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(discoveryNode1).put(discoveryNode2).build();

ClusterState baseClusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(routingTable)
.nodes(discoveryNodes)
.build();

// Two shards consuming each 80% of disk space while 70% is allowed, so shard 0 isn't allowed here
MutableShardRouting firstRouting = new MutableShardRouting("test", 0, "node1", true, ShardRoutingState.STARTED, 1);
MutableShardRouting secondRouting = new MutableShardRouting("test", 1, "node1", true, ShardRoutingState.STARTED, 1);
RoutingNode firstRoutingNode = new RoutingNode("node1", discoveryNode1, Arrays.asList(firstRouting, secondRouting));
RoutingTable.Builder builder = RoutingTable.builder().add(
IndexRoutingTable.builder("test")
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0), false)
.addShard(firstRouting)
.build()
)
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1), false)
.addShard(secondRouting)
.build()
)
);
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder).build();
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
assertThat(decision.type(), equalTo(Decision.Type.NO));

// Two shards consuming each 80% of disk space while 70% is allowed, but one is relocating, so shard 0 can stay
firstRouting = new MutableShardRouting("test", 0, "node1", true, ShardRoutingState.STARTED, 1);
secondRouting = new MutableShardRouting("test", 1, "node1", "node2", true, ShardRoutingState.RELOCATING, 1);
firstRoutingNode = new RoutingNode("node1", discoveryNode1, Arrays.asList(firstRouting, secondRouting));
builder = RoutingTable.builder().add(
IndexRoutingTable.builder("test")
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0), false)
.addShard(firstRouting)
.build()
)
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1), false)
.addShard(secondRouting)
.build()
)
);
clusterState = ClusterState.builder(baseClusterState).routingTable(builder).build();
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
assertThat(decision.type(), equalTo(Decision.Type.YES));

// Creating AllocationService instance and the services it depends on...
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}

@Override
public void addListener(Listener listener) {
// noop
}
};
AllocationDeciders deciders = new AllocationDeciders(ImmutableSettings.EMPTY, new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(ImmutableSettings.EMPTY), diskThresholdDecider
)));
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
// Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away
// and therefor we will have sufficient disk space on node1.
RoutingAllocation.Result result = strategy.reroute(clusterState);
assertThat(result.changed(), is(false));
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED));
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().relocatingNodeId(), nullValue());
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().state(), equalTo(RELOCATING));
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().relocatingNodeId(), equalTo("node2"));
}

public void logShardStates(ClusterState state) {
RoutingNodes rn = state.routingNodes();
logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}",
Expand Down

0 comments on commit f033c6a

Please sign in to comment.