Skip to content

Commit

Permalink
[7.x] Add ingest info to Cluster Stats (#48485) (#48661)
Browse files Browse the repository at this point in the history
* Add ingest info to Cluster Stats (#48485)

This commit enhances the ClusterStatsNodes response to include global
processor usage stats on a per-processor basis.

example output:

```
...
    "processor_stats": {
      "gsub": {
        "count": 0,
        "failed": 0
        "current": 0
        "time_in_millis": 0
      },
      "script": {
        "count": 0,
        "failed": 0
        "current": 0,
        "time_in_millis": 0
      }
    }
...
```

The purpose for this enhancement is to make it easier to collect stats on how specific processors are being used across the cluster beyond the current per-node usage statistics that currently exist in node stats.

Closes #46146.

* fix BWC of ingest stats

The introduction of processor types into IngestStats had a bug.
It was set to `null` and set as the key to the map. This would
throw a NPE. This commit resolves this by setting all the processor
types from previous versions that are not serializing it out to
`_NOT_AVAILABLE`.
  • Loading branch information
talevy committed Oct 31, 2019
1 parent d0ead68 commit 4be5440
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 15 deletions.
7 changes: 7 additions & 0 deletions docs/reference/cluster/stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ The API returns the following response:
},
...
],
"ingest": {
"number_of_pipelines" : 1,
"processor_stats": {
...
}
},
"network_types": {
...
},
Expand All @@ -244,6 +250,7 @@ The API returns the following response:
// TESTRESPONSE[s/"plugins": \[[^\]]*\]/"plugins": $body.$_path/]
// TESTRESPONSE[s/"network_types": \{[^\}]*\}/"network_types": $body.$_path/]
// TESTRESPONSE[s/"discovery_types": \{[^\}]*\}/"discovery_types": $body.$_path/]
// TESTRESPONSE[s/"processor_stats": \{[^\}]*\}/"processor_stats": $body.$_path/]
// TESTRESPONSE[s/"count": \{[^\}]*\}/"count": $body.$_path/]
// TESTRESPONSE[s/"packaging_types": \[[^\]]*\]/"packaging_types": $body.$_path/]
// TESTRESPONSE[s/: true|false/: $body.$_path/]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ClusterStatsNodes implements ToXContentFragment {
Expand All @@ -64,6 +66,7 @@ public class ClusterStatsNodes implements ToXContentFragment {
private final NetworkTypes networkTypes;
private final DiscoveryTypes discoveryTypes;
private final PackagingTypes packagingTypes;
private final IngestStats ingestStats;

ClusterStatsNodes(List<ClusterStatsNodeResponse> nodeResponses) {
this.versions = new HashSet<>();
Expand Down Expand Up @@ -97,6 +100,7 @@ public class ClusterStatsNodes implements ToXContentFragment {
this.networkTypes = new NetworkTypes(nodeInfos);
this.discoveryTypes = new DiscoveryTypes(nodeInfos);
this.packagingTypes = new PackagingTypes(nodeInfos);
this.ingestStats = new IngestStats(nodeStats);
}

public Counts getCounts() {
Expand Down Expand Up @@ -178,6 +182,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
discoveryTypes.toXContent(builder, params);

packagingTypes.toXContent(builder, params);

ingestStats.toXContent(builder, params);

return builder;
}

Expand Down Expand Up @@ -690,4 +697,68 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa

}

static class IngestStats implements ToXContentFragment {

final int pipelineCount;
final SortedMap<String, long[]> stats;

IngestStats(final List<NodeStats> nodeStats) {
Set<String> pipelineIds = new HashSet<>();
SortedMap<String, long[]> stats = new TreeMap<>();
for (NodeStats nodeStat : nodeStats) {
if (nodeStat.getIngestStats() != null) {
for (Map.Entry<String,
List<org.elasticsearch.ingest.IngestStats.ProcessorStat>> processorStats : nodeStat.getIngestStats()
.getProcessorStats().entrySet()) {
pipelineIds.add(processorStats.getKey());
for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) {
stats.compute(stat.getType(), (k, v) -> {
org.elasticsearch.ingest.IngestStats.Stats nodeIngestStats = stat.getStats();
if (v == null) {
return new long[] {
nodeIngestStats.getIngestCount(),
nodeIngestStats.getIngestFailedCount(),
nodeIngestStats.getIngestCurrent(),
nodeIngestStats.getIngestTimeInMillis()
};
} else {
v[0] += nodeIngestStats.getIngestCount();
v[1] += nodeIngestStats.getIngestFailedCount();
v[2] += nodeIngestStats.getIngestCurrent();
v[3] += nodeIngestStats.getIngestTimeInMillis();
return v;
}
});
}
}
}
}
this.pipelineCount = pipelineIds.size();
this.stats = Collections.unmodifiableSortedMap(stats);
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject("ingest");
{
builder.field("number_of_pipelines", pipelineCount);
builder.startObject("processor_stats");
for (Map.Entry<String, long[]> stat : stats.entrySet()) {
long[] statValues = stat.getValue();
builder.startObject(stat.getKey());
builder.field("count", statValues[0]);
builder.field("failed", statValues[1]);
builder.field("current", statValues[2]);
builder.humanReadableField("time_in_millis", "time",
new TimeValue(statValues[3], TimeUnit.MILLISECONDS));
builder.endObject();
}
builder.endObject();
}
builder.endObject();
return builder;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE,
true, true, true, false, true, false, false, false, false, false, false, false);
true, true, true, false, true, false, false, false, false, false, true, false);
List<ShardStats> shardsStats = new ArrayList<>();
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ public IngestStats stats() {
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric);
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processor.getType(), processorMetric);
});
});
return statsBuilder.build();
Expand Down
24 changes: 20 additions & 4 deletions server/src/main/java/org/elasticsearch/ingest/IngestStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ public IngestStats(StreamInput in) throws IOException {
List<ProcessorStat> processorStatsPerPipeline = new ArrayList<>(processorsSize);
for (int j = 0; j < processorsSize; j++) {
String processorName = in.readString();
String processorType = "_NOT_AVAILABLE";
if (in.getVersion().onOrAfter(Version.V_7_6_0)) {
processorType = in.readString();
}
Stats processorStat = new Stats(in);
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat));
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat));
}
this.processorStats.put(pipelineId, processorStatsPerPipeline);
}
Expand All @@ -92,6 +96,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(processorStatsForPipeline.size());
for (ProcessorStat processorStat : processorStatsForPipeline) {
out.writeString(processorStat.getName());
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeString(processorStat.getType());
}
processorStat.getStats().writeTo(out);
}
}
Expand All @@ -115,9 +122,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
for (ProcessorStat processorStat : processorStatsForPipeline) {
builder.startObject();
builder.startObject(processorStat.getName());
builder.field("type", processorStat.getType());
builder.startObject("stats");
processorStat.getStats().toXContent(builder, params);
builder.endObject();
builder.endObject();
builder.endObject();
}
}
builder.endArray();
Expand Down Expand Up @@ -229,9 +239,9 @@ Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) {
return this;
}

Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) {
Builder addProcessorMetrics(String pipelineId, String processorName, String processorType, IngestMetric metric) {
this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>())
.add(new ProcessorStat(processorName, metric.createStats()));
.add(new ProcessorStat(processorName, processorType, metric.createStats()));
return this;
}

Expand Down Expand Up @@ -267,17 +277,23 @@ public Stats getStats() {
*/
public static class ProcessorStat {
private final String name;
private final String type;
private final Stats stats;

public ProcessorStat(String name, Stats stats) {
public ProcessorStat(String name, String type, Stats stats) {
this.name = name;
this.type = type;
this.stats = stats;
}

public String getName() {
return name;
}

public String getType() {
return type;
}

public Stats getStats() {
return stats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public void testSerialization() throws IOException {
}
}

private static NodeStats createNodeStats() {
public static NodeStats createNodeStats() {
DiscoveryNode node = new DiscoveryNode("test_node", buildNewFakeTransportAddress(),
emptyMap(), emptySet(), VersionUtils.randomVersion(random()));
OsStats osStats = null;
Expand Down Expand Up @@ -456,7 +456,8 @@ private static NodeStats createNodeStats() {
for (int j =0; j < numProcessors;j++) {
IngestStats.Stats processorStats = new IngestStats.Stats
(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats));
processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10),
randomAlphaOfLengthBetween(3, 10), processorStats));
}
ingestProcessorStats.put(pipelineId,processorPerPipeline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,26 @@
package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStatsTests;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.hamcrest.Matchers.equalTo;

public class ClusterStatsNodesTests extends ESTestCase {

Expand Down Expand Up @@ -59,6 +67,41 @@ public void testNetworkTypesToXContent() throws Exception {
+ "}", toXContent(stats, XContentType.JSON, randomBoolean()).utf8ToString());
}

public void testIngestStats() throws Exception {
NodeStats nodeStats = randomValueOtherThanMany(n -> n.getIngestStats() == null, NodeStatsTests::createNodeStats);

SortedMap<String, long[]> processorStats = new TreeMap<>();
nodeStats.getIngestStats().getProcessorStats().values().forEach(l -> l.forEach(s -> processorStats.put(s.getType(),
new long[] { s.getStats().getIngestCount(), s.getStats().getIngestFailedCount(),
s.getStats().getIngestCurrent(), s.getStats().getIngestTimeInMillis()})));
ClusterStatsNodes.IngestStats stats = new ClusterStatsNodes.IngestStats(Collections.singletonList(nodeStats));
assertThat(stats.pipelineCount, equalTo(nodeStats.getIngestStats().getProcessorStats().size()));
String processorStatsString = "{";
Iterator<Map.Entry<String, long[]>> iter = processorStats.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, long[]> entry = iter.next();
long[] statValues = entry.getValue();
long count = statValues[0];
long failedCount = statValues[1];
long current = statValues[2];
long timeInMillis = statValues[3];
processorStatsString += "\"" + entry.getKey() + "\":{\"count\":" + count
+ ",\"failed\":" + failedCount
+ ",\"current\":" + current
+ ",\"time_in_millis\":" + timeInMillis
+ "}";
if (iter.hasNext()) {
processorStatsString += ",";
}
}
processorStatsString += "}";
assertThat(toXContent(stats, XContentType.JSON, false).utf8ToString(), equalTo(
"{\"ingest\":{"
+ "\"number_of_pipelines\":" + stats.pipelineCount + ","
+ "\"processor_stats\":" + processorStatsString
+ "}}"));
}

private static NodeInfo createNodeInfo(String nodeId, String transportType, String httpType) {
Settings.Builder settings = Settings.builder();
if (transportType != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testSerialization() throws IOException {
Map<String, List<IngestStats.ProcessorStat>> processorStats = createProcessorStats(pipelineStats);
IngestStats ingestStats = new IngestStats(totalStats, pipelineStats, processorStats);
IngestStats serializedStats = serialize(ingestStats);
assertIngestStats(ingestStats, serializedStats, true);
assertIngestStats(ingestStats, serializedStats, true, true);
}

public void testReadLegacyStream() throws IOException {
Expand All @@ -63,7 +63,24 @@ public void testReadLegacyStream() throws IOException {
in.setVersion(VersionUtils.getPreviousVersion(Version.V_6_5_0));
IngestStats serializedStats = new IngestStats(in);
IngestStats expectedStats = new IngestStats(totalStats, pipelineStats, Collections.emptyMap());
assertIngestStats(expectedStats, serializedStats, false);
assertIngestStats(expectedStats, serializedStats, false, true);
}

public void testBWCIngestProcessorTypeStats() throws IOException {
IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300);
List<IngestStats.PipelineStat> pipelineStats = createPipelineStats();
Map<String, List<IngestStats.ProcessorStat>> processorStats = createProcessorStats(pipelineStats);
IngestStats expectedIngestStats = new IngestStats(totalStats, pipelineStats, processorStats);

//legacy output logic
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(VersionUtils.getPreviousVersion(Version.V_7_6_0));
expectedIngestStats.writeTo(out);

StreamInput in = out.bytes().streamInput();
in.setVersion(VersionUtils.getPreviousVersion(Version.V_7_6_0));
IngestStats serializedStats = new IngestStats(in);
assertIngestStats(expectedIngestStats, serializedStats, true, false);
}

private List<IngestStats.PipelineStat> createPipelineStats() {
Expand All @@ -75,9 +92,10 @@ private List<IngestStats.PipelineStat> createPipelineStats() {

private Map<String, List<IngestStats.ProcessorStat>> createProcessorStats(List<IngestStats.PipelineStat> pipelineStats){
assert(pipelineStats.size() >= 2);
IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1));
IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2));
IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297));
IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", "type", new IngestStats.Stats(1, 1, 1, 1));
IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", "type", new IngestStats.Stats(2, 2, 2, 2));
IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", "type",
new IngestStats.Stats(47, 97, 197, 297));
//pipeline1 -> processor1,processor2; pipeline2 -> processor3
return MapBuilder.<String, List<IngestStats.ProcessorStat>>newMapBuilder()
.put(pipelineStats.get(0).getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList()))
Expand All @@ -92,7 +110,8 @@ private IngestStats serialize(IngestStats stats) throws IOException {
return new IngestStats(in);
}

private void assertIngestStats(IngestStats ingestStats, IngestStats serializedStats, boolean expectProcessors){
private void assertIngestStats(IngestStats ingestStats, IngestStats serializedStats, boolean expectProcessors,
boolean expectProcessorTypes){
assertNotSame(ingestStats, serializedStats);
assertNotSame(ingestStats.getTotalStats(), serializedStats.getTotalStats());
assertNotSame(ingestStats.getPipelineStats(), serializedStats.getPipelineStats());
Expand All @@ -114,6 +133,11 @@ private void assertIngestStats(IngestStats ingestStats, IngestStats serializedSt
for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) {
IngestStats.ProcessorStat ps = it.next();
assertEquals(ps.getName(), serializedProcessorStat.getName());
if (expectProcessorTypes) {
assertEquals(ps.getType(), serializedProcessorStat.getType());
} else {
assertEquals("_NOT_AVAILABLE", serializedProcessorStat.getType());
}
assertStats(ps.getStats(), serializedProcessorStat.getStats());
}
assertFalse(it.hasNext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,11 @@ public void testToXContent() throws IOException {
+ "\"type\":\"docker\","
+ "\"count\":1"
+ "}"
+ "]"
+ "],"
+ "\"ingest\":{"
+ "\"number_of_pipelines\":0,"
+ "\"processor_stats\":{}"
+ "}"
+ "}"
+ "},"
+ "\"cluster_state\":{"
Expand Down

0 comments on commit 4be5440

Please sign in to comment.