diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java b/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java index 0e387db5f45ef..84ee572770afb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/BatchedRerouteService.java @@ -50,7 +50,7 @@ public class BatchedRerouteService implements RerouteService { private final Object mutex = new Object(); @Nullable // null if no reroute is currently pending - private List> pendingRerouteListeners; + private List> pendingRerouteListeners; private Priority pendingTaskPriority = Priority.LANGUID; /** @@ -65,8 +65,8 @@ public BatchedRerouteService(ClusterService clusterService, BiFunction listener) { - final List> currentListeners; + public final void reroute(String reason, Priority priority, ActionListener listener) { + final List> currentListeners; synchronized (mutex) { if (pendingRerouteListeners != null) { if (priority.sameOrAfter(pendingTaskPriority)) { @@ -148,7 +148,7 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - ActionListener.onResponse(currentListeners, null); + ActionListener.onResponse(currentListeners, newState); } }); } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java b/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java index 58f9e41fe88a7..38d27117b0b10 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RerouteService.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.Priority; /** @@ -33,5 +34,5 @@ public interface RerouteService { * this reroute is batched with the pending one; if there is already a pending reroute at a lower priority then * the priority of the pending batch is raised to the given priority. */ - void reroute(String reason, Priority priority, ActionListener listener); + void reroute(String reason, Priority priority, ActionListener listener); } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 9071d23ce526d..fb82f1d67806a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -45,7 +46,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -65,7 +68,6 @@ public class DiskThresholdMonitor { private final DiskThresholdSettings diskThresholdSettings; private final Client client; - private final Set nodeHasPassedWatermark = Sets.newConcurrentHashSet(); private final Supplier clusterStateSupplier; private final LongSupplier currentTimeMillisSupplier; private final RerouteService rerouteService; @@ -73,6 +75,24 @@ public class DiskThresholdMonitor { private final AtomicBoolean checkInProgress = new AtomicBoolean(); private final DeprecationLogger deprecationLogger = new DeprecationLogger(logger); + /** + * The IDs of the nodes that were over the low threshold in the last check (and maybe over another threshold too). Tracked so that we + * can log when such nodes are no longer over the low threshold. + */ + private final Set nodesOverLowThreshold = Sets.newConcurrentHashSet(); + + /** + * The IDs of the nodes that were over the high threshold in the last check (and maybe over another threshold too). Tracked so that we + * can log when such nodes are no longer over the high threshold. + */ + private final Set nodesOverHighThreshold = Sets.newConcurrentHashSet(); + + /** + * The IDs of the nodes that were over the high threshold in the last check, but which are relocating shards that will bring them + * under the high threshold again. Tracked so that we can log when such nodes are no longer in this state. + */ + private final Set nodesOverHighThresholdAndRelocating = Sets.newConcurrentHashSet(); + public DiskThresholdMonitor(Settings settings, Supplier clusterStateSupplier, ClusterSettings clusterSettings, Client client, LongSupplier currentTimeMillisSupplier, RerouteService rerouteService) { this.clusterStateSupplier = clusterStateSupplier; @@ -86,35 +106,6 @@ public DiskThresholdMonitor(Settings settings, Supplier clusterSta } } - /** - * Warn about the given disk usage if the low or high watermark has been passed - */ - private void warnAboutDiskIfNeeded(DiskUsage usage) { - // Check absolute disk values - if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes()) { - logger.warn("flood stage disk watermark [{}] exceeded on {}, all indices on this node will be marked read-only", - diskThresholdSettings.getFreeBytesThresholdFloodStage(), usage); - } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { - logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node", - diskThresholdSettings.getFreeBytesThresholdHigh(), usage); - } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes()) { - logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node", - diskThresholdSettings.getFreeBytesThresholdLow(), usage); - } - - // Check percentage disk values - if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) { - logger.warn("flood stage disk watermark [{}] exceeded on {}, all indices on this node will be marked read-only", - Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdFloodStage(), "%"), usage); - } else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { - logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node", - Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), usage); - } else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { - logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node", - Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdLow(), "%"), usage); - } - } - private void checkFinished() { final boolean checkFinished = checkInProgress.compareAndSet(true, false); assert checkFinished; @@ -137,38 +128,50 @@ public void onNewInfo(ClusterInfo info) { String explanation = ""; final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); - // Garbage collect nodes that have been removed from the cluster - // from the map that tracks watermark crossing + // Clean up nodes that have been removed from the cluster final ObjectLookupContainer nodes = usages.keys(); - for (String node : nodeHasPassedWatermark) { - if (nodes.contains(node) == false) { - nodeHasPassedWatermark.remove(node); - } - } + cleanUpRemovedNodes(nodes, nodesOverLowThreshold); + cleanUpRemovedNodes(nodes, nodesOverHighThreshold); + cleanUpRemovedNodes(nodes, nodesOverHighThresholdAndRelocating); + final ClusterState state = clusterStateSupplier.get(); final Set indicesToMarkReadOnly = new HashSet<>(); RoutingNodes routingNodes = state.getRoutingNodes(); Set indicesNotToAutoRelease = new HashSet<>(); markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indicesNotToAutoRelease); + final List usagesOverHighThreshold = new ArrayList<>(); + for (final ObjectObjectCursor entry : usages) { final String node = entry.key; final DiskUsage usage = entry.value; - warnAboutDiskIfNeeded(usage); - RoutingNode routingNode = routingNodes.node(node); - // Only unblock index if all nodes that contain shards of it are below the high disk watermark + final RoutingNode routingNode = routingNodes.node(node); + if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() || usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) { - if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?! + + nodesOverLowThreshold.add(node); + nodesOverHighThreshold.add(node); + nodesOverHighThresholdAndRelocating.remove(node); + + if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step for (ShardRouting routing : routingNode) { String indexName = routing.index().getName(); indicesToMarkReadOnly.add(indexName); indicesNotToAutoRelease.add(indexName); } } + + logger.warn("flood stage disk watermark [{}] exceeded on {}, all indices on this node will be marked read-only", + diskThresholdSettings.describeFloodStageThreshold(), usage); + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { - if (routingNode != null) { + + nodesOverLowThreshold.add(node); + nodesOverHighThreshold.add(node); + + if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step for (ShardRouting routing : routingNode) { String indexName = routing.index().getName(); indicesNotToAutoRelease.add(indexName); @@ -177,41 +180,98 @@ public void onNewInfo(ClusterInfo info) { if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { reroute = true; explanation = "high disk watermark exceeded on one or more nodes"; + usagesOverHighThreshold.add(usage); + // will log about this node when the reroute completes } else { logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " + "in the last [{}], skipping reroute", node, diskThresholdSettings.getRerouteInterval()); } - nodeHasPassedWatermark.add(node); + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() || usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { - nodeHasPassedWatermark.add(node); + + nodesOverHighThresholdAndRelocating.remove(node); + + final boolean wasUnderLowThreshold = nodesOverLowThreshold.add(node); + final boolean wasOverHighThreshold = nodesOverHighThreshold.remove(node); + assert (wasUnderLowThreshold && wasOverHighThreshold) == false; + + if (wasUnderLowThreshold) { + logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node", + diskThresholdSettings.describeLowThreshold(), usage); + } else if (wasOverHighThreshold) { + logger.info("high disk watermark [{}] no longer exceeded on {}, but low disk watermark [{}] is still exceeded", + diskThresholdSettings.describeHighThreshold(), usage, diskThresholdSettings.describeLowThreshold()); + } + } else { - if (nodeHasPassedWatermark.contains(node)) { - // The node has previously been over the high or - // low watermark, but is no longer, so we should - // reroute so any unassigned shards can be allocated - // if they are able to be + + nodesOverHighThresholdAndRelocating.remove(node); + + if (nodesOverLowThreshold.contains(node)) { + // The node has previously been over the low watermark, but is no longer, so it may be possible to allocate more shards + // if we reroute now. if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { reroute = true; explanation = "one or more nodes has gone under the high or low watermark"; - nodeHasPassedWatermark.remove(node); + nodesOverLowThreshold.remove(node); + nodesOverHighThreshold.remove(node); + + logger.info("low disk watermark [{}] no longer exceeded on {}", + diskThresholdSettings.describeLowThreshold(), usage); + } else { logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " + "in the last [{}], skipping reroute", node, diskThresholdSettings.getRerouteInterval()); } } + } } final ActionListener listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 3); if (reroute) { - logger.info("rerouting shards: [{}]", explanation); - rerouteService.reroute("disk threshold monitor", Priority.HIGH, ActionListener.wrap(r -> { + logger.debug("rerouting shards: [{}]", explanation); + rerouteService.reroute("disk threshold monitor", Priority.HIGH, ActionListener.wrap(reroutedClusterState -> { + + for (DiskUsage diskUsage : usagesOverHighThreshold) { + final RoutingNode routingNode = reroutedClusterState.getRoutingNodes().node(diskUsage.getNodeId()); + final DiskUsage usageIncludingRelocations; + final long relocatingShardsSize; + if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step + relocatingShardsSize = sizeOfRelocatingShards(routingNode, diskUsage, info, reroutedClusterState); + usageIncludingRelocations = new DiskUsage(diskUsage.getNodeId(), diskUsage.getNodeName(), + diskUsage.getPath(), diskUsage.getTotalBytes(), diskUsage.getFreeBytes() - relocatingShardsSize); + } else { + usageIncludingRelocations = diskUsage; + relocatingShardsSize = 0L; + } + + if (usageIncludingRelocations.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() + || usageIncludingRelocations.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + + nodesOverHighThresholdAndRelocating.remove(diskUsage.getNodeId()); + logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node; " + + "currently relocating away shards totalling [{}] bytes; the node is expected to continue to exceed " + + "the high disk watermark when these relocations are complete", + diskThresholdSettings.describeHighThreshold(), diskUsage, -relocatingShardsSize); + } else if (nodesOverHighThresholdAndRelocating.add(diskUsage.getNodeId())) { + logger.info("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node; " + + "currently relocating away shards totalling [{}] bytes; the node is expected to be below the high " + + "disk watermark when these relocations are complete", + diskThresholdSettings.describeHighThreshold(), diskUsage, -relocatingShardsSize); + } else { + logger.debug("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node; " + + "currently relocating away shards totalling [{}] bytes", + diskThresholdSettings.describeHighThreshold(), diskUsage, -relocatingShardsSize); + } + } + setLastRunTimeMillis(); - listener.onResponse(r); + listener.onResponse(null); }, e -> { logger.debug("reroute failed", e); setLastRunTimeMillis(); @@ -220,7 +280,7 @@ public void onNewInfo(ClusterInfo info) { } else { listener.onResponse(null); } - Set indicesToAutoRelease = StreamSupport.stream(state.routingTable().indicesRouting() + final Set indicesToAutoRelease = StreamSupport.stream(state.routingTable().indicesRouting() .spliterator(), false) .map(c -> c.key) .filter(index -> indicesNotToAutoRelease.contains(index) == false) @@ -250,6 +310,12 @@ public void onNewInfo(ClusterInfo info) { } } + // exposed for tests to override + long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) { + return DiskThresholdDecider.sizeOfRelocatingShards(routingNode, true, + diskUsage.getPath(), info, reroutedClusterState.metaData(), reroutedClusterState.routingTable()); + } + private void markNodesMissingUsageIneligibleForRelease(RoutingNodes routingNodes, ImmutableOpenMap usages, Set indicesToMarkIneligibleForAutoRelease) { for (RoutingNode routingNode : routingNodes) { @@ -262,7 +328,6 @@ private void markNodesMissingUsageIneligibleForRelease(RoutingNodes routingNodes } } } - } private void setLastRunTimeMillis() { @@ -286,4 +351,12 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener .setSettings(readOnlySettings) .execute(ActionListener.map(wrappedListener, r -> null)); } + + private static void cleanUpRemovedNodes(ObjectLookupContainer nodesToKeep, Set nodesToCleanUp) { + for (String node : nodesToCleanUp) { + if (nodesToKeep.contains(node) == false) { + nodesToCleanUp.remove(node); + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java index 54b68c90881ae..7d97584f8ca12 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -321,6 +322,24 @@ public TimeValue getRerouteInterval() { return rerouteInterval; } + String describeLowThreshold() { + return freeBytesThresholdLow.equals(ByteSizeValue.ZERO) + ? Strings.format1Decimals(100.0 - freeDiskThresholdLow, "%") + : freeBytesThresholdLow.toString(); + } + + String describeHighThreshold() { + return freeBytesThresholdHigh.equals(ByteSizeValue.ZERO) + ? Strings.format1Decimals(100.0 - freeDiskThresholdHigh, "%") + : freeBytesThresholdHigh.toString(); + } + + String describeFloodStageThreshold() { + return freeBytesThresholdFloodStage.equals(ByteSizeValue.ZERO) + ? Strings.format1Decimals(100.0 - freeDiskThresholdFloodStage, "%") + : freeBytesThresholdFloodStage.toString(); + } + /** * Attempts to parse the watermark into a percentage, returning 100.0% if * it cannot be parsed. diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index a1549c5e217a4..ba92b7a20455c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -817,8 +817,9 @@ private void allocateUnassigned() { logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId()); } - final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation, - ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, + ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes()); minNode.addShard(shard); if (!shard.primary()) { @@ -838,8 +839,9 @@ private void allocateUnassigned() { if (minNode != null) { // throttle decision scenario assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED; - final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation, - ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, + ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize)); final RoutingNode node = minNode.getRoutingNode(); final Decision.Type nodeLevelDecision = deciders.canAllocate(node, allocation).type(); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 6c99cfa8ee056..bc6adf0367486 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -27,9 +27,11 @@ import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; @@ -86,10 +88,9 @@ public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings) * * If subtractShardsMovingAway is true then the size of shards moving away is subtracted from the total size of all shards */ - static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation, - boolean subtractShardsMovingAway, String dataPath) { - ClusterInfo clusterInfo = allocation.clusterInfo(); - long totalSize = 0; + public static long sizeOfRelocatingShards(RoutingNode node, boolean subtractShardsMovingAway, String dataPath, ClusterInfo clusterInfo, + MetaData metaData, RoutingTable routingTable) { + long totalSize = 0L; for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) { if (routing.relocatingNodeId() == null) { @@ -103,7 +104,7 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio // if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least // free space if (actualPath == null || actualPath.equals(dataPath)) { - totalSize += getExpectedShardSize(routing, allocation, 0); + totalSize += getExpectedShardSize(routing, 0L, clusterInfo, metaData, routingTable); } } @@ -115,7 +116,7 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio actualPath = clusterInfo.getDataPath(routing.cancelRelocation()); } if (dataPath.equals(actualPath)) { - totalSize -= getExpectedShardSize(routing, allocation, 0); + totalSize -= getExpectedShardSize(routing, 0L, clusterInfo, metaData, routingTable); } } } @@ -239,7 +240,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing } // Secondly, check that allocating the shard to this node doesn't put it above the high watermark - final long shardSize = getExpectedShardSize(shardRouting, allocation, 0); + final long shardSize = getExpectedShardSize(shardRouting, 0L, + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize); long freeBytesAfterShard = freeBytes - shardSize; if (freeBytesAfterShard < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) { @@ -342,7 +344,8 @@ private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, } if (diskThresholdSettings.includeRelocations()) { - long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath()); + final long relocatingShardsSize = sizeOfRelocatingShards(node, subtractLeavingShards, usage.getPath(), + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize); if (logger.isTraceEnabled()) { @@ -425,29 +428,28 @@ private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap shardIds = IndexMetaData.selectRecoverFromShards(shard.id(), - sourceIndexMeta, metaData.getNumberOfShards()); - for (IndexShardRoutingTable shardRoutingTable : allocation.routingTable().index(mergeSourceIndex.getName())) { + sourceIndexMeta, indexMetaData.getNumberOfShards()); + for (IndexShardRoutingTable shardRoutingTable : routingTable.index(mergeSourceIndex.getName())) { if (shardIds.contains(shardRoutingTable.shardId())) { - targetShardSize += info.getShardSize(shardRoutingTable.primaryShard(), 0); + targetShardSize += clusterInfo.getShardSize(shardRoutingTable.primaryShard(), 0); } } } return targetShardSize == 0 ? defaultValue : targetShardSize; } else { - return info.getShardSize(shard, defaultValue); + return clusterInfo.getShardSize(shard, defaultValue); } - } } diff --git a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java index 8b68516f61a96..3744ae6b3568c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/action/shard/ShardStartedClusterStateTaskExecutorTests.java @@ -55,7 +55,8 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC private ShardStateAction.ShardStartedClusterStateTaskExecutor executor; - private static void neverReroutes(String reason, Priority priority, ActionListener listener) { + @SuppressWarnings("unused") + private static void neverReroutes(String reason, Priority priority, ActionListener listener) { fail("unexpectedly ran a deferred reroute"); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 64406888bc239..61c152917958b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -18,6 +18,9 @@ */ package org.elasticsearch.cluster.routing.allocation; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterInfo; @@ -30,12 +33,16 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; import java.util.Arrays; import java.util.Collections; @@ -44,6 +51,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; @@ -146,7 +154,7 @@ public void testDoesNotSubmitRerouteTaskTooFrequently() { final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(); AtomicLong currentTime = new AtomicLong(); - AtomicReference> listenerReference = new AtomicReference<>(); + AtomicReference> listenerReference = new AtomicReference<>(); DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, priority, listener) -> { @@ -180,7 +188,7 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi currentTime.addAndGet(randomLongBetween(0, 120000)); monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null)); assertNotNull(listenerReference.get()); - listenerReference.getAndSet(null).onResponse(null); + listenerReference.getAndSet(null).onResponse(clusterState); if (randomBoolean()) { // should not re-route again within the reroute interval @@ -195,7 +203,7 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000)); monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null)); assertNotNull(listenerReference.get()); - final ActionListener rerouteListener1 = listenerReference.getAndSet(null); + final ActionListener rerouteListener1 = listenerReference.getAndSet(null); // should not re-route again before reroute has completed currentTime.addAndGet(randomLongBetween(0, 120000)); @@ -203,7 +211,7 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi assertNull(listenerReference.get()); // complete reroute - rerouteListener1.onResponse(null); + rerouteListener1.onResponse(clusterState); if (randomBoolean()) { // should not re-route again within the reroute interval @@ -250,7 +258,7 @@ public void testAutoReleaseIndices() { (reason, priority, listener) -> { assertNotNull(listener); assertThat(priority, equalTo(Priority.HIGH)); - listener.onResponse(null); + listener.onResponse(clusterState); }) { @Override protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { @@ -287,7 +295,7 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener (reason, priority, listener) -> { assertNotNull(listener); assertThat(priority, equalTo(Priority.HIGH)); - listener.onResponse(null); + listener.onResponse(clusterStateWithBlocks); }) { @Override protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { @@ -365,4 +373,201 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener assertThat(indicesToMarkReadOnly.get(), contains("test_1")); assertNull(indicesToRelease.get()); } + + @TestLogging(value="org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor:INFO", reason="testing INFO/WARN logging") + public void testDiskMonitorLogging() throws IllegalAccessException { + final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(DiscoveryNodes.builder().add(newNode("node1"))).build(); + final AtomicReference clusterStateRef = new AtomicReference<>(clusterState); + final AtomicBoolean advanceTime = new AtomicBoolean(randomBoolean()); + + final LongSupplier timeSupplier = new LongSupplier() { + long time; + + @Override + public long getAsLong() { + if (advanceTime.get()) { + time += DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).getMillis() + 1; + } + logger.info("time: [{}]", time); + return time; + } + }; + + final AtomicLong relocatingShardSizeRef = new AtomicLong(); + + DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, clusterStateRef::get, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, timeSupplier, + (reason, priority, listener) -> listener.onResponse(clusterStateRef.get())) { + @Override + protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener, boolean readOnly) { + listener.onResponse(null); + } + + @Override + long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, ClusterInfo info, ClusterState reroutedClusterState) { + return relocatingShardSizeRef.get(); + } + }; + + final ImmutableOpenMap.Builder allDisksOkBuilder; + allDisksOkBuilder = ImmutableOpenMap.builder(); + allDisksOkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(15, 100))); + final ImmutableOpenMap allDisksOk = allDisksOkBuilder.build(); + + final ImmutableOpenMap.Builder aboveLowWatermarkBuilder = ImmutableOpenMap.builder(); + aboveLowWatermarkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 14))); + final ImmutableOpenMap aboveLowWatermark = aboveLowWatermarkBuilder.build(); + + final ImmutableOpenMap.Builder aboveHighWatermarkBuilder = ImmutableOpenMap.builder(); + aboveHighWatermarkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 9))); + final ImmutableOpenMap aboveHighWatermark = aboveHighWatermarkBuilder.build(); + + final ImmutableOpenMap.Builder aboveFloodStageWatermarkBuilder = ImmutableOpenMap.builder(); + aboveFloodStageWatermarkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4))); + final ImmutableOpenMap aboveFloodStageWatermark = aboveFloodStageWatermarkBuilder.build(); + + assertNoLogging(monitor, allDisksOk); + + assertSingleInfoMessage(monitor, aboveLowWatermark, + "low disk watermark [85%] exceeded on * replicas will not be assigned to this node"); + + advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed + assertSingleWarningMessage(monitor, aboveHighWatermark, + "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"); + + advanceTime.set(true); + assertRepeatedWarningMessages(monitor, aboveHighWatermark, + "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"); + + advanceTime.set(randomBoolean()); + assertRepeatedWarningMessages(monitor, aboveFloodStageWatermark, + "flood stage disk watermark [95%] exceeded on * all indices on this node will be marked read-only"); + + relocatingShardSizeRef.set(-5L); + advanceTime.set(true); + assertSingleInfoMessage(monitor, aboveHighWatermark, + "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + + "the node is expected to be below the high disk watermark when these relocations are complete"); + + relocatingShardSizeRef.set(0L); + timeSupplier.getAsLong(); // advance time long enough to do another reroute + advanceTime.set(false); // will do one reroute and emit warnings, but subsequent reroutes and associated messages are delayed + assertSingleWarningMessage(monitor, aboveHighWatermark, + "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"); + + advanceTime.set(true); + assertRepeatedWarningMessages(monitor, aboveHighWatermark, + "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"); + + advanceTime.set(randomBoolean()); + assertSingleInfoMessage(monitor, aboveLowWatermark, + "high disk watermark [90%] no longer exceeded on * but low disk watermark [85%] is still exceeded"); + + advanceTime.set(true); // only log about dropping below the low disk watermark on a reroute + assertSingleInfoMessage(monitor, allDisksOk, + "low disk watermark [85%] no longer exceeded on *"); + + advanceTime.set(randomBoolean()); + assertRepeatedWarningMessages(monitor, aboveFloodStageWatermark, + "flood stage disk watermark [95%] exceeded on * all indices on this node will be marked read-only"); + + assertSingleInfoMessage(monitor, allDisksOk, + "low disk watermark [85%] no longer exceeded on *"); + + advanceTime.set(true); + assertRepeatedWarningMessages(monitor, aboveHighWatermark, + "high disk watermark [90%] exceeded on * shards will be relocated away from this node* " + + "the node is expected to continue to exceed the high disk watermark when these relocations are complete"); + + assertSingleInfoMessage(monitor, allDisksOk, + "low disk watermark [85%] no longer exceeded on *"); + + assertRepeatedWarningMessages(monitor, aboveFloodStageWatermark, + "flood stage disk watermark [95%] exceeded on * all indices on this node will be marked read-only"); + + assertSingleInfoMessage(monitor, aboveLowWatermark, + "high disk watermark [90%] no longer exceeded on * but low disk watermark [85%] is still exceeded"); + + } + + private void assertNoLogging(DiskThresholdMonitor monitor, + ImmutableOpenMap diskUsages) throws IllegalAccessException { + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation( + "any INFO message", + DiskThresholdMonitor.class.getCanonicalName(), + Level.INFO, + "*")); + mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation( + "any WARN message", + DiskThresholdMonitor.class.getCanonicalName(), + Level.WARN, + "*")); + + Logger diskThresholdMonitorLogger = LogManager.getLogger(DiskThresholdMonitor.class); + Loggers.addAppender(diskThresholdMonitorLogger, mockAppender); + + for (int i = between(1, 3); i >= 0; i--) { + monitor.onNewInfo(new ClusterInfo(diskUsages, null, null, null)); + } + + mockAppender.assertAllExpectationsMatched(); + Loggers.removeAppender(diskThresholdMonitorLogger, mockAppender); + mockAppender.stop(); + } + + private void assertRepeatedWarningMessages(DiskThresholdMonitor monitor, + ImmutableOpenMap diskUsages, + String message) throws IllegalAccessException { + for (int i = between(1, 3); i >= 0; i--) { + assertLogging(monitor, diskUsages, Level.WARN, message); + } + } + + private void assertSingleWarningMessage(DiskThresholdMonitor monitor, + ImmutableOpenMap diskUsages, + String message) throws IllegalAccessException { + assertLogging(monitor, diskUsages, Level.WARN, message); + assertNoLogging(monitor, diskUsages); + } + + private void assertSingleInfoMessage(DiskThresholdMonitor monitor, + ImmutableOpenMap diskUsages, + String message) throws IllegalAccessException { + assertLogging(monitor, diskUsages, Level.INFO, message); + assertNoLogging(monitor, diskUsages); + } + + private void assertLogging(DiskThresholdMonitor monitor, + ImmutableOpenMap diskUsages, + Level level, + String message) throws IllegalAccessException { + MockLogAppender mockAppender = new MockLogAppender(); + mockAppender.start(); + mockAppender.addExpectation(new MockLogAppender.SeenEventExpectation( + "expected message", + DiskThresholdMonitor.class.getCanonicalName(), + level, + message)); + mockAppender.addExpectation(new MockLogAppender.UnseenEventExpectation( + "any message of another level", + DiskThresholdMonitor.class.getCanonicalName(), + level == Level.INFO ? Level.WARN : Level.INFO, + "*")); + + Logger diskThresholdMonitorLogger = LogManager.getLogger(DiskThresholdMonitor.class); + Loggers.addAppender(diskThresholdMonitorLogger, mockAppender); + + monitor.onNewInfo(new ClusterInfo(diskUsages, null, null, null)); + + mockAppender.assertAllExpectationsMatched(); + Loggers.removeAppender(diskThresholdMonitorLogger, mockAppender); + mockAppender.stop(); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java index ac2aaefc4b86f..ec82925c823bc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java @@ -256,4 +256,33 @@ public void testSequenceOfUpdates() { } } + public void testThresholdDescriptions() { + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + DiskThresholdSettings diskThresholdSettings = new DiskThresholdSettings(Settings.EMPTY, clusterSettings); + assertThat(diskThresholdSettings.describeLowThreshold(), equalTo("85%")); + assertThat(diskThresholdSettings.describeHighThreshold(), equalTo("90%")); + assertThat(diskThresholdSettings.describeFloodStageThreshold(), equalTo("95%")); + + diskThresholdSettings = new DiskThresholdSettings(Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "91.2%") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "91.3%") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "91.4%") + .build(), clusterSettings); + + assertThat(diskThresholdSettings.describeLowThreshold(), equalTo("91.2%")); + assertThat(diskThresholdSettings.describeHighThreshold(), equalTo("91.3%")); + assertThat(diskThresholdSettings.describeFloodStageThreshold(), equalTo("91.4%")); + + diskThresholdSettings = new DiskThresholdSettings(Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "1GB") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "10MB") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "1B") + .build(), clusterSettings); + + assertThat(diskThresholdSettings.describeLowThreshold(), equalTo("1gb")); + assertThat(diskThresholdSettings.describeHighThreshold(), equalTo("10mb")); + assertThat(diskThresholdSettings.describeFloodStageThreshold(), equalTo("1b")); + } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 3f1975f35369a..55f4154680a05 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -315,22 +315,22 @@ public void testShardSizeAndRelocatingSize() { test_2 = ShardRoutingHelper.initialize(test_2, "node1"); test_2 = ShardRoutingHelper.moveToStarted(test_2); - assertEquals(1000L, DiskThresholdDecider.getExpectedShardSize(test_2, allocation, 0)); - assertEquals(100L, DiskThresholdDecider.getExpectedShardSize(test_1, allocation, 0)); - assertEquals(10L, DiskThresholdDecider.getExpectedShardSize(test_0, allocation, 0)); + assertEquals(1000L, getExpectedShardSize(test_2, 0L, allocation)); + assertEquals(100L, getExpectedShardSize(test_1, 0L, allocation)); + assertEquals(10L, getExpectedShardSize(test_0, 0L, allocation)); RoutingNode node = new RoutingNode("node1", new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.getTargetRelocatingShard(), test_2); - assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null")); - assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null")); - assertEquals(0L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/some/other/dev")); - assertEquals(0L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/some/other/dev")); + assertEquals(100L, sizeOfRelocatingShards(allocation, node, false, "/dev/null")); + assertEquals(90L, sizeOfRelocatingShards(allocation, node, true, "/dev/null")); + assertEquals(0L, sizeOfRelocatingShards(allocation, node, true, "/dev/some/other/dev")); + assertEquals(0L, sizeOfRelocatingShards(allocation, node, true, "/dev/some/other/dev")); ShardRouting test_3 = ShardRouting.newUnassigned(new ShardId(index, 3), false, PeerRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); test_3 = ShardRoutingHelper.initialize(test_3, "node1"); test_3 = ShardRoutingHelper.moveToStarted(test_3); - assertEquals(0L, DiskThresholdDecider.getExpectedShardSize(test_3, allocation, 0)); + assertEquals(0L, getExpectedShardSize(test_3, 0L, allocation)); ShardRouting other_0 = ShardRouting.newUnassigned(new ShardId("other", "5678", 0), randomBoolean(), @@ -342,14 +342,19 @@ public void testShardSizeAndRelocatingSize() { node = new RoutingNode("node1", new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT), test_0, test_1.getTargetRelocatingShard(), test_2, other_0.getTargetRelocatingShard()); if (other_0.primary()) { - assertEquals(10100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null")); - assertEquals(10090L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null")); + assertEquals(10100L, sizeOfRelocatingShards(allocation, node, false, "/dev/null")); + assertEquals(10090L, sizeOfRelocatingShards(allocation, node, true, "/dev/null")); } else { - assertEquals(100L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, false, "/dev/null")); - assertEquals(90L, DiskThresholdDecider.sizeOfRelocatingShards(node, allocation, true, "/dev/null")); + assertEquals(100L, sizeOfRelocatingShards(allocation, node, false, "/dev/null")); + assertEquals(90L, sizeOfRelocatingShards(allocation, node, true, "/dev/null")); } } + public long sizeOfRelocatingShards(RoutingAllocation allocation, RoutingNode node, boolean subtractShardsMovingAway, String dataPath) { + return DiskThresholdDecider.sizeOfRelocatingShards(node, subtractShardsMovingAway, dataPath, + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); + } + public void testSizeShrinkIndex() { ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); shardSizes.put("[test][0][p]", 10L); @@ -404,22 +409,22 @@ public void testSizeShrinkIndex() { ShardRouting test_3 = ShardRouting.newUnassigned(new ShardId(index, 3), true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); test_3 = ShardRoutingHelper.initialize(test_3, "node1"); - assertEquals(500L, DiskThresholdDecider.getExpectedShardSize(test_3, allocation, 0)); - assertEquals(500L, DiskThresholdDecider.getExpectedShardSize(test_2, allocation, 0)); - assertEquals(100L, DiskThresholdDecider.getExpectedShardSize(test_1, allocation, 0)); - assertEquals(10L, DiskThresholdDecider.getExpectedShardSize(test_0, allocation, 0)); + assertEquals(500L, getExpectedShardSize(test_3, 0L, allocation)); + assertEquals(500L, getExpectedShardSize(test_2, 0L, allocation)); + assertEquals(100L, getExpectedShardSize(test_1, 0L, allocation)); + assertEquals(10L, getExpectedShardSize(test_0, 0L, allocation)); ShardRouting target = ShardRouting.newUnassigned(new ShardId(new Index("target", "5678"), 0), true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); - assertEquals(1110L, DiskThresholdDecider.getExpectedShardSize(target, allocation, 0)); + assertEquals(1110L, getExpectedShardSize(target, 0L, allocation)); ShardRouting target2 = ShardRouting.newUnassigned(new ShardId(new Index("target2", "9101112"), 0), true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); - assertEquals(110L, DiskThresholdDecider.getExpectedShardSize(target2, allocation, 0)); + assertEquals(110L, getExpectedShardSize(target2, 0L, allocation)); target2 = ShardRouting.newUnassigned(new ShardId(new Index("target2", "9101112"), 1), true, LocalShardsRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); - assertEquals(1000L, DiskThresholdDecider.getExpectedShardSize(target2, allocation, 0)); + assertEquals(1000L, getExpectedShardSize(target2, 0L, allocation)); // check that the DiskThresholdDecider still works even if the source index has been deleted ClusterState clusterStateWithMissingSourceIndex = ClusterState.builder(clusterState) @@ -430,7 +435,13 @@ public void testSizeShrinkIndex() { allocationService.reroute(clusterState, "foo"); RoutingAllocation allocationWithMissingSourceIndex = new RoutingAllocation(null, clusterStateWithMissingSourceIndex.getRoutingNodes(), clusterStateWithMissingSourceIndex, info, 0); - assertEquals(42L, DiskThresholdDecider.getExpectedShardSize(target, allocationWithMissingSourceIndex, 42L)); - assertEquals(42L, DiskThresholdDecider.getExpectedShardSize(target2, allocationWithMissingSourceIndex, 42L)); + assertEquals(42L, getExpectedShardSize(target, 42L, allocationWithMissingSourceIndex)); + assertEquals(42L, getExpectedShardSize(target2, 42L, allocationWithMissingSourceIndex)); } + + private static long getExpectedShardSize(ShardRouting shardRouting, long defaultSize, RoutingAllocation allocation) { + return DiskThresholdDecider.getExpectedShardSize(shardRouting, defaultSize, + allocation.clusterInfo(), allocation.metaData(), allocation.routingTable()); + } + }