Skip to content

Commit

Permalink
Keep track of average shard write load (#90768)
Browse files Browse the repository at this point in the history
This commit adds a new field, write_load, into the shard stats. This new stat exposes the average number of write threads used while indexing documents.

Closes #90102
  • Loading branch information
fcofdez committed Oct 13, 2022
1 parent 0671e08 commit 1a3032b
Show file tree
Hide file tree
Showing 25 changed files with 412 additions and 39 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/90768.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 90768
summary: Keep track of average shard write load
area: CRUD
type: enhancement
issues:
- 90102
4 changes: 4 additions & 0 deletions docs/reference/cluster/nodes-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ Total time spent throttling operations.
(integer)
Total time in milliseconds
spent throttling operations.

`write_load`::
(double)
Average number of write threads used while indexing documents.
=======
`get`::
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
---
setup:
- skip:
version: " - 8.5.99"
reason: Indices write load stats were introduced in 8.6
---
"Write load average is tracked at shard level":
- do:
indices.create:
index: testindex
body:
settings:
index.number_of_shards: 1
index.number_of_replicas: 0
mappings:
properties:
name:
type: text
description:
type: text
price:
type: double

- do:
indices.stats:
index: "testindex"
level: shards
metric: [ indexing ]

- match: { _all.total.indexing.write_load: 0.0 }
- match: { indices.testindex.total.indexing.write_load: 0.0 }
- match: { indices.testindex.shards.0.0.indexing.write_load: 0.0 }

- do:
index:
index: testindex
body: { "name": "specialty coffee", "description": "arabica coffee beans", "price": 100 }
- do:
index:
index: testindex
body: { "name": "commercial coffee", "description": "robusta coffee beans", "price": 50 }
- do:
index:
index: testindex
body: { "name": "raw coffee", "description": "colombian coffee beans", "price": 25 }
- do:
index:
index: testindex
body: { "name": "book", "description": "some book", "price": 1000 }

- do:
indices.stats:
index: "testindex"
level: shards
metric: [ indexing ]

- gte: { _all.total.indexing.write_load: 0.0 }
- gte: { indices.testindex.total.indexing.write_load: 0.0 }
- gte: { indices.testindex.shards.0.0.indexing.write_load: 0.0 }
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,8 @@ public static final IndexShard newIndexShard(
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,
System::nanoTime
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(),
config.getSnapshotCommitSupplier(),
config.getLeafSorter()
config.getLeafSorter(),
config.getRelativeTimeInNanosSupplier()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.indices.stats;

import org.apache.lucene.tests.util.LuceneTestCase.SuppressCodecs;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
Expand Down Expand Up @@ -1437,6 +1438,38 @@ public void testConcurrentIndexingAndStatsRequests() throws BrokenBarrierExcepti
assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
}

public void testWriteLoadIsCaptured() throws Exception {
final String indexName = "test-idx";
createIndex(indexName);
final IndicesStatsResponse statsResponseBeforeIndexing = client().admin().indices().prepareStats(indexName).get();
final IndexStats indexStatsBeforeIndexing = statsResponseBeforeIndexing.getIndices().get(indexName);
assertThat(indexStatsBeforeIndexing, is(notNullValue()));
assertThat(indexStatsBeforeIndexing.getPrimaries().getIndexing().getTotal().getWriteLoad(), is(equalTo(0.0)));

final AtomicInteger idGenerator = new AtomicInteger();
assertBusy(() -> {
final int numDocs = randomIntBetween(15, 25);
final List<ActionFuture<IndexResponse>> indexRequestFutures = new ArrayList<>(numDocs);
for (int i = 0; i < numDocs; i++) {
indexRequestFutures.add(
client().prepareIndex(indexName)
.setId(Integer.toString(idGenerator.incrementAndGet()))
.setSource("{}", XContentType.JSON)
.execute()
);
}

for (ActionFuture<IndexResponse> indexRequestFuture : indexRequestFutures) {
assertThat(indexRequestFuture.get().getResult(), equalTo(DocWriteResponse.Result.CREATED));
}

final IndicesStatsResponse statsResponseAfterIndexing = client().admin().indices().prepareStats(indexName).get();
final IndexStats indexStatsAfterIndexing = statsResponseAfterIndexing.getIndices().get(indexName);
assertThat(indexStatsAfterIndexing, is(notNullValue()));
assertThat(indexStatsAfterIndexing.getPrimaries().getIndexing().getTotal().getWriteLoad(), is(greaterThan(0.0)));
});
}

/**
* Persist the global checkpoint on all shards of the given index into disk.
* This makes sure that the persisted global checkpoint on those shards will equal to the in-memory value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
snapshotCommitSupplier
snapshotCommitSupplier,
System::nanoTime
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {

private final TranslogConfig translogConfig;

private final LongSupplier relativeTimeInNanosSupplier;

/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
Expand All @@ -136,7 +138,8 @@ public EngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
Comparator<LeafReader> leafSorter
Comparator<LeafReader> leafSorter,
LongSupplier relativeTimeInNanosSupplier
) {
this.shardId = shardId;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -176,6 +179,7 @@ public EngineConfig(
this.primaryTermSupplier = primaryTermSupplier;
this.snapshotCommitSupplier = snapshotCommitSupplier;
this.leafSorter = leafSorter;
this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier;
}

/**
Expand Down Expand Up @@ -374,4 +378,8 @@ public IndexStorePlugin.SnapshotCommitSupplier getSnapshotCommitSupplier() {
public Comparator<LeafReader> getLeafSorter() {
return leafSorter;
}

public LongSupplier getRelativeTimeInNanosSupplier() {
return relativeTimeInNanosSupplier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,16 @@ public class InternalEngine extends Engine {
@Nullable
private volatile String forceMergeUUID;

private final LongSupplier relativeTimeInNanosSupplier;

public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
}

InternalEngine(EngineConfig engineConfig, int maxDocs, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig);
this.maxDocs = maxDocs;
this.relativeTimeInNanosSupplier = config().getRelativeTimeInNanosSupplier();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
store.incRef();
IndexWriter writer = null;
Expand Down Expand Up @@ -1024,7 +1027,7 @@ public IndexResult index(Index index) throws IOException {
assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO;
localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
}
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.setTook(relativeTimeInNanosSupplier.getAsLong() - index.startTime());
indexResult.freeze();
return indexResult;
} finally {
Expand Down
33 changes: 26 additions & 7 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ Runnable getGlobalCheckpointSyncer() {
private final RefreshPendingLocationListener refreshPendingLocationListener;
private volatile boolean useRetentionLeasesInPeerRecovery;
private final boolean isDataStreamIndex; // if a shard is a part of data stream
private final LongSupplier relativeTimeInNanosSupplier;
private volatile long startedRelativeTimeInNanos;
private volatile long indexingTimeBeforeShardStartedInNanos;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -307,7 +310,8 @@ public IndexShard(
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
final LongSupplier relativeTimeInNanosSupplier
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -385,6 +389,7 @@ public IndexShard(
this.useRetentionLeasesInPeerRecovery = replicationTracker.hasAllPeerRecoveryRetentionLeases();
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.isDataStreamIndex = mapperService == null ? false : mapperService.mappingLookup().isDataStreamTimestampFieldEnabled();
this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -533,6 +538,8 @@ public void updateShardState(
: "a primary relocation is completed by the master, but primary mode is not active " + currentRouting;

changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
startedRelativeTimeInNanos = getRelativeTimeInNanos();
indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos();
} else if (currentRouting.primary()
&& currentRouting.relocating()
&& replicationTracker.isRelocated()
Expand Down Expand Up @@ -956,7 +963,8 @@ private Engine.IndexResult applyIndexOperation(
autoGeneratedTimeStamp,
isRetry,
ifSeqNo,
ifPrimaryTerm
ifPrimaryTerm,
getRelativeTimeInNanos()
);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
Expand Down Expand Up @@ -985,9 +993,9 @@ public static Engine.Index prepareIndex(
long autoGeneratedIdTimestamp,
boolean isRetry,
long ifSeqNo,
long ifPrimaryTerm
long ifPrimaryTerm,
long startTimeInNanos
) {
long startTime = System.nanoTime();
assert source.dynamicTemplates().isEmpty() || origin == Engine.Operation.Origin.PRIMARY
: "dynamic_templates parameter can only be associated with primary operations";
DocumentMapper documentMapper = mapperService.documentMapper();
Expand All @@ -1013,7 +1021,7 @@ public static Engine.Index prepareIndex(
version,
versionType,
origin,
startTime,
startTimeInNanos,
autoGeneratedIdTimestamp,
isRetry,
ifSeqNo,
Expand Down Expand Up @@ -1272,7 +1280,13 @@ public IndexingStats indexingStats() {
throttled = engine.isThrottled();
throttleTimeInMillis = engine.getIndexThrottleTimeInMillis();
}
return internalIndexingStats.stats(throttled, throttleTimeInMillis);

return internalIndexingStats.stats(
throttled,
throttleTimeInMillis,
indexingTimeBeforeShardStartedInNanos,
getRelativeTimeInNanos() - startedRelativeTimeInNanos
);
}

public SearchStats searchStats(String... groups) {
Expand Down Expand Up @@ -3255,7 +3269,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
replicationTracker::getRetentionLeases,
this::getOperationPrimaryTerm,
snapshotCommitSupplier,
isTimeseriesIndex ? TIMESERIES_LEAF_READERS_SORTER : null
isTimeseriesIndex ? TIMESERIES_LEAF_READERS_SORTER : null,
relativeTimeInNanosSupplier
);
}

Expand Down Expand Up @@ -4091,6 +4106,10 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
return retentionLeaseSyncer;
}

public long getRelativeTimeInNanos() {
return relativeTimeInNanosSupplier.getAsLong();
}

@Override
public String toString() {
return "IndexShard(shardRouting=" + shardRouting + ")";
Expand Down

0 comments on commit 1a3032b

Please sign in to comment.