Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex,
CommonStats stats = new CommonStats();
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes());
stats.store = new StoreStats();
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123));
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123, 0.234));
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_REPORT_ORIGINAL_TYPES = def(9_038_00_0);
public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO = def(9_039_0_00);
public static final TransportVersion PROJECT_ID_IN_SNAPSHOT = def(9_040_0_00);
public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_041_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public static IndexMetadataStats fromStatsResponse(IndexMetadata indexMetadata,
shardStats.getShardRouting().id(),
indexingShardStats.getWriteLoad(),
indexingShardStats.getRecentWriteLoad(),
indexingShardStats.getPeakWriteLoad(),
indexingShardStats.getTotalActiveTimeInMillis()
);
totalSizeInBytes += commonStats.getDocs().getTotalSizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -26,31 +25,42 @@
import java.util.OptionalDouble;
import java.util.OptionalLong;

import static org.elasticsearch.TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD;
import static org.elasticsearch.TransportVersions.INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD;

public class IndexWriteLoad implements Writeable, ToXContentFragment {
public static final ParseField SHARDS_WRITE_LOAD_FIELD = new ParseField("loads");
public static final ParseField SHARDS_UPTIME_IN_MILLIS = new ParseField("uptimes");
public static final ParseField SHARDS_RECENT_WRITE_LOAD_FIELD = new ParseField("recent_loads");
public static final ParseField SHARDS_PEAK_WRITE_LOAD_FIELD = new ParseField("peak_loads");
private static final Double UNKNOWN_LOAD = -1.0;
private static final long UNKNOWN_UPTIME = -1;

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<IndexWriteLoad, Void> PARSER = new ConstructingObjectParser<>(
"index_write_load_parser",
false,
(args, unused) -> IndexWriteLoad.create((List<Double>) args[0], (List<Long>) args[1], (List<Double>) args[2])
(args, unused) -> IndexWriteLoad.create(
(List<Double>) args[0],
(List<Long>) args[1],
(List<Double>) args[2],
(List<Double>) args[3]
)
);

static {
PARSER.declareDoubleArray(ConstructingObjectParser.constructorArg(), SHARDS_WRITE_LOAD_FIELD);
PARSER.declareLongArray(ConstructingObjectParser.constructorArg(), SHARDS_UPTIME_IN_MILLIS);
// The recent write load field is optional so that we can parse XContent built by older versions which did not include it:
// The recent and peak write load fields are optional so that we can parse XContent built by older versions which did not have them:
PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_RECENT_WRITE_LOAD_FIELD);
PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_PEAK_WRITE_LOAD_FIELD);
}

private static IndexWriteLoad create(
List<Double> shardsWriteLoad,
List<Long> shardsUptimeInMillis,
@Nullable List<Double> shardsRecentWriteLoad
@Nullable List<Double> shardsRecentWriteLoad,
@Nullable List<Double> shardsPeakWriteLoad
) {
if (shardsWriteLoad.size() != shardsUptimeInMillis.size()) {
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
Expand All @@ -72,8 +82,19 @@ private static IndexWriteLoad create(
if (shardsRecentWriteLoad != null && shardsRecentWriteLoad.size() != shardsUptimeInMillis.size()) {
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardsRecentWriteLoad and shardUptimeInMillis";
throw new IllegalArgumentException(
"The same number of shard write loads and shard uptimes should be provided, but "
+ shardsWriteLoad
"The same number of shard recent write loads and shard uptimes should be provided, but "
+ shardsRecentWriteLoad
+ " "
+ shardsUptimeInMillis
+ " were provided"
);
}

if (shardsPeakWriteLoad != null && shardsPeakWriteLoad.size() != shardsUptimeInMillis.size()) {
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardsPeakWriteLoad and shardUptimeInMillis";
throw new IllegalArgumentException(
"The same number of shard peak write loads and shard uptimes should be provided, but "
+ shardsPeakWriteLoad
+ " "
+ shardsUptimeInMillis
+ " were provided"
Expand All @@ -83,15 +104,22 @@ private static IndexWriteLoad create(
return new IndexWriteLoad(
shardsWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray(),
shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray(),
shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null
shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null,
shardsPeakWriteLoad != null ? shardsPeakWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null
);
}

private final double[] shardWriteLoad;
private final long[] shardUptimeInMillis;
private final double[] shardRecentWriteLoad;
private final double[] shardPeakWriteLoad;

private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nullable double[] shardRecentWriteLoad) {
private IndexWriteLoad(
double[] shardWriteLoad,
long[] shardUptimeInMillis,
@Nullable double[] shardRecentWriteLoad,
@Nullable double[] shardPeakWriteLoad
) {
assert shardWriteLoad.length == shardUptimeInMillis.length
: "IndexWriteLoad constructor was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
this.shardWriteLoad = shardWriteLoad;
Expand All @@ -104,30 +132,43 @@ private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nul
this.shardRecentWriteLoad = new double[shardUptimeInMillis.length];
Arrays.fill(this.shardRecentWriteLoad, UNKNOWN_LOAD);
}
if (shardPeakWriteLoad != null) {
assert shardPeakWriteLoad.length == shardUptimeInMillis.length
: "IndexWriteLoad constructor was called with non-matched lengths for shardPeakWriteLoad and shardUptimeInMillis";
this.shardPeakWriteLoad = shardPeakWriteLoad;
} else {
this.shardPeakWriteLoad = new double[shardUptimeInMillis.length];
Arrays.fill(this.shardPeakWriteLoad, UNKNOWN_LOAD);
}
}

public IndexWriteLoad(StreamInput in) throws IOException {
this(
in.readDoubleArray(),
in.readLongArray(),
in.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null
in.getTransportVersion().onOrAfter(INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null,
in.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD) ? in.readDoubleArray() : null
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeDoubleArray(shardWriteLoad);
out.writeLongArray(shardUptimeInMillis);
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) {
if (out.getTransportVersion().onOrAfter(INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) {
out.writeDoubleArray(shardRecentWriteLoad);
}
if (out.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
out.writeDoubleArray(shardPeakWriteLoad);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(SHARDS_WRITE_LOAD_FIELD.getPreferredName(), shardWriteLoad);
builder.field(SHARDS_UPTIME_IN_MILLIS.getPreferredName(), shardUptimeInMillis);
builder.field(SHARDS_RECENT_WRITE_LOAD_FIELD.getPreferredName(), shardRecentWriteLoad);
builder.field(SHARDS_PEAK_WRITE_LOAD_FIELD.getPreferredName(), shardPeakWriteLoad);
return builder;
}

Expand All @@ -149,6 +190,13 @@ public OptionalDouble getRecentWriteLoadForShard(int shardId) {
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
}

public OptionalDouble getPeakWriteLoadForShard(int shardId) {
assertShardInBounds(shardId);

double load = shardPeakWriteLoad[shardId];
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
}

public OptionalLong getUptimeInMillisForShard(int shardId) {
assertShardInBounds(shardId);

Expand All @@ -172,14 +220,16 @@ public boolean equals(Object o) {
IndexWriteLoad that = (IndexWriteLoad) o;
return Arrays.equals(shardWriteLoad, that.shardWriteLoad)
&& Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis)
&& Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad);
&& Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad)
&& Arrays.equals(shardPeakWriteLoad, that.shardPeakWriteLoad);
}

@Override
public int hashCode() {
int result = Arrays.hashCode(shardWriteLoad);
result = 31 * result + Arrays.hashCode(shardUptimeInMillis);
result = 31 * result + Arrays.hashCode(shardRecentWriteLoad);
result = 31 * result + Arrays.hashCode(shardPeakWriteLoad);
return result;
}

Expand All @@ -192,30 +242,34 @@ public static class Builder {
private final double[] shardWriteLoad;
private final long[] uptimeInMillis;
private final double[] shardRecentWriteLoad;
private final double[] shardPeakWriteLoad;

private Builder(int numShards) {
this.shardWriteLoad = new double[numShards];
this.uptimeInMillis = new long[numShards];
this.shardRecentWriteLoad = new double[numShards];
this.shardPeakWriteLoad = new double[numShards];
Arrays.fill(shardWriteLoad, UNKNOWN_LOAD);
Arrays.fill(uptimeInMillis, UNKNOWN_UPTIME);
Arrays.fill(shardRecentWriteLoad, UNKNOWN_LOAD);
Arrays.fill(shardPeakWriteLoad, UNKNOWN_LOAD);
}

public Builder withShardWriteLoad(int shardId, double load, double recentLoad, long uptimeInMillis) {
public Builder withShardWriteLoad(int shardId, double load, double recentLoad, double peakLoad, long uptimeInMillis) {
if (shardId >= this.shardWriteLoad.length) {
throw new IllegalArgumentException();
}

this.shardWriteLoad[shardId] = load;
this.uptimeInMillis[shardId] = uptimeInMillis;
this.shardRecentWriteLoad[shardId] = recentLoad;
this.shardPeakWriteLoad[shardId] = peakLoad;

return this;
}

public IndexWriteLoad build() {
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad);
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad, shardPeakWriteLoad);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.TransportVersions.INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD;
import static org.elasticsearch.TransportVersions.INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD;

public class IndexingStats implements Writeable, ToXContentFragment {

Expand All @@ -46,6 +47,7 @@ public static class Stats implements Writeable, ToXContentFragment {
private long totalIndexingTimeSinceShardStartedInNanos;
private long totalActiveTimeInNanos;
private double recentIndexingLoad;
private double peakIndexingLoad;

Stats() {}

Expand Down Expand Up @@ -76,6 +78,15 @@ public Stats(StreamInput in) throws IOException {
? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos
: 0;
}
if (in.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
peakIndexingLoad = in.readDouble();
} else {
// When getting stats from an older version which doesn't have the recent indexing load, better to fall back to the
// unweighted write load, rather that assuming zero load:
peakIndexingLoad = totalActiveTimeInNanos > 0
? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos
: 0;
}
}

public Stats(
Expand All @@ -92,7 +103,8 @@ public Stats(
long throttleTimeInMillis,
long totalIndexingTimeSinceShardStartedInNanos,
long totalActiveTimeInNanos,
double recentIndexingLoad
double recentIndexingLoad,
double peakIndexingLoad
) {
this.indexCount = indexCount;
this.indexTimeInMillis = indexTimeInMillis;
Expand All @@ -110,6 +122,7 @@ public Stats(
this.totalActiveTimeInNanos = totalActiveTimeInNanos;
// We store the weighted write load as a double because the calculation is inherently floating point
this.recentIndexingLoad = recentIndexingLoad;
this.peakIndexingLoad = peakIndexingLoad;
}

public void add(Stats stats) {
Expand All @@ -131,11 +144,12 @@ public void add(Stats stats) {
// N.B. getWriteLoad() returns the ratio of these sums, which is the average of the ratios weighted by active time:
totalIndexingTimeSinceShardStartedInNanos += stats.totalIndexingTimeSinceShardStartedInNanos;
totalActiveTimeInNanos += stats.totalActiveTimeInNanos;
// We want getRecentWriteLoad() for the aggregated stats to also be the average weighted by active time, so we use the updating
// formula for a weighted mean:
// We want getRecentWriteLoad() and getPeakWriteLoad() for the aggregated stats to also be the average weighted by active time,
// so we use the updating formula for a weighted mean:
if (totalActiveTimeInNanos > 0) {
recentIndexingLoad += (stats.recentIndexingLoad - recentIndexingLoad) * stats.totalActiveTimeInNanos
/ totalActiveTimeInNanos;
peakIndexingLoad += (stats.peakIndexingLoad - peakIndexingLoad) * stats.totalActiveTimeInNanos / totalActiveTimeInNanos;
}
}

Expand Down Expand Up @@ -239,6 +253,20 @@ public double getRecentWriteLoad() {
return recentIndexingLoad;
}

/**
* Returns a measurement of the peak write load.
*
* <p>If this {@link Stats} instance represents a single shard, this is the highest value that {@link #getRecentWriteLoad()} would
* return for any of the instances created for this shard since it started (i.e. the highest value seen by any call to
* {@link InternalIndexingStats#stats}).
*
* <p>If this {@link Stats} instance represents multiple shards, this is the average of that value for each shard, weighted by
* the elapsed time for each shard. (N.B. This is the average of the peak values, <i>not</i> the peak of the average value.)
*/
public double getPeakWriteLoad() {
return peakIndexingLoad;
}

public long getTotalActiveTimeInMillis() {
return TimeUnit.NANOSECONDS.toMillis(totalActiveTimeInNanos);
}
Expand All @@ -265,6 +293,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD)) {
out.writeDouble(recentIndexingLoad);
}
if (out.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
out.writeDouble(peakIndexingLoad);
}
}

@Override
Expand All @@ -286,6 +317,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

builder.field(Fields.WRITE_LOAD, getWriteLoad());
builder.field(Fields.RECENT_WRITE_LOAD, getRecentWriteLoad());
builder.field(Fields.PEAK_WRITE_LOAD, getPeakWriteLoad());
return builder;
}

Expand All @@ -307,7 +339,8 @@ public boolean equals(Object o) {
&& throttleTimeInMillis == that.throttleTimeInMillis
&& totalIndexingTimeSinceShardStartedInNanos == that.totalIndexingTimeSinceShardStartedInNanos
&& totalActiveTimeInNanos == that.totalActiveTimeInNanos
&& recentIndexingLoad == that.recentIndexingLoad;
&& recentIndexingLoad == that.recentIndexingLoad
&& peakIndexingLoad == that.peakIndexingLoad;
}

@Override
Expand Down Expand Up @@ -408,6 +441,7 @@ static final class Fields {
static final String THROTTLED_TIME = "throttle_time";
static final String WRITE_LOAD = "write_load";
static final String RECENT_WRITE_LOAD = "recent_write_load";
static final String PEAK_WRITE_LOAD = "peak_write_load";
}

@Override
Expand Down
Loading