diff --git a/README.md b/README.md index 69ca202..185a298 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ A monitoring plugin for Icinga (2), Nagios, Shinken, Naemon, etc. to check the L -H HOST Logstash host -p, --hostname PORT Logstash API port + -P, --pipeline PIPELINE Pipeline to monitor, uses all pipelines when not set --file-descriptor-threshold-warn WARN The percentage relative to the process file descriptor limit on which to be a warning result. --file-descriptor-threshold-crit CRIT @@ -24,8 +25,17 @@ A monitoring plugin for Icinga (2), Nagios, Shinken, Naemon, etc. to check the L The percentage of CPU usage on which to be a warning result. --cpu-usage-threshold-crit CRIT The percentage of CPU usage on which to be a critical result. + --temp-filedir NAME Directory to use for the temporary state file. Only used when one of the events-per-minute metrics is used. Defaults to /tmp --inflight-events-warn WARN Threshold for inflight events to be a warning result. Use min:max for a range. --inflight-events-crit CRIT Threshold for inflight events to be a critical result. Use min:max for a range. + --events-in-per-minute-warn WARN + Threshold for the number of ingoing events per minute to be a warning. Use min:max for a range. + --events-in-per-minute-crit CRIT + Threshold for the number of ingoing events per minute to be critical. Use min:max for a range. + --events-out-per-minute-warn WARN + Threshold for the number of outgoing events per minute to be a warning. Use min:max for a range. + --events-out-per-minute-crit CRIT + Threshold for the number of outgoing events per minute to be critical. Use min:max for a range. -h, --help Show this message @@ -41,6 +51,20 @@ or ./check_logstash.rb -H 127.0.0.1 --inflight-events-warn 5 --inflight-events-crit 1:10 +### Checking only a single pipeline + Starting with Logstash 6.0, it is possible to use multiple pipelines. If you just want to monitor a single pipeline (e.g. You want to use independent checks for each pipleine) you can use the -P or the --pipeline parameter: + + ./check_logstash.rb -H [logstashhost] -P [pipeline_name] + +### Checking events in/out per minute + It is also possible to check the events in/out independently. This plugin uses a temporary file for this purpose, which is saved in the /tmp folder. The location can be changed using the --temp-filedir option. Make sure that the check can write to the chosen folder. The file name uses the following pattern: + + check_logstash_#{host}_#{port}_#{pipeline}_events_state.tmp + + If no specific pipeline is selected, "all" is used as the pipeline name. Note that the file is **not** created/read when no events in/out metrics are selected via the command line options. This plugin saves the current events in/out states with a timestamp in this file and on each invocation, the values are read and the current events in/out per minute metrics are calculated. Afterwards, the new state is saved in the file. + + The first invocation of this plugin with events in/out monitoring initiates the temporary file, so the corresponding metrics are only shown on the next invocation. + ## Sample Output ## ### With default values ### @@ -57,6 +81,27 @@ or OK: Heap usage at 16.00% (352959904 out of 2077753344 bytes in use) OK: Open file descriptors at 1.12%. (46 out of 4096 file descriptors are open) +### With events in/out per minute set ### + + OK - Logstash seems to be doing fine. | process.cpu.percent=0%;;;0;100 jvm.mem.heap_used_percent=46%;70;80;0;100 jvm.threads.count=38;;;0; process.open_file_descriptors=128;891225;996075;0;1048576 events_in_per_minute_main=2070;1:;1: events_out_per_minute_main=2069;1:;1: pipelines.main.events.in=236178654c;;;0; pipelines.main.events.out=236178650c;;;0; inflight_events_main=4;; + OK: Events out per minute: main: 2069; + OK: Events in per minute: main: 2070; + OK: CPU usage in percent: 0 + OK: Config reload syntax check: main: OK; + OK: Inflight events: main: 4; + OK: Heap usage at 46.00% (486260792 out of 1038876672 bytes in use) + OK: Open file descriptors at 0.01%. (128 out of 1048576 file descriptors are open) + +## With events in/out per minute set and with two pipelines ### + + CRITICAL - Logstash is unhealthy - CRITICAL: Events in per minute: PipelineOne: 2497; PipelineTwo: 0; | process.cpu.percent=11%;;;0;100 jvm.mem.heap_used_percent=70%;70;80;0;100 jvm.threads.count=592;;;0; process.open_file_descriptors=526;3400;3800;0;4096 events_in_per_minute_PipelineOne=2497;1:;1: events_out_per_minute_PipelineOne=2479;1:;1: pipelines.PipelineOne.events.in=23289504c;;;0; pipelines.PipelineOne.events.out=23289493c;;;0; inflight_events_PipelineOne=11;; events_in_per_minute_PipelineTwo=0;1:;1: events_out_per_minute_PipelineTwo=0;1:;1: pipelines.PipelineTwo.events.in=6606c;;;0; pipelines.PipelineTwo.events.out=6606c;;;0; inflight_events_PipelineTwo=0;; + CRITICAL: Events out per minute: PipelineOne: 2479; PipelineTwo: 0; + CRITICAL: Events in per minute: PipelineOne: 2497; PipelineTwo: 0; + OK: CPU usage in percent: 11 + OK: Config reload syntax check: PipelineOne: OK; PipelineTwo: + OK: Heap usage at 70.00% (736537928 out of 1037959168 bytes in use) + OK: Open file descriptors at 12.84%. (526 out of 4096 file descriptors are open) + ## Finding viable thresholds ## To set your thresholds for inflight events to a sensible value use baselining. Don't set thresholds from the beginning but let Graphite or other graphers create graphs for inflight events. Just add some percent to what Logstash usually processes and set this as threshold. Or use the `generator` plugin to put as many events through your Elastic stack as possible. Use some percent (e.g. 90%) from this maximum as a threshold. Keep in mind that changing your configuration might change the maximum inflight events. @@ -80,10 +125,15 @@ There are some default values defined in the plugin. Some values are merely put ### Optionally checked ### +* `--pipeline` * `--cpu-usage-threshold-warn` * `--cpu-usage-threshold-crit` * `--inflight-events-warn` * `--inflight-events-crit` +* `--events-in-per-minute-warn` +* `--events-in-per-minute-crit` +* `--events-out-per-minute-warn` +* `--events-out-per-minute-crit` ## Building ## diff --git a/contrib/icinga2-commands.conf b/contrib/icinga2-commands.conf index 2d1e2d3..cf8d3d0 100644 --- a/contrib/icinga2-commands.conf +++ b/contrib/icinga2-commands.conf @@ -10,6 +10,10 @@ object CheckCommand "logstash" { value = "$logstash_port$" description = "Port where Logstash is listening for API requests" } + "-P" = { + value = "$logstash_pipeline$" + description = "If set, check the given pipeline only" + } "--file-descriptor-threshold-warn" = { value = "$logstash_filedesc_warn$" description = "Warning threshold of file descriptor usage in percent" @@ -42,6 +46,26 @@ object CheckCommand "logstash" { value = "$logstash_cpu_crit$" description = "Critical threshold for cpu usage in percent" } + "--temp-filedir" = { + value = "$logstash_temp_filedir$" + description = "Directory to use for the temporary state file. Only used when one of the events-per-minute metrics is used. Defaults to /tmp" + } + "--events-in-per-minute-warn" = { + value = "$logstash_events_in_per_minute_warn$" + description = "Threshold for the number of ingoing events per minute to be a warning. Use min:max for a range." + } + "--events-in-per-minute-crit" = { + value = "$logstash_events_in_per_minute_crit$" + description = "Threshold for the number of ingoing events per minute to be critical. Use min:max for a range." + } + "--events-out-per-minute-warn" = { + value = "$logstash_events_out_per_minute_warn$" + description = "Threshold for the number of outgoing events per minute to be a warning. Use min:max for a range." + } + "--events-out-per-minute-crit" = { + value = "$logstash_events_out_per_minute_crit$" + description = "Threshold for the number of outgoing events per minute to be critical. Use min:max for a range." + } } vars.logstash_hostname = "$check_address$" diff --git a/lib/check_logstash.rb b/lib/check_logstash.rb index ad82867..ecdbce4 100755 --- a/lib/check_logstash.rb +++ b/lib/check_logstash.rb @@ -3,14 +3,16 @@ # File : check_logstash # Author : Thomas Widhalm, Netways # E-Mail: thomas.widhalm@netways.de -# Date : 11/04/2019 +# Date : 18/07/2022 # -# Version: 0.7.3-0 +# Version: 0.8.0-0 # # This program is free software; you can redistribute it or modify # it under the terms of the GNU General Public License version 3.0 # # Changelog: +# - 0.8.0 Add option to check incoming/outgoing events per minute +# - 0.7.4 Add option for checking only one pipeline + better handling for threshold ranges # - 0.7.3 fix inflight event calculation # - 0.7.2 fix handling of xpack-monitoring pipeline # - 0.7.1 fix multipipeline checks, improve errorhandling @@ -86,9 +88,11 @@ def run(args) check = CheckLogstash.new parse(check, args) - # fetch the result + # fetch the result and load saved state result = check.fetch - health = check.health(result) + state = check.load_state + health = check.health(result, state) + check.save_events_state(result) # Get the maximum status code (Critical > Warning > OK) code = health.collect(&:to_i).max @@ -107,7 +111,7 @@ def run(args) 'OK - Logstash seems to be doing fine.' end - puts "#{status} | #{check.performance_data(result)}\n" + puts "#{status} | #{check.performance_data(result, state)}\n" puts health.sort_by(&:to_i).reverse.join("\n") code @@ -127,48 +131,32 @@ def parse(check, args) opts.on('-H', '--hostname HOST', 'Logstash host') { |v| check.host = v } opts.on('-p', '--hostname PORT', 'Logstash API port') { |v| check.port = v.to_i } + opts.on('-P', '--pipeline PIPELINE', 'Pipeline to monitor, uses all pipelines when not set') { |v| check.pipeline = v } opts.on('--file-descriptor-threshold-warn WARN', 'The percentage relative to the process file descriptor limit on which to be a warning result.') { |v| check.warning_file_descriptor_percent = v.to_i } opts.on('--file-descriptor-threshold-crit CRIT', 'The percentage relative to the process file descriptor limit on which to be a critical result.') { |v| check.critical_file_descriptor_percent = v.to_i } opts.on('--heap-usage-threshold-warn WARN', 'The percentage relative to the heap size limit on which to be a warning result.') { |v| check.warning_heap_percent = v.to_i } opts.on('--heap-usage-threshold-crit CRIT', 'The percentage relative to the heap size limit on which to be a critical result.') { |v| check.critical_heap_percent = v.to_i } opts.on('--cpu-usage-threshold-warn WARN', 'The percentage of CPU usage on which to be a warning result.') { |v| check.warning_cpu_percent = v.to_i } opts.on('--cpu-usage-threshold-crit CRIT', 'The percentage of CPU usage on which to be a critical result.') { |v| check.critical_cpu_percent = v.to_i } - # the following 2 blocks split : seperated ranges into 2 values. If only one value is given it's used as maximum + opts.on('--temp-filedir NAME', 'Directory to use for the temporary state file. Only used when one of the events-per-minute metrics is used. Defaults to /tmp') { |v| check.temp_file_dir = v } + # the following blocks split : seperated ranges into 2 values. If only one value is given it's used as maximum opts.on('--inflight-events-warn WARN', 'Threshold for inflight events to be a warning result. Use min:max for a range.') do |v| - options_error.call('--inflight-events-warn requires an argument') if v.nil? - - values = v.split(':') - options_error.call("--inflight-events-warn has invalid argument #{v}") if values.count.zero? || values.count > 2 - - begin - if values.count == 1 - check.warning_inflight_events_min = -1 - check.warning_inflight_events_max = values[0].to_i - else - check.warning_inflight_events_min = values[0].to_i - check.warning_inflight_events_max = values[1].to_i - end - rescue ArgumentError => e - options_error.call("--inflight-events-warn has invalid argument. #{e.message}") - end + check.warning_inflight_events_min, check.warning_inflight_events_max = parse_min_max_option('inflight-events-warn', v, options_error) end opts.on('--inflight-events-crit CRIT', 'Threshold for inflight events to be a critical result. Use min:max for a range.') do |v| - options_error.call('--inflight-events-critical requires an argument') if v.nil? - - values = v.split(':') - options_error.call("--inflight-events-critical has invalid argument #{v}") if values.count.zero? || values.count > 2 - - begin - if values.count == 1 - check.critical_inflight_events_min = -1 - check.critical_inflight_events_max = values[0].to_i - else - check.critical_inflight_events_min = values[0].to_i - check.critical_inflight_events_max = values[1].to_i - end - rescue ArgumentError => e - options_error.call("--inflight-events-crit has invalid argument. #{e.message}") - end + check.critical_inflight_events_min, check.critical_inflight_events_max = parse_min_max_option('inflight-events-crit', v, options_error) + end + opts.on('--events-in-per-minute-warn WARN', 'Threshold for the number of ingoing events per minute to be a warning. Use min:max for a range.') do |v| + check.warning_events_in_per_minute_min, check.warning_events_in_per_minute_max = parse_min_max_option('events-in-per-minute-warn', v, options_error) + end + opts.on('--events-in-per-minute-crit CRIT', 'Threshold for the number of ingoing events per minute to be critical. Use min:max for a range.') do |v| + check.critical_events_in_per_minute_min, check.critical_events_in_per_minute_max = parse_min_max_option('events-in-per-minute-crit', v, options_error) + end + opts.on('--events-out-per-minute-warn WARN', 'Threshold for the number of outgoing events per minute to be a warning. Use min:max for a range.') do |v| + check.warning_events_out_per_minute_min, check.warning_events_out_per_minute_max = parse_min_max_option('events-out-per-minute-warn', v, options_error) + end + opts.on('--events-out-per-minute-crit CRIT', 'Threshold for the number of outgoing events per minute to be critical. Use min:max for a range.') do |v| + check.critical_events_out_per_minute_min, check.critical_events_out_per_minute_max = parse_min_max_option('events-out-per-minute-crit', v, options_error) end opts.on_tail('-h', '--help', 'Show this message') do @@ -177,6 +165,31 @@ def parse(check, args) end end.parse(args) end + + def parse_min_max_option(parameter_name, v, options_error) + options_error.call("--#{parameter_name} requires an argument") if v.nil? + + output_min = nil + output_max = nil + + values = v.split(':') + no_max = v[-1] == ':' + options_error.call("--#{parameter_name} has invalid argument #{v}") if v[0] == ':' || values.count.zero? || values.count > 2 + + begin + if values.count == 1 + output_min = values[0].to_i if no_max + output_max = values[0].to_i unless no_max + else + output_min = values[0].to_i + output_max = values[1].to_i + end + rescue ArgumentError => e + options_error.call("--#{parameter_name} has invalid argument. #{e.message}") + end + + [output_min, output_max] + end end # module CLI class Result @@ -184,12 +197,21 @@ class InvalidField < StandardError; end def initialize(data) @data = data + @timestamp = Time.now.to_f end def self.from_hash(data) new(data) end + def get_timestamp + @timestamp + end + + def has_key?(key) + @data.has_key?(key) + end + # Provide dot-notation for querying a given field in a hash def get(field) self.class.get(field, @data) @@ -212,7 +234,12 @@ module Fetcher module_function - def fetch(host, port) + def critical(message) + puts message + exit(3) + end + + def fetch(host, port, pipeline) uri = URI.parse("http://#{host}:#{port}/_node/stats") http = Net::HTTP.new(uri.host, uri.port) request = Net::HTTP::Get.new(uri.request_uri) @@ -221,14 +248,40 @@ def fetch(host, port) critical("Got HTTP response #{response.code}") if response.code != '200' result = begin - JSON.parse(response.body) + data = JSON.parse(response.body) + data['pipelines'].select! {|p| p == pipeline} if pipeline + data rescue => e critical("Failed parsing JSON response. #{e.class.name}") end + critical("Pipeline not found: #{pipeline}") if pipeline && result['pipelines'].empty? Result.from_hash(result) end end + module FileHandler + # handles the data state using a temporary file. + + module_function + + def read(temp_file_dir, host, port, pipeline) + temp_file_name = File.join(temp_file_dir, "check_logstash_#{host}_#{port}_#{pipeline || "all"}_events_state.tmp") + return {} unless File.file?(temp_file_name) + JSON.parse(File.read(temp_file_name)) + rescue Exception => e + puts "Can not load state from temp file, reason: #{e}" + exit(3) + end + + def save(temp_file_dir, host, port, pipeline, data) + temp_file_name = File.join(temp_file_dir, "check_logstash_#{host}_#{port}_#{pipeline || "all"}_events_state.tmp") + File.write(temp_file_name, data.to_json) + rescue Exception => e + puts "Can not save state to temp file, reason: #{e}" + exit(3) + end + end + module PerfData # return perfdata formatted string # use for values taken directly from API @@ -259,14 +312,16 @@ module PerfData_derived module_function - def report(label, value, warning = nil, critical = nil, minimum = nil, maximum = nil) - format('%s=%s;%s;%s;%s;%s', label, value, warning, critical, minimum, maximum) + def report(label, value, warning_min = nil, warning_max = nil, critical_min = nil, critical_max = nil) + format('%s=%s;%s%s%s;%s%s%s', label, value, warning_min, warning_min ? ':' : '', warning_max, critical_min, critical_min ? ':' : '', critical_max) end end - Version = '0.7.2-0' + Version = '0.8.0-0' DEFAULT_PORT = 9600 DEFAULT_HOST = '127.0.0.1' + DEFAULT_PIPELINE = nil + DEFAULT_TEMP_FILEDIR = "/tmp/" DEFAULT_FILE_DESCRIPTOR_WARNING = 85 DEFAULT_FILE_DESCRIPTOR_CRITICAL = 95 @@ -278,8 +333,16 @@ def report(label, value, warning = nil, critical = nil, minimum = nil, maximum = DEFAULT_INFLIGHT_EVENTS_WARNING_MAX = nil DEFAULT_INFLIGHT_EVENTS_CRITICAL_MIN = nil DEFAULT_INFLIGHT_EVENTS_CRITICAL_MAX = nil - - attr_accessor :host, :port + DEFAULT_EVENTS_IN_PER_MINUTE_WARNING_MIN = nil + DEFAULT_EVENTS_IN_PER_MINUTE_WARNING_MAX = nil + DEFAULT_EVENTS_IN_PER_MINUTE_CRITICAL_MIN = nil + DEFAULT_EVENTS_IN_PER_MINUTE_CRITICAL_MAX = nil + DEFAULT_EVENTS_OUT_PER_MINUTE_WARNING_MIN = nil + DEFAULT_EVENTS_OUT_PER_MINUTE_WARNING_MAX = nil + DEFAULT_EVENTS_OUT_PER_MINUTE_CRITICAL_MIN = nil + DEFAULT_EVENTS_OUT_PER_MINUTE_CRITICAL_MAX = nil + + attr_accessor :host, :port, :pipeline, :temp_file_dir attr_accessor :warning_file_descriptor_percent attr_accessor :critical_file_descriptor_percent attr_accessor :warning_heap_percent @@ -290,11 +353,20 @@ def report(label, value, warning = nil, critical = nil, minimum = nil, maximum = attr_accessor :warning_inflight_events_max attr_accessor :critical_inflight_events_min attr_accessor :critical_inflight_events_max + attr_accessor :warning_events_out_per_minute_min + attr_accessor :warning_events_out_per_minute_max + attr_accessor :critical_events_out_per_minute_min + attr_accessor :critical_events_out_per_minute_max + attr_accessor :warning_events_in_per_minute_min + attr_accessor :warning_events_in_per_minute_max + attr_accessor :critical_events_in_per_minute_min + attr_accessor :critical_events_in_per_minute_max def initialize @host = DEFAULT_HOST @port = DEFAULT_PORT + self.pipeline = DEFAULT_PIPELINE self.warning_file_descriptor_percent = DEFAULT_FILE_DESCRIPTOR_WARNING self.critical_file_descriptor_percent = DEFAULT_FILE_DESCRIPTOR_CRITICAL self.warning_heap_percent = DEFAULT_HEAP_WARNING @@ -305,6 +377,25 @@ def initialize self.warning_inflight_events_max = DEFAULT_INFLIGHT_EVENTS_WARNING_MAX self.critical_inflight_events_min = DEFAULT_INFLIGHT_EVENTS_CRITICAL_MIN self.critical_inflight_events_max = DEFAULT_INFLIGHT_EVENTS_CRITICAL_MAX + self.warning_events_in_per_minute_min = DEFAULT_EVENTS_IN_PER_MINUTE_WARNING_MIN + self.warning_events_in_per_minute_max = DEFAULT_EVENTS_IN_PER_MINUTE_WARNING_MAX + self.critical_events_in_per_minute_min = DEFAULT_EVENTS_IN_PER_MINUTE_CRITICAL_MIN + self.critical_events_in_per_minute_max = DEFAULT_EVENTS_IN_PER_MINUTE_CRITICAL_MAX + self.warning_events_out_per_minute_min = DEFAULT_EVENTS_OUT_PER_MINUTE_WARNING_MIN + self.warning_events_out_per_minute_max = DEFAULT_EVENTS_OUT_PER_MINUTE_WARNING_MAX + self.critical_events_out_per_minute_min = DEFAULT_EVENTS_OUT_PER_MINUTE_CRITICAL_MIN + self.critical_events_out_per_minute_max = DEFAULT_EVENTS_OUT_PER_MINUTE_CRITICAL_MAX + self.temp_file_dir = DEFAULT_TEMP_FILEDIR + end + + def checks_events_in_per_minute? + # need the saved state when events in per minute is somehow monitored + warning_events_in_per_minute_min || warning_events_in_per_minute_max || critical_events_in_per_minute_min || critical_events_in_per_minute_max + end + + def checks_events_out_per_minute? + # need the saved state when events out per minute is somehow monitored + warning_events_out_per_minute_min || warning_events_out_per_minute_max || critical_events_out_per_minute_min || critical_events_out_per_minute_max end def warning_file_descriptor_percent=(value) @@ -329,14 +420,29 @@ def critical_heap_percent=(value) def fetch begin - Fetcher.fetch(host, port) + Fetcher.fetch(host, port, pipeline) + rescue SystemExit + exit(3) rescue Exception puts "Can not connect to Logstash" exit(3) end end - def performance_data(result) + def load_state + return nil unless checks_events_in_per_minute? || checks_events_out_per_minute? + + Result.from_hash(FileHandler.read(temp_file_dir, host, port, pipeline)) + end + + def calculate_events_per_minute(state, pipeline, current_events, timestamp, direction) + saved_events = state.get("#{pipeline}.events_#{direction}") + saved_timestamp = state.get("#{pipeline}.timestamp") + events_per_minute = (current_events - saved_events) / (timestamp - saved_timestamp) * 60 + events_per_minute.to_i + end + + def performance_data(result, state) max_file_descriptors = result.get('process.max_file_descriptors') open_file_descriptors = result.get('process.open_file_descriptors') percent_file_descriptors = (open_file_descriptors.to_f / max_file_descriptors) * 100 @@ -360,15 +466,40 @@ def performance_data(result) events_in = result.get('pipelines.' + named_pipeline[0] + '.events.in').to_i events_out = result.get('pipelines.' + named_pipeline[0] + '.events.out').to_i + if checks_events_in_per_minute? && state.has_key?(named_pipeline[0]) + events_per_minute = calculate_events_per_minute(state, named_pipeline[0], events_in, result.get_timestamp, "in") + inflight_arr.push(PerfData_derived.report('events_in_per_minute_' + named_pipeline[0], events_per_minute, warning_events_in_per_minute_min, warning_events_in_per_minute_max, critical_events_in_per_minute_min, critical_events_in_per_minute_max)) + end + + if checks_events_out_per_minute? && state.has_key?(named_pipeline[0]) + events_per_minute = calculate_events_per_minute(state, named_pipeline[0], events_out, result.get_timestamp, "out") + inflight_arr.push(PerfData_derived.report('events_out_per_minute_' + named_pipeline[0], events_per_minute, warning_events_out_per_minute_min, warning_events_out_per_minute_max, critical_events_out_per_minute_min, critical_events_out_per_minute_max)) + end + inflight_events = events_in - events_out + inflight_arr.push(PerfData.report_counter(result, 'pipelines.' + named_pipeline[0] + '.events.in', nil, nil, 0, nil)) inflight_arr.push(PerfData.report_counter(result, 'pipelines.' + named_pipeline[0] + '.events.out', nil, nil, 0, nil)) - inflight_arr.push(PerfData_derived.report('inflight_events_' + named_pipeline[0], inflight_events, warning_inflight_events_max, critical_inflight_events_max, 0, nil)) + inflight_arr.push(PerfData_derived.report('inflight_events_' + named_pipeline[0], inflight_events, warning_inflight_events_min, warning_inflight_events_max, critical_inflight_events_min, critical_inflight_events_max)) end end else - inflight_events = (result.get('pipeline.events.in') - result.get('pipeline.events.out')).to_i + events_in = result.get('pipeline.events.in').to_i + events_out = result.get('pipeline.events.out').to_i + + if checks_events_in_per_minute? && state.has_key?('main') + events_per_minute = calculate_events_per_minute(state, 'main', events_in, result.get_timestamp, "in") + inflight_arr.push(PerfData_derived.report('events_in_per_minute', events_per_minute, warning_events_in_per_minute_min, warning_events_in_per_minute_max, critical_events_in_per_minute_min, critical_events_in_per_minute_max)) + end + + if checks_events_out_per_minute? && state.has_key?('main') + events_per_minute = calculate_events_per_minute(state, 'main', events_out, result.get_timestamp, "out") + inflight_arr.push(PerfData_derived.report('events_out_per_minute', events_per_minute, warning_events_out_per_minute_min, warning_events_out_per_minute_max, critical_events_out_per_minute_min, critical_events_out_per_minute_max)) + end + + inflight_events = events_in - events_out + inflight_arr.push(PerfData.report_counter(result, 'pipeline.events.in', nil, nil, 0, nil)) inflight_arr.push(PerfData.report_counter(result, 'pipeline.events.out', nil, nil, 0, nil)) - inflight_arr.push(PerfData_derived.report('inflight_events', inflight_events, warning_inflight_events_max, critical_inflight_events_max, 0, nil)) + inflight_arr.push(PerfData_derived.report('inflight_events', inflight_events, warning_inflight_events_min, warning_inflight_events_max, critical_inflight_events_min, critical_inflight_events_max)) end perfdata = common + inflight_arr @@ -377,16 +508,52 @@ def performance_data(result) # the reports are defined below, call them here - def health(result) + def health(result, state) [ file_descriptor_health(result), heap_health(result), inflight_events_health(result), config_reload_health(result), - cpu_usage_health(result) + cpu_usage_health(result), + events_per_minute_health(checks_events_in_per_minute?, "in", result, state, + warning_events_in_per_minute_min, + warning_events_in_per_minute_max, + critical_events_in_per_minute_min, + critical_events_in_per_minute_max), + events_per_minute_health(checks_events_out_per_minute?, "out", result, state, + warning_events_out_per_minute_min, + warning_events_out_per_minute_max, + critical_events_out_per_minute_min, + critical_events_out_per_minute_max) ] end + def save_events_state(result) + return unless checks_events_in_per_minute? || checks_events_out_per_minute? + + stats_to_save = {} + + # Since version 6.0.0 it's possible to define multiple pipelines and give them a name. + # This goes over all pipelines and compiles all events into one string. + if Gem::Version.new(result.get('version')) >= Gem::Version.new('6.0.0') + result.get('pipelines').each do |named_pipeline| + name = named_pipeline[0] + next if name == ".monitoring-logstash" + + events_in = result.get("pipelines.#{name}.events.in").to_i + events_out = result.get("pipelines.#{name}.events.out").to_i + stats_to_save[name] = {events_in: events_in, events_out: events_out, timestamp: result.get_timestamp} + end + # For versions older 6.0.0 we use the old method (unchanged) + else + events_in = result.get('pipeline.events.in') + events_out = result.get('pipeline.events.out') + stats_to_save['main'] = {events_in: events_in, events_out: events_out, timestamp: result.get_timestamp} + end + + FileHandler.save(temp_file_dir, host, port, pipeline, stats_to_save) + end + # reports for various performance data including threshold checks FILE_DESCRIPTOR_REPORT = 'Open file descriptors at %.2f%%. (%d out of %d file descriptors are open)' @@ -423,6 +590,70 @@ def heap_health(result) end end + def events_per_minute_health(checks_event, direction, result, state, warn_min, warn_max, crit_min, crit_max) + return unless checks_event + + # Since version 6.0.0 it's possible to define multiple pipelines and give them a name. + # This goes over all pipelines and compiles all events into one string. + if Gem::Version.new(result.get('version')) >= Gem::Version.new('6.0.0') + events_report = "Events #{direction} per minute:" + + warn_counter = 0 + critical_counter = 0 + + result.get('pipelines').each do |named_pipeline| + name = named_pipeline[0] + next if name == ".monitoring-logstash" + next(events_report += " #{named_pipeline[0]}: Initialized;") unless state.has_key?(name) + + events = result.get("pipelines.#{name}.events.#{direction}").to_i + events_per_minute = calculate_events_per_minute(state, name, events, result.get_timestamp, direction) + events_report += " #{name}: #{events_per_minute};" + + if crit_max && crit_max < events_per_minute + critical_counter += 1 + elsif crit_min && crit_min > events_per_minute + critical_counter += 1 + elsif warn_max && warn_max < events_per_minute + warn_counter += 1 + elsif warn_min && warn_min > events_per_minute + warn_counter += 1 + end + end + + # If any of the pipelines is above the configured values we throw the highest common alert. + # E.g. if pipeline1 is OK, but pipeline2 is CRIT the result will be CRIT. + if critical_counter > 0 + Critical.new(events_report) + elsif warn_counter > 0 + Warning.new(events_report) + else + OK.new(events_report) + end + # For versions older 6.0.0 we use the old method (unchanged) + else + return OK.new("Events #{direction} per minute: Initialized") unless state.has_key?('main') + + # check if inflight events are outside of threshold + # find a way to reuse the already computed inflight events + events = result.get("pipeline.events.#{direction}").to_i + events_per_minute = calculate_events_per_minute(state, 'main', events, result.get_timestamp, direction) + events_report = "Events #{direction} per minute: #{events_per_minute}" + + if crit_max && crit_max < events_per_minute + Critical.new(events_report) + elsif crit_min && crit_min > events_per_minute + Critical.new(events_report) + elsif warn_max && warn_max < events_per_minute + Warning.new(events_report) + elsif warn_min && warn_min > events_per_minute + Warning.new(events_report) + else + OK.new(events_report) + end + end + end + INFLIGHT_EVENTS_REPORT = 'Inflight events: %d' def inflight_events_health(result) # Since version 6.0.0 it's possible to define multiple pipelines and give them a name. @@ -457,7 +688,7 @@ def inflight_events_health(result) if critical_counter > 0 Critical.new(inflight_events_report) elsif warn_counter > 0 - Warning.new(infligh_events_report) + Warning.new(inflight_events_report) else OK.new(inflight_events_report) end