Skip to content

Commit

Permalink
Implement metrics plugin mechanism
Browse files Browse the repository at this point in the history
Signed-off-by: Hiroshi Hatake <hatake@calyptia.com>
  • Loading branch information
cosmo0920 committed Jul 5, 2021
1 parent 91b12e8 commit d3505ea
Show file tree
Hide file tree
Showing 8 changed files with 695 additions and 2 deletions.
11 changes: 10 additions & 1 deletion lib/fluent/plugin.rb
Expand Up @@ -36,8 +36,9 @@ module Plugin
FORMATTER_REGISTRY = Registry.new(:formatter, 'fluent/plugin/formatter_', dir_search_prefix: 'formatter_')
STORAGE_REGISTRY = Registry.new(:storage, 'fluent/plugin/storage_', dir_search_prefix: 'storage_')
SD_REGISTRY = Registry.new(:sd, 'fluent/plugin/sd_', dir_search_prefix: 'sd_')
METRICS_REGISTRY = Registry.new(:metrics, 'fluent/plugin/metrics_', dir_search_prefix: 'metrics_')

REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY, STORAGE_REGISTRY, SD_REGISTRY]
REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY, STORAGE_REGISTRY, SD_REGISTRY, METRICS_REGISTRY]

def self.register_input(type, klass)
register_impl('input', INPUT_REGISTRY, type, klass)
Expand All @@ -59,6 +60,10 @@ def self.register_sd(type, klass)
register_impl('sd', SD_REGISTRY, type, klass)
end

def self.register_metrics(type, klass)
register_impl('metrics', METRICS_REGISTRY, type, klass)
end

def self.register_parser(type, klass_or_proc)
if klass_or_proc.is_a?(Regexp)
# This usage is not recommended for new API
Expand Down Expand Up @@ -121,6 +126,10 @@ def self.new_sd(type, parent: nil)
new_impl('sd', SD_REGISTRY, type, parent)
end

def self.new_metrics(type, parent: nil)
new_impl('metrics', METRICS_REGISTRY, type, parent)
end

class << self
# This should be defined for fluent-plugin-config-formatter type arguments.
alias_method :new_service_discovery, :new_sd
Expand Down
119 changes: 119 additions & 0 deletions lib/fluent/plugin/metrics.rb
@@ -0,0 +1,119 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'socket'

require 'fluent/plugin/base'

require 'fluent/log'
require 'fluent/unique_id'
require 'fluent/plugin_id'

module Fluent
module Plugin
class Metrics < Base
include PluginId
include PluginLoggerMixin
include UniqueId::Mixin

DEFAULT_TYPE = 'local'

configured_in :metrics

config_param :default_labels, :hash, default: {agent: "Fluentd", hostname: "#{Socket.gethostname}"}
config_param :labels, :hash, default: {}

attr_reader :use_gauge_metric
attr_reader :has_methods_for_gauge, :has_methods_for_counter

def initialize
super

@has_methods_for_counter = false
@has_methods_for_gauge = false
@use_gauge_metric = false
end

def configure(conf)
super

if use_gauge_metric
@has_methods_for_gauge = has_methods_for_gauge?
else
@has_methods_for_counter = has_methods_for_counter?
end
end

# Some metrics should be counted by gauge.
# ref: https://prometheus.io/docs/concepts/metric_types/#gauge
def use_gauge_metric=(use_gauge_metric=false)
@use_gauge_metric = use_gauge_metric
end

def create(namespace:, subsystem:,name:,help_text:,labels: {})
# This API is for cmetrics type.
end

def get(key)
raise NotImplementedError, "Implement this method in child class"
end

def inc(key)
raise NotImplementedError, "Implement this method in child class"
end

def dec(key)
raise NotImplementedError, "Implement this method in child class"
end

def add(key, value)
raise NotImplementedError, "Implement this method in child class"
end

def sub(key, value)
raise NotImplementedError, "Implement this method in child class"
end

def set(key, value)
raise NotImplementedError, "Implement this method in child class"
end

private

def has_methods_for_counter?
implemented_methods = self.class.instance_methods(false)

if [:get, :inc, :add].all? {|e| implemented_methods.include?(e)} &&
[:set].all?{|e| self.class.method_defined?(e)}
true
else
raise "BUG: metrics plugin on counter mode MUST implement `get`, `inc`, `add` methods. And aliased `set` methods should be aliased other methods"
end
end

def has_methods_for_gauge?
implemented_methods = self.class.instance_methods(false)

if [:get, :inc, :add].all? {|e| implemented_methods.include?(e)} &&
[:set, :dec, :sub].all?{|e| self.class.method_defined?(e)}
true
else
raise "BUG: metrics plugin on gauge mode MUST implement `get`, `inc`, and `add` methods. And `dec`, `sub`, and `set` methods should be aliased from other methods"
end
end
end
end
end
96 changes: 96 additions & 0 deletions lib/fluent/plugin/metrics_local.rb
@@ -0,0 +1,96 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin'
require 'fluent/plugin/metrics'

module Fluent
module Plugin
class LocalMetrics < Metrics
Fluent::Plugin.register_metrics('local', self)

def initialize
super
@store = Hash.new(0)
@monitor = Monitor.new
end

def configure(conf)
super

if use_gauge_metric
class << self
alias_method :dec, :dec_gauge
alias_method :set, :set_gauge
alias_method :sub, :sub_gauge
end
else
class << self
alias_method :set, :set_counter
end
end
end

def multi_workers_ready?
true
end

def get(key)
@monitor.synchronize do
@store[key.to_s]
end
end

def inc(key)
@monitor.synchronize do
@store[key.to_s] += 1
end
end

def dec_gauge(key)
@monitor.synchronize do
@store[key.to_s] -= 1
end
end

def add(key, value)
@monitor.synchronize do
@store[key.to_s] += value
end
end

def sub_gauge(key, value)
@monitor.synchronize do
@store[key.to_s] -= value
end
end

def set_counter(key, value)
return if @store[key.to_s] > value

@monitor.synchronize do
@store[key.to_s] = value
end
end

def set_gauge(key, value)
@monitor.synchronize do
@store[key.to_s] = value
end
end
end
end
end
1 change: 1 addition & 0 deletions lib/fluent/plugin_helper.rb
Expand Up @@ -32,6 +32,7 @@
require 'fluent/plugin_helper/record_accessor'
require 'fluent/plugin_helper/compat_parameters'
require 'fluent/plugin_helper/service_discovery'
require 'fluent/plugin_helper/metrics'

module Fluent
module PluginHelper
Expand Down
66 changes: 66 additions & 0 deletions lib/fluent/plugin_helper/metrics.rb
@@ -0,0 +1,66 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'forwardable'

require 'fluent/plugin'
require 'fluent/plugin/metrics'
require 'fluent/plugin_helper/timer'
require 'fluent/config/element'
require 'fluent/configurable'
require 'fluent/system_config'

module Fluent
module PluginHelper
module Metrics
include Fluent::SystemConfig::Mixin

def initialize
super
@_metrics = {} # usage => metrics_state
end

def configure(conf)
super
end

def metrics_create(namespace: "Fluentd", subsystem: "metrics", name:, help_text:, labels: {}, prefer_gauge: false)
metrics = if system_config.metrics
Fluent::Plugin.new_metrics(system_config.metrics[:@type], parent: self)
else
Fluent::Plugin.new_metrics(Fluent::Plugin::Metrics::DEFAULT_TYPE, parent: self)
end
config = if system_config.metrics
system_config.metrics.corresponding_config_element
else
Fluent::Config::Element.new('metrics', '', {'@type' => Fluent::Plugin::Metrics::DEFAULT_TYPE}, [])
end
metrics.use_gauge_metric = prefer_gauge
metrics.configure(config)
metrics.create(namespace: namespace, subsystem: subsystem, name: name, help_text: help_text, labels: labels)

@_metrics["#{self.plugin_id}_#{namespace}_#{subsystem}_#{name}"] = metrics

metrics
end

def terminate
@_metrics = {}
super
end
end
end
end
8 changes: 7 additions & 1 deletion lib/fluent/system_config.rb
Expand Up @@ -27,7 +27,8 @@ class SystemConfig
:log_event_verbose, :ignore_repeated_log_interval, :ignore_same_log_interval,
:without_source, :rpc_endpoint, :enable_get_dump, :process_name,
:file_permission, :dir_permission, :counter_server, :counter_client,
:strict_config_value, :enable_msgpack_time_support, :disable_shared_socket
:strict_config_value, :enable_msgpack_time_support, :disable_shared_socket,
:metrics
]

config_param :workers, :integer, default: 1
Expand Down Expand Up @@ -93,6 +94,11 @@ class SystemConfig
config_param :timeout, :time, default: nil
end

config_section :metrics, multi: false do
config_param :@type, :string, default: "local"
config_param :labels, :hash, default: {}
end

def self.create(conf, strict_config_value=false)
systems = conf.elements(name: 'system')
return SystemConfig.new if systems.empty?
Expand Down

0 comments on commit d3505ea

Please sign in to comment.