Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -343,19 +343,19 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new ResizeAllocationDecider());
addAllocationDecider(deciders, new ReplicaAfterPrimaryActiveAllocationDecider());
addAllocationDecider(deciders, new RebalanceOnlyWhenActiveAllocationDecider());
addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(clusterSettings));
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new NodeShutdownAllocationDecider());
addAllocationDecider(deciders, new NodeReplacementAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new SameShardAllocationDecider(clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ThrottlingAllocationDecider(clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));

clusterPlugins.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;

/**
* The {@link BalancedShardsAllocator} re-balances the nodes allocations
Expand Down Expand Up @@ -128,7 +129,7 @@ public BalancedShardsAllocator() {
}

public BalancedShardsAllocator(Settings settings) {
this(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), WriteLoadForecaster.DEFAULT);
this(createBuiltInClusterSettings(settings), WriteLoadForecaster.DEFAULT);
}

public BalancedShardsAllocator(ClusterSettings clusterSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;

import java.util.Locale;

Expand Down Expand Up @@ -89,10 +88,9 @@ public String toString() {

private volatile ClusterRebalanceType type;

public ClusterRebalanceAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
type = CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.get(settings);
public ClusterRebalanceAllocationDecider(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, this::setType);
logger.debug("using [{}] with [{}]", CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE, type);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, this::setType);
}

private void setType(ClusterRebalanceType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;

/**
* Similar to the {@link ClusterRebalanceAllocationDecider} this
Expand Down Expand Up @@ -44,13 +43,12 @@ public class ConcurrentRebalanceAllocationDecider extends AllocationDecider {
);
private volatile int clusterConcurrentRebalance;

public ConcurrentRebalanceAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.clusterConcurrentRebalance = CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.get(settings);
logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance);
clusterSettings.addSettingsUpdateConsumer(
public ConcurrentRebalanceAllocationDecider(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(
CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
this::setClusterConcurrentRebalance
);
logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRebalance);
}

private void setClusterConcurrentRebalance(int concurrentRebalance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,9 @@ public class EnableAllocationDecider extends AllocationDecider {
private volatile Rebalance enableRebalance;
private volatile Allocation enableAllocation;

public EnableAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.enableAllocation = CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.get(settings);
this.enableRebalance = CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, this::setEnableAllocation);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING, this::setEnableRebalance);
public EnableAllocationDecider(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, this::setEnableAllocation);
clusterSettings.initializeAndWatch(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING, this::setEnableRebalance);
}

private void setEnableRebalance(Rebalance enableRebalance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;

/**
* An allocation decider that prevents multiple instances of the same shard to
Expand Down Expand Up @@ -46,9 +45,8 @@ public class SameShardAllocationDecider extends AllocationDecider {

private volatile boolean sameHost;

public SameShardAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.sameHost = CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING, this::setSameHost);
public SameShardAllocationDecider(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING, this::setSameHost);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;

import java.util.function.BiPredicate;

Expand Down Expand Up @@ -70,9 +69,8 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
Property.NodeScope
);

public ShardsLimitAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.clusterShardLimit = CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING, this::setClusterShardLimit);
public ShardsLimitAllocationDecider(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING, this::setClusterShardLimit);
}

private void setClusterShardLimit(int clusterShardLimit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;

import static org.elasticsearch.cluster.routing.allocation.decider.Decision.THROTTLE;
import static org.elasticsearch.cluster.routing.allocation.decider.Decision.YES;
Expand Down Expand Up @@ -81,24 +80,19 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
private volatile int concurrentIncomingRecoveries;
private volatile int concurrentOutgoingRecoveries;

public ThrottlingAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.primariesInitialRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.get(settings);
concurrentIncomingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.get(settings);
concurrentOutgoingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.get(settings);

clusterSettings.addSettingsUpdateConsumer(
public ThrottlingAllocationDecider(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(
CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
this::setPrimariesInitialRecoveries
);
clusterSettings.addSettingsUpdateConsumer(
clusterSettings.initializeAndWatch(
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
this::setConcurrentIncomingRecoverries
);
clusterSettings.addSettingsUpdateConsumer(
clusterSettings.initializeAndWatch(
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
this::setConcurrentOutgoingRecoverries
);

logger.debug(
"using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], "
+ "node_initial_primaries_recoveries [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -904,10 +905,10 @@ private ClusterState newClusterState(Metadata metadata, DiscoveryNodes discovery
.routingTable(routingTable.build())
.build();
final Settings settings = Settings.EMPTY;
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final ClusterSettings clusterSettings = createBuiltInClusterSettings(settings);
final ArrayList<AllocationDecider> deciders = new ArrayList<>();
deciders.add(new EnableAllocationDecider(settings, clusterSettings));
deciders.add(new SameShardAllocationDecider(settings, clusterSettings));
deciders.add(new EnableAllocationDecider(clusterSettings));
deciders.add(new SameShardAllocationDecider(clusterSettings));
deciders.add(new ReplicaAfterPrimaryActiveAllocationDecider());
Collections.shuffle(deciders, random());
final MockAllocationService allocationService = new MockAllocationService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void testRegisterAllocationDeciderDuplicate() {
() -> new ClusterModule(Settings.EMPTY, clusterService, Collections.<ClusterPlugin>singletonList(new ClusterPlugin() {
@Override
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
return Collections.singletonList(new EnableAllocationDecider(settings, clusterSettings));
return Collections.singletonList(new EnableAllocationDecider(clusterSettings));
}
}), clusterInfoService, null, threadPool, EmptySystemIndices.INSTANCE, WriteLoadForecaster.DEFAULT)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING;
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -109,13 +110,10 @@ public void testAssignsPrimariesInPriorityOrderThenReplicas() {
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 1)
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE)
.build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final ClusterSettings clusterSettings = createBuiltInClusterSettings(settings);
final AllocationService allocationService = new AllocationService(
new AllocationDeciders(
Arrays.asList(
new SameShardAllocationDecider(settings, clusterSettings),
new ThrottlingAllocationDecider(settings, clusterSettings)
)
Arrays.asList(new SameShardAllocationDecider(clusterSettings), new ThrottlingAllocationDecider(clusterSettings))
),
new ShardsAllocator() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
import org.hamcrest.Matchers;

import java.util.stream.Collectors;
Expand Down Expand Up @@ -61,7 +60,7 @@ public void testIndexBalance() {
settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance);
settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold);

AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator());
AllocationService strategy = createAllocationService(settings.build());

ClusterState clusterState = initCluster(strategy);
assertIndexBalance(
Expand Down Expand Up @@ -112,7 +111,7 @@ public void testReplicaBalance() {
settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), replicaBalance);
settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold);

AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator());
AllocationService strategy = createAllocationService(settings.build());

ClusterState clusterState = initCluster(strategy);
assertReplicaBalance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.EmptySnapshotsInfoService;
import org.elasticsearch.test.gateway.TestGatewayAllocator;
Expand All @@ -41,6 +40,7 @@

import static org.elasticsearch.cluster.routing.RoutingNodesHelper.shardsWithState;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
import static org.hamcrest.Matchers.equalTo;

public class RandomAllocationDeciderTests extends ESAllocationTestCase {
Expand All @@ -55,10 +55,7 @@ public void testRandomDecisions() {
new AllocationDeciders(
new HashSet<>(
Arrays.asList(
new SameShardAllocationDecider(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
),
new SameShardAllocationDecider(createBuiltInClusterSettings()),
new ReplicaAfterPrimaryActiveAllocationDecider(),
randomAllocationDecider
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.index.Index;
Expand All @@ -43,6 +42,7 @@
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.elasticsearch.cluster.routing.allocation.RoutingNodesUtils.numberOfShardsOfType;
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -206,10 +206,7 @@ public void testSameHostCheckWithExplain() {
} else {
final ShardRouting unassignedShard = unassignedShards.get(0);

final SameShardAllocationDecider decider = new SameShardAllocationDecider(
sameHostSetting,
new ClusterSettings(sameHostSetting, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
final SameShardAllocationDecider decider = new SameShardAllocationDecider(createBuiltInClusterSettings(sameHostSetting));

final RoutingNode emptyNode = clusterState.getRoutingNodes()
.stream()
Expand Down Expand Up @@ -308,10 +305,7 @@ public void testSameHostCheckDisabledByAutoExpandReplicas() {
}

public void testForceAllocatePrimaryOnSameNodeNotAllowed() {
SameShardAllocationDecider decider = new SameShardAllocationDecider(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
SameShardAllocationDecider decider = new SameShardAllocationDecider(createBuiltInClusterSettings());
ClusterState clusterState = ClusterStateCreationUtils.state("idx", randomIntBetween(2, 4), 1);
Index index = clusterState.getMetadata().index("idx").getIndex();
ShardRouting primaryShard = clusterState.routingTable().index(index).shard(0).primaryShard();
Expand Down
Loading