Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DiskThresholdDecider#remain(...) should take shards relocating away into account #8659

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -223,20 +223,28 @@ public DiskThresholdDecider(Settings settings, NodeSettingsService nodeSettingsS
/**
* Returns the size of all shards that are currently being relocated to
* the node, but may not be finished transfering yet.
*
* If subtractShardsMovingAway is set then the size of shards moving away is subtracted from the total size
* of all shards
*/
public long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, Map<String, Long> shardSizes) {
public long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, Map<String, Long> shardSizes, boolean subtractShardsMovingAway) {
List<ShardRouting> relocatingShards = allocation.routingTable().shardsWithState(ShardRoutingState.RELOCATING);
long totalSize = 0;
for (ShardRouting routing : relocatingShards) {
if (routing.relocatingNodeId().equals(node.nodeId())) {
Long shardSize = shardSizes.get(shardIdentifierFromRouting(routing));
shardSize = shardSize == null ? 0 : shardSize;
totalSize += shardSize;
totalSize += getShardSize(routing, shardSizes);
} else if (subtractShardsMovingAway && routing.currentNodeId().equals(node.nodeId())) {
totalSize -= getShardSize(routing, shardSizes);
}
}
return totalSize;
}

private long getShardSize(ShardRouting routing, Map<String, Long> shardSizes) {
Long shardSize = shardSizes.get(shardIdentifierFromRouting(routing));
return shardSize == null ? 0 : shardSize;
}

public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {

// Always allow allocation if the decider is disabled
Expand Down Expand Up @@ -283,7 +291,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
}

if (includeRelocations) {
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes, false);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -429,7 +437,7 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl

if (includeRelocations) {
Map<String, Long> shardSizes = clusterInfo.getShardSizes();
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes, true);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) {
Expand Down
Expand Up @@ -22,21 +22,25 @@
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
Expand All @@ -50,6 +54,8 @@
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {

Expand Down Expand Up @@ -784,6 +790,116 @@ public void addListener(Listener listener) {

}

@Test
public void testCanRemainWithShardRelocatingAway() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a strategy.reroute call in this test at the end to ensure that the cluster rerouting does the right thing with regard to one of the shards? (marking it as RELOCATING and not relocating the other shard because it's taking the relocation into account)

Settings diskSettings = settingsBuilder()
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, true)
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, "60%")
.put(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK, "70%").build();

// We have an index with 2 primary shards each taking 40 bytes. Each node has 100 bytes available
Map<String, DiskUsage> usages = new HashMap<>();
usages.put("node1", new DiskUsage("node1", "n1", 100, 20)); // 80% used
usages.put("node2", new DiskUsage("node2", "n2", 100, 100)); // 0% used

Map<String, Long> shardSizes = new HashMap<>();
shardSizes.put("[test][0][p]", 40L);
shardSizes.put("[test][1][p]", 40L);
final ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.copyOf(usages), ImmutableMap.copyOf(shardSizes));

DiskThresholdDecider diskThresholdDecider = new DiskThresholdDecider(diskSettings);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(2).numberOfReplicas(0))
.build();

RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();

DiscoveryNode discoveryNode1 = new DiscoveryNode("node1", new LocalTransportAddress("1"), Version.CURRENT);
DiscoveryNode discoveryNode2 = new DiscoveryNode("node2", new LocalTransportAddress("2"), Version.CURRENT);
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(discoveryNode1).put(discoveryNode2).build();

ClusterState baseClusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(routingTable)
.nodes(discoveryNodes)
.build();

// Two shards consuming each 80% of disk space while 70% is allowed, so shard 0 isn't allowed here
MutableShardRouting firstRouting = new MutableShardRouting("test", 0, "node1", true, ShardRoutingState.STARTED, 1);
MutableShardRouting secondRouting = new MutableShardRouting("test", 1, "node1", true, ShardRoutingState.STARTED, 1);
RoutingNode firstRoutingNode = new RoutingNode("node1", discoveryNode1, Arrays.asList(firstRouting, secondRouting));
RoutingTable.Builder builder = RoutingTable.builder().add(
IndexRoutingTable.builder("test")
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0), false)
.addShard(firstRouting)
.build()
)
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1), false)
.addShard(secondRouting)
.build()
)
);
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder).build();
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
assertThat(decision.type(), equalTo(Decision.Type.NO));

// Two shards consuming each 80% of disk space while 70% is allowed, but one is relocating, so shard 0 can stay
firstRouting = new MutableShardRouting("test", 0, "node1", true, ShardRoutingState.STARTED, 1);
secondRouting = new MutableShardRouting("test", 1, "node1", "node2", true, ShardRoutingState.RELOCATING, 1);
firstRoutingNode = new RoutingNode("node1", discoveryNode1, Arrays.asList(firstRouting, secondRouting));
builder = RoutingTable.builder().add(
IndexRoutingTable.builder("test")
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0), false)
.addShard(firstRouting)
.build()
)
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1), false)
.addShard(secondRouting)
.build()
)
);
clusterState = ClusterState.builder(baseClusterState).routingTable(builder).build();
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
assertThat(decision.type(), equalTo(Decision.Type.YES));

// Creating AllocationService instance and the services it depends on...
ClusterInfoService cis = new ClusterInfoService() {
@Override
public ClusterInfo getClusterInfo() {
logger.info("--> calling fake getClusterInfo");
return clusterInfo;
}

@Override
public void addListener(Listener listener) {
// noop
}
};
AllocationDeciders deciders = new AllocationDeciders(ImmutableSettings.EMPTY, new HashSet<>(Arrays.asList(
new SameShardAllocationDecider(ImmutableSettings.EMPTY), diskThresholdDecider
)));
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build(), deciders, new ShardsAllocators(), cis);
// Ensure that the reroute call doesn't alter the routing table, since the first primary is relocating away
// and therefor we will have sufficient disk space on node1.
RoutingAllocation.Result result = strategy.reroute(clusterState);
assertThat(result.changed(), is(false));
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().state(), equalTo(STARTED));
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(result.routingTable().index("test").getShards().get(0).primaryShard().relocatingNodeId(), nullValue());
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().state(), equalTo(RELOCATING));
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(result.routingTable().index("test").getShards().get(1).primaryShard().relocatingNodeId(), equalTo("node2"));
}

public void logShardStates(ClusterState state) {
RoutingNodes rn = state.routingNodes();
logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}",
Expand Down