Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingest info to Cluster Stats #48485

Merged
merged 8 commits into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/reference/cluster/stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ The API returns the following response:
},
...
],
"ingest": {
"number_of_pipelines" : 1,
"processor_stats": {
...
}
},
"network_types": {
...
},
Expand All @@ -244,6 +250,7 @@ The API returns the following response:
// TESTRESPONSE[s/"plugins": \[[^\]]*\]/"plugins": $body.$_path/]
// TESTRESPONSE[s/"network_types": \{[^\}]*\}/"network_types": $body.$_path/]
// TESTRESPONSE[s/"discovery_types": \{[^\}]*\}/"discovery_types": $body.$_path/]
// TESTRESPONSE[s/"processor_stats": \{[^\}]*\}/"processor_stats": $body.$_path/]
// TESTRESPONSE[s/"count": \{[^\}]*\}/"count": $body.$_path/]
// TESTRESPONSE[s/"packaging_types": \[[^\]]*\]/"packaging_types": $body.$_path/]
// TESTRESPONSE[s/: true|false/: $body.$_path/]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -64,6 +65,7 @@ public class ClusterStatsNodes implements ToXContentFragment {
private final NetworkTypes networkTypes;
private final DiscoveryTypes discoveryTypes;
private final PackagingTypes packagingTypes;
private IngestStats ingestStats;
talevy marked this conversation as resolved.
Show resolved Hide resolved

ClusterStatsNodes(List<ClusterStatsNodeResponse> nodeResponses) {
this.versions = new HashSet<>();
Expand Down Expand Up @@ -97,6 +99,7 @@ public class ClusterStatsNodes implements ToXContentFragment {
this.networkTypes = new NetworkTypes(nodeInfos);
this.discoveryTypes = new DiscoveryTypes(nodeInfos);
this.packagingTypes = new PackagingTypes(nodeInfos);
this.ingestStats = new IngestStats(nodeStats);
}

public Counts getCounts() {
Expand Down Expand Up @@ -178,6 +181,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
discoveryTypes.toXContent(builder, params);

packagingTypes.toXContent(builder, params);

ingestStats.toXContent(builder, params);

return builder;
}

Expand Down Expand Up @@ -690,4 +696,56 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa

}

static class IngestStats implements ToXContentFragment {

final int pipelineCount;
final Map<String, long[]> stats;

IngestStats(final List<NodeStats> nodeStats) {
Set<String> pipelineIds = new HashSet<>();
SortedMap<String, long[]> stats = new TreeMap<>();
for (NodeStats nodeStat : nodeStats) {
if (nodeStat.getIngestStats() != null) {
for (Map.Entry<String,
List<org.elasticsearch.ingest.IngestStats.ProcessorStat>> processorStats : nodeStat.getIngestStats()
.getProcessorStats().entrySet()) {
pipelineIds.add(processorStats.getKey());
for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) {
stats.compute(stat.getType(), (k, v) -> {
if (v == null) {
talevy marked this conversation as resolved.
Show resolved Hide resolved
return new long[] { stat.getStats().getIngestCount(), stat.getStats().getIngestFailedCount() };
} else {
v[0] += stat.getStats().getIngestCount();
v[1] += stat.getStats().getIngestFailedCount();
return v;
}
});
}
}
}
}
this.pipelineCount = pipelineIds.size();
this.stats = Collections.unmodifiableMap(stats);
talevy marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject("ingest");
{
builder.field("number_of_pipelines", pipelineCount);
builder.startObject("processor_stats");
for (Map.Entry<String, long[]> stat : stats.entrySet()) {
builder.startObject(stat.getKey());
builder.field("count", stat.getValue()[0]);
builder.field("fail_count", stat.getValue()[1]);
builder.endObject();
}
builder.endObject();
}
builder.endObject();
return builder;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest, Task task) {
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE,
true, true, true, false, true, false, false, false, false, false, false, false);
true, true, true, false, true, false, false, false, false, false, true, false);
List<ShardStats> shardsStats = new ArrayList<>();
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ public IngestStats stats() {
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric);
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processor.getType(), processorMetric);
});
});
return statsBuilder.build();
Expand Down
25 changes: 21 additions & 4 deletions server/src/main/java/org/elasticsearch/ingest/IngestStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.ingest;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -67,8 +68,12 @@ public IngestStats(StreamInput in) throws IOException {
List<ProcessorStat> processorStatsPerPipeline = new ArrayList<>(processorsSize);
for (int j = 0; j < processorsSize; j++) {
String processorName = in.readString();
String processorType = null;
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
processorType = in.readString();
}
Stats processorStat = new Stats(in);
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat));
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat));
}
this.processorStats.put(pipelineId, processorStatsPerPipeline);
}
Expand All @@ -88,6 +93,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(processorStatsForPipeline.size());
for (ProcessorStat processorStat : processorStatsForPipeline) {
out.writeString(processorStat.getName());
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeString(processorStat.getType());
}
processorStat.getStats().writeTo(out);
}
}
Expand All @@ -110,9 +118,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
for (ProcessorStat processorStat : processorStatsForPipeline) {
builder.startObject();
builder.startObject(processorStat.getName());
builder.field("type", processorStat.getType());
builder.startObject("stats");
processorStat.getStats().toXContent(builder, params);
builder.endObject();
builder.endObject();
builder.endObject();
}
}
builder.endArray();
Expand Down Expand Up @@ -224,9 +235,9 @@ Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) {
return this;
}

Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) {
Builder addProcessorMetrics(String pipelineId, String processorName, String processorType, IngestMetric metric) {
this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>())
.add(new ProcessorStat(processorName, metric.createStats()));
.add(new ProcessorStat(processorName, processorType, metric.createStats()));
return this;
}

Expand Down Expand Up @@ -262,17 +273,23 @@ public Stats getStats() {
*/
public static class ProcessorStat {
private final String name;
private final String type;
private final Stats stats;

public ProcessorStat(String name, Stats stats) {
public ProcessorStat(String name, String type, Stats stats) {
this.name = name;
this.type = type;
this.stats = stats;
}

public String getName() {
return name;
}

public String getType() {
return type;
}

public Stats getStats() {
return stats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public void testSerialization() throws IOException {
}
}

private static NodeStats createNodeStats() {
public static NodeStats createNodeStats() {
DiscoveryNode node = new DiscoveryNode("test_node", buildNewFakeTransportAddress(),
emptyMap(), emptySet(), VersionUtils.randomVersion(random()));
OsStats osStats = null;
Expand Down Expand Up @@ -456,7 +456,8 @@ private static NodeStats createNodeStats() {
for (int j =0; j < numProcessors;j++) {
IngestStats.Stats processorStats = new IngestStats.Stats
(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats));
processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10),
randomAlphaOfLengthBetween(3, 10), processorStats));
}
ingestProcessorStats.put(pipelineId,processorPerPipeline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,26 @@
package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.elasticsearch.action.admin.cluster.node.stats.NodeStatsTests.createNodeStats;
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
import static org.hamcrest.Matchers.equalTo;

public class ClusterStatsNodesTests extends ESTestCase {

Expand Down Expand Up @@ -59,6 +67,33 @@ public void testNetworkTypesToXContent() throws Exception {
+ "}", toXContent(stats, XContentType.JSON, randomBoolean()).utf8ToString());
}

public void testIngestStats() throws Exception {
NodeStats nodeStats = createNodeStats();

SortedMap<String, long[]> processorStats = new TreeMap<>();
nodeStats.getIngestStats().getProcessorStats().values().forEach(l -> l.forEach(s -> processorStats.put(s.getType(),
new long[] { s.getStats().getIngestCount(), s.getStats().getIngestFailedCount() })));
ClusterStatsNodes.IngestStats stats = new ClusterStatsNodes.IngestStats(Collections.singletonList(nodeStats));
assertThat(stats.pipelineCount, equalTo(nodeStats.getIngestStats().getProcessorStats().size()));
String processorStatsString = "{";
Iterator<Map.Entry<String, long[]>> iter = processorStats.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, long[]> entry = iter.next();
long count = entry.getValue()[0];
long failedCount = entry.getValue()[1];
processorStatsString += "\"" + entry.getKey() + "\":{\"count\":" + count + ",\"fail_count\":" + failedCount + "}";
if (iter.hasNext()) {
processorStatsString += ",";
}
}
processorStatsString += "}";
assertThat(toXContent(stats, XContentType.JSON, randomBoolean()).utf8ToString(), equalTo(
"{\"ingest\":{"
+ "\"number_of_pipelines\":" + stats.pipelineCount + ","
+ "\"processor_stats\":" + processorStatsString
+ "}}"));
}

private static NodeInfo createNodeInfo(String nodeId, String transportType, String httpType) {
Settings.Builder settings = Settings.builder();
if (transportType != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ private List<IngestStats.PipelineStat> createPipelineStats() {

private Map<String, List<IngestStats.ProcessorStat>> createProcessorStats(List<IngestStats.PipelineStat> pipelineStats){
assert(pipelineStats.size() >= 2);
IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1));
IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2));
IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297));
IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", "type", new IngestStats.Stats(1, 1, 1, 1));
IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", "type", new IngestStats.Stats(2, 2, 2, 2));
IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", "type",
new IngestStats.Stats(47, 97, 197, 297));
//pipeline1 -> processor1,processor2; pipeline2 -> processor3
return MapBuilder.<String, List<IngestStats.ProcessorStat>>newMapBuilder()
.put(pipelineStats.get(0).getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,11 @@ public void testToXContent() throws IOException {
+ "\"type\":\"docker\","
+ "\"count\":1"
+ "}"
+ "]"
+ "],"
+ "\"ingest\":{"
+ "\"number_of_pipelines\":0,"
+ "\"processor_stats\":{}"
+ "}"
+ "}"
+ "},"
+ "\"cluster_state\":{"
Expand Down