Skip to content

Commit

Permalink
Deeper chunking of node stats response (#95060)
Browse files Browse the repository at this point in the history
Pushes the chunking of `GET _nodes/stats` down to avoid creating
unboundedly large chunks. With this commit we yield one chunk per shard
(if `?level=shards`) or index (if `?level=indices`) and per HTTP client
and per transport action.

Closes #93985
  • Loading branch information
DaveCTurner committed Apr 6, 2023
1 parent 3ff085c commit c282f50
Show file tree
Hide file tree
Showing 17 changed files with 259 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,5 @@ default boolean isFragment() {
return true;
}

ToXContent EMPTY = (b, p) -> b;
}
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ public void waitForTaskCompletion(Task task) {}
// Need to run the task in a separate thread because node client's .execute() is blocked by our task listener
index = new Thread(() -> {
IndexResponse indexResponse = client().prepareIndex("test").setSource("test", "test").get();
assertArrayEquals(ReplicationResponse.EMPTY, indexResponse.getShardInfo().getFailures());
assertArrayEquals(ReplicationResponse.NO_FAILURES, indexResponse.getShardInfo().getFailures());
});
index.start();
assertTrue(taskRegistered.await(10, TimeUnit.SECONDS)); // waiting for at least one task to be registered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.http.HttpStats;
Expand All @@ -29,16 +31,17 @@
import org.elasticsearch.script.ScriptStats;
import org.elasticsearch.threadpool.ThreadPoolStats;
import org.elasticsearch.transport.TransportStats;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;

/**
* Node statistics (dynamic, changes depending on when created).
*/
public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
public class NodeStats extends BaseNodeResponse implements ChunkedToXContent {

private final long timestamp;

Expand Down Expand Up @@ -275,72 +278,65 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {

builder.field("name", getNode().getName());
builder.field("transport_address", getNode().getAddress().toString());
builder.field("host", getNode().getHostName());
builder.field("ip", getNode().getAddress());
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {

return Iterators.concat(

Iterators.single((builder, params) -> {
builder.field("name", getNode().getName());
builder.field("transport_address", getNode().getAddress().toString());
builder.field("host", getNode().getHostName());
builder.field("ip", getNode().getAddress());

builder.startArray("roles");
for (DiscoveryNodeRole role : getNode().getRoles()) {
builder.value(role.roleName());
}
builder.endArray();

if (getNode().getAttributes().isEmpty() == false) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : getNode().getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
builder.endObject();
}

return builder;
}),

ifPresent(getIndices()).toXContentChunked(outerParams),

Iterators.single((builder, params) -> {
ifPresent(getOs()).toXContent(builder, params);
ifPresent(getProcess()).toXContent(builder, params);
ifPresent(getJvm()).toXContent(builder, params);
ifPresent(getThreadPool()).toXContent(builder, params);
ifPresent(getFs()).toXContent(builder, params);
return builder;
}),

ifPresent(getTransport()).toXContentChunked(outerParams),
ifPresent(getHttp()).toXContentChunked(outerParams),

Iterators.single((builder, params) -> {
ifPresent(getBreaker()).toXContent(builder, params);
ifPresent(getScriptStats()).toXContent(builder, params);
ifPresent(getDiscoveryStats()).toXContent(builder, params);
ifPresent(getIngestStats()).toXContent(builder, params);
ifPresent(getAdaptiveSelectionStats()).toXContent(builder, params);
ifPresent(getScriptCacheStats()).toXContent(builder, params);
ifPresent(getIndexingPressureStats()).toXContent(builder, params);
return builder;
})
);
}

builder.startArray("roles");
for (DiscoveryNodeRole role : getNode().getRoles()) {
builder.value(role.roleName());
}
builder.endArray();

if (getNode().getAttributes().isEmpty() == false) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : getNode().getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
builder.endObject();
}
private static ChunkedToXContent ifPresent(@Nullable ChunkedToXContent chunkedToXContent) {
return Objects.requireNonNullElse(chunkedToXContent, ChunkedToXContent.EMPTY);
}

if (getIndices() != null) {
getIndices().toXContent(builder, params);
}
if (getOs() != null) {
getOs().toXContent(builder, params);
}
if (getProcess() != null) {
getProcess().toXContent(builder, params);
}
if (getJvm() != null) {
getJvm().toXContent(builder, params);
}
if (getThreadPool() != null) {
getThreadPool().toXContent(builder, params);
}
if (getFs() != null) {
getFs().toXContent(builder, params);
}
if (getTransport() != null) {
getTransport().toXContent(builder, params);
}
if (getHttp() != null) {
getHttp().toXContent(builder, params);
}
if (getBreaker() != null) {
getBreaker().toXContent(builder, params);
}
if (getScriptStats() != null) {
getScriptStats().toXContent(builder, params);
}
if (getDiscoveryStats() != null) {
getDiscoveryStats().toXContent(builder, params);
}
if (getIngestStats() != null) {
getIngestStats().toXContent(builder, params);
}
if (getAdaptiveSelectionStats() != null) {
getAdaptiveSelectionStats().toXContent(builder, params);
}
if (getScriptCacheStats() != null) {
getScriptCacheStats().toXContent(builder, params);
}
if (getIndexingPressureStats() != null) {
getIndexingPressureStats().toXContent(builder, params);
}
return builder;
private static ToXContent ifPresent(@Nullable ToXContent toXContent) {
return Objects.requireNonNullElse(toXContent, ToXContent.EMPTY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
Expand Down Expand Up @@ -42,16 +43,15 @@ protected void writeNodesTo(StreamOutput out, List<NodeStats> nodes) throws IOEx
}

@Override
protected Iterator<? extends ToXContent> xContentChunks() {
protected Iterator<? extends ToXContent> xContentChunks(ToXContent.Params outerParams) {
return Iterators.concat(
Iterators.single((b, p) -> b.startObject("nodes")),
getNodes().stream().map(nodeStats -> (ToXContent) (b, p) -> {
b.startObject(nodeStats.getNode().getId());
b.field("timestamp", nodeStats.getTimestamp());
nodeStats.toXContent(b, p);
return b.endObject();
}).iterator(),
Iterators.single((b, p) -> b.endObject())
ChunkedToXContentHelper.startObject("nodes"),
Iterators.flatMap(getNodes().iterator(), nodeStats -> Iterators.concat(Iterators.single((builder, params) -> {
builder.startObject(nodeStats.getNode().getId());
builder.field("timestamp", nodeStats.getTimestamp());
return builder;
}), nodeStats.toXContentChunked(outerParams), ChunkedToXContentHelper.endObject())),
ChunkedToXContentHelper.endObject()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.xcontent.ToXContent;

Expand All @@ -38,8 +39,8 @@ public final Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params
b.startObject();
RestActions.buildNodesHeader(b, p, this);
return b.field("cluster_name", getClusterName().value());
}), xContentChunks(), Iterators.single((ToXContent) (b, p) -> b.endObject()));
}), xContentChunks(params), ChunkedToXContentHelper.endObject());
}

protected abstract Iterator<? extends ToXContent> xContentChunks();
protected abstract Iterator<? extends ToXContent> xContentChunks(ToXContent.Params outerParams);
}
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ private void finish() {
if (finished.compareAndSet(false, true)) {
final ReplicationResponse.ShardInfo.Failure[] failuresArray;
if (shardReplicaFailures.isEmpty()) {
failuresArray = ReplicationResponse.EMPTY;
failuresArray = ReplicationResponse.NO_FAILURES;
} else {
failuresArray = new ReplicationResponse.ShardInfo.Failure[shardReplicaFailures.size()];
shardReplicaFailures.toArray(failuresArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*/
public class ReplicationResponse extends ActionResponse {

public static final ReplicationResponse.ShardInfo.Failure[] EMPTY = new ReplicationResponse.ShardInfo.Failure[0];
public static final ReplicationResponse.ShardInfo.Failure[] NO_FAILURES = new ReplicationResponse.ShardInfo.Failure[0];

private ShardInfo shardInfo;

Expand Down Expand Up @@ -68,7 +68,7 @@ public static class ShardInfo implements Writeable, ToXContentObject {

private int total;
private int successful;
private Failure[] failures = EMPTY;
private Failure[] failures = ReplicationResponse.NO_FAILURES;

public ShardInfo() {}

Expand Down Expand Up @@ -186,7 +186,7 @@ public static ShardInfo fromXContent(XContentParser parser) throws IOException {
parser.skipChildren(); // skip potential inner arrays for forward compatibility
}
}
Failure[] failures = EMPTY;
Failure[] failures = ReplicationResponse.NO_FAILURES;
if (failuresList != null) {
failures = failuresList.toArray(new Failure[failuresList.size()]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;

/**
Expand Down Expand Up @@ -84,4 +85,9 @@ public boolean isFragment() {
default boolean isFragment() {
return true;
}

/**
* A {@link ChunkedToXContent} that yields no chunks
*/
ChunkedToXContent EMPTY = params -> Collections.emptyIterator();
}
28 changes: 16 additions & 12 deletions server/src/main/java/org/elasticsearch/http/HttpStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@

package org.elasticsearch.http;

import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public class HttpStats implements Writeable, ToXContentFragment {
public class HttpStats implements Writeable, ChunkedToXContent {

private final long serverOpen;
private final long totalOpen;
Expand Down Expand Up @@ -78,17 +82,17 @@ static final class Fields {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.HTTP);
builder.field(Fields.CURRENT_OPEN, serverOpen);
builder.field(Fields.TOTAL_OPENED, totalOpen);
builder.startArray(Fields.CLIENTS);
for (ClientStats clientStats : this.clientStats) {
clientStats.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
return Iterators.<ToXContent>concat(
Iterators.single(
(builder, params) -> builder.startObject(Fields.HTTP)
.field(Fields.CURRENT_OPEN, serverOpen)
.field(Fields.TOTAL_OPENED, totalOpen)
.startArray(Fields.CLIENTS)
),
Iterators.flatMap(clientStats.iterator(), Iterators::<ToXContent>single),
Iterators.single((builder, params) -> builder.endArray().endObject())
);
}

public static class ClientStats implements Writeable, ToXContentFragment {
Expand Down

0 comments on commit c282f50

Please sign in to comment.