Skip to content

Commit

Permalink
Make "cluster.routing.allocation.allow_rebalance" a dynamic setting
Browse files Browse the repository at this point in the history
Also makes it a static constant and changes all tests to use it instead
of a string.

Fixes #7092
  • Loading branch information
dakrone committed Aug 5, 2014
1 parent 859fc03 commit f33aaa6
Show file tree
Hide file tree
Showing 18 changed files with 141 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.settings.Validator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.settings.NodeSettingsService;

import java.util.Locale;

Expand All @@ -47,6 +51,19 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {

public static final String NAME = "cluster_rebalance";

public static final String CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE = "cluster.routing.allocation.allow_rebalance";
public static final Validator ALLOCATION_ALLOW_REBALANCE_VALIDATOR = new Validator() {
@Override
public String validate(String setting, String value) {
try {
ClusterRebalanceType.parseString(value);
return null;
} catch (ElasticsearchIllegalArgumentException e) {
return "the value of " + setting + " must be one of: [always, indices_primaries_active, indices_all_active]";
}
}
};

/**
* An enum representation for the configured re-balance type.
*/
Expand All @@ -62,26 +79,58 @@ public static enum ClusterRebalanceType {
/**
* Re-balancing is allowed only once all shards on all indices are active.
*/
INDICES_ALL_ACTIVE
INDICES_ALL_ACTIVE;

public static ClusterRebalanceType parseString(String typeString) {
if ("always".equalsIgnoreCase(typeString)) {
return ClusterRebalanceType.ALWAYS;
} else if ("indices_primaries_active".equalsIgnoreCase(typeString) || "indicesPrimariesActive".equalsIgnoreCase(typeString)) {
return ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE;
} else if ("indices_all_active".equalsIgnoreCase(typeString) || "indicesAllActive".equalsIgnoreCase(typeString)) {
return ClusterRebalanceType.INDICES_ALL_ACTIVE;
}
throw new ElasticsearchIllegalArgumentException("Illegal value for " + CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE + ": " + typeString);
}
}

private final ClusterRebalanceType type;
private ClusterRebalanceType type;

@Inject
public ClusterRebalanceAllocationDecider(Settings settings) {
public ClusterRebalanceAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
String allowRebalance = settings.get("cluster.routing.allocation.allow_rebalance", "indices_all_active");
if ("always".equalsIgnoreCase(allowRebalance)) {
type = ClusterRebalanceType.ALWAYS;
} else if ("indices_primaries_active".equalsIgnoreCase(allowRebalance) || "indicesPrimariesActive".equalsIgnoreCase(allowRebalance)) {
type = ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE;
} else if ("indices_all_active".equalsIgnoreCase(allowRebalance) || "indicesAllActive".equalsIgnoreCase(allowRebalance)) {
type = ClusterRebalanceType.INDICES_ALL_ACTIVE;
} else {
logger.warn("[cluster.routing.allocation.allow_rebalance] has a wrong value {}, defaulting to 'indices_all_active'", allowRebalance);
String allowRebalance = settings.get(CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "indices_all_active");
try {
type = ClusterRebalanceType.parseString(allowRebalance);
} catch (ElasticsearchIllegalStateException e) {
logger.warn("[{}] has a wrong value {}, defaulting to 'indices_all_active'", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, allowRebalance);
type = ClusterRebalanceType.INDICES_ALL_ACTIVE;
}
logger.debug("using [cluster.routing.allocation.allow_rebalance] with [{}]", type.toString().toLowerCase(Locale.ROOT));
logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, type.toString().toLowerCase(Locale.ROOT));

nodeSettingsService.addListener(new ApplySettings());
}

class ApplySettings implements NodeSettingsService.Listener {

@Override
public void onRefreshSettings(Settings settings) {
String newAllowRebalance = settings.get(CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, null);
if (newAllowRebalance != null) {
ClusterRebalanceType newType = null;
try {
newType = ClusterRebalanceType.parseString(newAllowRebalance);
} catch (ElasticsearchIllegalArgumentException e) {
// ignore
}

if (newType != null && newType != ClusterRebalanceAllocationDecider.this.type) {
logger.info("updating [{}] from [{}] to [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE,
ClusterRebalanceAllocationDecider.this.type.toString().toLowerCase(Locale.ROOT),
newType.toString().toLowerCase(Locale.ROOT));
ClusterRebalanceAllocationDecider.this.type = newType;
}
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public ClusterDynamicSettingsModule() {
clusterDynamicSettings.addDynamicSetting(BalancedShardsAllocator.SETTING_PRIMARY_BALANCE_FACTOR, Validator.FLOAT);
clusterDynamicSettings.addDynamicSetting(BalancedShardsAllocator.SETTING_SHARD_BALANCE_FACTOR, Validator.FLOAT);
clusterDynamicSettings.addDynamicSetting(BalancedShardsAllocator.SETTING_THRESHOLD, Validator.NON_NEGATIVE_FLOAT);
clusterDynamicSettings.addDynamicSetting(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE,
ClusterRebalanceAllocationDecider.ALLOCATION_ALLOW_REBALANCE_VALIDATOR);
clusterDynamicSettings.addDynamicSetting(ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE, Validator.INTEGER);
clusterDynamicSettings.addDynamicSetting(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE);
clusterDynamicSettings.addDynamicSetting(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class AddIncrementallyTests extends ElasticsearchAllocationTestCase {
@Test
public void testAddNodesAndIndices() {
ImmutableSettings.Builder settings = settingsBuilder();
settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
settings.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
AllocationService service = createAllocationService(settings.build());

ClusterState clusterState = initCluster(service, 1, 3, 3, 1);
Expand Down Expand Up @@ -95,7 +95,7 @@ public void testAddNodesAndIndices() {
@Test
public void testMinimalRelocations() {
ImmutableSettings.Builder settings = settingsBuilder();
settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString())
settings.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString())
.put("cluster.routing.allocation.node_concurrent_recoveries", 2);
AllocationService service = createAllocationService(settings.build());

Expand Down Expand Up @@ -165,7 +165,7 @@ public void testMinimalRelocations() {
@Test
public void testMinimalRelocationsNoLimit() {
ImmutableSettings.Builder settings = settingsBuilder();
settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString())
settings.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString())
.put("cluster.routing.allocation.node_concurrent_recoveries", 100)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 100);
AllocationService service = createAllocationService(settings.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
Expand All @@ -46,7 +47,7 @@ public class AwarenessAllocationTests extends ElasticsearchAllocationTestCase {
public void moveShardOnceNewNodeWithAttributeAdded1() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());

Expand Down Expand Up @@ -115,7 +116,7 @@ public void moveShardOnceNewNodeWithAttributeAdded1() {
public void moveShardOnceNewNodeWithAttributeAdded2() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());

Expand Down Expand Up @@ -186,7 +187,7 @@ public void moveShardOnceNewNodeWithAttributeAdded3() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.put("cluster.routing.allocation.balance.index", 0.0f)
Expand Down Expand Up @@ -287,7 +288,7 @@ public void moveShardOnceNewNodeWithAttributeAdded4() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
Expand Down Expand Up @@ -382,7 +383,7 @@ public void moveShardOnceNewNodeWithAttributeAdded4() {
public void moveShardOnceNewNodeWithAttributeAdded5() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());

Expand Down Expand Up @@ -461,7 +462,7 @@ public void moveShardOnceNewNodeWithAttributeAdded5() {
public void moveShardOnceNewNodeWithAttributeAdded6() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());

Expand Down Expand Up @@ -542,7 +543,7 @@ public void moveShardOnceNewNodeWithAttributeAdded6() {
public void fullAwareness1() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
Expand Down Expand Up @@ -610,7 +611,7 @@ public void fullAwareness1() {
public void fullAwareness2() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build());
Expand Down Expand Up @@ -680,7 +681,7 @@ public void fullAwareness3() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2")
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
Expand Down Expand Up @@ -767,7 +768,7 @@ public void testUnbalancedZones() {
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testIndexBalance() {
final float balanceTreshold = 1.0f;

ImmutableSettings.Builder settings = settingsBuilder();
settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
settings.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
settings.put(BalancedShardsAllocator.SETTING_INDEX_BALANCE_FACTOR, indexBalance);
settings.put(BalancedShardsAllocator.SETTING_SHARD_BALANCE_FACTOR, replicaBalance);
settings.put(BalancedShardsAllocator.SETTING_PRIMARY_BALANCE_FACTOR, primaryBalance);
Expand Down Expand Up @@ -90,7 +90,7 @@ public void testReplicaBalance() {
final float balanceTreshold = 1.0f;

ImmutableSettings.Builder settings = settingsBuilder();
settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
settings.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
settings.put(BalancedShardsAllocator.SETTING_INDEX_BALANCE_FACTOR, indexBalance);
settings.put(BalancedShardsAllocator.SETTING_SHARD_BALANCE_FACTOR, replicaBalance);
settings.put(BalancedShardsAllocator.SETTING_PRIMARY_BALANCE_FACTOR, primaryBalance);
Expand Down Expand Up @@ -118,7 +118,7 @@ public void testPrimaryBalance() {
final float balanceTreshold = 1.0f;

ImmutableSettings.Builder settings = settingsBuilder();
settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
settings.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
settings.put(BalancedShardsAllocator.SETTING_INDEX_BALANCE_FACTOR, indexBalance);
settings.put(BalancedShardsAllocator.SETTING_SHARD_BALANCE_FACTOR, replicaBalance);
settings.put(BalancedShardsAllocator.SETTING_PRIMARY_BALANCE_FACTOR, primaryBalance);
Expand Down Expand Up @@ -329,7 +329,7 @@ public void addListener(Listener listener) {
assertThat(allocator.getThreshold(), Matchers.equalTo(2.0f));

settings = settingsBuilder();
settings.put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
settings.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString());
listeners[0].onRefreshSettings(settings.build());
assertThat(allocator.getIndexBalance(), Matchers.equalTo(0.2f));
assertThat(allocator.getShardBalance(), Matchers.equalTo(0.3f));
Expand Down
Loading

0 comments on commit f33aaa6

Please sign in to comment.