Skip to content

Commit

Permalink
Fix refresh behavior in MockDiskUsagesIT (#57926)
Browse files Browse the repository at this point in the history
Ensures that InternalClusterInfoService's internally cached stats are refreshed whenever the
shard size or disk usage function (to mock out disk usage) are overridden.

Closes #57888
  • Loading branch information
ywelsch committed Jun 11, 2020
1 parent 0ed7b5f commit a955fae
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
clusterInfoService.onMaster();

// prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk
clusterInfoService.shardSizeFunction = shardRouting -> 0L;
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 0L);

// start with all nodes below the watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100));
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)));

final boolean watermarkBytes = randomBoolean(); // we have to consistently use bytes or percentage for the disk watermark settings
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
Expand All @@ -115,8 +115,8 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
});

// move node2 above high watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100,
discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 9) : between(10, 100));
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100,
discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 9) : between(10, 100)));

logger.info("--> waiting for shards to relocate off node [{}]", nodeIds.get(2));

Expand All @@ -128,7 +128,7 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
});

// move all nodes below watermark again
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100));
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)));

logger.info("--> waiting for shards to rebalance back onto node [{}]", nodeIds.get(2));

Expand All @@ -154,10 +154,10 @@ public void testAutomaticReleaseOfIndexBlock() throws Exception {
clusterInfoService.onMaster();

// prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk
clusterInfoService.shardSizeFunction = shardRouting -> 0L;
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 0L);

// start with all nodes below the low watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(15, 100));
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(15, 100)));

final boolean watermarkBytes = randomBoolean(); // we have to consistently use bytes or percentage for the disk watermark settings
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
Expand All @@ -184,8 +184,8 @@ public void testAutomaticReleaseOfIndexBlock() throws Exception {

// Move all nodes above the low watermark so no shard movement can occur, and at least one node above the flood stage watermark so
// the index is blocked
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100,
discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 4) : between(0, 9));
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100,
discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 4) : between(0, 9)));

assertBusy(() -> assertBlocked(
client().prepareIndex().setIndex("test").setType("doc").setId("1").setSource("foo", "bar"),
Expand All @@ -201,7 +201,7 @@ public void testAutomaticReleaseOfIndexBlock() throws Exception {
logger.info("--> index is confirmed read-only, releasing disk space");

// Move all nodes below the high watermark so that the index is unblocked
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100));
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)));

// Attempt to create a new document until DiskUsageMonitor unblocks the index
assertBusy(() -> {
Expand Down Expand Up @@ -230,10 +230,10 @@ public void testOnlyMovesEnoughShardsToDropBelowHighWatermark() throws Exception
});

// shards are 1 byte large
clusterInfoService.shardSizeFunction = shardRouting -> 1L;
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 1L);

// start with all nodes below the watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L, 1000L);
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L, 1000L));

assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "90%")
Expand All @@ -260,10 +260,10 @@ public void testOnlyMovesEnoughShardsToDropBelowHighWatermark() throws Exception
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));

// node2 suddenly has 99 bytes free, less than 10%, but moving one shard is enough to bring it up to 100 bytes free:
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L,
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L,
discoveryNode.getId().equals(nodeIds.get(2))
? 101L - masterAppliedClusterState.get().getRoutingNodes().node(nodeIds.get(2)).numberOfOwningShards()
: 1000L);
: 1000L));

clusterInfoService.refresh();

Expand Down Expand Up @@ -302,13 +302,13 @@ public void testDoesNotExceedLowWatermarkWhenRebalancing() throws Exception {
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100%")));

// shards are 1 byte large
clusterInfoService.shardSizeFunction = shardRouting -> 1L;
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 1L);

// node 2 only has space for one shard
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L,
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 1000L,
discoveryNode.getId().equals(nodeIds.get(2))
? 150L - masterAppliedClusterState.get().getRoutingNodes().node(nodeIds.get(2)).numberOfOwningShards()
: 1000L);
: 1000L));

assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put("number_of_shards", 6)
Expand Down Expand Up @@ -352,10 +352,10 @@ public void testMovesShardsOffSpecificDataPathAboveWatermark() throws Exception
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();

// prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk
clusterInfoService.shardSizeFunction = shardRouting -> 0L;
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 0L);

// start with all paths below the watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100));
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)));

assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "90%")
Expand All @@ -381,15 +381,13 @@ public void testMovesShardsOffSpecificDataPathAboveWatermark() throws Exception
&& shardStats.getDataPath().startsWith(pathOverWatermark.toString()) == false).count();
logger.info("--> shards on good path: [{}]", shardsOnGoodPath);

// one of the paths on node0 suddenly exceeds the high watermark
clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100L,
fsInfoPath.getPath().startsWith(pathOverWatermark.toString()) ? between(0, 9) : between(10, 100));

// disable rebalancing, or else we might move shards back onto the over-full path since we're not faking that
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));

clusterInfoService.refresh();
// one of the paths on node0 suddenly exceeds the high watermark
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100L,
fsInfoPath.getPath().startsWith(pathOverWatermark.toString()) ? between(0, 9) : between(10, 100)));

logger.info("--> waiting for shards to relocate off path [{}]", pathOverWatermark);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,25 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
public static class TestPlugin extends Plugin {}

@Nullable // if no fakery should take place
public volatile Function<ShardRouting, Long> shardSizeFunction;
private volatile Function<ShardRouting, Long> shardSizeFunction;

@Nullable // if no fakery should take place
public volatile BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFunction;
private volatile BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFunction;

public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
super(settings, clusterService, threadPool, client);
}

public void setDiskUsageFunctionAndRefresh(BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFunction) {
this.diskUsageFunction = diskUsageFunction;
refresh();
}

public void setShardSizeFunctionAndRefresh(Function<ShardRouting, Long> shardSizeFunction) {
this.shardSizeFunction = shardSizeFunction;
refresh();
}

@Override
public ClusterInfo getClusterInfo() {
final ClusterInfo clusterInfo = super.getClusterInfo();
Expand Down

0 comments on commit a955fae

Please sign in to comment.