Skip to content

Commit

Permalink
Autoshard data streams on rollover (#106076)
Browse files Browse the repository at this point in the history
This enhances our rollover logic to use the data stream autosharding
service to retrieve an autosharding recommendation. If the
recommendation is an INCREASE_SHARDS or an
COOLDOWN_PREVENTED_INCREASE_SHARDS we'll create a rollover condition
that'll capture this recommendation, such that rollover will be
triggered in ourder to increase the number of shards even if other
"regular" conditions are not met (or in the case where cooldown
prevented rollover, display the information as to why in the rollover
response).

All other recommednations are passed to the `MetadataRolloverService`
that'll do the needful to ensure the new write index of the data stream
receives the correct number of shards.

Note that a DECREASE_SHARDS recommendation will reduce the number of
shards for a data stream when one of the other "regular" rollover
conditions match. It will not trigger a rollover itself, only the
INCREASE_SHARDS recommendation will.

Some notes on the `NOT_APPLICABLE` recommendation: N/A results are
switching back a data stream to the sharding configured in the index
template. A data stream can be using auto sharding and later be excluded
from the functionality using the `data_streams.auto_sharding.excludes`
setting. After a data stream is excluded it needs to start using the
number of shards configured in the backing index template.

The new autosharding_condition will look like this in the rollover
response:

```
  "acknowledged": true,
  "shards_acknowledged": true,
  "old_index": ".ds-logs-nginx-2024.03.13-000003",
  "new_index": ".ds-logs-nginx-2024.03.13-000004",
  "rolled_over": true,
  "dry_run": false,
  "lazy": false,
  "conditions": {
    "[optimal_shard_count : 3]": true
  }
```

and like so in the `met_conditions` field, part of rollover info in the
cluster state :

```
"rollover_info" : {
  "logs-nginx" : {
     "met_conditions" : {
        "max_docs" : 20000000,
        "optimal_shard_count" : 3
      },
      "time" : 1710421491138
    }
 },
```
  • Loading branch information
andreidan committed Mar 15, 2024
1 parent 002ed8d commit 387eb38
Show file tree
Hide file tree
Showing 28 changed files with 1,706 additions and 48 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.datastreams;

import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher;
import org.elasticsearch.features.FeatureSpecification;
import org.elasticsearch.features.NodeFeature;
Expand All @@ -24,7 +25,8 @@ public class DataStreamFeatures implements FeatureSpecification {
public Set<NodeFeature> getFeatures() {
return Set.of(
DataStreamLifecycleHealthInfoPublisher.DSL_HEALTH_INFO_FEATURE, // Added in 8.12
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER // Added in 8.13
LazyRolloverAction.DATA_STREAM_LAZY_ROLLOVER, // Added in 8.13
DataStreamAutoShardingService.DATA_STREAM_AUTO_SHARDING_FEATURE
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private MetadataRolloverService.RolloverResult rolloverOver(ClusterState state,
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
return rolloverService.rolloverClusterState(state, name, null, createIndexRequest, metConditions, time, false, false, null);
return rolloverService.rolloverClusterState(state, name, null, createIndexRequest, metConditions, time, false, false, null, null);
}

private Index getWriteIndex(ClusterState state, String name, String timestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public void testRolloverClusterStateForDataStream() throws Exception {
now,
randomBoolean(),
false,
indexStats
indexStats,
null
);
long after = testThreadPool.absoluteTimeInMillis();

Expand Down Expand Up @@ -218,6 +219,7 @@ public void testRolloverAndMigrateDataStream() throws Exception {
now,
randomBoolean(),
false,
null,
null
);

Expand Down Expand Up @@ -310,6 +312,7 @@ public void testChangingIndexModeFromTimeSeriesToSomethingElseNoEffectOnExisting
now,
randomBoolean(),
false,
null,
null
);

Expand Down Expand Up @@ -375,7 +378,8 @@ public void testRolloverClusterStateWithBrokenOlderTsdbDataStream() throws Excep
now,
randomBoolean(),
false,
indexStats
indexStats,
null
);
long after = testThreadPool.absoluteTimeInMillis();

Expand Down Expand Up @@ -455,7 +459,8 @@ public void testRolloverClusterStateWithBrokenTsdbDataStream() throws Exception
now,
randomBoolean(),
false,
indexStats
indexStats,
null
)
);
assertThat(e.getMessage(), containsString("is overlapping with backing index"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_SERIALIZE_BIG_VECTOR = def(8_608_00_0);
public static final TransportVersion AGGS_EXCLUDED_DELETED_DOCS = def(8_609_00_0);
public static final TransportVersion ESQL_SERIALIZE_BIG_ARRAY = def(8_610_00_0);
public static final TransportVersion AUTO_SHARDING_ROLLOVER_CONDITION = def(8_611_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
*/
public abstract class Condition<T> implements NamedWriteable, ToXContentFragment {

/**
* Describes the type of condition - a min_* condition (MIN) or max_* condition (MAX).
/*
* Describes the type of condition - a min_* condition (MIN), max_* condition (MAX), or an automatic condition (automatic conditions
* are something that the platform configures and manages)
*/
public enum Type {
MIN,
MAX
MAX,
AUTOMATIC
}

protected T value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -59,6 +60,7 @@ public TransportLazyRolloverAction(
MetadataRolloverService rolloverService,
AllocationService allocationService,
MetadataDataStreamsService metadataDataStreamsService,
DataStreamAutoShardingService dataStreamAutoShardingService,
Client client
) {
super(
Expand All @@ -71,7 +73,8 @@ public TransportLazyRolloverAction(
rolloverService,
client,
allocationService,
metadataDataStreamsService
metadataDataStreamsService,
dataStreamAutoShardingService
);
}

Expand Down Expand Up @@ -121,6 +124,7 @@ protected void masterOperation(
new RolloverRequest(rolloverRequest.getRolloverTarget(), null),
null,
trialRolloverResponse,
null,
listener
);
submitRolloverTask(rolloverRequest, source, rolloverTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@

package org.elasticsearch.action.admin.indices.rollover;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.datastreams.autosharding.AutoShardingResult;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAutoShardingEvent;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
Expand Down Expand Up @@ -61,6 +65,7 @@
* Service responsible for handling rollover requests for write aliases and data streams
*/
public class MetadataRolloverService {
private static final Logger logger = LogManager.getLogger(MetadataRolloverService.class);
private static final Pattern INDEX_NAME_PATTERN = Pattern.compile("^.*-\\d+$");
private static final List<IndexAbstraction.Type> VALID_ROLLOVER_TARGETS = List.of(ALIAS, DATA_STREAM);

Expand Down Expand Up @@ -110,7 +115,8 @@ public RolloverResult rolloverClusterState(
Instant now,
boolean silent,
boolean onlyValidate,
@Nullable IndexMetadataStats sourceIndexStats
@Nullable IndexMetadataStats sourceIndexStats,
@Nullable AutoShardingResult autoShardingResult
) throws Exception {
validate(currentState.metadata(), rolloverTarget, newIndexName, createIndexRequest);
final IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(rolloverTarget);
Expand All @@ -134,7 +140,8 @@ public RolloverResult rolloverClusterState(
now,
silent,
onlyValidate,
sourceIndexStats
sourceIndexStats,
autoShardingResult
);
default ->
// the validate method above prevents this case
Expand Down Expand Up @@ -244,7 +251,8 @@ private RolloverResult rolloverDataStream(
Instant now,
boolean silent,
boolean onlyValidate,
@Nullable IndexMetadataStats sourceIndexStats
@Nullable IndexMetadataStats sourceIndexStats,
@Nullable AutoShardingResult autoShardingResult
) throws Exception {

if (SnapshotsService.snapshottingDataStreams(currentState, Collections.singleton(dataStream.getName())).isEmpty() == false) {
Expand Down Expand Up @@ -281,6 +289,54 @@ private RolloverResult rolloverDataStream(
return new RolloverResult(newWriteIndexName, originalWriteIndex.getName(), currentState);
}

DataStreamAutoShardingEvent dataStreamAutoShardingEvent = autoShardingResult == null
? dataStream.getAutoShardingEvent()
: switch (autoShardingResult.type()) {
case NO_CHANGE_REQUIRED -> {
logger.info(
"Rolling over data stream [{}] using existing auto-sharding recommendation [{}]",
dataStreamName,
dataStream.getAutoShardingEvent()
);
yield dataStream.getAutoShardingEvent();
}
case INCREASE_SHARDS, DECREASE_SHARDS -> {
logger.info("Auto sharding data stream [{}] to [{}]", dataStreamName, autoShardingResult);
yield new DataStreamAutoShardingEvent(
dataStream.getWriteIndex().getName(),
autoShardingResult.targetNumberOfShards(),
now.toEpochMilli()
);
}
case COOLDOWN_PREVENTED_INCREASE, COOLDOWN_PREVENTED_DECREASE -> {
// we're in the cooldown period for this particular recommendation so perhaps use a previous autosharding
// recommendation (or the value configured in the backing index template otherwise)
if (dataStream.getAutoShardingEvent() != null) {
logger.info(
"Rolling over data stream [{}] using existing auto-sharding recommendation [{}]",
dataStreamName,
dataStream.getAutoShardingEvent()
);
}
yield dataStream.getAutoShardingEvent();
}
// data sharding might not be available due to the feature not being available/enabled or due to cluster level excludes
// being configured. the index template will dictate the number of shards as usual
case NOT_APPLICABLE -> {
logger.debug("auto sharding is not applicable for data stream [{}]", dataStreamName);
yield null;
}
};

// configure the number of shards using an auto sharding event (new, or existing) if we have one
if (dataStreamAutoShardingEvent != null) {
Settings settingsWithAutoSharding = Settings.builder()
.put(createIndexRequest.settings())
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), dataStreamAutoShardingEvent.targetNumberOfShards())
.build();
createIndexRequest.settings(settingsWithAutoSharding);
}

var createIndexClusterStateRequest = prepareDataStreamCreateIndexRequest(
dataStreamName,
newWriteIndexName,
Expand All @@ -298,7 +354,14 @@ private RolloverResult rolloverDataStream(
silent,
(builder, indexMetadata) -> {
downgradeBrokenTsdbBackingIndices(dataStream, builder);
builder.put(dataStream.rollover(indexMetadata.getIndex(), newGeneration, metadata.isTimeSeriesTemplate(templateV2)));
builder.put(
dataStream.rollover(
indexMetadata.getIndex(),
newGeneration,
metadata.isTimeSeriesTemplate(templateV2),
dataStreamAutoShardingEvent
)
);
},
rerouteCompletionIsNotRequired()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.indices.rollover;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;

/**
* Condition for automatically increasing the number of shards for a data stream. This indicates the optimum number of shards that was
* configured for the index abstraction as part of rollover.
* It's more of a marker condition, when present the condition is met, more than a condition we evaluate against stats.
*/
public class OptimalShardCountCondition extends Condition<Integer> {
public static final String NAME = "optimal_shard_count";

public OptimalShardCountCondition(int optimalShards) {
super(NAME, Type.AUTOMATIC);
this.value = optimalShards;
}

public OptimalShardCountCondition(StreamInput in) throws IOException {
super(NAME, Type.AUTOMATIC);
this.value = in.readVInt();
}

@Override
public Result evaluate(final Stats stats) {
return new Result(this, true);
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(value);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field(NAME, value);
}

public static OptimalShardCountCondition fromXContent(XContentParser parser) throws IOException {
if (parser.nextToken() == XContentParser.Token.VALUE_NUMBER) {
return new OptimalShardCountCondition(parser.intValue());
} else {
throw new IllegalArgumentException("invalid token when parsing " + NAME + " condition: " + parser.currentToken());
}
}

@Override
boolean includedInVersion(TransportVersion version) {
return version.onOrAfter(TransportVersions.AUTO_SHARDING_ROLLOVER_CONDITION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.action.admin.indices.rollover;

import org.elasticsearch.action.datastreams.autosharding.AutoShardingResult;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -27,6 +28,9 @@
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.DECREASE_SHARDS;
import static org.elasticsearch.action.datastreams.autosharding.AutoShardingType.INCREASE_SHARDS;

/**
* Contains the conditions that determine if an index can be rolled over or not. It is used by the {@link RolloverRequest},
* the Index Lifecycle Management and the Data Stream Lifecycle.
Expand Down Expand Up @@ -243,7 +247,12 @@ public boolean areConditionsMet(Map<String, Boolean> conditionResults) {
.filter(c -> Condition.Type.MAX == c.type())
.anyMatch(c -> conditionResults.getOrDefault(c.toString(), false));

return conditionResults.size() == 0 || (allMinConditionsMet && anyMaxConditionsMet);
boolean anyAutomaticConditionsMet = conditions.values()
.stream()
.filter(c -> Condition.Type.AUTOMATIC == c.type())
.anyMatch(c -> conditionResults.getOrDefault(c.toString(), false));

return conditionResults.size() == 0 || (allMinConditionsMet && anyMaxConditionsMet) || anyAutomaticConditionsMet;
}

public static RolloverConditions fromXContent(XContentParser parser) throws IOException {
Expand Down Expand Up @@ -408,6 +417,19 @@ public Builder addMinPrimaryShardDocsCondition(Long numDocs) {
return this;
}

/**
* Adds an optimal shard count condition if the autosharding result is of type INCREASE or DECREASE_SHARDS, ignores it otherwise.
*/
public Builder addOptimalShardCountCondition(AutoShardingResult autoShardingResult) {
if (autoShardingResult.type().equals(INCREASE_SHARDS) || autoShardingResult.type().equals(DECREASE_SHARDS)) {
OptimalShardCountCondition optimalShardCountCondition = new OptimalShardCountCondition(
autoShardingResult.targetNumberOfShards()
);
this.conditions.put(optimalShardCountCondition.name, optimalShardCountCondition);
}
return this;
}

public RolloverConditions build() {
return new RolloverConditions(conditions);
}
Expand Down

0 comments on commit 387eb38

Please sign in to comment.