Skip to content

Commit

Permalink
Store write load in IndexMetadata during data streams rollovers (#91019)
Browse files Browse the repository at this point in the history
This commits stores the index write load of the current data stream write-index
during rollover into its IndexMetadata.

Closes #91046
  • Loading branch information
fcofdez committed Nov 4, 2022
1 parent d6cff11 commit 72f3578
Show file tree
Hide file tree
Showing 19 changed files with 749 additions and 38 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/91019.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 91019
summary: Store write load in the `IndexMetadata` during data streams rollovers
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.get.GetComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
Expand Down Expand Up @@ -61,6 +65,8 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
Expand All @@ -72,6 +78,8 @@
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.IndexWriteLoad;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -80,6 +88,8 @@
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FieldAndFormat;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ObjectPath;
import org.elasticsearch.xcontent.XContentType;

Expand Down Expand Up @@ -115,6 +125,8 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItemInArray;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -127,7 +139,7 @@ public class DataStreamIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class);
return List.of(DataStreamsPlugin.class, MockTransportService.TestPlugin.class);
}

public void testBasicScenario() throws Exception {
Expand Down Expand Up @@ -1998,6 +2010,128 @@ public void testSearchWithRouting() throws IOException, ExecutionException, Inte
assertEquals(searchResponse.getTotalShards(), 4);
}

public void testWriteIndexWriteLoadIsStoredAfterRollover() throws Exception {
final String dataStreamName = "logs-es";
final int numberOfShards = randomIntBetween(1, 5);
final int numberOfReplicas = randomIntBetween(0, 1);
final var indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.build();
DataStreamIT.putComposableIndexTemplate("my-template", null, List.of("logs-*"), indexSettings, null);
final var request = new CreateDataStreamAction.Request(dataStreamName);
assertAcked(client().execute(CreateDataStreamAction.INSTANCE, request).actionGet());

assertBusy(() -> {
for (int i = 0; i < 10; i++) {
indexDocs(dataStreamName, randomIntBetween(100, 200));
}

final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName);
final String writeIndex = dataStream.getWriteIndex().getName();
final IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(writeIndex).get();
for (IndexShardStats indexShardStats : indicesStatsResponse.getIndex(writeIndex).getIndexShards().values()) {
for (ShardStats shard : indexShardStats.getShards()) {
final IndexingStats.Stats shardIndexingStats = shard.getStats().getIndexing().getTotal();
// Ensure that we have enough clock granularity before rolling over to ensure that we capture _some_ write load
assertThat(shardIndexingStats.getTotalActiveTimeInMillis(), is(greaterThan(0L)));
assertThat(shardIndexingStats.getWriteLoad(), is(greaterThan(0.0)));
}
}
});

assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet());
final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName);

for (Index index : dataStream.getIndices()) {
final IndexMetadata indexMetadata = clusterState.metadata().index(index);
final IndexWriteLoad indexWriteLoad = indexMetadata.getWriteLoad();

if (index.equals(dataStream.getWriteIndex()) == false) {
assertThat(indexWriteLoad, is(notNullValue()));
for (int shardId = 0; shardId < numberOfShards; shardId++) {
assertThat(indexWriteLoad.getWriteLoadForShard(shardId).getAsDouble(), is(greaterThanOrEqualTo(0.0)));
assertThat(indexWriteLoad.getUptimeInMillisForShard(shardId).getAsLong(), is(greaterThan(0L)));
}
} else {
assertThat(indexWriteLoad, is(nullValue()));
}
}
}

public void testWriteLoadIsStoredInABestEffort() throws Exception {
// This test simulates the scenario where some nodes fail to respond
// to the IndicesStatsRequest and therefore only a partial view of the
// write-index write-load is stored during rollover.
// In this test we simulate the following scenario:
// - The DataStream template is configured to have 2 shards and 1 replica
// - There's an allocation rule to allocate the data stream nodes in 4 particular nodes
// - We want to simulate two possible cases here:
// - All the assigned nodes for shard 0 will fail to respond to the IndicesStatsRequest
// - Only the shard 1 replica will respond successfully to the IndicesStatsRequest ensuring that we fall back in that case

final List<String> dataOnlyNodes = internalCluster().startDataOnlyNodes(4);
final String dataStreamName = "logs-es";

final var indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.include._name", String.join(",", dataOnlyNodes))
.build();
DataStreamIT.putComposableIndexTemplate("my-template", null, List.of("logs-*"), indexSettings, null);
final var createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest).actionGet());

for (int i = 0; i < 10; i++) {
indexDocs(dataStreamName, randomIntBetween(100, 200));
}

final ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
final DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName);
final IndexRoutingTable currentDataStreamWriteIndexRoutingTable = clusterStateBeforeRollover.routingTable()
.index(dataStreamBeforeRollover.getWriteIndex());

final List<String> failingIndicesStatsNodeIds = new ArrayList<>();
for (ShardRouting shardRouting : currentDataStreamWriteIndexRoutingTable.shard(0).assignedShards()) {
failingIndicesStatsNodeIds.add(shardRouting.currentNodeId());
}
failingIndicesStatsNodeIds.add(currentDataStreamWriteIndexRoutingTable.shard(1).primaryShard().currentNodeId());

for (String nodeId : failingIndicesStatsNodeIds) {
String nodeName = clusterStateBeforeRollover.nodes().resolveNode(nodeId).getName();
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeName);
transportService.addRequestHandlingBehavior(
IndicesStatsAction.NAME + "[n]",
(handler, request, channel, task) -> channel.sendResponse(new RuntimeException("Unable to get stats"))
);
}
assertThat(failingIndicesStatsNodeIds.size(), is(equalTo(3)));

assertAcked(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet());
final ClusterState clusterState = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
final DataStream dataStream = clusterState.getMetadata().dataStreams().get(dataStreamName);

for (Index index : dataStream.getIndices()) {
final IndexMetadata indexMetadata = clusterState.metadata().index(index);
final IndexWriteLoad indexWriteLoad = indexMetadata.getWriteLoad();

if (index.equals(dataStream.getWriteIndex()) == false) {
assertThat(indexWriteLoad, is(notNullValue()));
// All stats request performed against nodes holding the shard 0 failed
assertThat(indexWriteLoad.getWriteLoadForShard(0).isPresent(), is(false));
assertThat(indexWriteLoad.getUptimeInMillisForShard(0).isPresent(), is(false));

// At least one of the shard 1 copies responded with stats
assertThat(indexWriteLoad.getWriteLoadForShard(1).getAsDouble(), is(greaterThanOrEqualTo(0.0)));
assertThat(indexWriteLoad.getUptimeInMillisForShard(1).getAsLong(), is(greaterThan(0L)));
} else {
assertThat(indexWriteLoad, is(nullValue()));
}
}
}

static void putComposableIndexTemplate(
String id,
@Nullable String mappings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ private MetadataRolloverService.RolloverResult rolloverOver(ClusterState state,
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
return rolloverService.rolloverClusterState(state, name, null, createIndexRequest, metConditions, time, false, false);
return rolloverService.rolloverClusterState(state, name, null, createIndexRequest, metConditions, time, false, false, null);
}

private Index getWriteIndex(ClusterState state, String name, String timestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MapperTestUtils;
import org.elasticsearch.index.shard.IndexWriteLoad;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -47,6 +48,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class MetadataDataStreamRolloverServiceTests extends ESTestCase {

Expand Down Expand Up @@ -100,6 +102,7 @@ public void testRolloverClusterStateForDataStream() throws Exception {
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
IndexWriteLoad indexWriteLoad = IndexWriteLoad.builder(1).build();

long before = testThreadPool.absoluteTimeInMillis();
MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(
Expand All @@ -110,7 +113,8 @@ public void testRolloverClusterStateForDataStream() throws Exception {
metConditions,
now,
randomBoolean(),
false
false,
indexWriteLoad
);
long after = testThreadPool.absoluteTimeInMillis();

Expand Down Expand Up @@ -138,12 +142,16 @@ public void testRolloverClusterStateForDataStream() throws Exception {
IndexMetadata im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(0));
Instant startTime1 = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
Instant endTime1 = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
IndexWriteLoad indexWriteLoad1 = im.getWriteLoad();
im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(1));
Instant startTime2 = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
Instant endTime2 = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
IndexWriteLoad indexWriteLoad2 = im.getWriteLoad();
assertThat(startTime1.isBefore(endTime1), is(true));
assertThat(endTime1, equalTo(startTime2));
assertThat(endTime2.isAfter(endTime1), is(true));
assertThat(indexWriteLoad1, is(equalTo(indexWriteLoad)));
assertThat(indexWriteLoad2, is(nullValue()));
} finally {
testThreadPool.shutdown();
}
Expand Down Expand Up @@ -204,7 +212,8 @@ public void testRolloverAndMigrateDataStream() throws Exception {
metConditions,
now,
randomBoolean(),
false
false,
null
);

String sourceIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration());
Expand Down Expand Up @@ -295,7 +304,8 @@ public void testChangingIndexModeFromTimeSeriesToSomethingElseNoEffectOnExisting
metConditions,
now,
randomBoolean(),
false
false,
null
);

String sourceIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.IndexWriteLoad;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.snapshots.SnapshotInProgressException;
Expand Down Expand Up @@ -98,7 +99,8 @@ public RolloverResult rolloverClusterState(
List<Condition<?>> metConditions,
Instant now,
boolean silent,
boolean onlyValidate
boolean onlyValidate,
@Nullable IndexWriteLoad sourceIndexWriteLoad
) throws Exception {
validate(currentState.metadata(), rolloverTarget, newIndexName, createIndexRequest);
final IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(rolloverTarget);
Expand All @@ -121,7 +123,8 @@ public RolloverResult rolloverClusterState(
metConditions,
now,
silent,
onlyValidate
onlyValidate,
sourceIndexWriteLoad
);
default ->
// the validate method above prevents this case
Expand Down Expand Up @@ -228,7 +231,8 @@ private RolloverResult rolloverDataStream(
List<Condition<?>> metConditions,
Instant now,
boolean silent,
boolean onlyValidate
boolean onlyValidate,
@Nullable IndexWriteLoad sourceIndexWriteLoad
) throws Exception {

if (SnapshotsService.snapshottingDataStreams(currentState, Collections.singleton(dataStream.getName())).isEmpty() == false) {
Expand Down Expand Up @@ -284,10 +288,15 @@ private RolloverResult rolloverDataStream(
);

RolloverInfo rolloverInfo = new RolloverInfo(dataStreamName, metConditions, threadPool.absoluteTimeInMillis());

newState = ClusterState.builder(newState)
.metadata(
Metadata.builder(newState.metadata())
.put(IndexMetadata.builder(newState.metadata().index(originalWriteIndex)).putRolloverInfo(rolloverInfo))
.put(
IndexMetadata.builder(newState.metadata().index(originalWriteIndex))
.indexWriteLoad(sourceIndexWriteLoad)
.putRolloverInfo(rolloverInfo)
)
)
.build();

Expand Down

0 comments on commit 72f3578

Please sign in to comment.