Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Take percentage watermarks into account for reroute listener
Fixes an issue where only absolute bytes were taken into account when
kicking off an automatic reroute due to disk usage. Also randomized the
tests to use either an absolute value or a percentage so this is tested.

Also adds logging for each node over the high and low watermark every
time a new cluster info usage is gathered (defaults to every 30
seconds).

Related to #8368
Fixes #8367
  • Loading branch information
dakrone committed Nov 13, 2014
1 parent 4e5264c commit 94df281
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 63 deletions.
10 changes: 8 additions & 2 deletions src/main/java/org/elasticsearch/cluster/DiskUsage.java
Expand Up @@ -19,20 +19,25 @@

package org.elasticsearch.cluster;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.ByteSizeValue;

/**
* Encapsulation class used to represent the amount of disk used on a node.
*/
public class DiskUsage {
final String nodeId;
final String nodeName;
final long totalBytes;
final long freeBytes;

public DiskUsage(String nodeId, long totalBytes, long freeBytes) {
public DiskUsage(String nodeId, String nodeName, long totalBytes, long freeBytes) {
if ((totalBytes < freeBytes) || (totalBytes < 0)) {
throw new IllegalStateException("Free bytes [" + freeBytes +
"] cannot be less than 0 or greater than total bytes [" + totalBytes + "]");
}
this.nodeId = nodeId;
this.nodeName = nodeName;
this.totalBytes = totalBytes;
this.freeBytes = freeBytes;
}
Expand All @@ -55,6 +60,7 @@ public long getUsedBytes() {
}

public String toString() {
return "[" + nodeId + "] free: " + getFreeBytes() + "[" + getFreeDiskAsPercentage() + "%]";
return "[" + nodeId + "][" + nodeName + "] free: " + new ByteSizeValue(getFreeBytes()) +
"[" + Strings.format1Decimals(getFreeDiskAsPercentage(), "%") + "]";
}
}
Expand Up @@ -304,10 +304,11 @@ public void onResponse(NodesStatsResponse nodeStatses) {
total += info.getTotal().bytes();
}
String nodeId = nodeStats.getNode().id();
String nodeName = nodeStats.getNode().getName();
if (logger.isTraceEnabled()) {
logger.trace("node: [{}], total disk: {}, available disk: {}", nodeId, total, available);
}
newUsages.put(nodeId, new DiskUsage(nodeId, total, available));
newUsages.put(nodeId, new DiskUsage(nodeId, nodeName, total, available));
}
}
usages = ImmutableMap.copyOf(newUsages);
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -119,7 +120,8 @@ public void onRefreshSettings(Settings settings) {

/**
* Listens for a node to go over the high watermark and kicks off an empty
* reroute if it does
* reroute if it does. Also responsible for logging about nodes that have
* passed the disk watermarks
*/
class DiskListener implements ClusterInfoService.Listener {
private final Client client;
Expand All @@ -129,27 +131,52 @@ class DiskListener implements ClusterInfoService.Listener {
this.client = client;
}

/**
* Warn about the given disk usage if the low or high watermark has been passed
*/
private void warnAboutDiskIfNeeded(DiskUsage usage) {
// Check absolute disk values
if (usage.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdHigh.bytes()) {
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
DiskThresholdDecider.this.freeBytesThresholdHigh, usage);
} else if (usage.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdLow.bytes()) {
logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node",
DiskThresholdDecider.this.freeBytesThresholdLow, usage);
}

// Check percentage disk values
if (usage.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdHigh) {
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
Strings.format1Decimals(DiskThresholdDecider.this.freeDiskThresholdHigh, "%"), usage);
} else if (usage.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdLow) {
logger.info("low disk watermark [{}] exceeded on {}, replicas will not be assigned to this node",
Strings.format1Decimals(DiskThresholdDecider.this.freeDiskThresholdLow, "%"), usage);
}
}

@Override
public void onNewInfo(ClusterInfo info) {
Map<String, DiskUsage> usages = info.getNodeDiskUsages();
if (usages != null) {
boolean reroute = false;
for (DiskUsage entry : usages.values()) {
if (entry.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdHigh.bytes()) {
warnAboutDiskIfNeeded(entry);
if (entry.getFreeBytes() < DiskThresholdDecider.this.freeBytesThresholdHigh.bytes() ||
entry.getFreeDiskAsPercentage() < DiskThresholdDecider.this.freeDiskThresholdHigh) {
if ((System.currentTimeMillis() - lastRun) > DiskThresholdDecider.this.rerouteInterval.millis()) {
lastRun = System.currentTimeMillis();
logger.info("high watermark [{}/{}%] exceeded on {}, rerouting shards",
DiskThresholdDecider.this.freeBytesThresholdHigh, DiskThresholdDecider.this.freeDiskThresholdHigh, entry);
// Execute an empty reroute, but don't block on the response
client.admin().cluster().prepareReroute().execute();
// Only one reroute is required, short circuit
return;
reroute = true;
} else {
logger.debug("high watermark exceeded on {} but an automatic reroute has occurred in the last [{}], skipping reroute",
logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred in the last [{}], skipping reroute",
entry, DiskThresholdDecider.this.rerouteInterval);
return;
}
}
}
if (reroute) {
logger.info("high disk watermark exceeded on one or more nodes, rerouting shards");
// Execute an empty reroute, but don't block on the response
client.admin().cluster().prepareReroute().execute();
}
}
}
}
Expand Down Expand Up @@ -251,7 +278,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

if (includeRelocations) {
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) {
logger.trace("usage without relocations: {}", usage);
logger.trace("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
Expand All @@ -262,8 +290,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
// First, check that the node currently over the low watermark
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
long freeBytes = usage.getFreeBytes();
if (logger.isDebugEnabled()) {
logger.debug("Node [{}] has {}% free disk", node.nodeId(), freeDiskPercentage);
if (logger.isTraceEnabled()) {
logger.trace("Node [{}] has {}% free disk", node.nodeId(), freeDiskPercentage);
}

// a flag for whether the primary shard has been previously allocated
Expand Down Expand Up @@ -306,18 +334,20 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
// If the shard is a replica or has a primary that has already been allocated before, check the low threshold
if (!shardRouting.primary() || (shardRouting.primary() && primaryHasBeenAllocated)) {
if (logger.isDebugEnabled()) {
logger.debug("Less than the required {}% free disk threshold ({}% free) on node [{}], preventing allocation",
freeDiskThresholdLow, freeDiskPercentage, node.nodeId());
logger.debug("Less than the required {} free disk threshold ({} free) on node [{}], preventing allocation",
Strings.format1Decimals(freeDiskThresholdLow, "%"),
Strings.format1Decimals(freeDiskPercentage, "%"), node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "less than required [%s%%] free disk on node, free: [%s%%]",
freeDiskThresholdLow, freeDiskThresholdLow);
freeDiskThresholdLow, freeDiskPercentage);
} else if (freeDiskPercentage > freeDiskThresholdHigh) {
// Allow the shard to be allocated because it is primary that
// has never been allocated if it's under the high watermark
if (logger.isDebugEnabled()) {
logger.debug("Less than the required {}% free disk threshold ({}% free) on node [{}], " +
logger.debug("Less than the required {} free disk threshold ({} free) on node [{}], " +
"but allowing allocation because primary has never been allocated",
freeDiskThresholdLow, freeDiskPercentage, node.nodeId());
Strings.format1Decimals(freeDiskThresholdLow, "%"),
Strings.format1Decimals(freeDiskPercentage, "%"), node.nodeId());
}
return allocation.decision(Decision.YES, NAME, "primary has never been allocated before");
} else {
Expand All @@ -326,10 +356,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
if (logger.isDebugEnabled()) {
logger.debug("Less than the required {} free bytes threshold ({} bytes free) on node {}, " +
"preventing allocation even though primary has never been allocated",
freeDiskThresholdHigh, freeDiskPercentage, node.nodeId());
Strings.format1Decimals(freeDiskThresholdHigh, "%"),
Strings.format1Decimals(freeDiskPercentage, "%"), node.nodeId());
}
return allocation.decision(Decision.NO, NAME, "less than required [%s%%] free disk on node, free: [%s%%]",
freeDiskThresholdLow, freeDiskThresholdLow);
freeDiskThresholdLow, freeDiskPercentage);
}
}

Expand All @@ -345,8 +376,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
freeBytesThresholdLow, new ByteSizeValue(freeBytesAfterShard));
}
if (freeSpaceAfterShard < freeDiskThresholdHigh) {
logger.warn("After allocating, node [{}] would have less than the required {}% free disk threshold ({}% free), preventing allocation",
node.nodeId(), freeDiskThresholdHigh, freeSpaceAfterShard);
logger.warn("After allocating, node [{}] would have less than the required {} free disk threshold ({} free), preventing allocation",
node.nodeId(), Strings.format1Decimals(freeDiskThresholdHigh, "%"), Strings.format1Decimals(freeSpaceAfterShard, "%"));
return allocation.decision(Decision.NO, NAME, "after allocation less than required [%s%%] free disk on node, free: [%s%%]",
freeDiskThresholdLow, freeSpaceAfterShard);
}
Expand Down Expand Up @@ -393,7 +424,8 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
if (includeRelocations) {
Map<String, Long> shardSizes = clusterInfo.getShardSizes();
long relocatingShardsSize = sizeOfRelocatingShards(node, allocation, shardSizes);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) {
logger.trace("usage without relocations: {}", usage);
logger.trace("usage with relocations: [{} bytes] {}", relocatingShardsSize, usageIncludingRelocations);
Expand Down Expand Up @@ -441,7 +473,7 @@ public DiskUsage averageUsage(RoutingNode node, Map<String, DiskUsage> usages) {
totalBytes += du.getTotalBytes();
freeBytes += du.getFreeBytes();
}
return new DiskUsage(node.nodeId(), totalBytes / usages.size(), freeBytes / usages.size());
return new DiskUsage(node.nodeId(), node.node().name(), totalBytes / usages.size(), freeBytes / usages.size());
}

/**
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/org/elasticsearch/cluster/DiskUsageTests.java
Expand Up @@ -28,7 +28,7 @@ public class DiskUsageTests extends ElasticsearchTestCase {

@Test
public void diskUsageCalcTest() {
DiskUsage du = new DiskUsage("node1", 100, 40);
DiskUsage du = new DiskUsage("node1", "n1", 100, 40);
assertThat(du.getFreeDiskAsPercentage(), equalTo(40.0));
assertThat(du.getFreeBytes(), equalTo(40L));
assertThat(du.getUsedBytes(), equalTo(60L));
Expand All @@ -44,12 +44,12 @@ public void randomDiskUsageTest() {
long free = between(Integer.MIN_VALUE, Integer.MAX_VALUE);
if (free > total || total <= 0) {
try {
new DiskUsage("random", total, free);
new DiskUsage("random", "random", total, free);
fail("should never reach this");
} catch (IllegalStateException e) {
}
} else {
DiskUsage du = new DiskUsage("random", total, free);
DiskUsage du = new DiskUsage("random", "random", total, free);
assertThat(du.getFreeBytes(), equalTo(free));
assertThat(du.getTotalBytes(), equalTo(total));
assertThat(du.getUsedBytes(), equalTo(total - free));
Expand Down

0 comments on commit 94df281

Please sign in to comment.