diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java index e337d351d3483..32bc7c5f97930 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java @@ -527,7 +527,7 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex, CommonStats stats = new CommonStats(); stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes()); stats.store = new StoreStats(); - stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123)); + stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123, 0.234)); return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0); } diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index aad71bbf4f640..fcddbbdae8efa 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -206,6 +206,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_REPORT_ORIGINAL_TYPES = def(9_038_00_0); public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO = def(9_039_0_00); public static final TransportVersion PROJECT_ID_IN_SNAPSHOT = def(9_040_0_00); + public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_041_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataStats.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataStats.java index f2064397ed777..39475f56ddcdf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataStats.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataStats.java @@ -109,6 +109,7 @@ public static IndexMetadataStats fromStatsResponse(IndexMetadata indexMetadata, shardStats.getShardRouting().id(), indexingShardStats.getWriteLoad(), indexingShardStats.getRecentWriteLoad(), + indexingShardStats.getPeakWriteLoad(), indexingShardStats.getTotalActiveTimeInMillis() ); totalSizeInBytes += commonStats.getDocs().getTotalSizeInBytes(); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexWriteLoad.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexWriteLoad.java index b03a1c02891f7..2fbb323e27259 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexWriteLoad.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexWriteLoad.java @@ -9,7 +9,6 @@ package org.elasticsearch.cluster.metadata; -import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -26,10 +25,14 @@ import java.util.OptionalDouble; import java.util.OptionalLong; +import static org.elasticsearch.TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD; +import static org.elasticsearch.TransportVersions.INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD; + public class IndexWriteLoad implements Writeable, ToXContentFragment { public static final ParseField SHARDS_WRITE_LOAD_FIELD = new ParseField("loads"); public static final ParseField SHARDS_UPTIME_IN_MILLIS = new ParseField("uptimes"); public static final ParseField SHARDS_RECENT_WRITE_LOAD_FIELD = new ParseField("recent_loads"); + public static final ParseField SHARDS_PEAK_WRITE_LOAD_FIELD = new ParseField("peak_loads"); private static final Double UNKNOWN_LOAD = -1.0; private static final long UNKNOWN_UPTIME = -1; @@ -37,20 +40,27 @@ public class IndexWriteLoad implements Writeable, ToXContentFragment { private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "index_write_load_parser", false, - (args, unused) -> IndexWriteLoad.create((List) args[0], (List) args[1], (List) args[2]) + (args, unused) -> IndexWriteLoad.create( + (List) args[0], + (List) args[1], + (List) args[2], + (List) args[3] + ) ); static { PARSER.declareDoubleArray(ConstructingObjectParser.constructorArg(), SHARDS_WRITE_LOAD_FIELD); PARSER.declareLongArray(ConstructingObjectParser.constructorArg(), SHARDS_UPTIME_IN_MILLIS); - // The recent write load field is optional so that we can parse XContent built by older versions which did not include it: + // The recent and peak write load fields are optional so that we can parse XContent built by older versions which did not have them: PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_RECENT_WRITE_LOAD_FIELD); + PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_PEAK_WRITE_LOAD_FIELD); } private static IndexWriteLoad create( List shardsWriteLoad, List shardsUptimeInMillis, - @Nullable List shardsRecentWriteLoad + @Nullable List shardsRecentWriteLoad, + @Nullable List shardsPeakWriteLoad ) { if (shardsWriteLoad.size() != shardsUptimeInMillis.size()) { assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis"; @@ -72,8 +82,19 @@ private static IndexWriteLoad create( if (shardsRecentWriteLoad != null && shardsRecentWriteLoad.size() != shardsUptimeInMillis.size()) { assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardsRecentWriteLoad and shardUptimeInMillis"; throw new IllegalArgumentException( - "The same number of shard write loads and shard uptimes should be provided, but " - + shardsWriteLoad + "The same number of shard recent write loads and shard uptimes should be provided, but " + + shardsRecentWriteLoad + + " " + + shardsUptimeInMillis + + " were provided" + ); + } + + if (shardsPeakWriteLoad != null && shardsPeakWriteLoad.size() != shardsUptimeInMillis.size()) { + assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardsPeakWriteLoad and shardUptimeInMillis"; + throw new IllegalArgumentException( + "The same number of shard peak write loads and shard uptimes should be provided, but " + + shardsPeakWriteLoad + " " + shardsUptimeInMillis + " were provided" @@ -83,15 +104,22 @@ private static IndexWriteLoad create( return new IndexWriteLoad( shardsWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray(), shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray(), - shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null + shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null, + shardsPeakWriteLoad != null ? shardsPeakWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null ); } private final double[] shardWriteLoad; private final long[] shardUptimeInMillis; private final double[] shardRecentWriteLoad; + private final double[] shardPeakWriteLoad; - private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nullable double[] shardRecentWriteLoad) { + private IndexWriteLoad( + double[] shardWriteLoad, + long[] shardUptimeInMillis, + @Nullable double[] shardRecentWriteLoad, + @Nullable double[] shardPeakWriteLoad + ) { assert shardWriteLoad.length == shardUptimeInMillis.length : "IndexWriteLoad constructor was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis"; this.shardWriteLoad = shardWriteLoad; @@ -104,13 +132,22 @@ private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nul this.shardRecentWriteLoad = new double[shardUptimeInMillis.length]; Arrays.fill(this.shardRecentWriteLoad, UNKNOWN_LOAD); } + if (shardPeakWriteLoad != null) { + assert shardPeakWriteLoad.length == shardUptimeInMillis.length + : "IndexWriteLoad constructor was called with non-matched lengths for shardPeakWriteLoad and shardUptimeInMillis"; + this.shardPeakWriteLoad = shardPeakWriteLoad; + } else { + this.shardPeakWriteLoad = new double[shardUptimeInMillis.length]; + Arrays.fill(this.shardPeakWriteLoad, UNKNOWN_LOAD); + } } public IndexWriteLoad(StreamInput in) throws IOException { this( in.readDoubleArray(), in.readLongArray(), - in.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null + in.getTransportVersion().onOrAfter(INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null, + in.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD) ? in.readDoubleArray() : null ); } @@ -118,9 +155,12 @@ public IndexWriteLoad(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeDoubleArray(shardWriteLoad); out.writeLongArray(shardUptimeInMillis); - if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) { + if (out.getTransportVersion().onOrAfter(INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) { out.writeDoubleArray(shardRecentWriteLoad); } + if (out.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) { + out.writeDoubleArray(shardPeakWriteLoad); + } } @Override @@ -128,6 +168,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(SHARDS_WRITE_LOAD_FIELD.getPreferredName(), shardWriteLoad); builder.field(SHARDS_UPTIME_IN_MILLIS.getPreferredName(), shardUptimeInMillis); builder.field(SHARDS_RECENT_WRITE_LOAD_FIELD.getPreferredName(), shardRecentWriteLoad); + builder.field(SHARDS_PEAK_WRITE_LOAD_FIELD.getPreferredName(), shardPeakWriteLoad); return builder; } @@ -149,6 +190,13 @@ public OptionalDouble getRecentWriteLoadForShard(int shardId) { return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty(); } + public OptionalDouble getPeakWriteLoadForShard(int shardId) { + assertShardInBounds(shardId); + + double load = shardPeakWriteLoad[shardId]; + return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty(); + } + public OptionalLong getUptimeInMillisForShard(int shardId) { assertShardInBounds(shardId); @@ -172,7 +220,8 @@ public boolean equals(Object o) { IndexWriteLoad that = (IndexWriteLoad) o; return Arrays.equals(shardWriteLoad, that.shardWriteLoad) && Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis) - && Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad); + && Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad) + && Arrays.equals(shardPeakWriteLoad, that.shardPeakWriteLoad); } @Override @@ -180,6 +229,7 @@ public int hashCode() { int result = Arrays.hashCode(shardWriteLoad); result = 31 * result + Arrays.hashCode(shardUptimeInMillis); result = 31 * result + Arrays.hashCode(shardRecentWriteLoad); + result = 31 * result + Arrays.hashCode(shardPeakWriteLoad); return result; } @@ -192,17 +242,20 @@ public static class Builder { private final double[] shardWriteLoad; private final long[] uptimeInMillis; private final double[] shardRecentWriteLoad; + private final double[] shardPeakWriteLoad; private Builder(int numShards) { this.shardWriteLoad = new double[numShards]; this.uptimeInMillis = new long[numShards]; this.shardRecentWriteLoad = new double[numShards]; + this.shardPeakWriteLoad = new double[numShards]; Arrays.fill(shardWriteLoad, UNKNOWN_LOAD); Arrays.fill(uptimeInMillis, UNKNOWN_UPTIME); Arrays.fill(shardRecentWriteLoad, UNKNOWN_LOAD); + Arrays.fill(shardPeakWriteLoad, UNKNOWN_LOAD); } - public Builder withShardWriteLoad(int shardId, double load, double recentLoad, long uptimeInMillis) { + public Builder withShardWriteLoad(int shardId, double load, double recentLoad, double peakLoad, long uptimeInMillis) { if (shardId >= this.shardWriteLoad.length) { throw new IllegalArgumentException(); } @@ -210,12 +263,13 @@ public Builder withShardWriteLoad(int shardId, double load, double recentLoad, l this.shardWriteLoad[shardId] = load; this.uptimeInMillis[shardId] = uptimeInMillis; this.shardRecentWriteLoad[shardId] = recentLoad; + this.shardPeakWriteLoad[shardId] = peakLoad; return this; } public IndexWriteLoad build() { - return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad); + return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad, shardPeakWriteLoad); } } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java b/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java index 86e894d91440f..7946e0d8eb821 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.TransportVersions.INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD; +import static org.elasticsearch.TransportVersions.INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD; public class IndexingStats implements Writeable, ToXContentFragment { @@ -46,6 +47,7 @@ public static class Stats implements Writeable, ToXContentFragment { private long totalIndexingTimeSinceShardStartedInNanos; private long totalActiveTimeInNanos; private double recentIndexingLoad; + private double peakIndexingLoad; Stats() {} @@ -76,6 +78,15 @@ public Stats(StreamInput in) throws IOException { ? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos : 0; } + if (in.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) { + peakIndexingLoad = in.readDouble(); + } else { + // When getting stats from an older version which doesn't have the recent indexing load, better to fall back to the + // unweighted write load, rather that assuming zero load: + peakIndexingLoad = totalActiveTimeInNanos > 0 + ? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos + : 0; + } } public Stats( @@ -92,7 +103,8 @@ public Stats( long throttleTimeInMillis, long totalIndexingTimeSinceShardStartedInNanos, long totalActiveTimeInNanos, - double recentIndexingLoad + double recentIndexingLoad, + double peakIndexingLoad ) { this.indexCount = indexCount; this.indexTimeInMillis = indexTimeInMillis; @@ -110,6 +122,7 @@ public Stats( this.totalActiveTimeInNanos = totalActiveTimeInNanos; // We store the weighted write load as a double because the calculation is inherently floating point this.recentIndexingLoad = recentIndexingLoad; + this.peakIndexingLoad = peakIndexingLoad; } public void add(Stats stats) { @@ -131,11 +144,12 @@ public void add(Stats stats) { // N.B. getWriteLoad() returns the ratio of these sums, which is the average of the ratios weighted by active time: totalIndexingTimeSinceShardStartedInNanos += stats.totalIndexingTimeSinceShardStartedInNanos; totalActiveTimeInNanos += stats.totalActiveTimeInNanos; - // We want getRecentWriteLoad() for the aggregated stats to also be the average weighted by active time, so we use the updating - // formula for a weighted mean: + // We want getRecentWriteLoad() and getPeakWriteLoad() for the aggregated stats to also be the average weighted by active time, + // so we use the updating formula for a weighted mean: if (totalActiveTimeInNanos > 0) { recentIndexingLoad += (stats.recentIndexingLoad - recentIndexingLoad) * stats.totalActiveTimeInNanos / totalActiveTimeInNanos; + peakIndexingLoad += (stats.peakIndexingLoad - peakIndexingLoad) * stats.totalActiveTimeInNanos / totalActiveTimeInNanos; } } @@ -239,6 +253,20 @@ public double getRecentWriteLoad() { return recentIndexingLoad; } + /** + * Returns a measurement of the peak write load. + * + *

If this {@link Stats} instance represents a single shard, this is the highest value that {@link #getRecentWriteLoad()} would + * return for any of the instances created for this shard since it started (i.e. the highest value seen by any call to + * {@link InternalIndexingStats#stats}). + * + *

If this {@link Stats} instance represents multiple shards, this is the average of that value for each shard, weighted by + * the elapsed time for each shard. (N.B. This is the average of the peak values, not the peak of the average value.) + */ + public double getPeakWriteLoad() { + return peakIndexingLoad; + } + public long getTotalActiveTimeInMillis() { return TimeUnit.NANOSECONDS.toMillis(totalActiveTimeInNanos); } @@ -265,6 +293,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD)) { out.writeDouble(recentIndexingLoad); } + if (out.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) { + out.writeDouble(peakIndexingLoad); + } } @Override @@ -286,6 +317,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.WRITE_LOAD, getWriteLoad()); builder.field(Fields.RECENT_WRITE_LOAD, getRecentWriteLoad()); + builder.field(Fields.PEAK_WRITE_LOAD, getPeakWriteLoad()); return builder; } @@ -307,7 +339,8 @@ public boolean equals(Object o) { && throttleTimeInMillis == that.throttleTimeInMillis && totalIndexingTimeSinceShardStartedInNanos == that.totalIndexingTimeSinceShardStartedInNanos && totalActiveTimeInNanos == that.totalActiveTimeInNanos - && recentIndexingLoad == that.recentIndexingLoad; + && recentIndexingLoad == that.recentIndexingLoad + && peakIndexingLoad == that.peakIndexingLoad; } @Override @@ -408,6 +441,7 @@ static final class Fields { static final String THROTTLED_TIME = "throttle_time"; static final String WRITE_LOAD = "write_load"; static final String RECENT_WRITE_LOAD = "recent_write_load"; + static final String PEAK_WRITE_LOAD = "peak_write_load"; } @Override diff --git a/server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java b/server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java index f7c5f3de2d497..19a2e59ddd863 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java +++ b/server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java @@ -21,6 +21,7 @@ import org.elasticsearch.index.engine.VersionConflictEngineException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; import static org.elasticsearch.core.TimeValue.timeValueNanos; @@ -155,6 +156,7 @@ void noopUpdate() { static class StatsHolder { private final MeanMetric indexMetric = new MeanMetric(); // Used for the count and total 'took' time (in ns) of index operations private final ExponentiallyWeightedMovingRate recentIndexMetric; // An EWMR of the total 'took' time of index operations (in ns) + private final AtomicReference peakIndexMetric; // The peak value of the EWMR observed in any stats() call private final MeanMetric deleteMetric = new MeanMetric(); private final CounterMetric indexCurrent = new CounterMetric(); private final CounterMetric indexFailed = new CounterMetric(); @@ -170,6 +172,7 @@ static class StatsHolder { lambdaInInverseNanos ); this.recentIndexMetric = new ExponentiallyWeightedMovingRate(lambdaInInverseNanos, startTimeInNanos); + this.peakIndexMetric = new AtomicReference<>(0.0); } IndexingStats.Stats stats( @@ -189,15 +192,17 @@ IndexingStats.Stats stats( currentTimeInNanos - timeSinceShardStartedInNanos, recentIndexingLoadAtShardStarted ); + double peakIndexingLoad = peakIndexMetric.accumulateAndGet(recentIndexingLoadSinceShardStarted, Math::max); logger.debug( () -> Strings.format( "Generating stats for an index shard with indexing time %s and active time %s giving unweighted write load %g, " - + "while the recency-weighted write load is %g using a half-life of %s", + + "while the recency-weighted write load is %g using a half-life of %s and the peak write load is %g", timeValueNanos(totalIndexingTimeSinceShardStartedInNanos), timeValueNanos(timeSinceShardStartedInNanos), 1.0 * totalIndexingTimeSinceShardStartedInNanos / timeSinceShardStartedInNanos, recentIndexingLoadSinceShardStarted, - timeValueNanos((long) recentIndexMetric.getHalfLife()) + timeValueNanos((long) recentIndexMetric.getHalfLife()), + peakIndexingLoad ) ); return new IndexingStats.Stats( @@ -214,7 +219,8 @@ IndexingStats.Stats stats( TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis), totalIndexingTimeSinceShardStartedInNanos, timeSinceShardStartedInNanos, - recentIndexingLoadSinceShardStarted + recentIndexingLoadSinceShardStarted, + peakIndexingLoad ); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 99c0583a351eb..cf9d3bc811664 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -592,6 +592,7 @@ private static CommonStats createShardLevelCommonStats() { ++iota, ++iota, ++iota, + ++iota, ++iota ); indicesCommonStats.getIndexing().add(new IndexingStats(indexingStats)); diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java index 4cf6120e12bbd..da4c907208299 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java @@ -100,7 +100,7 @@ public void testCalculateValidations() { 1, now, List.of(now - 3000, now - 2000, now - 1000), - getWriteLoad(1, 2.0, 9999.0), + getWriteLoad(1, 2.0, 9999.0, 9999.0), null ); builder.put(dataStream); @@ -141,7 +141,7 @@ public void testCalculateIncreaseShardingRecommendations() { 1, now, List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), - getWriteLoad(1, 2.0, 9999.0), + getWriteLoad(1, 2.0, 9999.0, 9999.0), autoShardingEvent ); @@ -170,7 +170,7 @@ public void testCalculateIncreaseShardingRecommendations() { 1, now, List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), - getWriteLoad(1, 2.0, 9999.0), + getWriteLoad(1, 2.0, 9999.0, 9999.0), autoShardingEvent ); @@ -202,7 +202,7 @@ public void testCalculateIncreaseShardingRecommendations() { 1, now, List.of(now - 10_000_000, now - 7_000_000, now - 2_000_000, now - 1_000_000, now - 1000), - getWriteLoad(1, 2.0, 9999.0), + getWriteLoad(1, 2.0, 9999.0, 9999.0), autoShardingEvent ); @@ -237,7 +237,7 @@ public void testCalculateDecreaseShardingRecommendations() { 3, now, List.of(now - 10_000, now - 7000, now - 5000, now - 2000, now - 1000), - getWriteLoad(3, 0.25, 9999.0), + getWriteLoad(3, 0.25, 9999.0, 9999.0), autoShardingEvent ); @@ -271,7 +271,7 @@ public void testCalculateDecreaseShardingRecommendations() { now - TimeValue.timeValueDays(2).getMillis(), now - 1000 ), - getWriteLoad(3, 0.333, 9999.0), + getWriteLoad(3, 0.333, 9999.0, 9999.0), autoShardingEvent ); @@ -306,7 +306,7 @@ public void testCalculateDecreaseShardingRecommendations() { now - TimeValue.timeValueDays(2).getMillis(), now - 1000 ), - getWriteLoad(3, 0.333, 9999.0), + getWriteLoad(3, 0.333, 9999.0, 9999.0), autoShardingEvent ); @@ -346,7 +346,7 @@ public void testCalculateDecreaseShardingRecommendations() { now - TimeValue.timeValueDays(1).getMillis(), now - 1000 ), - getWriteLoad(3, 0.25, 9999.0), + getWriteLoad(3, 0.25, 9999.0, 9999.0), autoShardingEvent ); @@ -386,7 +386,7 @@ public void testCalculateDecreaseShardingRecommendations() { now - TimeValue.timeValueDays(2).getMillis(), now - 1000 ), - getWriteLoad(3, 1.333, 9999.0), + getWriteLoad(3, 1.333, 9999.0, 9999.0), autoShardingEvent ); @@ -479,7 +479,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod() { IndexMetadata indexMetadata = createIndexMetadata( DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size(), creationDate), 1, - getWriteLoad(1, 999.0, 9999.0), + getWriteLoad(1, 999.0, 9999.0, 9999.0), creationDate ); @@ -487,7 +487,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod() { indexMetadata = createIndexMetadata( DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size(), creationDate), 1, - getWriteLoad(1, 1.0, 9999.0), + getWriteLoad(1, 1.0, 9999.0, 9999.0), creationDate ); } @@ -502,14 +502,14 @@ public void testGetMaxIndexLoadWithinCoolingPeriod() { indexMetadata = createIndexMetadata( DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size(), createdAt), 3, - getWriteLoad(3, 5.0, 9999.0), // max write index within cooling period + getWriteLoad(3, 5.0, 9999.0, 9999.0), // max write index within cooling period createdAt ); } else { indexMetadata = createIndexMetadata( DataStream.getDefaultBackingIndexName(dataStreamName, backingIndices.size(), createdAt), 3, - getWriteLoad(3, 3.0, 9999.0), // each backing index has a write load of 3.0 + getWriteLoad(3, 3.0, 9999.0, 9999.0), // each backing index has a write load of 3.0 createdAt ); } @@ -521,7 +521,7 @@ public void testGetMaxIndexLoadWithinCoolingPeriod() { final IndexMetadata writeIndexMetadata = createIndexMetadata( writeIndexName, 3, - getWriteLoad(3, 1.0, 9999.0), + getWriteLoad(3, 1.0, 9999.0, 9999.0), System.currentTimeMillis() ); backingIndices.add(writeIndexMetadata.getIndex()); @@ -563,9 +563,9 @@ public void testIndexLoadWithinCoolingPeriodIsSumOfShardsLoads() { IndexWriteLoad.Builder builder = IndexWriteLoad.builder(3); for (int shardId = 0; shardId < 3; shardId++) { switch (shardId) { - case 0 -> builder.withShardWriteLoad(shardId, 0.5, 9999.0, 40); - case 1 -> builder.withShardWriteLoad(shardId, 3.0, 9999.0, 10); - case 2 -> builder.withShardWriteLoad(shardId, 0.3333, 9999.0, 150); + case 0 -> builder.withShardWriteLoad(shardId, 0.5, 9999.0, 9999.0, 40); + case 1 -> builder.withShardWriteLoad(shardId, 3.0, 9999.0, 9999.0, 10); + case 2 -> builder.withShardWriteLoad(shardId, 0.3333, 9999.0, 9999.0, 150); } } indexMetadata = createIndexMetadata( @@ -582,7 +582,7 @@ public void testIndexLoadWithinCoolingPeriodIsSumOfShardsLoads() { final IndexMetadata writeIndexMetadata = createIndexMetadata( writeIndexName, 3, - getWriteLoad(3, 0.1, 9999.0), + getWriteLoad(3, 0.1, 9999.0, 9999.0), System.currentTimeMillis() ); backingIndices.add(writeIndexMetadata.getIndex()); @@ -704,10 +704,10 @@ private IndexMetadata createIndexMetadata( .build(); } - private IndexWriteLoad getWriteLoad(int numberOfShards, double shardWriteLoad, double shardRecentWriteLoad) { + private IndexWriteLoad getWriteLoad(int numberOfShards, double shardWriteLoad, double shardRecentWriteLoad, double shardPeakWriteLoad) { IndexWriteLoad.Builder builder = IndexWriteLoad.builder(numberOfShards); for (int shardId = 0; shardId < numberOfShards; shardId++) { - builder.withShardWriteLoad(shardId, shardWriteLoad, shardRecentWriteLoad, 1); + builder.withShardWriteLoad(shardId, shardWriteLoad, shardRecentWriteLoad, shardPeakWriteLoad, 1); } return builder.build(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java index 5a819419cd93d..889f08b4c1587 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateTests.java @@ -1070,7 +1070,8 @@ public void testToXContent() throws IOException { "write_load": { "loads": [-1.0], "uptimes": [-1], - "recent_loads": [-1.0] + "recent_loads": [-1.0], + "peak_loads": [-1.0] }, "avg_size": { "total_size_in_bytes": 120, @@ -1347,6 +1348,9 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti ], "recent_loads" : [ -1.0 + ], + "peak_loads" : [ + -1.0 ] }, "avg_size" : { @@ -1630,6 +1634,9 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti ], "recent_loads" : [ -1.0 + ], + "peak_loads" : [ + -1.0 ] }, "avg_size" : { diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 84841d0caea07..c93c9a93b9e50 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -2248,6 +2248,7 @@ private IndexWriteLoad randomIndexWriteLoad(int numberOfShards) { shardId, randomDoubleBetween(0, 64, true), randomDoubleBetween(0, 64, true), + randomDoubleBetween(0, 64, true), randomLongBetween(1, 10) ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsSerializationTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsSerializationTests.java index 148063c8a00b8..e188b99a33c20 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsSerializationTests.java @@ -36,6 +36,7 @@ protected IndexMetadataStats createTestInstance() { i, randomDoubleBetween(1, 10, true), randomDoubleBetween(1, 10, true), + randomLongBetween(1, 1000), randomLongBetween(1, 1000) ); } @@ -61,10 +62,13 @@ protected IndexMetadataStats mutateInstance(IndexMetadataStats originalStats) { double recentShardLoad = existingShard && randomBoolean() ? originalWriteLoad.getRecentWriteLoadForShard(i).getAsDouble() : randomDoubleBetween(0, 128, true); + double peakShardLoad = existingShard && randomBoolean() + ? originalWriteLoad.getPeakWriteLoadForShard(i).getAsDouble() + : randomDoubleBetween(0, 128, true); long uptimeInMillis = existingShard && randomBoolean() ? originalWriteLoad.getUptimeInMillisForShard(i).getAsLong() : randomNonNegativeLong(); - indexWriteLoad.withShardWriteLoad(i, shardLoad, recentShardLoad, uptimeInMillis); + indexWriteLoad.withShardWriteLoad(i, shardLoad, recentShardLoad, peakShardLoad, uptimeInMillis); } return new IndexMetadataStats(indexWriteLoad.build(), randomLongBetween(1024, 10240), randomIntBetween(1, 4)); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsTests.java index 55b620580be3f..257a09416d749 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataStatsTests.java @@ -127,6 +127,7 @@ private ShardStats createShardStats( 0, totalIndexingTimeSinceShardStartedInNanos, totalActiveTimeInNanos, + 0.0, 0.0 ) ); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java index 190cd61a3a8e4..3dde53956bb2a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexMetadataTests.java @@ -739,6 +739,7 @@ private IndexMetadataStats randomIndexStats(int numberOfShards) { i, randomDoubleBetween(0.0, 128.0, true), randomDoubleBetween(0.0, 128.0, true), + randomDoubleBetween(0.0, 128.0, true), randomNonNegativeLong() ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexWriteLoadTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexWriteLoadTests.java index e55da0d65b950..7dc85a0ff2d48 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/IndexWriteLoadTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/IndexWriteLoadTests.java @@ -17,9 +17,9 @@ import java.io.IOException; import java.util.OptionalDouble; import java.util.OptionalLong; +import java.util.Set; import static java.util.Collections.emptySet; -import static java.util.Collections.singleton; import static org.hamcrest.Matchers.equalTo; public class IndexWriteLoadTests extends ESTestCase { @@ -31,15 +31,18 @@ public void testGetWriteLoadForShardAndGetUptimeInMillisForShard() { final double[] populatedShardWriteLoads = new double[numberOfPopulatedShards]; final double[] populatedShardRecentWriteLoads = new double[numberOfPopulatedShards]; + final double[] populatedShardPeakWriteLoads = new double[numberOfPopulatedShards]; final long[] populatedShardUptimes = new long[numberOfPopulatedShards]; for (int shardId = 0; shardId < numberOfPopulatedShards; shardId++) { double writeLoad = randomDoubleBetween(1, 128, true); double recentWriteLoad = randomDoubleBetween(1, 128, true); + double peakWriteLoad = randomDoubleBetween(1, 128, true); long uptimeInMillis = randomNonNegativeLong(); populatedShardWriteLoads[shardId] = writeLoad; populatedShardRecentWriteLoads[shardId] = recentWriteLoad; + populatedShardPeakWriteLoads[shardId] = peakWriteLoad; populatedShardUptimes[shardId] = uptimeInMillis; - indexWriteLoadBuilder.withShardWriteLoad(shardId, writeLoad, recentWriteLoad, uptimeInMillis); + indexWriteLoadBuilder.withShardWriteLoad(shardId, writeLoad, recentWriteLoad, peakWriteLoad, uptimeInMillis); } final IndexWriteLoad indexWriteLoad = indexWriteLoadBuilder.build(); @@ -50,6 +53,10 @@ public void testGetWriteLoadForShardAndGetUptimeInMillisForShard() { indexWriteLoad.getRecentWriteLoadForShard(shardId), equalTo(OptionalDouble.of(populatedShardRecentWriteLoads[shardId])) ); + assertThat( + indexWriteLoad.getPeakWriteLoadForShard(shardId), + equalTo(OptionalDouble.of(populatedShardPeakWriteLoads[shardId])) + ); assertThat(indexWriteLoad.getUptimeInMillisForShard(shardId), equalTo(OptionalLong.of(populatedShardUptimes[shardId]))); } else { assertThat(indexWriteLoad.getWriteLoadForShard(shardId), equalTo(OptionalDouble.empty())); @@ -66,13 +73,16 @@ public void testXContent_roundTrips() throws IOException { assertThat(roundTripped, equalTo(original)); } - public void testXContent_missingRecentWriteLoad() throws IOException { + public void testXContent_missingRecentWriteAndPeakLoad() throws IOException { // Simulate a JSON serialization from before we added recent write load: IndexWriteLoad original = randomIndexWriteLoad(); XContentBuilder builder = XContentBuilder.builder( XContentType.JSON, emptySet(), - singleton(IndexWriteLoad.SHARDS_RECENT_WRITE_LOAD_FIELD.getPreferredName()) + Set.of( + IndexWriteLoad.SHARDS_RECENT_WRITE_LOAD_FIELD.getPreferredName(), + IndexWriteLoad.SHARDS_PEAK_WRITE_LOAD_FIELD.getPreferredName() + ) ).startObject().value(original).endObject(); // Deserialize, and assert that it matches the original except the recent write loads are all missing: IndexWriteLoad roundTripped = IndexWriteLoad.fromXContent(createParser(builder)); @@ -80,6 +90,7 @@ public void testXContent_missingRecentWriteLoad() throws IOException { assertThat(roundTripped.getUptimeInMillisForShard(i), equalTo(original.getUptimeInMillisForShard(i))); assertThat(roundTripped.getWriteLoadForShard(i), equalTo(original.getWriteLoadForShard(i))); assertThat(roundTripped.getRecentWriteLoadForShard(i), equalTo(OptionalDouble.empty())); + assertThat(roundTripped.getPeakWriteLoadForShard(i), equalTo(OptionalDouble.empty())); } } @@ -92,6 +103,7 @@ private static IndexWriteLoad randomIndexWriteLoad() { shardId, randomDoubleBetween(1, 128, true), randomDoubleBetween(1, 128, true), + randomDoubleBetween(1, 128, true), randomNonNegativeLong() ); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index e79227c53a7c5..9e203f179e6fc 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -185,6 +185,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; @@ -5189,7 +5190,7 @@ public void testShardExposesWriteLoadStats() throws Exception { new IndexingOperationListener() { @Override public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { - fakeClock.advance(); + fakeClock.advance(1); return IndexingOperationListener.super.preIndex(shardId, operation); } } @@ -5197,7 +5198,7 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { // Now simulate that each operation takes 1 minute to complete. // This applies both for replaying translog ops and new indexing ops - fakeClock.setSimulatedElapsedRelativeTime(TimeValue.timeValueMinutes(1)); + fakeClock.setTickLength(TimeValue.timeValueMinutes(1)); final CyclicBarrier barrier = new CyclicBarrier(2); final CountDownLatch concurrentIndexingFinished = new CountDownLatch(1); @@ -5272,12 +5273,14 @@ public void indexTranslogOperations( }, true, true); recoveryFinishedLatch.await(); - fakeClock.setSimulatedElapsedRelativeTime(TimeValue.ZERO); + fakeClock.setTickLength(TimeValue.ZERO); final IndexingStats indexingStatsBeforeIndexingDocs = replicaShard.indexingStats(); assertThat(indexingStatsBeforeIndexingDocs.getTotal().getWriteLoad(), is(equalTo(0.0))); + assertThat(indexingStatsBeforeIndexingDocs.getTotal().getRecentWriteLoad(), is(equalTo(0.0))); + assertThat(indexingStatsBeforeIndexingDocs.getTotal().getPeakWriteLoad(), is(equalTo(0.0))); // Now simulate that each operation takes 1 second to complete. - fakeClock.setSimulatedElapsedRelativeTime(TimeValue.timeValueSeconds(1)); + fakeClock.setTickLength(TimeValue.timeValueSeconds(1)); final int numberOfDocs = randomIntBetween(5, 10); for (int i = 0; i < numberOfDocs; i++) { long seqNo = replicaShard.seqNoStats().getMaxSeqNo() + 1; @@ -5291,28 +5294,118 @@ public void indexTranslogOperations( ); } - fakeClock.setSimulatedElapsedRelativeTime(TimeValue.ZERO); + fakeClock.setTickLength(TimeValue.ZERO); final IndexingStats indexingStatsAfterIndexingDocs = replicaShard.indexingStats(); + // We advanced the clock only during the index operation, so all elapsed time was spent indexing, and the write load is exactly 1.0: assertThat(indexingStatsAfterIndexingDocs.getTotal().getWriteLoad(), is(equalTo(1.0))); + // The EWMR should give approximately the same value, but with only a few increments there will be some difference: + double recentWriteLoad = indexingStatsAfterIndexingDocs.getTotal().getRecentWriteLoad(); + assertThat(recentWriteLoad, is(closeTo(1.0, 0.002))); + // This will also be the peak value: + assertThat(indexingStatsAfterIndexingDocs.getTotal().getPeakWriteLoad(), equalTo(recentWriteLoad)); closeShards(primary, replicaShard); } + public void testShardExposesWriteLoadStats_variableRates() throws IOException { + long recentLoadHalfLifeNanos = IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT.nanos(); + long clockTickNanos = recentLoadHalfLifeNanos / 100; // Take exactly 1000 ticks to make one half life + FakeClock fakeClock = new FakeClock(); + fakeClock.setTickLength(TimeValue.timeValueNanos(clockTickNanos)); + + ShardId shardId = new ShardId("index", "_na_", 0); + NodeEnvironment.DataPath dataPath = new NodeEnvironment.DataPath(createTempDir()); + IndexShard shard = newShard( + shardRoutingBuilder(shardId, randomAlphaOfLength(10), true, ShardRoutingState.INITIALIZING).withRecoverySource( + RecoverySource.EmptyStoreRecoverySource.INSTANCE + ).build(), + new ShardPath(false, dataPath.resolve(shardId), dataPath.resolve(shardId), shardId), + IndexMetadata.builder("index") + .settings(indexSettings(1, 0).put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()).build()) + .primaryTerm(0, primaryTerm) + .build(), + null, + null, + new InternalEngineFactory(), + NOOP_GCP_SYNCER, + RetentionLeaseSyncer.EMPTY, + EMPTY_EVENT_LISTENER, + fakeClock, + // Use a listener to advance the fake clock once per indexing operation: + new IndexingOperationListener() { + @Override + public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { + fakeClock.advance(1); + return IndexingOperationListener.super.preIndex(shardId, operation); + } + } + ); + recoverShardFromStore(shard); + + // PHASE ONE: Do 450 index operations. Advance clock one tick during each operation and one tick between each operation. + // Elapsed time in phase: 900 ticks. + // Load during phase: 0.5. + for (int i = 0; i < 450; i++) { + indexDoc(shard, "_doc", "phase-1-" + i); + fakeClock.advance(1); + } + IndexingStats stats1 = shard.indexingStats(); + // We have had a consistent load of 0.5 so far, so all load stats should give this, although the EWMR will only be approximate. + assertThat(stats1.getTotal().getWriteLoad(), equalTo(0.5)); + double recentWriteLoad1 = stats1.getTotal().getRecentWriteLoad(); + assertThat(recentWriteLoad1, closeTo(0.5, 0.002)); + // The peak should be equal to this: + assertThat(stats1.getTotal().getPeakWriteLoad(), equalTo(recentWriteLoad1)); + + // PHASE TWO: Do 25 operations. Advance clock one tick during each operation and three ticks between each operation. + // Elapsed time in phase: 100 ticks. + // Load during phase: 0.25. + for (int i = 0; i < 25; i++) { + indexDoc(shard, "_doc", "phase-2-" + i); + fakeClock.advance(3); + } + IndexingStats stats2 = shard.indexingStats(); + // We had a load of 0.5 for 900 ticks and 0.25 for 100 ticks, so the all-time load is 0.5 * 0.9 + 0.25 * 0.1 = 0.475: + assertThat(stats2.getTotal().getWriteLoad(), equalTo(0.475)); + // That is 0.5 for 9 half-lives (a long time) and 0.25 for one half-life, so the EWMR is halfway between the two, 0.375 + assertThat(stats2.getTotal().getRecentWriteLoad(), closeTo(0.375, 0.002)); + // The new EWMR is lower than the previous one, so the peak should be equal to the previous peak: + assertThat(stats2.getTotal().getPeakWriteLoad(), equalTo(recentWriteLoad1)); + + // PHASE THREE: Do 1000 operations. Advance clock one tick during each operation only. + // Elapsed time in phase: 10-0 ticks. + // Load during phase: 1.0. + for (int i = 0; i < 1000; i++) { + indexDoc(shard, "_doc", "phase-3-" + i); + } + IndexingStats stats3 = shard.indexingStats(); + // We had a load of 0.5 for 900 ticks, 0.25 for 100 ticks, and 1.0 for 1000 ticks... + // ...so the all-time load is 0.5 * 0.45 + 0.25 * 0.05 + 1.0 * 0.5 = 0.7375: + assertThat(stats3.getTotal().getWriteLoad(), equalTo(0.7375)); + // That load of 1.0 lasted 10 half-lives (a long time), to the EWMR is approximately 1.0: + double recentWriteLoad3 = stats3.getTotal().getRecentWriteLoad(); + assertThat(recentWriteLoad3, closeTo(1.0, 0.003)); + // This EWMR is the highest we've seen so far, so should be the new peak: + assertThat(stats3.getTotal().getPeakWriteLoad(), equalTo(recentWriteLoad3)); + + closeShards(shard); + } + static class FakeClock implements LongSupplier { private final AtomicLong currentRelativeTime = new AtomicLong(); - private volatile TimeValue simulatedElapsedRelativeTime = TimeValue.ZERO; + private volatile TimeValue tickLength = TimeValue.ZERO; @Override public long getAsLong() { return currentRelativeTime.get(); } - void setSimulatedElapsedRelativeTime(TimeValue simulatedElapsedRelativeTime) { - this.simulatedElapsedRelativeTime = simulatedElapsedRelativeTime; + void setTickLength(TimeValue tickLength) { + this.tickLength = tickLength; } - public void advance() { - currentRelativeTime.addAndGet(simulatedElapsedRelativeTime.nanos()); + public void advance(int ticks) { + currentRelativeTime.addAndGet(ticks * tickLength.nanos()); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexingStatsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexingStatsTests.java index aa267ba6a16b9..fd6b6bc162300 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexingStatsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexingStatsTests.java @@ -34,7 +34,8 @@ public void testStatsGetWriteLoad() { 10, 1_800_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 1.8sec 3_000_000_000L, // totalActiveTimeInNanos - 3sec - 0.1357 + 0.1357, + 0.2468 ); double expectedWriteLoad = 0.6; // 1.8sec / 3sec assertThat(stats.getWriteLoad(), closeTo(expectedWriteLoad, DOUBLE_TOLERANCE)); @@ -55,7 +56,8 @@ public void testStatsAdd_indexCount() { 10, 11, 12, - 0.1357 + 0.1357, + 0.2468 ); IndexingStats.Stats stats2 = new IndexingStats.Stats( 2001L, // indexCount @@ -71,15 +73,16 @@ public void testStatsAdd_indexCount() { 10, 11, 12, - 0.1357 + 0.1357, + 0.2468 ); IndexingStats.Stats statsAgg = sumOfStats(stats1, stats2); assertThat(statsAgg.getIndexCount(), equalTo(1001L + 2001L)); } public void testStatsAdd_throttled() { - IndexingStats.Stats statsFalse = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, 9, false, 10, 11, 12, 0.1357); - IndexingStats.Stats statsTrue = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, 9, true, 10, 11, 12, 0.1357); + IndexingStats.Stats statsFalse = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, 9, false, 10, 11, 12, 0.1357, 0.2468); + IndexingStats.Stats statsTrue = new IndexingStats.Stats(1, 2, 3, 4, 5, 6, 7, 8, 9, true, 10, 11, 12, 0.1357, 0.2468); assertThat(sumOfStats(statsFalse, statsFalse).isThrottled(), is(false)); assertThat(sumOfStats(statsFalse, statsTrue).isThrottled(), is(true)); assertThat(sumOfStats(statsTrue, statsFalse).isThrottled(), is(true)); @@ -101,7 +104,8 @@ public void testStatsAdd_writeLoads() { 10, 1_000_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 1sec 2_000_000_000L, // totalActiveTimeInNanos - 2sec - 0.1357 // recentWriteLoad + 0.1357, // recentWriteLoad + 0.3579 // peakWriteLoad ); IndexingStats.Stats stats2 = new IndexingStats.Stats( 2, @@ -117,16 +121,19 @@ public void testStatsAdd_writeLoads() { 10, 2_100_000_000L, // totalIndexingTimeSinceShardStartedInNanos - 2.1sec 3_000_000_000L, // totalActiveTimeInNanos - 3sec - 0.2468 // recentWriteLoad + 0.2468, // recentWriteLoad + 0.5791 // peakWriteLoad ); IndexingStats.Stats statsAgg = sumOfStats(stats1, stats2); // The unweighted write loads for the two shards are 0.5 (1sec / 2sec) and 0.7 (2.1sec / 3sec) respectively. // The aggregated value should be the average weighted by the times, i.e. by 2sec and 3sec, giving weights of 0.4 and 0.6. double expectedWriteLoad = 0.4 * 0.5 + 0.6 * 0.7; - // The aggregated value for the recent write load should be the average with the same weights. + // The aggregated value for the recent and peak write loads should be the average with the same weights. double expectedRecentWriteLoad = 0.4 * 0.1357 + 0.6 * 0.2468; + double expectedPeakWriteLoad = 0.4 * 0.3579 + 0.6 * 0.5791; assertThat(statsAgg.getWriteLoad(), closeTo(expectedWriteLoad, DOUBLE_TOLERANCE)); assertThat(statsAgg.getRecentWriteLoad(), closeTo(expectedRecentWriteLoad, DOUBLE_TOLERANCE)); + assertThat(statsAgg.getPeakWriteLoad(), closeTo(expectedPeakWriteLoad, DOUBLE_TOLERANCE)); } private static IndexingStats.Stats sumOfStats(IndexingStats.Stats stats1, IndexingStats.Stats stats2) { diff --git a/server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java index 730197b0450cc..ef1aeabde8c3f 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/cat/RestShardsActionTests.java @@ -153,6 +153,7 @@ private void mockShardStats(boolean includeCommonStats) { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomDoubleBetween(0.0, 1.0, true), randomDoubleBetween(0.0, 1.0, true) ) ) diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java index 415af89d9835a..185539e6036c7 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java @@ -403,6 +403,7 @@ private static CommonStats mockCommonStats() { ++iota, no, no, + no, no ); commonStats.getIndexing().add(new IndexingStats(indexingStats)); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java index dc75f6dd9d554..3d590bbcf674f 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java @@ -183,7 +183,7 @@ private CommonStats mockCommonStats() { commonStats.getDocs().add(new DocsStats(1L, 0L, randomNonNegativeLong() >> 8)); // >> 8 to avoid overflow - we add these things up commonStats.getStore().add(new StoreStats(2L, 0L, 0L)); - final IndexingStats.Stats indexingStats = new IndexingStats.Stats(3L, 4L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, true, 5L, 0, 0, 0.0); + final IndexingStats.Stats indexingStats = new IndexingStats.Stats(3L, 4L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, true, 5L, 0, 0, 0.0, 0.0); commonStats.getIndexing().add(new IndexingStats(indexingStats)); final SearchStats.Stats searchStats = new SearchStats.Stats(6L, 7L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java index 982680ec65283..d7071aa8d0017 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java @@ -349,6 +349,7 @@ private static NodeStats mockNodeStats() { ++iota, no, no, + no, no ); indicesCommonStats.getIndexing().add(new IndexingStats(indexingStats)); diff --git a/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java b/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java index 24c24f0846d44..d3db529f3cda7 100644 --- a/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java +++ b/x-pack/plugin/write-load-forecaster/src/test/java/org/elasticsearch/xpack/writeloadforecaster/LicensedWriteLoadForecasterTests.java @@ -128,11 +128,11 @@ public void testUptimeIsUsedToWeightWriteLoad() { DataStream.getDefaultBackingIndexName(dataStreamName, 0), numberOfShards, IndexWriteLoad.builder(numberOfShards) - .withShardWriteLoad(0, 12, 999, 80) - .withShardWriteLoad(1, 24, 999, 5) - .withShardWriteLoad(2, 24, 999, 5) - .withShardWriteLoad(3, 24, 999, 5) - .withShardWriteLoad(4, 24, 999, 5) + .withShardWriteLoad(0, 12, 999, 999, 80) + .withShardWriteLoad(1, 24, 999, 999, 5) + .withShardWriteLoad(2, 24, 999, 999, 5) + .withShardWriteLoad(3, 24, 999, 999, 5) + .withShardWriteLoad(4, 24, 999, 999, 5) .build(), System.currentTimeMillis() - (maxIndexAge.millis() / 2) ); @@ -234,7 +234,7 @@ public void testWriteLoadForecast() { { OptionalDouble writeLoadForecast = forecastIndexWriteLoad( - List.of(IndexWriteLoad.builder(1).withShardWriteLoad(0, 12, 999, 100).build()) + List.of(IndexWriteLoad.builder(1).withShardWriteLoad(0, 12, 999, 999, 100).build()) ); assertThat(writeLoadForecast.isPresent(), is(true)); assertThat(writeLoadForecast.getAsDouble(), is(equalTo(12.0))); @@ -244,11 +244,11 @@ public void testWriteLoadForecast() { OptionalDouble writeLoadForecast = forecastIndexWriteLoad( List.of( IndexWriteLoad.builder(5) - .withShardWriteLoad(0, 12, 999, 80) - .withShardWriteLoad(1, 24, 999, 5) - .withShardWriteLoad(2, 24, 999, 5) - .withShardWriteLoad(3, 24, 999, 5) - .withShardWriteLoad(4, 24, 999, 5) + .withShardWriteLoad(0, 12, 999, 999, 80) + .withShardWriteLoad(1, 24, 999, 999, 5) + .withShardWriteLoad(2, 24, 999, 999, 5) + .withShardWriteLoad(3, 24, 999, 999, 5) + .withShardWriteLoad(4, 24, 999, 999, 5) .build() ) ); @@ -260,14 +260,14 @@ public void testWriteLoadForecast() { OptionalDouble writeLoadForecast = forecastIndexWriteLoad( List.of( IndexWriteLoad.builder(5) - .withShardWriteLoad(0, 12, 999, 80) - .withShardWriteLoad(1, 24, 999, 5) - .withShardWriteLoad(2, 24, 999, 5) - .withShardWriteLoad(3, 24, 999, 5) - .withShardWriteLoad(4, 24, 999, 4) + .withShardWriteLoad(0, 12, 999, 999, 80) + .withShardWriteLoad(1, 24, 999, 999, 5) + .withShardWriteLoad(2, 24, 999, 999, 5) + .withShardWriteLoad(3, 24, 999, 999, 5) + .withShardWriteLoad(4, 24, 999, 999, 4) .build(), // Since this shard uptime is really low, it doesn't add much to the avg - IndexWriteLoad.builder(1).withShardWriteLoad(0, 120, 999, 1).build() + IndexWriteLoad.builder(1).withShardWriteLoad(0, 120, 999, 999, 1).build() ) ); assertThat(writeLoadForecast.isPresent(), is(true)); @@ -277,9 +277,9 @@ public void testWriteLoadForecast() { { OptionalDouble writeLoadForecast = forecastIndexWriteLoad( List.of( - IndexWriteLoad.builder(2).withShardWriteLoad(0, 12, 999, 25).withShardWriteLoad(1, 12, 999, 25).build(), + IndexWriteLoad.builder(2).withShardWriteLoad(0, 12, 999, 999, 25).withShardWriteLoad(1, 12, 999, 999, 25).build(), - IndexWriteLoad.builder(1).withShardWriteLoad(0, 12, 999, 50).build() + IndexWriteLoad.builder(1).withShardWriteLoad(0, 12, 999, 999, 50).build() ) ); assertThat(writeLoadForecast.isPresent(), is(true)); @@ -291,14 +291,14 @@ public void testWriteLoadForecast() { OptionalDouble writeLoadForecast = forecastIndexWriteLoad( List.of( IndexWriteLoad.builder(3) - .withShardWriteLoad(0, 25, 999, 1) - .withShardWriteLoad(1, 18, 999, 1) - .withShardWriteLoad(2, 23, 999, 1) + .withShardWriteLoad(0, 25, 999, 999, 1) + .withShardWriteLoad(1, 18, 999, 999, 1) + .withShardWriteLoad(2, 23, 999, 999, 1) .build(), - IndexWriteLoad.builder(2).withShardWriteLoad(0, 6, 999, 1).withShardWriteLoad(1, 8, 999, 1).build(), + IndexWriteLoad.builder(2).withShardWriteLoad(0, 6, 999, 999, 1).withShardWriteLoad(1, 8, 999, 999, 1).build(), - IndexWriteLoad.builder(1).withShardWriteLoad(0, 15, 999, 1).build() + IndexWriteLoad.builder(1).withShardWriteLoad(0, 15, 999, 999, 1).build() ) ); assertThat(writeLoadForecast.isPresent(), is(true)); @@ -313,6 +313,7 @@ private IndexWriteLoad randomIndexWriteLoad(int numberOfShards) { shardId, randomDoubleBetween(0, 64, true), randomDoubleBetween(0, 64, true), + randomDoubleBetween(0, 64, true), randomLongBetween(1, 10) ); }