Skip to content

Commit

Permalink
Add explanations for all AllocationDeciders
Browse files Browse the repository at this point in the history
Relates to #4380
Relates to #2483
  • Loading branch information
dakrone committed Jan 31, 2014
1 parent 68b4db9 commit 767c2e9
Show file tree
Hide file tree
Showing 19 changed files with 213 additions and 96 deletions.
Expand Up @@ -109,7 +109,7 @@ public void onFailure(String source, Throwable t) {

@Override
public ClusterState execute(ClusterState currentState) {
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.commands);
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.commands, true);
ClusterState newState = ClusterState.builder(currentState).routingResult(routingResult).build();
clusterStateToSend = newState;
if (request.dryRun) {
Expand Down
Expand Up @@ -153,4 +153,34 @@ public boolean match(DiscoveryNode node) {
return true;
}
}

/**
* Generates a human-readable string for the DiscoverNodeFilters.
* Example: {@code _id:"id1 OR blah",name:"blah OR name2"}
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
int entryCount = filters.size();
for (Map.Entry<String, String[]> entry : filters.entrySet()) {
String attr = entry.getKey();
String[] values = entry.getValue();
sb.append(attr);
sb.append(":\"");
int valueCount = values.length;
for (String value : values) {
sb.append(value);
if (valueCount > 1) {
sb.append(" " + opType.toString() + " ");
}
valueCount--;
}
sb.append("\"");
if (entryCount > 1) {
sb.append(",");
}
entryCount--;
}
return sb.toString();
}
}
Expand Up @@ -114,12 +114,17 @@ public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, Lis
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
}

public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands) throws ElasticsearchException {
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands) {
return reroute(clusterState, commands, false);
}

public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands, boolean debug) throws ElasticsearchException {
RoutingNodes routingNodes = clusterState.routingNodes();
// we don't shuffle the unassigned shards here, to try and get as close as possible to
// a consistent result of the effect the commands have on the routing
// this allows systems to dry run the commands, see the resulting cluster state, and act on it
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
allocation.debugDecision(debug);
// we ignore disable allocation, because commands are explicit
allocation.ignoreDisable(true);
commands.execute(allocation);
Expand All @@ -137,10 +142,20 @@ public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCom
* <p>If the same instance of the routing table is returned, then no change has been made.
*/
public RoutingAllocation.Result reroute(ClusterState clusterState) {
return reroute(clusterState, false);
}

/**
* Reroutes the routing table based on the live nodes.
* <p/>
* <p>If the same instance of the routing table is returned, then no change has been made.
*/
public RoutingAllocation.Result reroute(ClusterState clusterState, boolean debug) {
RoutingNodes routingNodes = clusterState.routingNodes();
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
allocation.debugDecision(debug);
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}
Expand All @@ -153,10 +168,20 @@ public RoutingAllocation.Result reroute(ClusterState clusterState) {
* them.
*/
public RoutingAllocation.Result rerouteWithNoReassign(ClusterState clusterState) {
return rerouteWithNoReassign(clusterState, false);
}

/**
* Only handles reroute but *without* any reassignment of unassigned shards or rebalancing. Does
* make sure to handle removed nodes, but only moved the shards to UNASSIGNED, does not reassign
* them.
*/
public RoutingAllocation.Result rerouteWithNoReassign(ClusterState clusterState, boolean debug) {
RoutingNodes routingNodes = clusterState.routingNodes();
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
allocation.debugDecision(debug);
boolean changed = false;
// first, clear from the shards any node id they used to belong to that is now dead
changed |= deassociateDeadNodes(allocation);
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.index.shard.ShardId;

import java.util.HashMap;
Expand Down Expand Up @@ -99,6 +100,8 @@ public AllocationExplanation explanation() {

private boolean ignoreDisable = false;

private boolean debugDecision = false;

/**
* Creates a new {@link RoutingAllocation}
*
Expand Down Expand Up @@ -173,6 +176,14 @@ public boolean ignoreDisable() {
return this.ignoreDisable;
}

public void debugDecision(boolean debug) {
this.debugDecision = debug;
}

public boolean debugDecision() {
return this.debugDecision;
}

public void addIgnoreShardForNode(ShardId shardId, String nodeId) {
if (ignoredShardToNodes == null) {
ignoredShardToNodes = new HashMap<ShardId, String>();
Expand All @@ -183,4 +194,16 @@ public void addIgnoreShardForNode(ShardId shardId, String nodeId) {
public boolean shouldIgnoreShardForNode(ShardId shardId, String nodeId) {
return ignoredShardToNodes != null && nodeId.equals(ignoredShardToNodes.get(shardId));
}

/**
* Create a routing decision, including the reason if the debug flag is
* turned on
*/
public Decision decision(Decision decision, String reason, Object... params) {
if (debugDecision()) {
return Decision.single(decision.type(), reason, params);
} else {
return decision;
}
}
}
Expand Up @@ -52,7 +52,11 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
Decision decision = allocationDecider.canRebalance(shardRouting, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
return decision;
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
ret.add(decision);
}
Expand All @@ -73,7 +77,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
if (logger.isTraceEnabled()) {
logger.trace("Can not allocate [{}] on node [{}] due to [{}]", shardRouting, node.nodeId(), allocationDecider.getClass().getSimpleName());
}
return decision;
// short circuit only if debugging is not enabled
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
// the assumption is that a decider that returns the static instance Decision#ALWAYS
// does not really implements canAllocate
Expand All @@ -99,7 +108,11 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
if (logger.isTraceEnabled()) {
logger.trace("Shard [{}] can not remain on node [{}] due to [{}]", shardRouting, node.nodeId(), allocationDecider.getClass().getSimpleName());
}
return decision;
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
ret.add(decision);
}
Expand All @@ -113,7 +126,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
Decision decision = allocationDecider.canAllocate(shardRouting, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
return decision;
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
ret.add(decision);
}
Expand All @@ -127,7 +144,11 @@ public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
Decision decision = allocationDecider.canAllocate(node, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
return decision;
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
ret.add(decision);
}
Expand Down
Expand Up @@ -158,25 +158,25 @@ public String[] awarenessAttributes() {

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, true) ? Decision.YES : Decision.NO;
return underCapacity(shardRouting, node, allocation, true);
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, false) ? Decision.YES : Decision.NO;
return underCapacity(shardRouting, node, allocation, false);
}

private boolean underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
if (awarenessAttributes.length == 0) {
return true;
return allocation.decision(Decision.YES, "no allocation awareness enabled");
}

IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.index());
int shardCount = indexMetaData.numberOfReplicas() + 1; // 1 for primary
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute
if (!node.node().attributes().containsKey(awarenessAttribute)) {
return false;
return allocation.decision(Decision.NO, "node does not contain awareness attribute: [%s]", awarenessAttribute);
}

// build attr_value -> nodes map
Expand Down Expand Up @@ -234,14 +234,14 @@ private boolean underCapacity(ShardRouting shardRouting, RoutingNode node, Routi
int currentNodeCount = shardPerAttribute.get(node.node().attributes().get(awarenessAttribute));
// if we are above with leftover, then we know we are not good, even with mod
if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) {
return false;
return allocation.decision(Decision.NO, "too many shards on nodes for attribute: [%s]", awarenessAttribute);
}
// all is well, we are below or same as average
if (currentNodeCount <= requiredCountPerAttribute) {
continue;
}
}

return true;
return allocation.decision(Decision.YES, "node meets awareness requirements");
}
}
Expand Up @@ -19,15 +19,11 @@

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;

import java.util.List;
import java.util.Locale;

/**
Expand Down Expand Up @@ -91,27 +87,27 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
if (type == ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE) {
// check if there are unassigned primaries.
if ( allocation.routingNodes().hasUnassignedPrimaries() ) {
return Decision.NO;
return allocation.decision(Decision.NO, "cluster has unassigned primary shards");
}
// check if there are initializing primaries that don't have a relocatingNodeId entry.
if ( allocation.routingNodes().hasInactivePrimaries() ) {
return Decision.NO;
return allocation.decision(Decision.NO, "cluster has inactive primary shards");
}

return Decision.YES;
return allocation.decision(Decision.YES, "all primary shards are active");
}
if (type == ClusterRebalanceType.INDICES_ALL_ACTIVE) {
// check if there are unassigned shards.
if ( allocation.routingNodes().hasUnassignedShards() ) {
return Decision.NO;
return allocation.decision(Decision.NO, "cluster has unassigned shards");
}
// in case all indices are assigned, are there initializing shards which
// are not relocating?
if ( allocation.routingNodes().hasInactiveShards() ) {
return Decision.NO;
return allocation.decision(Decision.NO, "cluster has inactive shards");
}
}
// type == Type.ALWAYS
return Decision.YES;
return allocation.decision(Decision.YES, "all shards are active");
}
}
Expand Up @@ -65,11 +65,12 @@ public ConcurrentRebalanceAllocationDecider(Settings settings, NodeSettingsServi
@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
if (clusterConcurrentRebalance == -1) {
return Decision.YES;
return allocation.decision(Decision.YES, "all concurrent rebalances are allowed");
}
if (allocation.routingNodes().getRelocatingShardCount() >= clusterConcurrentRebalance) {
return Decision.NO;
return allocation.decision(Decision.NO, "too man concurrent rebalances [%d], limit: [%d]",
allocation.routingNodes().getRelocatingShardCount(), clusterConcurrentRebalance);
}
return Decision.YES;
return allocation.decision(Decision.YES, "below threshold [%d] for concurrent rebalances", clusterConcurrentRebalance);
}
}
Expand Up @@ -104,20 +104,28 @@ public DisableAllocationDecider(Settings settings, NodeSettingsService nodeSetti
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
return Decision.YES;
return allocation.decision(Decision.YES, "allocation disabling is ignored");
}
Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).settings();
if (shardRouting.primary() && !allocation.routingNodes().routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi()) {
// if its primary, and it hasn't been allocated post API (meaning its a "fresh newly created shard"), only disable allocation
// on a special disable allocation flag
return indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, disableNewAllocation) ? Decision.NO : Decision.YES;
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, disableNewAllocation)) {
return allocation.decision(Decision.NO, "new primary allocation is disabled");
} else {
return allocation.decision(Decision.YES, "new primary allocation is enabled");
}
}
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_ALLOCATION, disableAllocation)) {
return Decision.NO;
return allocation.decision(Decision.NO, "all allocation is disabled");
}
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION, disableReplicaAllocation)) {
return shardRouting.primary() ? Decision.YES : Decision.NO;
if (shardRouting.primary()) {
return allocation.decision(Decision.YES, "primary allocation is enabled");
} else {
return allocation.decision(Decision.NO, "replica allocation is disabled");
}
}
return Decision.YES;
return allocation.decision(Decision.YES, "all allocation is enabled");
}
}

0 comments on commit 767c2e9

Please sign in to comment.