Skip to content

Commit

Permalink
Avoid scaling empty tier unnecessarily (#74086) (#74413) (#74436)
Browse files Browse the repository at this point in the history
Autoscaling supports data tiers using attributes. This commit refines
the check to avoid bootstrapping a tier when a non-filter decider says
NO.

Co-authored-by: David Turner <david.turner@elastic.co>
  • Loading branch information
henningandersen and DaveCTurner committed Jun 22, 2021
1 parent aea0b6a commit b387256
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

public class DiscoveryNodeFilters {

static final Set<String> NON_ATTRIBUTE_NAMES =
org.elasticsearch.common.collect.Set.of("_ip", "_host_ip", "_publish_ip", "host", "_id", "_name", "name");

public enum OpType {
AND,
OR
Expand Down Expand Up @@ -226,6 +230,14 @@ public boolean match(DiscoveryNode node) {
}
}

/**
*
* @return true if this filter only contains attribute values, i.e., no node specific info.
*/
public boolean isOnlyAttributeValueFilter() {
return filters.keySet().stream().anyMatch(NON_ATTRIBUTE_NAMES::contains) == false;
}

/**
* Generates a human-readable string for the DiscoverNodeFilters.
* Example: {@code _id:"id1 OR blah",name:"blah OR name2"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND;
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class DiscoveryNodeFiltersTests extends ESTestCase {

Expand Down Expand Up @@ -277,6 +278,23 @@ public void testCommaSeparatedValuesTrimmed() {
assertTrue(filters.match(node));
}

public void testOnlyAttributeValueFilter() {
List<String> keys = randomSubsetOf(DiscoveryNodeFilters.NON_ATTRIBUTE_NAMES);
if (keys.isEmpty() || randomBoolean()) {
keys.add("tag");
}
Settings.Builder builder = Settings.builder();
keys.forEach(key -> builder.put("xxx." + key, "1.2.3.4"));
DiscoveryNodeFilters discoveryNodeFilters = buildFromSettings(
DiscoveryNodeFilters.OpType.AND, "xxx.", builder.build()
);
DiscoveryNode node = new DiscoveryNode(
"", "", "", "", "192.1.1.54", localAddress, singletonMap("tag", "1.2.3.4"), emptySet(), null
);

assertThat(discoveryNodeFilters.isOnlyAttributeValueFilter(), is(discoveryNodeFilters.match(node)));
}

private Settings shuffleSettings(Settings source) {
Settings.Builder settings = Settings.builder();
List<String> keys = new ArrayList<>(source.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,18 +174,32 @@ public void testScaleFromEmptyLegacy() {
putAutoscalingPolicy("warm", DataTier.DATA_WARM);
putAutoscalingPolicy("cold", DataTier.DATA_COLD);

// add an index using `_id` allocation to check that it does not trigger spinning up the tier.
assertAcked(
prepareCreate(randomAlphaOfLength(10).toLowerCase(Locale.ROOT)).setSettings(
Settings.builder()
// more than 0 replica provokes the same shard decider to say no.
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6)
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_id", randomAlphaOfLength(5))
.build()
).setWaitForActiveShards(ActiveShardCount.NONE)
);

final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
assertAcked(
prepareCreate(indexName).setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
// more than 0 replica provokes the same shard decider to say no.
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 6)
.put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms")
.put(IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "data_tier", "hot")
.build()
)
);
refresh();
refresh(indexName);
assertThat(capacity().results().get("warm").requiredCapacity().total().storage().getBytes(), Matchers.equalTo(0L));
assertThat(capacity().results().get("cold").requiredCapacity().total().storage().getBytes(), Matchers.equalTo(0L));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
Expand All @@ -29,6 +30,8 @@
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -130,12 +133,42 @@ static String message(long unassignedBytes, long assignedBytes) {
}

static boolean isDiskOnlyNoDecision(Decision decision) {
// we consider throttling==yes, throttling should be temporary.
return singleNoDecision(decision, single -> true).map(DiskThresholdDecider.NAME::equals).orElse(false);
}

static boolean isFilterTierOnlyDecision(Decision decision, IndexMetadata indexMetadata) {
// only primary shards are handled here, allowing us to disregard same shard allocation decider.
return singleNoDecision(decision, single -> SameShardAllocationDecider.NAME.equals(single.label()) == false).filter(
FilterAllocationDecider.NAME::equals
).map(d -> filterLooksLikeTier(indexMetadata)).orElse(false);
}

static boolean filterLooksLikeTier(IndexMetadata indexMetadata) {
return isOnlyAttributeValueFilter(indexMetadata.requireFilters())
&& isOnlyAttributeValueFilter(indexMetadata.includeFilters())
&& isOnlyAttributeValueFilter(indexMetadata.excludeFilters());
}

private static boolean isOnlyAttributeValueFilter(DiscoveryNodeFilters filters) {
if (filters == null) {
return true;
} else {
return DiscoveryNodeFilters.trimTier(filters).isOnlyAttributeValueFilter();
}
}

static Optional<String> singleNoDecision(Decision decision, Predicate<Decision> predicate) {
List<Decision> nos = decision.getDecisions()
.stream()
.filter(single -> single.type() == Decision.Type.NO)
.filter(predicate)
.collect(Collectors.toList());
return nos.size() == 1 && DiskThresholdDecider.NAME.equals(nos.get(0).label());

if (nos.size() == 1) {
return Optional.ofNullable(nos.get(0).label());
} else {
return Optional.empty();
}
}

// todo: move this to top level class.
Expand Down Expand Up @@ -280,7 +313,7 @@ private boolean allocatedToTier(ShardRouting s, RoutingAllocation allocation) {
* @return true if and only if a node exists in the tier where only disk decider prevents allocation
*/
private boolean cannotAllocateDueToStorage(ShardRouting shard, RoutingAllocation allocation) {
if (nodeIds.isEmpty() && isAssignedToTier(shard, allocation)) {
if (nodeIds.isEmpty() && needsThisTier(shard, allocation)) {
return true;
}
assert allocation.debugDecision() == false;
Expand Down Expand Up @@ -317,6 +350,43 @@ private boolean canAllocate(ShardRouting shard, RoutingAllocation allocation) {
);
}

boolean needsThisTier(ShardRouting shard, RoutingAllocation allocation) {
if (isAssignedToTier(shard, allocation) == false) {
return false;
}
IndexMetadata indexMetadata = indexMetadata(shard, allocation);
Set<Decision.Type> decisionTypes = StreamSupport.stream(allocation.routingNodes().spliterator(), false)
.map(
node -> dataTierAllocationDecider.shouldFilter(
indexMetadata,
node.node().getRoles(),
this::highestPreferenceTier,
allocation
)
)
.map(Decision::type)
.collect(Collectors.toSet());
if (decisionTypes.contains(Decision.Type.NO)) {
// we know we have some filter and can respond. Only need this tier if ALL responses where NO.
return decisionTypes.size() == 1;
}

// check for using allocation filters for data tiers. For simplicity, only scale up new tier based on primary shard
if (shard.primary() == false) {
return false;
}
assert allocation.debugDecision() == false;
// enable debug decisions to see all decisions and preserve the allocation decision label
allocation.debugDecision(true);
try {
// check that it does not belong on any existing node, i.e., there must be only a tier like reason it cannot be allocated
return StreamSupport.stream(allocation.routingNodes().spliterator(), false)
.anyMatch(node -> isFilterTierOnlyDecision(allocationDeciders.canAllocate(shard, node, allocation), indexMetadata));
} finally {
allocation.debugDecision(false);
}
}

private boolean isAssignedToTier(ShardRouting shard, RoutingAllocation allocation) {
IndexMetadata indexMetadata = indexMetadata(shard, allocation);
return dataTierAllocationDecider.shouldFilter(indexMetadata, roles, this::highestPreferenceTier, allocation) != Decision.NO;
Expand Down

0 comments on commit b387256

Please sign in to comment.