Skip to content

Commit

Permalink
Skip zone/host awareness with auto-expand replicas (#69334)
Browse files Browse the repository at this point in the history
Today if an index is set to `auto_expand_replicas: N-all` then we will
try and create a shard copy on every node that matches the applicable
allocation filters. This conflits with shard allocation awareness and
the same-host allocation decider if there is an uneven distribution of
nodes across zones or hosts, since these deciders prevent shard copies
from being allocated unevenly and may therefore leave some unassigned
shards.

The point of these two deciders is to improve resilience given a limited
number of shard copies but there is no need for this behaviour when the
number of shard copies is not limited, so this commit supresses them in
that case.

Closes #54151
Closes #2869
  • Loading branch information
DaveCTurner committed Feb 22, 2021
1 parent bd4a585 commit 1c1c3bb
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 20 deletions.
23 changes: 13 additions & 10 deletions docs/reference/index-modules.asciidoc
Expand Up @@ -139,16 +139,19 @@ specific index module:
The number of replicas each primary shard has. Defaults to 1.

`index.auto_expand_replicas`::

Auto-expand the number of replicas based on the number of data nodes in the cluster.
Set to a dash delimited lower and upper bound (e.g. `0-5`) or use `all`
for the upper bound (e.g. `0-all`). Defaults to `false` (i.e. disabled).
Note that the auto-expanded number of replicas only takes
<<shard-allocation-filtering,allocation filtering>> rules into account, but ignores
any other allocation rules such as <<shard-allocation-awareness,shard allocation awareness>>
and <<allocation-total-shards,total shards per node>>, and this can lead to the
cluster health becoming `YELLOW` if the applicable rules prevent all the replicas
from being allocated.
Auto-expand the number of replicas based on the number of data nodes in the
cluster. Set to a dash delimited lower and upper bound (e.g. `0-5`) or use `all`
for the upper bound (e.g. `0-all`). Defaults to `false` (i.e. disabled). Note
that the auto-expanded number of replicas only takes
<<shard-allocation-filtering,allocation filtering>> rules into account, but
ignores other allocation rules such as <<allocation-total-shards,total shards
per node>>, and this can lead to the cluster health becoming `YELLOW` if the
applicable rules prevent all the replicas from being allocated.
+
If the upper bound is `all` then <<shard-allocation-awareness,shard allocation
awareness>> and
<<cluster-routing-allocation-same-shard-host,`cluster.routing.allocation.same_shard.host`>>
are ignored for this index.

`index.search.idle.after`::
How long a shard can not receive a search or get request until it's considered
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/modules/cluster/shards_allocation.asciidoc
Expand Up @@ -45,7 +45,7 @@ one of the active allocation ids in the cluster state.
These should be fast so more initial primary recoveries can happen in
parallel on the same node. Defaults to `4`.


[[cluster-routing-allocation-same-shard-host]]
`cluster.routing.allocation.same_shard.host`::
(<<dynamic-cluster-setting,Dynamic>>)
Allows to perform a check to prevent allocation of multiple instances of
Expand Down
Expand Up @@ -92,6 +92,10 @@ int getMaxReplicas(int numDataNodes) {
return Math.min(maxReplicas, numDataNodes-1);
}

public boolean expandToAllNodes() {
return maxReplicas == Integer.MAX_VALUE;
}

private OptionalInt getDesiredNumberOfReplicas(IndexMetadata indexMetadata, RoutingAllocation allocation) {
if (enabled) {
int numMatchingDataNodes = 0;
Expand Down
Expand Up @@ -8,11 +8,6 @@

package org.elasticsearch.cluster.routing.allocation.decider;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import com.carrotsearch.hppc.ObjectIntHashMap;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.RoutingNode;
Expand All @@ -24,7 +19,13 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static java.util.Collections.emptyList;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING;

/**
* This {@link AllocationDecider} controls shard allocation based on
Expand Down Expand Up @@ -118,6 +119,9 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
"allocation awareness is not enabled, set cluster setting ["
+ CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey() + "] to enable it");

private static final Decision YES_AUTO_EXPAND_ALL = Decision.single(Decision.Type.YES, NAME,
"allocation awareness is ignored, this index is set to auto-expand to all nodes");

private static final Decision YES_ALL_MET =
Decision.single(Decision.Type.YES, NAME, "node meets all awareness attribute requirements");

Expand All @@ -128,6 +132,11 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout

final boolean debug = allocation.debugDecision();
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());

if (INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(indexMetadata.getSettings()).expandToAllNodes()) {
return YES_AUTO_EXPAND_ALL;
}

int shardCount = indexMetadata.getNumberOfReplicas() + 1; // 1 for primary
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute
Expand Down
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING;

/**
* An allocation decider that prevents multiple instances of the same shard to
* be allocated on the same {@code node}.
Expand Down Expand Up @@ -58,6 +60,9 @@ private void setSameHost(boolean sameHost) {
private static final Decision YES_NONE_HOLD_COPY =
Decision.single(Decision.Type.YES, NAME, "none of the nodes on this host hold a copy of this shard");

private static final Decision YES_AUTO_EXPAND_ALL = Decision.single(Decision.Type.YES, NAME,
"same-host allocation is ignored, this index is set to auto-expand to all nodes");

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
Iterable<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId());
Expand All @@ -66,6 +71,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
// if its already a NO decision looking at the node, or we aren't configured to look at the host, return the decision
return decision;
}
if (INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(
allocation.metadata().getIndexSafe(shardRouting.index()).getSettings()).expandToAllNodes()) {
return YES_AUTO_EXPAND_ALL;
}
if (node.node() != null) {
for (RoutingNode checkNode : allocation.routingNodes()) {
if (checkNode.node() == null) {
Expand Down
Expand Up @@ -50,16 +50,19 @@ public void testParseSettings() {
assertEquals(0, autoExpandReplicas.getMinReplicas());
assertEquals(5, autoExpandReplicas.getMaxReplicas(8));
assertEquals(2, autoExpandReplicas.getMaxReplicas(3));
assertFalse(autoExpandReplicas.expandToAllNodes());

autoExpandReplicas = AutoExpandReplicas.SETTING.get(Settings.builder().put("index.auto_expand_replicas", "0-all").build());
assertEquals(0, autoExpandReplicas.getMinReplicas());
assertEquals(5, autoExpandReplicas.getMaxReplicas(6));
assertEquals(2, autoExpandReplicas.getMaxReplicas(3));
assertTrue(autoExpandReplicas.expandToAllNodes());

autoExpandReplicas = AutoExpandReplicas.SETTING.get(Settings.builder().put("index.auto_expand_replicas", "1-all").build());
assertEquals(1, autoExpandReplicas.getMinReplicas());
assertEquals(5, autoExpandReplicas.getMaxReplicas(6));
assertEquals(2, autoExpandReplicas.getMaxReplicas(3));
assertTrue(autoExpandReplicas.expandToAllNodes());

}

Expand Down
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -34,6 +35,7 @@
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.sameInstance;
Expand Down Expand Up @@ -770,8 +772,13 @@ public void testUnassignedShardsWithUnbalancedZones() {

logger.info("Building initial routing table for 'testUnassignedShardsWithUnbalancedZones'");

final Settings.Builder indexSettings = settings(Version.CURRENT);
if (randomBoolean()) {
indexSettings.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-4");
}

Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(4))
.put(IndexMetadata.builder("test").settings(indexSettings).numberOfShards(1).numberOfReplicas(4))
.build();

RoutingTable initialRoutingTable = RoutingTable.builder()
Expand Down Expand Up @@ -865,4 +872,36 @@ public void testMultipleAwarenessAttributes() {
assertThat(clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
}

public void testDisabledByAutoExpandReplicas() {
final Settings settings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
.build();

final AllocationService strategy = createAllocationService(settings);

final Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 99)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all")))
.build();

final ClusterState clusterState = applyStartedShardsUntilNoChange(
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder()
.addAsNew(metadata.index("test"))
.build())
.nodes(DiscoveryNodes.builder()
.add(newNode("A-0", singletonMap("zone", "a")))
.add(newNode("A-1", singletonMap("zone", "a")))
.add(newNode("A-2", singletonMap("zone", "a")))
.add(newNode("A-3", singletonMap("zone", "a")))
.add(newNode("A-4", singletonMap("zone", "a")))
.add(newNode("B-0", singletonMap("zone", "b")))
).build(), strategy);

assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED), empty());
}
}
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -37,8 +36,11 @@
import java.util.Collections;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.elasticsearch.cluster.routing.allocation.RoutingNodesUtils.numberOfShardsOfType;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;

public class SameShardRoutingTests extends ESAllocationTestCase {
Expand All @@ -48,14 +50,19 @@ public void testSameHost() {
AllocationService strategy = createAllocationService(
Settings.builder().put(SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING.getKey(), true).build());

final Settings.Builder indexSettings = settings(Version.CURRENT);
if (randomBoolean()) {
indexSettings.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1");
}

Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1))
.put(IndexMetadata.builder("test").settings(indexSettings).numberOfShards(2).numberOfReplicas(1))
.build();

RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metadata.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metadata(metadata)
ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metadata(metadata)
.routingTable(routingTable).build();

logger.info("--> adding two nodes with the same host");
Expand Down Expand Up @@ -88,6 +95,54 @@ public void testSameHost() {
}
}

public void testSameHostCheckDisabledByAutoExpandReplicas() {
final AllocationService strategy = createAllocationService(
Settings.builder().put(SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING.getKey(), true).build());

final Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 99)
.put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-all")))
.build();

final DiscoveryNode node1 = new DiscoveryNode(
"node1",
"node1",
"node1",
"test1",
"test1",
buildNewFakeTransportAddress(),
emptyMap(),
MASTER_DATA_ROLES,
Version.CURRENT);


final DiscoveryNode node2 = new DiscoveryNode(
"node2",
"node2",
"node2",
"test1",
"test1",
buildNewFakeTransportAddress(),
emptyMap(),
MASTER_DATA_ROLES,
Version.CURRENT);

final ClusterState clusterState = applyStartedShardsUntilNoChange(ClusterState
.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder()
.addAsNew(metadata.index("test"))
.build())
.nodes(
DiscoveryNodes.builder()
.add(node1)
.add(node2)).build(), strategy);

assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED), empty());
}

public void testForceAllocatePrimaryOnSameNodeNotAllowed() {
SameShardAllocationDecider decider = new SameShardAllocationDecider(
Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
Expand Down

0 comments on commit 1c1c3bb

Please sign in to comment.