Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand ClusterInfo to provide min / max disk usage for allocation decider #13163

Merged
merged 1 commit into from Aug 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 33 additions & 8 deletions core/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Expand Up @@ -32,28 +32,53 @@
*/
public class ClusterInfo {

private final Map<String, DiskUsage> usages;
private final Map<String, DiskUsage> leastAvailableSpaceUsage;
private final Map<String, DiskUsage> mostAvailabeSpaceUsage;
final Map<String, Long> shardSizes;
public static final ClusterInfo EMPTY = new ClusterInfo();

private ClusterInfo() {
this.usages = Collections.emptyMap();
this.shardSizes = Collections.emptyMap();
protected ClusterInfo() {
this(Collections.EMPTY_MAP, Collections.EMPTY_MAP, Collections.EMPTY_MAP);
}

public ClusterInfo(Map<String, DiskUsage> usages, Map<String, Long> shardSizes) {
this.usages = usages;
/**
* Creates a new ClusterInfo instance.
*
* @param leastAvailableSpaceUsage a node id to disk usage mapping for the path that has the least available space on the node.
* @param mostAvailableSpaceUsage a node id to disk usage mapping for the path that has the most available space on the node.
* @param shardSizes a shardkey to size in bytes mapping per shard.
* @see #shardIdentifierFromRouting
*/
public ClusterInfo(final Map<String, DiskUsage> leastAvailableSpaceUsage, final Map<String, DiskUsage> mostAvailableSpaceUsage, final Map<String, Long> shardSizes) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes;
this.mostAvailabeSpaceUsage = mostAvailableSpaceUsage;
}

/**
* Returns a node id to disk usage mapping for the path that has the least available space on the node.
*/
public Map<String, DiskUsage> getNodeLeastAvailableDiskUsages() {
return this.leastAvailableSpaceUsage;
}

public Map<String, DiskUsage> getNodeDiskUsages() {
return this.usages;
/**
* Returns a node id to disk usage mapping for the path that has the most available space on the node.
*/
public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
return this.mostAvailabeSpaceUsage;
}

/**
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
*/
public Long getShardSize(ShardRouting shardRouting) {
return shardSizes.get(shardIdentifierFromRouting(shardRouting));
}

/**
* Returns the shard size for the given shard routing or <code>defaultValue</code> it that metric is not available.
*/
public long getShardSize(ShardRouting shardRouting, long defaultValue) {
Long shardSize = getShardSize(shardRouting);
return shardSize == null ? defaultValue : shardSize;
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/org/elasticsearch/cluster/DiskUsage.java
Expand Up @@ -28,18 +28,20 @@
public class DiskUsage {
final String nodeId;
final String nodeName;
final String path;
final long totalBytes;
final long freeBytes;

/**
* Create a new DiskUsage, if {@code totalBytes} is 0, {@get getFreeDiskAsPercentage}
* will always return 100.0% free
*/
public DiskUsage(String nodeId, String nodeName, long totalBytes, long freeBytes) {
public DiskUsage(String nodeId, String nodeName, String path, long totalBytes, long freeBytes) {
this.nodeId = nodeId;
this.nodeName = nodeName;
this.freeBytes = freeBytes;
this.totalBytes = totalBytes;
this.path = path;
}

public String getNodeId() {
Expand All @@ -50,6 +52,10 @@ public String getNodeName() {
return nodeName;
}

public String getPath() {
return path;
}

public double getFreeDiskAsPercentage() {
// We return 100.0% in order to fail "open", in that if we have invalid
// numbers for the total bytes, it's as if we don't know disk usage.
Expand Down Expand Up @@ -77,7 +83,7 @@ public long getUsedBytes() {

@Override
public String toString() {
return "[" + nodeId + "][" + nodeName + "] free: " + new ByteSizeValue(getFreeBytes()) +
return "[" + nodeId + "][" + nodeName + "][" + path + "] free: " + new ByteSizeValue(getFreeBytes()) +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice.

"[" + Strings.format1Decimals(getFreeDiskAsPercentage(), "%") + "]";
}
}
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.cluster;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand All @@ -35,9 +34,9 @@
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.node.settings.NodeSettingsService;
Expand All @@ -47,7 +46,6 @@
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* InternalClusterInfoService provides the ClusterInfoService interface,
Expand All @@ -67,7 +65,8 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu

private volatile TimeValue updateFrequency;

private volatile Map<String, DiskUsage> usages;
private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile Map<String, Long> shardSizes;
private volatile boolean isMaster = false;
private volatile boolean enabled;
Expand All @@ -84,7 +83,8 @@ public InternalClusterInfoService(Settings settings, NodeSettingsService nodeSet
TransportIndicesStatsAction transportIndicesStatsAction, ClusterService clusterService,
ThreadPool threadPool) {
super(settings);
this.usages = Collections.emptyMap();
this.leastAvailableSpaceUsages = Collections.emptyMap();
this.mostAvailableSpaceUsages = Collections.emptyMap();
this.shardSizes = Collections.emptyMap();
this.transportNodesStatsAction = transportNodesStatsAction;
this.transportIndicesStatsAction = transportIndicesStatsAction;
Expand Down Expand Up @@ -200,17 +200,24 @@ public void clusterChanged(ClusterChangedEvent event) {
if (logger.isTraceEnabled()) {
logger.trace("Removing node from cluster info: {}", removedNode.getId());
}
Map<String, DiskUsage> newUsages = new HashMap<>(usages);
newUsages.remove(removedNode.getId());
usages = Collections.unmodifiableMap(newUsages);
if (leastAvailableSpaceUsages.containsKey(removedNode.getId())) {
Map<String, DiskUsage> newMaxUsages = new HashMap<>(leastAvailableSpaceUsages);
newMaxUsages.remove(removedNode.getId());
leastAvailableSpaceUsages = Collections.unmodifiableMap(newMaxUsages);
}
if (mostAvailableSpaceUsages.containsKey(removedNode.getId())) {
Map<String, DiskUsage> newMinUsages = new HashMap<>(mostAvailableSpaceUsages);
newMinUsages.remove(removedNode.getId());
mostAvailableSpaceUsages = Collections.unmodifiableMap(newMinUsages);
}
}
}
}
}

@Override
public ClusterInfo getClusterInfo() {
return new ClusterInfo(usages, shardSizes);
return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes);
}

@Override
Expand Down Expand Up @@ -313,27 +320,11 @@ public void run() {
CountDownLatch nodeLatch = updateNodeStats(new ActionListener<NodesStatsResponse>() {
@Override
public void onResponse(NodesStatsResponse nodeStatses) {
Map<String, DiskUsage> newUsages = new HashMap<>();
for (NodeStats nodeStats : nodeStatses.getNodes()) {
if (nodeStats.getFs() == null) {
logger.warn("Unable to retrieve node FS stats for {}", nodeStats.getNode().name());
} else {
long available = 0;
long total = 0;

for (FsInfo.Path info : nodeStats.getFs()) {
available += info.getAvailable().bytes();
total += info.getTotal().bytes();
}
String nodeId = nodeStats.getNode().id();
String nodeName = nodeStats.getNode().getName();
if (logger.isTraceEnabled()) {
logger.trace("node: [{}], total disk: {}, available disk: {}", nodeId, total, available);
}
newUsages.put(nodeId, new DiskUsage(nodeId, nodeName, total, available));
}
}
usages = Collections.unmodifiableMap(newUsages);
Map<String, DiskUsage> newLeastAvaiableUsages = new HashMap<>();
Map<String, DiskUsage> newMostAvaiableUsages = new HashMap<>();
fillDiskUsagePerNode(logger, nodeStatses.getNodes(), newLeastAvaiableUsages, newMostAvaiableUsages);
leastAvailableSpaceUsages = Collections.unmodifiableMap(newLeastAvaiableUsages);
mostAvailableSpaceUsages = Collections.unmodifiableMap(newMostAvaiableUsages);
}

@Override
Expand All @@ -349,7 +340,8 @@ public void onFailure(Throwable e) {
logger.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e);
}
// we empty the usages list, to be safe - we don't know what's going on.
usages = Collections.emptyMap();
leastAvailableSpaceUsages = Collections.emptyMap();
mostAvailableSpaceUsages = Collections.emptyMap();
}
}
});
Expand Down Expand Up @@ -412,5 +404,34 @@ public void onFailure(Throwable e) {
}
}

static void fillDiskUsagePerNode(ESLogger logger, NodeStats[] nodeStatsArray, Map<String, DiskUsage> newLeastAvaiableUsages, Map<String, DiskUsage> newMostAvaiableUsages) {
for (NodeStats nodeStats : nodeStatsArray) {
if (nodeStats.getFs() == null) {
logger.warn("Unable to retrieve node FS stats for {}", nodeStats.getNode().name());
} else {
FsInfo.Path leastAvailablePath = null;
FsInfo.Path mostAvailablePath = null;
for (FsInfo.Path info : nodeStats.getFs()) {
if (leastAvailablePath == null) {
assert mostAvailablePath == null;
mostAvailablePath = leastAvailablePath = info;
} else if (leastAvailablePath.getAvailable().bytes() > info.getAvailable().bytes()){
leastAvailablePath = info;
} else if (mostAvailablePath.getAvailable().bytes() < info.getAvailable().bytes()) {
mostAvailablePath = info;
}
}
String nodeId = nodeStats.getNode().id();
String nodeName = nodeStats.getNode().getName();
if (logger.isTraceEnabled()) {
logger.trace("node: [{}], most available: total disk: {}, available disk: {} / least available: total disk: {}, available disk: {}", nodeId, mostAvailablePath.getTotal(), leastAvailablePath.getAvailable(), leastAvailablePath.getTotal(), leastAvailablePath.getAvailable());
}
newLeastAvaiableUsages.put(nodeId, new DiskUsage(nodeId, nodeName, leastAvailablePath.getPath(), leastAvailablePath.getTotal().bytes(), leastAvailablePath.getAvailable().bytes()));
newMostAvaiableUsages.put(nodeId, new DiskUsage(nodeId, nodeName, mostAvailablePath.getPath(), mostAvailablePath.getTotal().bytes(), mostAvailablePath.getAvailable().bytes()));

}
}
}


}
Expand Up @@ -164,7 +164,7 @@ private void warnAboutDiskIfNeeded(DiskUsage usage) {

@Override
public void onNewInfo(ClusterInfo info) {
Map<String, DiskUsage> usages = info.getNodeDiskUsages();
Map<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
if (usages != null) {
boolean reroute = false;
String explanation = "";
Expand Down Expand Up @@ -339,7 +339,9 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

final double usedDiskThresholdLow = 100.0 - DiskThresholdDecider.this.freeDiskThresholdLow;
final double usedDiskThresholdHigh = 100.0 - DiskThresholdDecider.this.freeDiskThresholdHigh;
DiskUsage usage = getDiskUsage(node, allocation);
ClusterInfo clusterInfo = allocation.clusterInfo();
Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
DiskUsage usage = getDiskUsage(node, allocation, usages);
// First, check that the node currently over the low watermark
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
// Cache the used disk percentage for displaying disk percentages consistent with documentation
Expand Down Expand Up @@ -441,11 +443,16 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (shardRouting.currentNodeId().equals(node.nodeId()) == false) {
throw new IllegalArgumentException("Shard [" + shardRouting + "] is not allocated on node: [" + node.nodeId() + "]");
}
final Decision decision = earlyTerminate(allocation);
if (decision != null) {
return decision;
}
DiskUsage usage = getDiskUsage(node, allocation);
ClusterInfo clusterInfo = allocation.clusterInfo();
Map<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages();
DiskUsage usage = getDiskUsage(node, allocation, usages);
// If this node is already above the high threshold, the shard cannot remain (get it off!)
double freeDiskPercentage = usage.getFreeDiskAsPercentage();
long freeBytes = usage.getFreeBytes();
Expand All @@ -472,9 +479,8 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
return allocation.decision(Decision.YES, NAME, "enough disk for shard to remain on node, free: [%s]", new ByteSizeValue(freeBytes));
}

private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation) {
private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation, Map<String, DiskUsage> usages) {
ClusterInfo clusterInfo = allocation.clusterInfo();
Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
DiskUsage usage = usages.get(node.nodeId());
if (usage == null) {
// If there is no usage, and we have other nodes in the cluster,
Expand All @@ -488,7 +494,7 @@ private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation) {

if (includeRelocations) {
long relocatingShardsSize = sizeOfRelocatingShards(node, clusterInfo, true);
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(),
DiskUsage usageIncludingRelocations = new DiskUsage(node.nodeId(), node.node().name(), "_na_",
usage.getTotalBytes(), usage.getFreeBytes() - relocatingShardsSize);
if (logger.isTraceEnabled()) {
logger.trace("usage without relocations: {}", usage);
Expand All @@ -508,15 +514,15 @@ private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation) {
*/
public DiskUsage averageUsage(RoutingNode node, Map<String, DiskUsage> usages) {
if (usages.size() == 0) {
return new DiskUsage(node.nodeId(), node.node().name(), 0, 0);
return new DiskUsage(node.nodeId(), node.node().name(), "_na_", 0, 0);
}
long totalBytes = 0;
long freeBytes = 0;
for (DiskUsage du : usages.values()) {
totalBytes += du.getTotalBytes();
freeBytes += du.getFreeBytes();
}
return new DiskUsage(node.nodeId(), node.node().name(), totalBytes / usages.size(), freeBytes / usages.size());
return new DiskUsage(node.nodeId(), node.node().name(), "_na_", totalBytes / usages.size(), freeBytes / usages.size());
}

/**
Expand All @@ -528,8 +534,8 @@ public DiskUsage averageUsage(RoutingNode node, Map<String, DiskUsage> usages) {
*/
public double freeDiskPercentageAfterShardAssigned(DiskUsage usage, Long shardSize) {
shardSize = (shardSize == null) ? 0 : shardSize;
DiskUsage newUsage = new DiskUsage(usage.getNodeId(), usage.getNodeName(),
usage.getTotalBytes(), usage.getFreeBytes() - shardSize);
DiskUsage newUsage = new DiskUsage(usage.getNodeId(), usage.getNodeName(), usage.getPath(),
usage.getTotalBytes(), usage.getFreeBytes() - shardSize);
return newUsage.getFreeDiskAsPercentage();
}

Expand Down Expand Up @@ -600,7 +606,7 @@ private Decision earlyTerminate(RoutingAllocation allocation) {
return allocation.decision(Decision.YES, NAME, "cluster info unavailable");
}

final Map<String, DiskUsage> usages = clusterInfo.getNodeDiskUsages();
final Map<String, DiskUsage> usages = clusterInfo.getNodeLeastAvailableDiskUsages();
// Fail open if there are no disk usages available
if (usages.isEmpty()) {
if (logger.isTraceEnabled()) {
Expand Down