Skip to content

Commit

Permalink
[ALLOCATION] Weight deltas must be absolute deltas
Browse files Browse the repository at this point in the history
In some situations the shard balanceing weight delta becomes negative. Yet,
a negative delta is always treated as `well balanced` which is wrong. I wasn't
able to reproduce the issue in any way other than useing the real world data
from issue #9023. This commit adds a fix for absolute deltas as well as a base
test class that allows to build tests or simulations from the cat API output.

Closes #9023
  • Loading branch information
s1monw committed Jan 6, 2015
1 parent 90ce507 commit 81293ba
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 53 deletions.
6 changes: 3 additions & 3 deletions docs/reference/cluster/update-settings.asciidoc
Expand Up @@ -78,14 +78,14 @@ due to forced awareness or allocation filtering.

`cluster.routing.allocation.balance.index`::
Defines a factor to the number of shards per index allocated
on a specific node (float). Defaults to `0.5f`. Raising this raises the
on a specific node (float). Defaults to `0.55f`. Raising this raises the
tendency to equalize the number of shards per index across all nodes in
the cluster.

`cluster.routing.allocation.balance.primary`::
Defines a weight factor for the number of primaries of a specific index
allocated on a node (float). `0.05f`. Raising this raises the tendency
to equalize the number of primary shards across all nodes in the cluster.
allocated on a node (float). `0.00f`. Raising this raises the tendency
to equalize the number of primary shards across all nodes in the cluster. deprecated[1.3.8]

`cluster.routing.allocation.balance.threshold`::
Minimal optimization value of operations that should be performed (non
Expand Down
Expand Up @@ -259,13 +259,13 @@ public int primaryShardsUnassigned() {
/**
* Returns a {@link List} of shards that match one of the states listed in {@link ShardRoutingState states}
*
* @param states a set of {@link ShardRoutingState states}
* @param state {@link ShardRoutingState} to retrieve
* @return a {@link List} of shards that match one of the given {@link ShardRoutingState states}
*/
public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
public List<ShardRouting> shardsWithState(ShardRoutingState state) {
List<ShardRouting> shards = newArrayList();
for (IndexShardRoutingTable shardRoutingTable : this) {
shards.addAll(shardRoutingTable.shardsWithState(states));
shards.addAll(shardRoutingTable.shardsWithState(state));
}
return shards;
}
Expand Down
Expand Up @@ -504,13 +504,14 @@ public List<ShardRouting> replicaShardsWithState(ShardRoutingState... states) {
return shards;
}

public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
public List<ShardRouting> shardsWithState(ShardRoutingState state) {
if (state == ShardRoutingState.INITIALIZING) {
return allInitializingShards;
}
List<ShardRouting> shards = newArrayList();
for (ShardRouting shardEntry : this) {
for (ShardRoutingState state : states) {
if (shardEntry.state() == state) {
shards.add(shardEntry);
}
if (shardEntry.state() == state) {
shards.add(shardEntry);
}
}
return shards;
Expand Down
Expand Up @@ -108,10 +108,10 @@ public RoutingTableValidation validate(MetaData metaData) {
return validation;
}

public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
public List<ShardRouting> shardsWithState(ShardRoutingState state) {
List<ShardRouting> shards = newArrayList();
for (IndexRoutingTable indexRoutingTable : this) {
shards.addAll(indexRoutingTable.shardsWithState(states));
shards.addAll(indexRoutingTable.shardsWithState(state));
}
return shards;
}
Expand Down
Expand Up @@ -71,9 +71,18 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
public static final String SETTING_SHARD_BALANCE_FACTOR = "cluster.routing.allocation.balance.shard";
public static final String SETTING_PRIMARY_BALANCE_FACTOR = "cluster.routing.allocation.balance.primary";

private static final float DEFAULT_INDEX_BALANCE_FACTOR = 0.5f;
private static final float DEFAULT_INDEX_BALANCE_FACTOR = 0.55f;
private static final float DEFAULT_SHARD_BALANCE_FACTOR = 0.45f;
private static final float DEFAULT_PRIMARY_BALANCE_FACTOR = 0.05f;
/**
* The primary balance factor was introduces as a tie-breaker to make the initial allocation
* more deterministic. Yet other mechanism have been added ensure that the algorithm is more deterministic such that this
* setting is not needed anymore. Additionally, this setting was abused to balance shards based on their primary flag which can lead
* to unexpected behavior when allocating or balancing the shards.
*
* @deprecated the threshold primary balance factor is deprecated and should not be used.
*/
@Deprecated
private static final float DEFAULT_PRIMARY_BALANCE_FACTOR = 0.0f;

class ApplySettings implements NodeSettingsService.Listener {
@Override
Expand Down Expand Up @@ -191,44 +200,23 @@ public static class WeightFunction {
private final float indexBalance;
private final float shardBalance;
private final float primaryBalance;
private final EnumMap<Operation, float[]> thetaMap = new EnumMap<>(Operation.class);
private final float[] theta;

public WeightFunction(float indexBalance, float shardBalance, float primaryBalance) {
float sum = indexBalance + shardBalance + primaryBalance;
if (sum <= 0.0f) {
throw new ElasticsearchIllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
}
final float[] defaultTheta = new float[]{shardBalance / sum, indexBalance / sum, primaryBalance / sum};
for (Operation operation : Operation.values()) {
switch (operation) {
case THRESHOLD_CHECK:
sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
thetaMap.put(operation, defaultTheta);
} else {
thetaMap.put(operation, new float[]{shardBalance / sum, indexBalance / sum, 0});
}
break;
case BALANCE:
case ALLOCATE:
case MOVE:
thetaMap.put(operation, defaultTheta);
break;
default:
assert false;
}
}
theta = new float[]{shardBalance / sum, indexBalance / sum, primaryBalance / sum};
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
this.primaryBalance = primaryBalance;
}

public float weight(Operation operation, Balancer balancer, ModelNode node, String index) {
final float weightShard = (node.numShards() - balancer.avgShardsPerNode());
final float weightIndex = (node.numShards(index) - balancer.avgShardsPerNode(index));
final float weightPrimary = (node.numPrimaries() - balancer.avgPrimariesPerNode());
final float[] theta = thetaMap.get(operation);
assert theta != null;
final float weightShard = node.numShards() - balancer.avgShardsPerNode();
final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
final float weightPrimary = node.numPrimaries() - balancer.avgPrimariesPerNode();
return theta[0] * weightShard + theta[1] * weightIndex + theta[2] * weightPrimary;
}

Expand All @@ -250,13 +238,7 @@ public static enum Operation {
/**
* Provided during move operation.
*/
MOVE,
/**
* Provided when the weight delta is checked against the configured threshold.
* This can be used to ignore tie-breaking weight factors that should not
* solely trigger a relocation unless the delta is above the threshold.
*/
THRESHOLD_CHECK
MOVE
}

/**
Expand Down Expand Up @@ -348,11 +330,16 @@ private boolean initialize(RoutingNodes routing, RoutingNodes.UnassignedShards u
return allocateUnassigned(unassigned, routing.ignoredUnassigned());
}

private static float absDelta(float lower, float higher) {
assert higher >= lower : higher + " lt " + lower +" but was expected to be gte";
return Math.abs(higher - lower);
}

private static boolean lessThan(float delta, float threshold) {
/* deltas close to the threshold are "rounded" to the threshold manually
to prevent floating point problems if the delta is very close to the
threshold ie. 1.000000002 which can trigger unnecessary balance actions*/
return delta <= threshold + 0.001f;
return delta <= (threshold + 0.001f);
}

/**
Expand Down Expand Up @@ -393,11 +380,10 @@ public boolean balance() {
final ModelNode maxNode = modelNodes[highIdx];
advance_range:
if (maxNode.numShards(index) > 0) {
float delta = weights[highIdx] - weights[lowIdx];
delta = lessThan(delta, threshold) ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode);
float delta = absDelta(weights[lowIdx], weights[highIdx]);
if (lessThan(delta, threshold)) {
if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta?
&& (weights[highIdx-1] - weights[0] > threshold) // check if we need to break at all
&& (absDelta(weights[0], weights[highIdx-1]) > threshold) // check if we need to break at all
) {
/* This is a special case if allocations from the "heaviest" to the "lighter" nodes is not possible
* due to some allocation decider restrictions like zone awareness. if one zone has for instance
Expand Down Expand Up @@ -747,7 +733,7 @@ public int compare(MutableShardRouting o1,
final RoutingNode node = routingNodes.node(minNode.getNodeId());
if (deciders.canAllocate(node, allocation).type() != Type.YES) {
if (logger.isTraceEnabled()) {
logger.trace("Can not allocate on node [{}] remove from round decisin [{}]", node, decision.type());
logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decision.type());
}
throttledNodes.add(minNode);
}
Expand Down
@@ -0,0 +1,97 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;

import org.apache.lucene.util.TestUtil;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;

/**
* see issue #9023
*/
public class BalanceUnbalancedClusterTest extends CatAllocationTestBase {

@Override
protected Path getCatPath() throws IOException {
Path tmp = newTempDir().toPath();
TestUtil.unzip(getResource("/org/elasticsearch/cluster/routing/issue_9023.zip"), tmp.toFile());
return tmp.resolve("issue_9023");
}

@Override
protected ClusterState allocateNew(ClusterState state) {
String index = "tweets-2014-12-29:00";
AllocationService strategy = createAllocationService(settingsBuilder()
.build());
MetaData metaData = MetaData.builder(state.metaData())
.put(IndexMetaData.builder(index).numberOfShards(5).numberOfReplicas(1))
.build();

RoutingTable routingTable = RoutingTable.builder(state.routingTable())
.addAsNew(metaData.index(index))
.build();

ClusterState clusterState = ClusterState.builder(state).metaData(metaData).routingTable(routingTable).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
while (true) {
if (routingTable.shardsWithState(INITIALIZING).isEmpty()) {
break;
}
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
}
Map<String, Integer> counts = new HashMap<>();
for (IndexShardRoutingTable table : routingTable.index(index)) {
for (ShardRouting r : table) {
String s = r.currentNodeId();
Integer count = counts.get(s);
if (count == null) {
count = 0;
}
count++;
counts.put(s, count);
}
}
for (Map.Entry<String, Integer> count : counts.entrySet()) {
// we have 10 shards and 4 nodes so 2 nodes have 3 shards and 2 nodes have 2 shards
assertTrue("Node: " + count.getKey() + " has shard mismatch: " + count.getValue(), count.getValue() >= 2);
assertTrue("Node: " + count.getKey() + " has shard mismatch: " + count.getValue(), count.getValue() <= 3);

}
return clusterState;
}

}

0 comments on commit 81293ba

Please sign in to comment.