Skip to content

Commit

Permalink
Enhance GET _node/stats/pipelines API for Metricbeat monitoring (#10576)
Browse files Browse the repository at this point in the history
* parent 8c5697c
author Guy Boertje <guy@elastic.co> 1556806171 +0100
committer Mike Place <mike.place@elastic.co> 1557234770 +0200

Bump JrJackson to 0.4.8

Fixes #10748

LIR serializer refactor

Remove commented code

Remove more commented code

Remove license and add encoding

Style change to make code more vertical.

eid and hash

Use pipelines_info to construct the stats

Add tests for new fields

Add queue stats

* bad merge resolution

* bad merge resolution

* Don't merge if nil

* Better merge strategy

* add vertex gate

* Guard against nil

* Use extended queue stats in pipeline report

* Add cluster uuids to Elasticsearch outputters in pipeline output

* move uuid

* remove old uuid lookup

* Only populate cluster_uuids when present

* remove print

* cluster_uuids -> cluster_uuid

* Update logstash-core/lib/logstash/api/commands/stats.rb

Co-Authored-By: Ry Biesemeyer <yaauie@users.noreply.github.com>

* Update logstash-core/lib/logstash/api/commands/stats.rb

Co-Authored-By: Ry Biesemeyer <yaauie@users.noreply.github.com>

* Update logstash-core/lib/logstash/api/commands/stats.rb

Co-Authored-By: Ry Biesemeyer <yaauie@users.noreply.github.com>

* Make var singular

* Match singular var name

* Remove unnecessary nil check

* Pass in the matching pipeline for the report

* Remove old way of inserting cluster_uuids

* Update logstash-core/lib/logstash/api/commands/stats.rb

I like this much better and in testing it seems to work correctly.

Co-Authored-By: Ry Biesemeyer <yaauie@users.noreply.github.com>

* Remove unreferenced code that was part of debugging

* Remove events var which was unused

* Don't try to remove before insert

* Update logstash-core/lib/logstash/api/commands/stats.rb

Co-Authored-By: Ry Biesemeyer <yaauie@users.noreply.github.com>

* Make pipeline extended stats generation more efficient

* Implement suggestion to improve readability

* Cleaner merging per review recommendation

* Only generate extended_stats once

* remove unneeded comments

* Add cluster_uuid to node vertex

* remove top-level cluster_uuids

* Update logstash-core/lib/logstash/api/commands/stats.rb

Co-Authored-By: Ry Biesemeyer <yaauie@users.noreply.github.com>

* Implement change to make logic more simple suggested in review

* Rely on options gate to insert graph

Resolves concern here:
#10576 (comment)

* Update logstash-core/lib/logstash/api/commands/stats.rb

Co-Authored-By: Ry Biesemeyer <yaauie@users.noreply.github.com>

* Move UUID lookup to API layer

* Move private method to bottom per review recommandation
  • Loading branch information
cachedout committed Jun 22, 2019
1 parent 32d8542 commit 252d5e7
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 22 deletions.
1 change: 0 additions & 1 deletion logstash-core/lib/logstash/api/commands/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def pipeline(pipeline_id, options={})
:config_reload_interval,
:dead_letter_queue_enabled,
:dead_letter_queue_path,
:cluster_uuids
).reject{|_, v|v.nil?}
if options.fetch(:graph, false)
metrics.merge!(extract_metrics([:stats, :pipelines, pipeline_id.to_sym, :config], :graph))
Expand Down
69 changes: 61 additions & 8 deletions logstash-core/lib/logstash/api/commands/stats.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# encoding: utf-8
require "logstash/api/commands/base"
require 'logstash/util/thread_dump'
require 'logstash/config/pipelines_info'
require_relative "hot_threads_reporter"

java_import java.nio.file.Files
Expand All @@ -10,6 +11,20 @@ module LogStash
module Api
module Commands
class Stats < Commands::Base
def queue
pipeline_ids = service.get_shallow(:stats, :pipelines).keys
total_queued_events = 0
pipeline_ids.each do |pipeline_id|
p_stats = service.get_shallow(:stats, :pipelines, pipeline_id.to_sym)
type = p_stats[:queue] && p_stats[:queue][:type].value
pipeline = service.agent.get_pipeline(pipeline_id)
next if pipeline.nil? || pipeline.system? || type != 'persisted'
total_queued_events += p_stats[:queue][:events].value
end

{:events_count => total_queued_events}
end

def jvm
{
:threads => extract_metrics(
Expand Down Expand Up @@ -45,14 +60,24 @@ def events
)
end

def pipeline(pipeline_id = nil)
def pipeline(pipeline_id = nil, opts={})
extended_stats = LogStash::Config::PipelinesInfo.format_pipelines_info(
service.agent,
service.snapshot.metric_store,
true).each_with_object({}) do |pipeline_stats, memo|
pipeline_id = pipeline_stats["id"].to_s
memo[pipeline_id] = pipeline_stats
end

if pipeline_id.nil?
pipeline_ids = service.get_shallow(:stats, :pipelines).keys
pipeline_ids.each_with_object({}) do |pipeline_id, result|
result[pipeline_id] = plugins_stats_report(pipeline_id)
extended_pipeline = extended_stats[pipeline_id.to_s]
result[pipeline_id] = plugins_stats_report(pipeline_id, extended_pipeline, opts)
end
else
{ pipeline_id => plugins_stats_report(pipeline_id) }
extended_pipeline = extended_stats[pipeline_id.to_s]
{ pipeline_id => plugins_stats_report(pipeline_id, extended_pipeline, opts) }
end
rescue # failed to find pipeline
{}
Expand Down Expand Up @@ -92,9 +117,9 @@ def hot_threads(options={})
end

private
def plugins_stats_report(pipeline_id)
def plugins_stats_report(pipeline_id, extended_pipeline, opts={})
stats = service.get_shallow(:stats, :pipelines, pipeline_id.to_sym)
PluginsStats.report(stats)
PluginsStats.report(stats, extended_pipeline, opts)
end

module PluginsStats
Expand All @@ -110,8 +135,8 @@ def plugin_stats(stats, plugin_type)
end
end

def report(stats)
{
def report(stats, extended_stats=nil, opts={})
ret = {
:events => stats[:events],
:plugins => {
:inputs => plugin_stats(stats, :inputs),
Expand All @@ -121,8 +146,36 @@ def report(stats)
},
:reloads => stats[:reloads],
:queue => stats[:queue]
}.merge(stats[:dlq] ? {:dead_letter_queue => stats[:dlq]} : {})
}
ret[:dead_letter_queue] = stats[:dlq] if stats.include?(:dlq)

# if extended_stats were provided, enrich the return value
if extended_stats
ret[:queue] = extended_stats["queue"] if extended_stats.include?("queue")
if opts[:vertices] && extended_stats.include?("vertices")
ret[:vertices] = extended_stats["vertices"].map { |vertex| decorate_vertex(vertex) }
end
end

ret
end

##
# Returns a vertex, decorated with additional metadata if available.
# Does not mutate the passed `vertex` object.
# @api private
# @param vertex [Hash{String=>Object}]
# @return [Hash{String=>Object}]
def decorate_vertex(vertex)
plugin_id = vertex["id"]&.to_s
return vertex unless plugin_id && LogStash::PluginMetadata.exists?(plugin_id)

plugin_metadata = LogStash::PluginMetadata.for_plugin(plugin_id)
cluster_uuid = plugin_metadata&.get(:cluster_uuid)
vertex = vertex.merge("cluster_uuid" => cluster_uuid) unless cluster_uuid.nil?

vertex
end
end # module PluginsStats
end
end
Expand Down
6 changes: 4 additions & 2 deletions logstash-core/lib/logstash/api/modules/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ def node

get "/pipelines/:id" do
pipeline_id = params["id"]
opts = {:graph => as_boolean(params.fetch("graph", false))}
opts = {:graph => as_boolean(params.fetch("graph", false)),
:vertices => as_boolean(params.fetch("vertices", false))}
payload = node.pipeline(pipeline_id, opts)
halt(404) if payload.empty?
respond_with(:pipelines => { pipeline_id => payload } )
end

get "/pipelines" do
opts = {:graph => as_boolean(params.fetch("graph", false))}
opts = {:graph => as_boolean(params.fetch("graph", false)),
:vertices => as_boolean(params.fetch("vertices", false))}
payload = node.pipelines(opts)
halt(404) if payload.empty?
respond_with(:pipelines => payload )
Expand Down
11 changes: 9 additions & 2 deletions logstash-core/lib/logstash/api/modules/node_stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@ class NodeStats < ::LogStash::Api::Modules::Base
:events => events_payload,
:pipelines => pipeline_payload,
:reloads => reloads_payload,
:os => os_payload
:os => os_payload,
:queue => queue
}
respond_with(payload, {:filter => params["filter"]})
end

private
def queue
@stats.queue
end

private
def os_payload
@stats.os
Expand All @@ -52,7 +58,8 @@ def mem_payload
end

def pipeline_payload(val = nil)
@stats.pipeline(val)
opts = {:vertices => as_boolean(params.fetch("vertices", false))}
@stats.pipeline(val, opts)
end
end
end
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/config/lir_serializer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,4 @@ def format_swm(source_with_metadata)
end
end
end

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
module LogStash; module Inputs; class Metrics; module StatsEvent;
module LogStash; module Config;
class PipelinesInfo
def self.format_pipelines_info(agent, metric_store, extended_performance_collection)
# It is important that we iterate via the agent's pipelines vs. the
Expand All @@ -24,11 +24,9 @@ def self.format_pipelines_info(agent, metric_store, extended_performance_collect
"failures" => p_stats[:reloads][:failures].value
}
}

if extended_performance_collection
res["vertices"] = format_pipeline_vertex_stats(p_stats[:plugins], pipeline)
end

res
end.compact
end
Expand Down Expand Up @@ -93,11 +91,18 @@ def self.format_pipeline_vertex_section_stats(stats, pipeline)

acc
end

acc << {
segment = {
:id => plugin_id,
:pipeline_ephemeral_id => pipeline.ephemeral_id
}.merge(segmented)
}

if LogStash::PluginMetadata.exists?(plugin_id.to_s)
plugin_metadata = LogStash::PluginMetadata.for_plugin(plugin_id.to_s)
cluster_uuid = plugin_metadata&.get(:cluster_uuid)
segment[:cluster_uuid] = cluster_uuid unless cluster_uuid.nil?
end

acc << segment.merge(segmented)
acc
end
end
Expand Down Expand Up @@ -144,4 +149,4 @@ def self.format_queue_stats(pipeline_id, metric_store)
}
end
end
end; end; end; end
end; end;
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ def make
end
end
end; end; end

4 changes: 2 additions & 2 deletions x-pack/lib/monitoring/inputs/metrics/stats_event_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
module LogStash; module Inputs; class Metrics;
class StatsEventFactory
include ::LogStash::Util::Loggable
require 'monitoring/inputs/metrics/stats_event/pipelines_info'
require 'logstash/config/pipelines_info'

def initialize(global_stats, snapshot)
@global_stats = global_stats
Expand All @@ -19,7 +19,7 @@ def make(agent, extended_performance_collection=true)
"logstash" => fetch_node_stats(agent, @metric_store),
"events" => format_global_event_count(@metric_store),
"process" => format_process_stats(@metric_store),
"pipelines" => StatsEvent::PipelinesInfo.format_pipelines_info(agent, @metric_store, extended_performance_collection),
"pipelines" => LogStash::Config::PipelinesInfo.format_pipelines_info(agent, @metric_store, extended_performance_collection),
"reloads" => format_reloads(@metric_store),
"jvm" => format_jvm_stats(@metric_store),
"os" => format_os_stats(@metric_store),
Expand Down

0 comments on commit 252d5e7

Please sign in to comment.