Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement metrics plugin mechanism #3440

Merged
merged 3 commits into from Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/fluent/plugin.rb
Expand Up @@ -36,8 +36,9 @@ module Plugin
FORMATTER_REGISTRY = Registry.new(:formatter, 'fluent/plugin/formatter_', dir_search_prefix: 'formatter_')
STORAGE_REGISTRY = Registry.new(:storage, 'fluent/plugin/storage_', dir_search_prefix: 'storage_')
SD_REGISTRY = Registry.new(:sd, 'fluent/plugin/sd_', dir_search_prefix: 'sd_')
METRICS_REGISTRY = Registry.new(:metrics, 'fluent/plugin/metrics_', dir_search_prefix: 'metrics_')

REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY, STORAGE_REGISTRY, SD_REGISTRY]
REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY, STORAGE_REGISTRY, SD_REGISTRY, METRICS_REGISTRY]

def self.register_input(type, klass)
register_impl('input', INPUT_REGISTRY, type, klass)
Expand All @@ -59,6 +60,10 @@ def self.register_sd(type, klass)
register_impl('sd', SD_REGISTRY, type, klass)
end

def self.register_metrics(type, klass)
register_impl('metrics', METRICS_REGISTRY, type, klass)
end

def self.register_parser(type, klass_or_proc)
if klass_or_proc.is_a?(Regexp)
# This usage is not recommended for new API
Expand Down Expand Up @@ -121,6 +126,10 @@ def self.new_sd(type, parent: nil)
new_impl('sd', SD_REGISTRY, type, parent)
end

def self.new_metrics(type, parent: nil)
new_impl('metrics', METRICS_REGISTRY, type, parent)
end

class << self
# This should be defined for fluent-plugin-config-formatter type arguments.
alias_method :new_service_discovery, :new_sd
Expand Down
119 changes: 119 additions & 0 deletions lib/fluent/plugin/metrics.rb
@@ -0,0 +1,119 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'socket'

require 'fluent/plugin/base'

require 'fluent/log'
require 'fluent/unique_id'
require 'fluent/plugin_id'

module Fluent
module Plugin
class Metrics < Base
include PluginId
include PluginLoggerMixin
include UniqueId::Mixin

DEFAULT_TYPE = 'local'

configured_in :metrics

config_param :default_labels, :hash, default: {agent: "Fluentd", hostname: "#{Socket.gethostname}"}
config_param :labels, :hash, default: {}

attr_reader :use_gauge_metric
attr_reader :has_methods_for_gauge, :has_methods_for_counter

def initialize
super

@has_methods_for_counter = false
@has_methods_for_gauge = false
@use_gauge_metric = false
end

def configure(conf)
super

if use_gauge_metric
@has_methods_for_gauge = has_methods_for_gauge?
else
@has_methods_for_counter = has_methods_for_counter?
end
end

# Some metrics should be counted by gauge.
# ref: https://prometheus.io/docs/concepts/metric_types/#gauge
def use_gauge_metric=(use_gauge_metric=false)
@use_gauge_metric = use_gauge_metric
end

def create(namespace:, subsystem:,name:,help_text:,labels: {})
# This API is for cmetrics type.
end

def get(key)
raise NotImplementedError, "Implement this method in child class"
end

def inc(key)
raise NotImplementedError, "Implement this method in child class"
end

def dec(key)
raise NotImplementedError, "Implement this method in child class"
end
kenhys marked this conversation as resolved.
Show resolved Hide resolved

def add(key, value)
raise NotImplementedError, "Implement this method in child class"
end

def sub(key, value)
raise NotImplementedError, "Implement this method in child class"
end

def set(key, value)
raise NotImplementedError, "Implement this method in child class"
end

private

def has_methods_for_counter?
implemented_methods = self.class.instance_methods(false)

if [:get, :inc, :add].all? {|e| implemented_methods.include?(e)} &&
[:set].all?{|e| self.class.method_defined?(e)}
true
else
raise "BUG: metrics plugin on counter mode MUST implement `get`, `inc`, `add` methods. And aliased `set` methods should be aliased from another method"
end
end

def has_methods_for_gauge?
implemented_methods = self.class.instance_methods(false)

if [:get, :inc, :add].all? {|e| implemented_methods.include?(e)} &&
[:set, :dec, :sub].all?{|e| self.class.method_defined?(e)}
true
else
raise "BUG: metrics plugin on gauge mode MUST implement `get`, `inc`, and `add` methods. And `dec`, `sub`, and `set` methods should be aliased from other methods"
end
end
end
end
end
96 changes: 96 additions & 0 deletions lib/fluent/plugin/metrics_local.rb
@@ -0,0 +1,96 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin'
require 'fluent/plugin/metrics'

module Fluent
module Plugin
class LocalMetrics < Metrics
Fluent::Plugin.register_metrics('local', self)

def initialize
super
@store = Hash.new(0)
@monitor = Monitor.new
end

def configure(conf)
super

if use_gauge_metric
class << self
alias_method :dec, :dec_gauge
alias_method :set, :set_gauge
alias_method :sub, :sub_gauge
end
else
class << self
alias_method :set, :set_counter
end
end
end

def multi_workers_ready?
true
end

def get(key)
@monitor.synchronize do
@store[key.to_s]
end
end

def inc(key)
@monitor.synchronize do
@store[key.to_s] += 1
end
end

def dec_gauge(key)
@monitor.synchronize do
@store[key.to_s] -= 1
end
end

def add(key, value)
@monitor.synchronize do
@store[key.to_s] += value
end
end

def sub_gauge(key, value)
@monitor.synchronize do
@store[key.to_s] -= value
end
end

def set_counter(key, value)
return if @store[key.to_s] > value

@monitor.synchronize do
@store[key.to_s] = value
end
end

def set_gauge(key, value)
@monitor.synchronize do
@store[key.to_s] = value
end
end
end
end
end
1 change: 1 addition & 0 deletions lib/fluent/plugin_helper.rb
Expand Up @@ -32,6 +32,7 @@
require 'fluent/plugin_helper/record_accessor'
require 'fluent/plugin_helper/compat_parameters'
require 'fluent/plugin_helper/service_discovery'
require 'fluent/plugin_helper/metrics'

module Fluent
module PluginHelper
Expand Down
125 changes: 125 additions & 0 deletions lib/fluent/plugin_helper/metrics.rb
@@ -0,0 +1,125 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'forwardable'

require 'fluent/plugin'
require 'fluent/plugin/metrics'
require 'fluent/plugin_helper/timer'
require 'fluent/config/element'
require 'fluent/configurable'
require 'fluent/system_config'

module Fluent
module PluginHelper
module Metrics
include Fluent::SystemConfig::Mixin

attr_reader :_metrics # For tests.

def initialize
super
@_metrics_started = false
@_metrics = {} # usage => metrics_state
end

def configure(conf)
super

@plugin_type_or_id = if self.plugin_id_configured?
self.plugin_id
else
"#{conf["@type"] || conf["type"]}.#{self.plugin_id}"
end
end

def metrics_create(namespace: "fluentd", subsystem: "metrics", name:, help_text:, labels: {}, prefer_gauge: false)
metrics = if system_config.metrics
Fluent::Plugin.new_metrics(system_config.metrics[:@type], parent: self)
else
Fluent::Plugin.new_metrics(Fluent::Plugin::Metrics::DEFAULT_TYPE, parent: self)
end
config = if system_config.metrics
system_config.metrics.corresponding_config_element
else
Fluent::Config::Element.new('metrics', '', {'@type' => Fluent::Plugin::Metrics::DEFAULT_TYPE}, [])
end
metrics.use_gauge_metric = prefer_gauge
metrics.configure(config)
# For multi workers environment, cmetrics should be distinguish with static labels.
if Fluent::Engine.system_config.workers > 1
labels.merge!(worker_id: fluentd_worker_id.to_s)
end
labels.merge!(plugin: @plugin_type_or_id)
metrics.create(namespace: namespace, subsystem: subsystem, name: name, help_text: help_text, labels: labels)

@_metrics["#{@plugin_type_or_id}_#{namespace}_#{subsystem}_#{name}"] = metrics

metrics
end

def metrics_operate(method_name, &block)
@_metrics.each_pair do |key, m|
begin
block.call(s) if block_given?
m.__send__(method_name)
rescue => e
log.error "unexpected error while #{method_name}", key: key, metrics: m, error: e
end
end
end

def start
super

metrics_operate(:start)
@_metrics_started = true
end

def stop
super
# timer stops automatically in super
metrics_operate(:stop)
end

def before_shutdown
metrics_operate(:before_shutdown)
super
end

def shutdown
metrics_operate(:shutdown)
super
end

def after_shutdown
metrics_operate(:after_shutdown)
super
end

def close
metrics_operate(:close)
super
end

def terminate
metrics_operate(:terminate)
@_metrics = {}
super
end
end
end
end