Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING = def(8_839_0_00);
public static final TransportVersion ML_INFERENCE_IBM_WATSONX_RERANK_ADDED = def(8_840_0_00);
public static final TransportVersion ELASTICSEARCH_9_0 = def(9_000_0_00);
public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED = def(9_002_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

Expand Down Expand Up @@ -70,18 +69,16 @@ public TransportStats(StreamInput in) throws IOException {
rxSize = in.readVLong();
txCount = in.readVLong();
txSize = in.readVLong();
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_1_0) && in.readBoolean()) {
inboundHandlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
for (int i = 0; i < inboundHandlingTimeBucketFrequencies.length; i++) {
inboundHandlingTimeBucketFrequencies[i] = in.readVLong();
}
outboundHandlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
for (int i = 0; i < inboundHandlingTimeBucketFrequencies.length; i++) {
outboundHandlingTimeBucketFrequencies[i] = in.readVLong();
}
} else {
inboundHandlingTimeBucketFrequencies = new long[0];
outboundHandlingTimeBucketFrequencies = new long[0];
if (in.getTransportVersion().before(TransportVersions.TRANSPORT_STATS_HANDLING_TIME_REQUIRED)) {
in.readBoolean();
}
inboundHandlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
for (int i = 0; i < inboundHandlingTimeBucketFrequencies.length; i++) {
inboundHandlingTimeBucketFrequencies[i] = in.readVLong();
}
outboundHandlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
for (int i = 0; i < inboundHandlingTimeBucketFrequencies.length; i++) {
outboundHandlingTimeBucketFrequencies[i] = in.readVLong();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
transportActionStats = Collections.unmodifiableMap(in.readOrderedMap(StreamInput::readString, TransportActionStats::new));
Expand All @@ -99,15 +96,16 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(rxSize);
out.writeVLong(txCount);
out.writeVLong(txSize);
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_1_0)) {
assert (inboundHandlingTimeBucketFrequencies.length > 0) == (outboundHandlingTimeBucketFrequencies.length > 0);
out.writeBoolean(inboundHandlingTimeBucketFrequencies.length > 0);
for (long handlingTimeBucketFrequency : inboundHandlingTimeBucketFrequencies) {
out.writeVLong(handlingTimeBucketFrequency);
}
for (long handlingTimeBucketFrequency : outboundHandlingTimeBucketFrequencies) {
out.writeVLong(handlingTimeBucketFrequency);
}
assert inboundHandlingTimeBucketFrequencies.length == HandlingTimeTracker.BUCKET_COUNT;
assert outboundHandlingTimeBucketFrequencies.length == HandlingTimeTracker.BUCKET_COUNT;
if (out.getTransportVersion().before(TransportVersions.TRANSPORT_STATS_HANDLING_TIME_REQUIRED)) {
out.writeBoolean(true);
}
for (long handlingTimeBucketFrequency : inboundHandlingTimeBucketFrequencies) {
out.writeVLong(handlingTimeBucketFrequency);
}
for (long handlingTimeBucketFrequency : outboundHandlingTimeBucketFrequencies) {
out.writeVLong(handlingTimeBucketFrequency);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_8_0)) {
out.writeMap(transportActionStats, StreamOutput::writeWriteable);
Expand Down Expand Up @@ -166,24 +164,13 @@ public Map<String, TransportActionStats> getTransportActionStats() {
return transportActionStats;
}

@UpdateForV9(owner = UpdateForV9.Owner.DISTRIBUTED_COORDINATION)
// Review and simplify the if-else blocks containing this symbol once v9 is released
private static final boolean IMPOSSIBLE_IN_V9 = true;

private boolean assertHistogramsConsistent() {
assert inboundHandlingTimeBucketFrequencies.length == outboundHandlingTimeBucketFrequencies.length;
if (inboundHandlingTimeBucketFrequencies.length == 0) {
// Stats came from before v8.1
assert IMPOSSIBLE_IN_V9;
} else {
assert inboundHandlingTimeBucketFrequencies.length == HandlingTimeTracker.BUCKET_COUNT;
}
assert inboundHandlingTimeBucketFrequencies.length == HandlingTimeTracker.BUCKET_COUNT;
return true;
}

@Override
@UpdateForV9(owner = UpdateForV9.Owner.DISTRIBUTED_COORDINATION)
// review the "if" blocks checking for non-empty once we have
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
return Iterators.concat(Iterators.single((builder, params) -> {
builder.startObject(Fields.TRANSPORT);
Expand All @@ -193,19 +180,10 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
builder.humanReadableField(Fields.RX_SIZE_IN_BYTES, Fields.RX_SIZE, ByteSizeValue.ofBytes(rxSize));
builder.field(Fields.TX_COUNT, txCount);
builder.humanReadableField(Fields.TX_SIZE_IN_BYTES, Fields.TX_SIZE, ByteSizeValue.ofBytes(txSize));
if (inboundHandlingTimeBucketFrequencies.length > 0) {
histogramToXContent(builder, inboundHandlingTimeBucketFrequencies, Fields.INBOUND_HANDLING_TIME_HISTOGRAM);
histogramToXContent(builder, outboundHandlingTimeBucketFrequencies, Fields.OUTBOUND_HANDLING_TIME_HISTOGRAM);
} else {
// Stats came from before v8.1
assert IMPOSSIBLE_IN_V9;
}
if (transportActionStats.isEmpty() == false) {
builder.startObject(Fields.ACTIONS);
} else {
// Stats came from before v8.8
assert IMPOSSIBLE_IN_V9;
}
assert inboundHandlingTimeBucketFrequencies.length > 0;
histogramToXContent(builder, inboundHandlingTimeBucketFrequencies, Fields.INBOUND_HANDLING_TIME_HISTOGRAM);
histogramToXContent(builder, outboundHandlingTimeBucketFrequencies, Fields.OUTBOUND_HANDLING_TIME_HISTOGRAM);
builder.startObject(Fields.ACTIONS);
return builder;
}),

Expand All @@ -215,12 +193,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
return builder;
}),

Iterators.single((builder, params) -> {
if (transportActionStats.isEmpty() == false) {
builder.endObject();
}
return builder.endObject();
})
Iterators.single((builder, params) -> { return builder.endObject().endObject(); })
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,50 +20,8 @@

public class TransportStatsTests extends ESTestCase {
public void testToXContent() {
assertEquals(
Strings.toString(
new TransportStats(1, 2, 3, ByteSizeUnit.MB.toBytes(4), 5, ByteSizeUnit.MB.toBytes(6), new long[0], new long[0], Map.of()),
false,
true
),
"""
{"transport":{"server_open":1,"total_outbound_connections":2,\
"rx_count":3,"rx_size":"4mb","rx_size_in_bytes":4194304,\
"tx_count":5,"tx_size":"6mb","tx_size_in_bytes":6291456\
}}"""
);

final var histogram = new long[HandlingTimeTracker.BUCKET_COUNT];
assertEquals(
Strings.toString(
new TransportStats(1, 2, 3, ByteSizeUnit.MB.toBytes(4), 5, ByteSizeUnit.MB.toBytes(6), histogram, histogram, Map.of()),
false,
true
),
"""
{"transport":{"server_open":1,"total_outbound_connections":2,\
"rx_count":3,"rx_size":"4mb","rx_size_in_bytes":4194304,\
"tx_count":5,"tx_size":"6mb","tx_size_in_bytes":6291456,\
"inbound_handling_time_histogram":[],\
"outbound_handling_time_histogram":[]\
}}"""
);

histogram[4] = 10;
assertEquals(
Strings.toString(
new TransportStats(1, 2, 3, ByteSizeUnit.MB.toBytes(4), 5, ByteSizeUnit.MB.toBytes(6), histogram, histogram, Map.of()),
false,
true
),
"""
{"transport":{"server_open":1,"total_outbound_connections":2,\
"rx_count":3,"rx_size":"4mb","rx_size_in_bytes":4194304,\
"tx_count":5,"tx_size":"6mb","tx_size_in_bytes":6291456,\
"inbound_handling_time_histogram":[{"ge":"8ms","ge_millis":8,"lt":"16ms","lt_millis":16,"count":10}],\
"outbound_handling_time_histogram":[{"ge":"8ms","ge_millis":8,"lt":"16ms","lt_millis":16,"count":10}]\
}}"""
);

final var requestSizeHistogram = new long[29];
requestSizeHistogram[2] = 9;
Expand All @@ -84,8 +42,8 @@ public void testToXContent() {
ByteSizeUnit.MB.toBytes(4),
5,
ByteSizeUnit.MB.toBytes(6),
new long[0],
new long[0],
histogram,
histogram,
Map.of("internal:test/action", exampleActionStats)
),
false,
Expand All @@ -95,6 +53,8 @@ public void testToXContent() {
{"transport":{"server_open":1,"total_outbound_connections":2,\
"rx_count":3,"rx_size":"4mb","rx_size_in_bytes":4194304,\
"tx_count":5,"tx_size":"6mb","tx_size_in_bytes":6291456,\
"inbound_handling_time_histogram":[{"ge":"8ms","ge_millis":8,"lt":"16ms","lt_millis":16,"count":10}],\
"outbound_handling_time_histogram":[{"ge":"8ms","ge_millis":8,"lt":"16ms","lt_millis":16,"count":10}],\
"actions":{"internal:test/action":%s}}}""", Strings.toString(exampleActionStats, false, true))
);
}
Expand Down