Skip to content

Commit

Permalink
Forecast average shard size during rollovers (#91561)
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez committed Nov 15, 2022
1 parent 6e5c3d9 commit d891b1f
Show file tree
Hide file tree
Showing 15 changed files with 573 additions and 192 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/91561.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 91561
summary: Forecast average shard size during rollovers
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.metadata.DataStreamAlias;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
import org.elasticsearch.cluster.metadata.IndexWriteLoad;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
Expand All @@ -78,7 +80,6 @@
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.IndexWriteLoad;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.indices.InvalidIndexNameException;
Expand All @@ -103,6 +104,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
Expand Down Expand Up @@ -2010,7 +2012,7 @@ public void testSearchWithRouting() throws IOException, ExecutionException, Inte
assertEquals(searchResponse.getTotalShards(), 4);
}

public void testWriteIndexWriteLoadIsStoredAfterRollover() throws Exception {
public void testWriteIndexWriteLoadAndAvgShardSizeIsStoredAfterRollover() throws Exception {
final String dataStreamName = "logs-es";
final int numberOfShards = randomIntBetween(1, 5);
final int numberOfReplicas = randomIntBetween(0, 1);
Expand Down Expand Up @@ -2047,21 +2049,26 @@ public void testWriteIndexWriteLoadIsStoredAfterRollover() throws Exception {

for (Index index : dataStream.getIndices()) {
final IndexMetadata indexMetadata = clusterState.metadata().index(index);
final IndexWriteLoad indexWriteLoad = indexMetadata.getWriteLoad();
final IndexMetadataStats metadataStats = indexMetadata.getStats();

if (index.equals(dataStream.getWriteIndex()) == false) {
assertThat(indexWriteLoad, is(notNullValue()));
assertThat(metadataStats, is(notNullValue()));

final var averageShardSize = metadataStats.averageShardSize();
assertThat(averageShardSize.getAverageSizeInBytes(), is(greaterThan(0L)));

final IndexWriteLoad indexWriteLoad = metadataStats.writeLoad();
for (int shardId = 0; shardId < numberOfShards; shardId++) {
assertThat(indexWriteLoad.getWriteLoadForShard(shardId).getAsDouble(), is(greaterThanOrEqualTo(0.0)));
assertThat(indexWriteLoad.getUptimeInMillisForShard(shardId).getAsLong(), is(greaterThan(0L)));
}
} else {
assertThat(indexWriteLoad, is(nullValue()));
assertThat(metadataStats, is(nullValue()));
}
}
}

public void testWriteLoadIsStoredInABestEffort() throws Exception {
public void testWriteLoadAndAvgShardSizeIsStoredInABestEffort() throws Exception {
// This test simulates the scenario where some nodes fail to respond
// to the IndicesStatsRequest and therefore only a partial view of the
// write-index write-load is stored during rollover.
Expand Down Expand Up @@ -2115,21 +2122,129 @@ public void testWriteLoadIsStoredInABestEffort() throws Exception {

for (Index index : dataStream.getIndices()) {
final IndexMetadata indexMetadata = clusterState.metadata().index(index);
final IndexWriteLoad indexWriteLoad = indexMetadata.getWriteLoad();
final IndexMetadataStats metadataStats = indexMetadata.getStats();

if (index.equals(dataStream.getWriteIndex()) == false) {
assertThat(indexWriteLoad, is(notNullValue()));
assertThat(metadataStats, is(notNullValue()));

final IndexWriteLoad indexWriteLoad = metadataStats.writeLoad();
// All stats request performed against nodes holding the shard 0 failed
assertThat(indexWriteLoad.getWriteLoadForShard(0).isPresent(), is(false));
assertThat(indexWriteLoad.getUptimeInMillisForShard(0).isPresent(), is(false));

// At least one of the shard 1 copies responded with stats
assertThat(indexWriteLoad.getWriteLoadForShard(1).getAsDouble(), is(greaterThanOrEqualTo(0.0)));
assertThat(indexWriteLoad.getUptimeInMillisForShard(1).getAsLong(), is(greaterThan(0L)));

final var averageShardSize = metadataStats.averageShardSize();
assertThat(averageShardSize.numberOfShards(), is(equalTo(1)));

assertThat(averageShardSize.getAverageSizeInBytes(), is(greaterThan(0L)));
} else {
assertThat(indexWriteLoad, is(nullValue()));
assertThat(metadataStats, is(nullValue()));
}
}
}

public void testNoShardSizeIsForecastedWhenAllShardStatRequestsFail() throws Exception {
final String dataOnlyNode = internalCluster().startDataOnlyNode();
final String dataStreamName = "logs-es";

final var indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.require._name", dataOnlyNode)
.build();
DataStreamIT.putComposableIndexTemplate("my-template", null, List.of("logs-*"), indexSettings, null);
final var createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet());

for (int i = 0; i < 10; i++) {
indexDocs(dataStreamName, randomIntBetween(100, 200));
}

final ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
final DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName);
final String assignedShardNodeId = clusterStateBeforeRollover.routingTable()
.index(dataStreamBeforeRollover.getWriteIndex())
.shard(0)
.primaryShard()
.currentNodeId();

final String nodeName = clusterStateBeforeRollover.nodes().resolveNode(assignedShardNodeId).getName();
final MockTransportService transportService = (MockTransportService) internalCluster().getInstance(
TransportService.class,
nodeName
);
transportService.addRequestHandlingBehavior(
IndicesStatsAction.NAME + "[n]",
(handler, request, channel, task) -> channel.sendResponse(new RuntimeException("Unable to get stats"))
);

assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet());

final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName);
final IndexMetadata currentWriteIndexMetadata = clusterState.metadata().getIndexSafe(dataStream.getWriteIndex());

// When all shard stats request fail, we cannot forecast the shard size
assertThat(currentWriteIndexMetadata.getForecastedShardSizeInBytes().isEmpty(), is(equalTo(true)));
}

public void testShardSizeIsForecastedDuringRollover() throws Exception {
final String dataStreamName = "logs-es";
final int numberOfShards = randomIntBetween(1, 5);
final int numberOfReplicas = randomIntBetween(0, 1);
final var indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.build();
DataStreamIT.putComposableIndexTemplate("my-template", null, List.of("logs-*"), indexSettings, null);
final var request = new CreateDataStreamAction.Request(dataStreamName);
assertAcked(client().execute(CreateDataStreamAction.INSTANCE, request).actionGet());

for (int i = 0; i < 4; i++) {
for (int j = 0; j < 10; j++) {
indexDocs(dataStreamName, randomIntBetween(100, 200));
}

// Ensure that we get a stable size to compare against the expected size
assertThat(
client().admin().indices().prepareForceMerge().setFlush(true).setMaxNumSegments(1).get().getSuccessfulShards(),
is(greaterThanOrEqualTo(numberOfShards))
);

assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet());
}

final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName);

final List<String> dataStreamReadIndices = dataStream.getIndices()
.stream()
.filter(index -> index.equals(dataStream.getWriteIndex()) == false)
.map(Index::getName)
.toList();

final IndicesStatsResponse indicesStatsResponse = client().admin()
.indices()
.prepareStats(dataStreamReadIndices.toArray(new String[dataStreamReadIndices.size()]))
.setStore(true)
.get();
long expectedTotalSizeInBytes = 0;
int shardCount = 0;
for (ShardStats shard : indicesStatsResponse.getShards()) {
if (shard.getShardRouting().primary() == false) {
continue;
}
expectedTotalSizeInBytes += shard.getStats().getDocs().getTotalSizeInBytes();
shardCount++;
}

final IndexMetadata writeIndexMetadata = clusterState.metadata().index(dataStream.getWriteIndex());
final OptionalLong forecastedShardSizeInBytes = writeIndexMetadata.getForecastedShardSizeInBytes();
assertThat(forecastedShardSizeInBytes.isPresent(), is(equalTo(true)));
assertThat(forecastedShardSizeInBytes.getAsLong(), is(equalTo(expectedTotalSizeInBytes / shardCount)));
}

static void putComposableIndexTemplate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
import org.elasticsearch.cluster.metadata.IndexWriteLoad;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MapperTestUtils;
import org.elasticsearch.index.shard.IndexWriteLoad;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -102,7 +103,7 @@ public void testRolloverClusterStateForDataStream() throws Exception {
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
IndexWriteLoad indexWriteLoad = IndexWriteLoad.builder(1).build();
IndexMetadataStats indexStats = new IndexMetadataStats(IndexWriteLoad.builder(1).build(), 10, 10);

long before = testThreadPool.absoluteTimeInMillis();
MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(
Expand All @@ -114,7 +115,7 @@ public void testRolloverClusterStateForDataStream() throws Exception {
now,
randomBoolean(),
false,
indexWriteLoad
indexStats
);
long after = testThreadPool.absoluteTimeInMillis();

Expand Down Expand Up @@ -142,16 +143,16 @@ public void testRolloverClusterStateForDataStream() throws Exception {
IndexMetadata im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(0));
Instant startTime1 = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
Instant endTime1 = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
IndexWriteLoad indexWriteLoad1 = im.getWriteLoad();
IndexMetadataStats indexStats1 = im.getStats();
im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(1));
Instant startTime2 = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
Instant endTime2 = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
IndexWriteLoad indexWriteLoad2 = im.getWriteLoad();
IndexMetadataStats indexStats2 = im.getStats();
assertThat(startTime1.isBefore(endTime1), is(true));
assertThat(endTime1, equalTo(startTime2));
assertThat(endTime2.isAfter(endTime1), is(true));
assertThat(indexWriteLoad1, is(equalTo(indexWriteLoad)));
assertThat(indexWriteLoad2, is(nullValue()));
assertThat(indexStats1, is(equalTo(indexStats)));
assertThat(indexStats2, is(nullValue()));
} finally {
testThreadPool.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
Expand All @@ -31,7 +32,6 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.IndexWriteLoad;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.snapshots.SnapshotInProgressException;
Expand All @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;

import static org.elasticsearch.cluster.metadata.IndexAbstraction.Type.ALIAS;
Expand Down Expand Up @@ -105,7 +106,7 @@ public RolloverResult rolloverClusterState(
Instant now,
boolean silent,
boolean onlyValidate,
@Nullable IndexWriteLoad sourceIndexWriteLoad
@Nullable IndexMetadataStats sourceIndexStats
) throws Exception {
validate(currentState.metadata(), rolloverTarget, newIndexName, createIndexRequest);
final IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(rolloverTarget);
Expand All @@ -129,7 +130,7 @@ public RolloverResult rolloverClusterState(
now,
silent,
onlyValidate,
sourceIndexWriteLoad
sourceIndexStats
);
default ->
// the validate method above prevents this case
Expand Down Expand Up @@ -240,7 +241,7 @@ private RolloverResult rolloverDataStream(
Instant now,
boolean silent,
boolean onlyValidate,
@Nullable IndexWriteLoad sourceIndexWriteLoad
@Nullable IndexMetadataStats sourceIndexStats
) throws Exception {

if (SnapshotsService.snapshottingDataStreams(currentState, Collections.singleton(dataStream.getName())).isEmpty() == false) {
Expand Down Expand Up @@ -302,18 +303,50 @@ private RolloverResult rolloverDataStream(

Metadata.Builder metadataBuilder = Metadata.builder(newState.metadata())
.put(
IndexMetadata.builder(newState.metadata().index(originalWriteIndex))
.indexWriteLoad(sourceIndexWriteLoad)
.putRolloverInfo(rolloverInfo)
IndexMetadata.builder(newState.metadata().index(originalWriteIndex)).stats(sourceIndexStats).putRolloverInfo(rolloverInfo)
);

metadataBuilder = writeLoadForecaster.withWriteLoadForecastForWriteIndex(dataStreamName, metadataBuilder);
metadataBuilder = withShardSizeForecastForWriteIndex(dataStreamName, metadataBuilder);

newState = ClusterState.builder(newState).metadata(metadataBuilder).build();

return new RolloverResult(newWriteIndexName, originalWriteIndex.getName(), newState);
}

public Metadata.Builder withShardSizeForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
final DataStream dataStream = metadata.dataStream(dataStreamName);

if (dataStream == null) {
return metadata;
}

final List<IndexMetadataStats> indicesStats = dataStream.getIndices()
.stream()
.map(metadata::getSafe)
.map(IndexMetadata::getStats)
.filter(Objects::nonNull)
.toList();

long totalSizeInBytes = 0;
int shardCount = 0;
for (IndexMetadataStats stats : indicesStats) {
var averageShardSize = stats.averageShardSize();
totalSizeInBytes += averageShardSize.totalSizeInBytes();
shardCount += averageShardSize.numberOfShards();
}

if (shardCount == 0) {
return metadata;
}

long shardSizeInBytesForecast = totalSizeInBytes / shardCount;
final IndexMetadata writeIndex = metadata.getSafe(dataStream.getWriteIndex());
metadata.put(IndexMetadata.builder(writeIndex).shardSizeInBytesForecast(shardSizeInBytesForecast).build(), false);

return metadata;
}

static String generateRolloverIndexName(String sourceIndexName) {
String resolvedName = IndexNameExpressionResolver.resolveDateMathExpression(sourceIndexName);
final boolean isDateMath = sourceIndexName.equals(resolvedName) == false;
Expand Down

0 comments on commit d891b1f

Please sign in to comment.