Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add explanations for all AllocationDeciders #4934

Merged
merged 1 commit into from Jan 31, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -109,7 +109,7 @@ public void onFailure(String source, Throwable t) {

@Override
public ClusterState execute(ClusterState currentState) {
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.commands);
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.commands, true);
ClusterState newState = ClusterState.builder(currentState).routingResult(routingResult).build();
clusterStateToSend = newState;
if (request.dryRun) {
Expand Down
Expand Up @@ -153,4 +153,34 @@ public boolean match(DiscoveryNode node) {
return true;
}
}

/**
* Generates a human-readable string for the DiscoverNodeFilters.
* Example: {@code _id:"id1 OR blah",name:"blah OR name2"}
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
int entryCount = filters.size();
for (Map.Entry<String, String[]> entry : filters.entrySet()) {
String attr = entry.getKey();
String[] values = entry.getValue();
sb.append(attr);
sb.append(":\"");
int valueCount = values.length;
for (String value : values) {
sb.append(value);
if (valueCount > 1) {
sb.append(" " + opType.toString() + " ");
}
valueCount--;
}
sb.append("\"");
if (entryCount > 1) {
sb.append(",");
}
entryCount--;
}
return sb.toString();
}
}
Expand Up @@ -114,12 +114,17 @@ public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, Lis
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
}

public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands) throws ElasticsearchException {
public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands) {
return reroute(clusterState, commands, false);
}

public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCommands commands, boolean debug) throws ElasticsearchException {
RoutingNodes routingNodes = clusterState.routingNodes();
// we don't shuffle the unassigned shards here, to try and get as close as possible to
// a consistent result of the effect the commands have on the routing
// this allows systems to dry run the commands, see the resulting cluster state, and act on it
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
allocation.debugDecision(debug);
// we ignore disable allocation, because commands are explicit
allocation.ignoreDisable(true);
commands.execute(allocation);
Expand All @@ -137,10 +142,20 @@ public RoutingAllocation.Result reroute(ClusterState clusterState, AllocationCom
* <p>If the same instance of the routing table is returned, then no change has been made.
*/
public RoutingAllocation.Result reroute(ClusterState clusterState) {
return reroute(clusterState, false);
}

/**
* Reroutes the routing table based on the live nodes.
* <p/>
* <p>If the same instance of the routing table is returned, then no change has been made.
*/
public RoutingAllocation.Result reroute(ClusterState clusterState, boolean debug) {
RoutingNodes routingNodes = clusterState.routingNodes();
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
allocation.debugDecision(debug);
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}
Expand All @@ -153,10 +168,20 @@ public RoutingAllocation.Result reroute(ClusterState clusterState) {
* them.
*/
public RoutingAllocation.Result rerouteWithNoReassign(ClusterState clusterState) {
return rerouteWithNoReassign(clusterState, false);
}

/**
* Only handles reroute but *without* any reassignment of unassigned shards or rebalancing. Does
* make sure to handle removed nodes, but only moved the shards to UNASSIGNED, does not reassign
* them.
*/
public RoutingAllocation.Result rerouteWithNoReassign(ClusterState clusterState, boolean debug) {
RoutingNodes routingNodes = clusterState.routingNodes();
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
allocation.debugDecision(debug);
boolean changed = false;
// first, clear from the shards any node id they used to belong to that is now dead
changed |= deassociateDeadNodes(allocation);
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.index.shard.ShardId;

import java.util.HashMap;
Expand Down Expand Up @@ -99,6 +100,8 @@ public AllocationExplanation explanation() {

private boolean ignoreDisable = false;

private boolean debugDecision = false;

/**
* Creates a new {@link RoutingAllocation}
*
Expand Down Expand Up @@ -173,6 +176,14 @@ public boolean ignoreDisable() {
return this.ignoreDisable;
}

public void debugDecision(boolean debug) {
this.debugDecision = debug;
}

public boolean debugDecision() {
return this.debugDecision;
}

public void addIgnoreShardForNode(ShardId shardId, String nodeId) {
if (ignoredShardToNodes == null) {
ignoredShardToNodes = new HashMap<ShardId, String>();
Expand All @@ -183,4 +194,16 @@ public void addIgnoreShardForNode(ShardId shardId, String nodeId) {
public boolean shouldIgnoreShardForNode(ShardId shardId, String nodeId) {
return ignoredShardToNodes != null && nodeId.equals(ignoredShardToNodes.get(shardId));
}

/**
* Create a routing decision, including the reason if the debug flag is
* turned on
*/
public Decision decision(Decision decision, String reason, Object... params) {
if (debugDecision()) {
return Decision.single(decision.type(), reason, params);
} else {
return decision;
}
}
}
Expand Up @@ -52,7 +52,11 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
Decision decision = allocationDecider.canRebalance(shardRouting, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
return decision;
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
ret.add(decision);
}
Expand All @@ -73,7 +77,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
if (logger.isTraceEnabled()) {
logger.trace("Can not allocate [{}] on node [{}] due to [{}]", shardRouting, node.nodeId(), allocationDecider.getClass().getSimpleName());
}
return decision;
// short circuit only if debugging is not enabled
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
// the assumption is that a decider that returns the static instance Decision#ALWAYS
// does not really implements canAllocate
Expand All @@ -99,7 +108,11 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
if (logger.isTraceEnabled()) {
logger.trace("Shard [{}] can not remain on node [{}] due to [{}]", shardRouting, node.nodeId(), allocationDecider.getClass().getSimpleName());
}
return decision;
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
ret.add(decision);
}
Expand All @@ -113,7 +126,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat
Decision decision = allocationDecider.canAllocate(shardRouting, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
return decision;
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
ret.add(decision);
}
Expand All @@ -127,7 +144,11 @@ public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
Decision decision = allocationDecider.canAllocate(node, allocation);
// short track if a NO is returned.
if (decision == Decision.NO) {
return decision;
if (!allocation.debugDecision()) {
return decision;
} else {
ret.add(decision);
}
} else if (decision != Decision.ALWAYS) {
ret.add(decision);
}
Expand Down
Expand Up @@ -158,25 +158,25 @@ public String[] awarenessAttributes() {

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, true) ? Decision.YES : Decision.NO;
return underCapacity(shardRouting, node, allocation, true);
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, false) ? Decision.YES : Decision.NO;
return underCapacity(shardRouting, node, allocation, false);
}

private boolean underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
if (awarenessAttributes.length == 0) {
return true;
return allocation.decision(Decision.YES, "no allocation awareness enabled");
}

IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.index());
int shardCount = indexMetaData.numberOfReplicas() + 1; // 1 for primary
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute
if (!node.node().attributes().containsKey(awarenessAttribute)) {
return false;
return allocation.decision(Decision.NO, "node does not contain awareness attribute: [%s]", awarenessAttribute);
}

// build attr_value -> nodes map
Expand Down Expand Up @@ -234,14 +234,14 @@ private boolean underCapacity(ShardRouting shardRouting, RoutingNode node, Routi
int currentNodeCount = shardPerAttribute.get(node.node().attributes().get(awarenessAttribute));
// if we are above with leftover, then we know we are not good, even with mod
if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) {
return false;
return allocation.decision(Decision.NO, "too many shards on nodes for attribute: [%s]", awarenessAttribute);
}
// all is well, we are below or same as average
if (currentNodeCount <= requiredCountPerAttribute) {
continue;
}
}

return true;
return allocation.decision(Decision.YES, "node meets awareness requirements");
}
}
Expand Up @@ -19,15 +19,11 @@

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;

import java.util.List;
import java.util.Locale;

/**
Expand Down Expand Up @@ -91,27 +87,27 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
if (type == ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE) {
// check if there are unassigned primaries.
if ( allocation.routingNodes().hasUnassignedPrimaries() ) {
return Decision.NO;
return allocation.decision(Decision.NO, "cluster has unassigned primary shards");
}
// check if there are initializing primaries that don't have a relocatingNodeId entry.
if ( allocation.routingNodes().hasInactivePrimaries() ) {
return Decision.NO;
return allocation.decision(Decision.NO, "cluster has inactive primary shards");
}

return Decision.YES;
return allocation.decision(Decision.YES, "all primary shards are active");
}
if (type == ClusterRebalanceType.INDICES_ALL_ACTIVE) {
// check if there are unassigned shards.
if ( allocation.routingNodes().hasUnassignedShards() ) {
return Decision.NO;
return allocation.decision(Decision.NO, "cluster has unassigned shards");
}
// in case all indices are assigned, are there initializing shards which
// are not relocating?
if ( allocation.routingNodes().hasInactiveShards() ) {
return Decision.NO;
return allocation.decision(Decision.NO, "cluster has inactive shards");
}
}
// type == Type.ALWAYS
return Decision.YES;
return allocation.decision(Decision.YES, "all shards are active");
}
}
Expand Up @@ -65,11 +65,12 @@ public ConcurrentRebalanceAllocationDecider(Settings settings, NodeSettingsServi
@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
if (clusterConcurrentRebalance == -1) {
return Decision.YES;
return allocation.decision(Decision.YES, "all concurrent rebalances are allowed");
}
if (allocation.routingNodes().getRelocatingShardCount() >= clusterConcurrentRebalance) {
return Decision.NO;
return allocation.decision(Decision.NO, "too man concurrent rebalances [%d], limit: [%d]",
allocation.routingNodes().getRelocatingShardCount(), clusterConcurrentRebalance);
}
return Decision.YES;
return allocation.decision(Decision.YES, "below threshold [%d] for concurrent rebalances", clusterConcurrentRebalance);
}
}
Expand Up @@ -104,20 +104,28 @@ public DisableAllocationDecider(Settings settings, NodeSettingsService nodeSetti
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
return Decision.YES;
return allocation.decision(Decision.YES, "allocation disabling is ignored");
}
Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).settings();
if (shardRouting.primary() && !allocation.routingNodes().routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi()) {
// if its primary, and it hasn't been allocated post API (meaning its a "fresh newly created shard"), only disable allocation
// on a special disable allocation flag
return indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, disableNewAllocation) ? Decision.NO : Decision.YES;
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, disableNewAllocation)) {
return allocation.decision(Decision.NO, "new primary allocation is disabled");
} else {
return allocation.decision(Decision.YES, "new primary allocation is enabled");
}
}
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_ALLOCATION, disableAllocation)) {
return Decision.NO;
return allocation.decision(Decision.NO, "all allocation is disabled");
}
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION, disableReplicaAllocation)) {
return shardRouting.primary() ? Decision.YES : Decision.NO;
if (shardRouting.primary()) {
return allocation.decision(Decision.YES, "primary allocation is enabled");
} else {
return allocation.decision(Decision.NO, "replica allocation is disabled");
}
}
return Decision.YES;
return allocation.decision(Decision.YES, "all allocation is enabled");
}
}