Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quieter logging from the DiskThresholdMonitor #48115

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class BatchedRerouteService implements RerouteService {

private final Object mutex = new Object();
@Nullable // null if no reroute is currently pending
private List<ActionListener<Void>> pendingRerouteListeners;
private List<ActionListener<ClusterState>> pendingRerouteListeners;
private Priority pendingTaskPriority = Priority.LANGUID;

/**
Expand All @@ -65,8 +65,8 @@ public BatchedRerouteService(ClusterService clusterService, BiFunction<ClusterSt
* Initiates a reroute.
*/
@Override
public final void reroute(String reason, Priority priority, ActionListener<Void> listener) {
final List<ActionListener<Void>> currentListeners;
public final void reroute(String reason, Priority priority, ActionListener<ClusterState> listener) {
final List<ActionListener<ClusterState>> currentListeners;
synchronized (mutex) {
if (pendingRerouteListeners != null) {
if (priority.sameOrAfter(pendingTaskPriority)) {
Expand Down Expand Up @@ -148,7 +148,7 @@ public void onFailure(String source, Exception e) {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
ActionListener.onResponse(currentListeners, null);
ActionListener.onResponse(currentListeners, newState);
}
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.Priority;

/**
Expand All @@ -33,5 +34,5 @@ public interface RerouteService {
* this reroute is batched with the pending one; if there is already a pending reroute at a lower priority then
* the priority of the pending batch is raised to the given priority.
*/
void reroute(String reason, Priority priority, ActionListener<Void> listener);
void reroute(String reason, Priority priority, ActionListener<ClusterState> listener);
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -298,6 +299,24 @@ public TimeValue getRerouteInterval() {
return rerouteInterval;
}

String describeLowThreshold() {
return freeBytesThresholdLow.equals(ByteSizeValue.ZERO)
? Strings.format1Decimals(100.0 - freeDiskThresholdLow, "%")
: freeBytesThresholdLow.toString();
}

String describeHighThreshold() {
return freeBytesThresholdHigh.equals(ByteSizeValue.ZERO)
? Strings.format1Decimals(100.0 - freeDiskThresholdHigh, "%")
: freeBytesThresholdHigh.toString();
}

String describeFloodStageThreshold() {
return freeBytesThresholdFloodStage.equals(ByteSizeValue.ZERO)
? Strings.format1Decimals(100.0 - freeDiskThresholdFloodStage, "%")
: freeBytesThresholdFloodStage.toString();
}

/**
* Attempts to parse the watermark into a percentage, returning 100.0% if
* it cannot be parsed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,8 +817,9 @@ private void allocateUnassigned() {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
}

final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE,
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes());
minNode.addShard(shard);
if (!shard.primary()) {
Expand All @@ -838,8 +839,9 @@ private void allocateUnassigned() {
if (minNode != null) {
// throttle decision scenario
assert allocationDecision.getAllocationStatus() == AllocationStatus.DECIDERS_THROTTLED;
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE,
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
final RoutingNode node = minNode.getRoutingNode();
final Decision.Type nodeLevelDecision = deciders.canAllocate(node, allocation).type();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
Expand Down Expand Up @@ -86,10 +88,9 @@ public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings)
*
* If subtractShardsMovingAway is true then the size of shards moving away is subtracted from the total size of all shards
*/
static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocation,
boolean subtractShardsMovingAway, String dataPath) {
ClusterInfo clusterInfo = allocation.clusterInfo();
long totalSize = 0;
public static long sizeOfRelocatingShards(RoutingNode node, boolean subtractShardsMovingAway, String dataPath, ClusterInfo clusterInfo,
MetaData metaData, RoutingTable routingTable) {
long totalSize = 0L;

for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) {
if (routing.relocatingNodeId() == null) {
Expand All @@ -103,7 +104,7 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio
// if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least
// free space
if (actualPath == null || actualPath.equals(dataPath)) {
totalSize += getExpectedShardSize(routing, allocation, 0);
totalSize += getExpectedShardSize(routing, 0L, clusterInfo, metaData, routingTable);
}
}

Expand All @@ -115,7 +116,7 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio
actualPath = clusterInfo.getDataPath(routing.cancelRelocation());
}
if (dataPath.equals(actualPath)) {
totalSize -= getExpectedShardSize(routing, allocation, 0);
totalSize -= getExpectedShardSize(routing, 0L, clusterInfo, metaData, routingTable);
}
}
}
Expand Down Expand Up @@ -239,7 +240,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
}

// Secondly, check that allocating the shard to this node doesn't put it above the high watermark
final long shardSize = getExpectedShardSize(shardRouting, allocation, 0);
final long shardSize = getExpectedShardSize(shardRouting, 0L,
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned(usage, shardSize);
long freeBytesAfterShard = freeBytes - shardSize;
if (freeBytesAfterShard < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
Expand Down Expand Up @@ -339,7 +341,8 @@ private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation,
node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeDiskAsPercentage());
}

final long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, subtractLeavingShards, usage.getPath());
final long relocatingShardsSize = sizeOfRelocatingShards(node, subtractLeavingShards, usage.getPath(),
allocation.clusterInfo(), allocation.metaData(), allocation.routingTable());
final DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().getName(), usage.getPath(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
logger.trace("getDiskUsage: usage [{}] with [{}] bytes relocating yields [{}]",
Expand Down Expand Up @@ -418,29 +421,28 @@ private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap<S
* Returns the expected shard size for the given shard or the default value provided if not enough information are available
* to estimate the shards size.
*/
public static long getExpectedShardSize(ShardRouting shard, RoutingAllocation allocation, long defaultValue) {
final IndexMetaData metaData = allocation.metaData().getIndexSafe(shard.index());
final ClusterInfo info = allocation.clusterInfo();
if (metaData.getResizeSourceIndex() != null && shard.active() == false &&
public static long getExpectedShardSize(ShardRouting shard, long defaultValue, ClusterInfo clusterInfo, MetaData metaData,
RoutingTable routingTable) {
final IndexMetaData indexMetaData = metaData.getIndexSafe(shard.index());
if (indexMetaData.getResizeSourceIndex() != null && shard.active() == false &&
shard.recoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
// in the shrink index case we sum up the source index shards since we basically make a copy of the shard in
// the worst case
long targetShardSize = 0;
final Index mergeSourceIndex = metaData.getResizeSourceIndex();
final IndexMetaData sourceIndexMeta = allocation.metaData().index(mergeSourceIndex);
final Index mergeSourceIndex = indexMetaData.getResizeSourceIndex();
final IndexMetaData sourceIndexMeta = metaData.index(mergeSourceIndex);
if (sourceIndexMeta != null) {
final Set<ShardId> shardIds = IndexMetaData.selectRecoverFromShards(shard.id(),
sourceIndexMeta, metaData.getNumberOfShards());
for (IndexShardRoutingTable shardRoutingTable : allocation.routingTable().index(mergeSourceIndex.getName())) {
sourceIndexMeta, indexMetaData.getNumberOfShards());
for (IndexShardRoutingTable shardRoutingTable : routingTable.index(mergeSourceIndex.getName())) {
if (shardIds.contains(shardRoutingTable.shardId())) {
targetShardSize += info.getShardSize(shardRoutingTable.primaryShard(), 0);
targetShardSize += clusterInfo.getShardSize(shardRoutingTable.primaryShard(), 0);
}
}
}
return targetShardSize == 0 ? defaultValue : targetShardSize;
} else {
return info.getShardSize(shard, defaultValue);
return clusterInfo.getShardSize(shard, defaultValue);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public class ShardStartedClusterStateTaskExecutorTests extends ESAllocationTestC

private ShardStateAction.ShardStartedClusterStateTaskExecutor executor;

private static void neverReroutes(String reason, Priority priority, ActionListener<Void> listener) {
@SuppressWarnings("unused")
private static void neverReroutes(String reason, Priority priority, ActionListener<ClusterState> listener) {
fail("unexpectedly ran a deferred reroute");
}

Expand Down
Loading