From 4be54402de5d0d272966b921ef51fcbbcd968489 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Thu, 31 Oct 2019 14:36:54 -0700 Subject: [PATCH] [7.x] Add ingest info to Cluster Stats (#48485) (#48661) * Add ingest info to Cluster Stats (#48485) This commit enhances the ClusterStatsNodes response to include global processor usage stats on a per-processor basis. example output: ``` ... "processor_stats": { "gsub": { "count": 0, "failed": 0 "current": 0 "time_in_millis": 0 }, "script": { "count": 0, "failed": 0 "current": 0, "time_in_millis": 0 } } ... ``` The purpose for this enhancement is to make it easier to collect stats on how specific processors are being used across the cluster beyond the current per-node usage statistics that currently exist in node stats. Closes #46146. * fix BWC of ingest stats The introduction of processor types into IngestStats had a bug. It was set to `null` and set as the key to the map. This would throw a NPE. This commit resolves this by setting all the processor types from previous versions that are not serializing it out to `_NOT_AVAILABLE`. --- docs/reference/cluster/stats.asciidoc | 7 ++ .../cluster/stats/ClusterStatsNodes.java | 71 +++++++++++++++++++ .../stats/TransportClusterStatsAction.java | 2 +- .../elasticsearch/ingest/IngestService.java | 2 +- .../org/elasticsearch/ingest/IngestStats.java | 24 +++++-- .../cluster/node/stats/NodeStatsTests.java | 5 +- .../cluster/stats/ClusterStatsNodesTests.java | 43 +++++++++++ .../ingest/IngestStatsTests.java | 36 ++++++++-- .../ClusterStatsMonitoringDocTests.java | 6 +- 9 files changed, 181 insertions(+), 15 deletions(-) diff --git a/docs/reference/cluster/stats.asciidoc b/docs/reference/cluster/stats.asciidoc index dbb4ea25d365f..60cee385eabbc 100644 --- a/docs/reference/cluster/stats.asciidoc +++ b/docs/reference/cluster/stats.asciidoc @@ -227,6 +227,12 @@ The API returns the following response: }, ... ], + "ingest": { + "number_of_pipelines" : 1, + "processor_stats": { + ... + } + }, "network_types": { ... }, @@ -244,6 +250,7 @@ The API returns the following response: // TESTRESPONSE[s/"plugins": \[[^\]]*\]/"plugins": $body.$_path/] // TESTRESPONSE[s/"network_types": \{[^\}]*\}/"network_types": $body.$_path/] // TESTRESPONSE[s/"discovery_types": \{[^\}]*\}/"discovery_types": $body.$_path/] +// TESTRESPONSE[s/"processor_stats": \{[^\}]*\}/"processor_stats": $body.$_path/] // TESTRESPONSE[s/"count": \{[^\}]*\}/"count": $body.$_path/] // TESTRESPONSE[s/"packaging_types": \[[^\]]*\]/"packaging_types": $body.$_path/] // TESTRESPONSE[s/: true|false/: $body.$_path/] diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java index 16614a964f9d7..62dbacd9397e8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java @@ -49,7 +49,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ClusterStatsNodes implements ToXContentFragment { @@ -64,6 +66,7 @@ public class ClusterStatsNodes implements ToXContentFragment { private final NetworkTypes networkTypes; private final DiscoveryTypes discoveryTypes; private final PackagingTypes packagingTypes; + private final IngestStats ingestStats; ClusterStatsNodes(List nodeResponses) { this.versions = new HashSet<>(); @@ -97,6 +100,7 @@ public class ClusterStatsNodes implements ToXContentFragment { this.networkTypes = new NetworkTypes(nodeInfos); this.discoveryTypes = new DiscoveryTypes(nodeInfos); this.packagingTypes = new PackagingTypes(nodeInfos); + this.ingestStats = new IngestStats(nodeStats); } public Counts getCounts() { @@ -178,6 +182,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws discoveryTypes.toXContent(builder, params); packagingTypes.toXContent(builder, params); + + ingestStats.toXContent(builder, params); + return builder; } @@ -690,4 +697,68 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa } + static class IngestStats implements ToXContentFragment { + + final int pipelineCount; + final SortedMap stats; + + IngestStats(final List nodeStats) { + Set pipelineIds = new HashSet<>(); + SortedMap stats = new TreeMap<>(); + for (NodeStats nodeStat : nodeStats) { + if (nodeStat.getIngestStats() != null) { + for (Map.Entry> processorStats : nodeStat.getIngestStats() + .getProcessorStats().entrySet()) { + pipelineIds.add(processorStats.getKey()); + for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) { + stats.compute(stat.getType(), (k, v) -> { + org.elasticsearch.ingest.IngestStats.Stats nodeIngestStats = stat.getStats(); + if (v == null) { + return new long[] { + nodeIngestStats.getIngestCount(), + nodeIngestStats.getIngestFailedCount(), + nodeIngestStats.getIngestCurrent(), + nodeIngestStats.getIngestTimeInMillis() + }; + } else { + v[0] += nodeIngestStats.getIngestCount(); + v[1] += nodeIngestStats.getIngestFailedCount(); + v[2] += nodeIngestStats.getIngestCurrent(); + v[3] += nodeIngestStats.getIngestTimeInMillis(); + return v; + } + }); + } + } + } + } + this.pipelineCount = pipelineIds.size(); + this.stats = Collections.unmodifiableSortedMap(stats); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject("ingest"); + { + builder.field("number_of_pipelines", pipelineCount); + builder.startObject("processor_stats"); + for (Map.Entry stat : stats.entrySet()) { + long[] statValues = stat.getValue(); + builder.startObject(stat.getKey()); + builder.field("count", statValues[0]); + builder.field("failed", statValues[1]); + builder.field("current", statValues[2]); + builder.humanReadableField("time_in_millis", "time", + new TimeValue(statValues[3], TimeUnit.MILLISECONDS)); + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + return builder; + } + + } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 8aa4da2504e37..1fd399f1d5b39 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -94,7 +94,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) { NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false); NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, - true, true, true, false, true, false, false, false, false, false, false, false); + true, true, true, false, true, false, false, false, false, false, true, false); List shardsStats = new ArrayList<>(); for (IndexService indexService : indicesService) { for (IndexShard indexShard : indexService) { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 2ba5214f80f62..e3efd3f7d3671 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -413,7 +413,7 @@ public IngestStats stats() { processorMetrics.forEach(t -> { Processor processor = t.v1(); IngestMetric processorMetric = t.v2(); - statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processorMetric); + statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processor.getType(), processorMetric); }); }); return statsBuilder.build(); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java index 7a24cf3ee895e..5471f60d8bc3a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java @@ -69,8 +69,12 @@ public IngestStats(StreamInput in) throws IOException { List processorStatsPerPipeline = new ArrayList<>(processorsSize); for (int j = 0; j < processorsSize; j++) { String processorName = in.readString(); + String processorType = "_NOT_AVAILABLE"; + if (in.getVersion().onOrAfter(Version.V_7_6_0)) { + processorType = in.readString(); + } Stats processorStat = new Stats(in); - processorStatsPerPipeline.add(new ProcessorStat(processorName, processorStat)); + processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat)); } this.processorStats.put(pipelineId, processorStatsPerPipeline); } @@ -92,6 +96,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(processorStatsForPipeline.size()); for (ProcessorStat processorStat : processorStatsForPipeline) { out.writeString(processorStat.getName()); + if (out.getVersion().onOrAfter(Version.V_7_6_0)) { + out.writeString(processorStat.getType()); + } processorStat.getStats().writeTo(out); } } @@ -115,9 +122,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws for (ProcessorStat processorStat : processorStatsForPipeline) { builder.startObject(); builder.startObject(processorStat.getName()); + builder.field("type", processorStat.getType()); + builder.startObject("stats"); processorStat.getStats().toXContent(builder, params); builder.endObject(); builder.endObject(); + builder.endObject(); } } builder.endArray(); @@ -229,9 +239,9 @@ Builder addPipelineMetrics(String pipelineId, IngestMetric pipelineMetric) { return this; } - Builder addProcessorMetrics(String pipelineId, String processorName, IngestMetric metric) { + Builder addProcessorMetrics(String pipelineId, String processorName, String processorType, IngestMetric metric) { this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>()) - .add(new ProcessorStat(processorName, metric.createStats())); + .add(new ProcessorStat(processorName, processorType, metric.createStats())); return this; } @@ -267,10 +277,12 @@ public Stats getStats() { */ public static class ProcessorStat { private final String name; + private final String type; private final Stats stats; - public ProcessorStat(String name, Stats stats) { + public ProcessorStat(String name, String type, Stats stats) { this.name = name; + this.type = type; this.stats = stats; } @@ -278,6 +290,10 @@ public String getName() { return name; } + public String getType() { + return type; + } + public Stats getStats() { return stats; } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 987bff812c45a..f1734d3228c18 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -315,7 +315,7 @@ public void testSerialization() throws IOException { } } - private static NodeStats createNodeStats() { + public static NodeStats createNodeStats() { DiscoveryNode node = new DiscoveryNode("test_node", buildNewFakeTransportAddress(), emptyMap(), emptySet(), VersionUtils.randomVersion(random())); OsStats osStats = null; @@ -456,7 +456,8 @@ private static NodeStats createNodeStats() { for (int j =0; j < numProcessors;j++) { IngestStats.Stats processorStats = new IngestStats.Stats (randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()); - processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), processorStats)); + processorPerPipeline.add(new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), + randomAlphaOfLengthBetween(3, 10), processorStats)); } ingestProcessorStats.put(pipelineId,processorPerPipeline); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index 04edc775a2d53..1f5ebb72bca82 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -20,6 +20,8 @@ package org.elasticsearch.action.admin.cluster.stats; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStatsTests; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; @@ -27,11 +29,17 @@ import org.elasticsearch.test.ESTestCase; import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.elasticsearch.common.xcontent.XContentHelper.toXContent; +import static org.hamcrest.Matchers.equalTo; public class ClusterStatsNodesTests extends ESTestCase { @@ -59,6 +67,41 @@ public void testNetworkTypesToXContent() throws Exception { + "}", toXContent(stats, XContentType.JSON, randomBoolean()).utf8ToString()); } + public void testIngestStats() throws Exception { + NodeStats nodeStats = randomValueOtherThanMany(n -> n.getIngestStats() == null, NodeStatsTests::createNodeStats); + + SortedMap processorStats = new TreeMap<>(); + nodeStats.getIngestStats().getProcessorStats().values().forEach(l -> l.forEach(s -> processorStats.put(s.getType(), + new long[] { s.getStats().getIngestCount(), s.getStats().getIngestFailedCount(), + s.getStats().getIngestCurrent(), s.getStats().getIngestTimeInMillis()}))); + ClusterStatsNodes.IngestStats stats = new ClusterStatsNodes.IngestStats(Collections.singletonList(nodeStats)); + assertThat(stats.pipelineCount, equalTo(nodeStats.getIngestStats().getProcessorStats().size())); + String processorStatsString = "{"; + Iterator> iter = processorStats.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + long[] statValues = entry.getValue(); + long count = statValues[0]; + long failedCount = statValues[1]; + long current = statValues[2]; + long timeInMillis = statValues[3]; + processorStatsString += "\"" + entry.getKey() + "\":{\"count\":" + count + + ",\"failed\":" + failedCount + + ",\"current\":" + current + + ",\"time_in_millis\":" + timeInMillis + + "}"; + if (iter.hasNext()) { + processorStatsString += ","; + } + } + processorStatsString += "}"; + assertThat(toXContent(stats, XContentType.JSON, false).utf8ToString(), equalTo( + "{\"ingest\":{" + + "\"number_of_pipelines\":" + stats.pipelineCount + "," + + "\"processor_stats\":" + processorStatsString + + "}}")); + } + private static NodeInfo createNodeInfo(String nodeId, String transportType, String httpType) { Settings.Builder settings = Settings.builder(); if (transportType != null) { diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java index 04bfcbb92b8e9..2051120ec55e2 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java @@ -42,7 +42,7 @@ public void testSerialization() throws IOException { Map> processorStats = createProcessorStats(pipelineStats); IngestStats ingestStats = new IngestStats(totalStats, pipelineStats, processorStats); IngestStats serializedStats = serialize(ingestStats); - assertIngestStats(ingestStats, serializedStats, true); + assertIngestStats(ingestStats, serializedStats, true, true); } public void testReadLegacyStream() throws IOException { @@ -63,7 +63,24 @@ public void testReadLegacyStream() throws IOException { in.setVersion(VersionUtils.getPreviousVersion(Version.V_6_5_0)); IngestStats serializedStats = new IngestStats(in); IngestStats expectedStats = new IngestStats(totalStats, pipelineStats, Collections.emptyMap()); - assertIngestStats(expectedStats, serializedStats, false); + assertIngestStats(expectedStats, serializedStats, false, true); + } + + public void testBWCIngestProcessorTypeStats() throws IOException { + IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300); + List pipelineStats = createPipelineStats(); + Map> processorStats = createProcessorStats(pipelineStats); + IngestStats expectedIngestStats = new IngestStats(totalStats, pipelineStats, processorStats); + + //legacy output logic + BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(VersionUtils.getPreviousVersion(Version.V_7_6_0)); + expectedIngestStats.writeTo(out); + + StreamInput in = out.bytes().streamInput(); + in.setVersion(VersionUtils.getPreviousVersion(Version.V_7_6_0)); + IngestStats serializedStats = new IngestStats(in); + assertIngestStats(expectedIngestStats, serializedStats, true, false); } private List createPipelineStats() { @@ -75,9 +92,10 @@ private List createPipelineStats() { private Map> createProcessorStats(List pipelineStats){ assert(pipelineStats.size() >= 2); - IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", new IngestStats.Stats(1, 1, 1, 1)); - IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", new IngestStats.Stats(2, 2, 2, 2)); - IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", new IngestStats.Stats(47, 97, 197, 297)); + IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", "type", new IngestStats.Stats(1, 1, 1, 1)); + IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", "type", new IngestStats.Stats(2, 2, 2, 2)); + IngestStats.ProcessorStat processor3Stat = new IngestStats.ProcessorStat("processor3", "type", + new IngestStats.Stats(47, 97, 197, 297)); //pipeline1 -> processor1,processor2; pipeline2 -> processor3 return MapBuilder.>newMapBuilder() .put(pipelineStats.get(0).getPipelineId(), Stream.of(processor1Stat, processor2Stat).collect(Collectors.toList())) @@ -92,7 +110,8 @@ private IngestStats serialize(IngestStats stats) throws IOException { return new IngestStats(in); } - private void assertIngestStats(IngestStats ingestStats, IngestStats serializedStats, boolean expectProcessors){ + private void assertIngestStats(IngestStats ingestStats, IngestStats serializedStats, boolean expectProcessors, + boolean expectProcessorTypes){ assertNotSame(ingestStats, serializedStats); assertNotSame(ingestStats.getTotalStats(), serializedStats.getTotalStats()); assertNotSame(ingestStats.getPipelineStats(), serializedStats.getPipelineStats()); @@ -114,6 +133,11 @@ private void assertIngestStats(IngestStats ingestStats, IngestStats serializedSt for (IngestStats.ProcessorStat serializedProcessorStat : serializedProcessorStats) { IngestStats.ProcessorStat ps = it.next(); assertEquals(ps.getName(), serializedProcessorStat.getName()); + if (expectProcessorTypes) { + assertEquals(ps.getType(), serializedProcessorStat.getType()); + } else { + assertEquals("_NOT_AVAILABLE", serializedProcessorStat.getType()); + } assertStats(ps.getStats(), serializedProcessorStat.getStats()); } assertFalse(it.hasNext()); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java index e57d17b324f15..1c0e2b2cb0e39 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java @@ -535,7 +535,11 @@ public void testToXContent() throws IOException { + "\"type\":\"docker\"," + "\"count\":1" + "}" - + "]" + + "]," + + "\"ingest\":{" + + "\"number_of_pipelines\":0," + + "\"processor_stats\":{}" + + "}" + "}" + "}," + "\"cluster_state\":{"