Skip to content

Commit

Permalink
Merge pull request #2651 from ganmacs/fix-dry-run-mode
Browse files Browse the repository at this point in the history
Fix dry run mode
  • Loading branch information
ganmacs committed Nov 5, 2019
2 parents 572aa34 + 4db63ce commit e4a1dd7
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 78 deletions.
4 changes: 3 additions & 1 deletion lib/fluent/agent.rb
Expand Up @@ -62,7 +62,9 @@ def configure(conf)

# initialize <match> and <filter> elements
conf.elements('filter', 'match').each { |e|
next if e.for_another_worker?
if !Fluent::Engine.supervisor_mode && e.for_another_worker?
next
end
pattern = e.arg.empty? ? '**' : e.arg
type = e['@type']
raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/command/fluentd.rb
Expand Up @@ -323,7 +323,7 @@

supervisor = Fluent::Supervisor.new(opts)
supervisor.configure(supervisor: true)
supervisor.run_supervisor
supervisor.run_supervisor(dry_run: opts[:dry_run])
else
if opts[:standalone_worker] && opts[:workers] && opts[:workers] > 1
puts "Error: multi workers is not supported with --no-supervisor"
Expand Down
57 changes: 28 additions & 29 deletions lib/fluent/engine.rb
Expand Up @@ -41,16 +41,16 @@ def initialize
@fluent_log_event_router = nil
@system_config = SystemConfig.new

@dry_run_mode = false
@supervisor_mode = false
end

MAINLOOP_SLEEP_INTERVAL = 0.3

attr_reader :root_agent, :system_config
attr_accessor :dry_run_mode
attr_reader :root_agent, :system_config, :supervisor_mode

def init(system_config)
def init(system_config, supervisor_mode: false)
@system_config = system_config
@supervisor_mode = supervisor_mode

@suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
Expand All @@ -75,34 +75,29 @@ def parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
end
end

def run_configure(conf)
def run_configure(conf, dry_run: false)
configure(conf)
conf.check_not_fetched { |key, e|
conf.check_not_fetched do |key, e|
parent_name, plugin_name = e.unused_in
if parent_name
message = if plugin_name
"section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin"
else
"section <#{e.name}> is not used in <#{parent_name}>"
end
if e.for_every_workers?
$log.warn :worker0, message
elsif e.for_this_worker?
$log.warn message
end
next
message = if parent_name && plugin_name
"section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin"
elsif parent_name
"section <#{e.name}> is not used in <#{parent_name}>"
elsif e.name != 'system' && !(@without_source && e.name == 'source')
"parameter '#{key}' in #{e.to_s.strip} is not used."
else
nil
end
next if message.nil?

if dry_run && @supervisor_mode
$log.warn :supervisor, message
elsif e.for_every_workers?
$log.warn :worker0, message
elsif e.for_this_worker?
$log.warn message
end
unless e.name == 'system'
unless @without_source && e.name == 'source'
message = "parameter '#{key}' in #{e.to_s.strip} is not used."
if e.for_every_workers?
$log.warn :worker0, message
elsif e.for_this_worker?
$log.warn message
end
end
end
}
end
end

def configure(conf)
Expand Down Expand Up @@ -182,6 +177,10 @@ def push_log_event(tag, time, record)
end

def worker_id
if @supervisor_mode
return -1
end

return @_worker_id if @_worker_id
# if ENV doesn't have SERVERENGINE_WORKER_ID, it is a worker under --no-supervisor or in tests
# so it's (almost) a single worker, worker_id=0
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/base.rb
Expand Up @@ -52,7 +52,7 @@ def fluentd_worker_id
end

def configure(conf)
if conf.respond_to?(:for_this_worker?) && conf.for_this_worker?
if Fluent::Engine.supervisor_mode || (conf.respond_to?(:for_this_worker?) && conf.for_this_worker?)
workers = if conf.target_worker_ids && !conf.target_worker_ids.empty?
conf.target_worker_ids.size
else
Expand Down
16 changes: 7 additions & 9 deletions lib/fluent/root_agent.rb
Expand Up @@ -89,7 +89,7 @@ def configure(conf)
raise Fluent::ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end
available_worker_ids.delete(target_worker_id) if available_worker_ids.include?(target_worker_id)
if used_worker_ids.include?(target_worker_id) && !Fluent::Engine.dry_run_mode
if used_worker_ids.include?(target_worker_id)
raise Fluent::ConfigError, "specified worker_id<#{worker_id}> collisions is detected on <worker> directive. Available worker id(s): #{available_worker_ids}"
end
used_worker_ids << target_worker_id
Expand All @@ -100,9 +100,6 @@ def configure(conf)
end
end

# On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0).
target_worker_ids = [0] if Fluent::Engine.dry_run_mode

unless target_worker_ids.empty?
e.set_target_worker_ids(target_worker_ids.uniq)
end
Expand All @@ -113,9 +110,6 @@ def configure(conf)
raise Fluent::ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end

## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0).
target_worker_id = 0 if Fluent::Engine.dry_run_mode

e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise Fluent::ConfigError, "<worker> section cannot have <#{elem.name}> directive"
Expand All @@ -132,7 +126,9 @@ def configure(conf)
# initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output.
label_configs = {}
conf.elements(name: 'label').each { |e|
next if e.for_another_worker?
if !Fluent::Engine.supervisor_mode && e.for_another_worker?
next
end
name = e.arg
raise ConfigError, "Missing symbol argument on <label> directive" if name.empty?

Expand All @@ -154,7 +150,9 @@ def configure(conf)
log.info :worker0, "'--without-source' is applied. Ignore <source> sections"
else
conf.elements(name: 'source').each { |e|
next if e.for_another_worker?
if !Fluent::Engine.supervisor_mode && e.for_another_worker?
next
end
type = e['@type']
raise ConfigError, "Missing '@type' parameter on <source> directive" unless type
add_source(type, e)
Expand Down
57 changes: 23 additions & 34 deletions lib/fluent/supervisor.rb
Expand Up @@ -450,7 +450,6 @@ def initialize(opt)
@use_v1_config = opt[:use_v1_config]
@conf_encoding = opt[:conf_encoding]
@log_path = opt[:log_path]
@dry_run = opt[:dry_run]
@show_plugin_config = opt[:show_plugin_config]
@libs = opt[:libs]
@plugin_dirs = opt[:plugin_dirs]
Expand All @@ -473,7 +472,11 @@ def initialize(opt)
@finished = false
end

def run_supervisor
def run_supervisor(dry_run: false)
if dry_run
$log.info "starting fluentd-#{Fluent::VERSION} as dry run mode", ruby: RUBY_VERSION
end

if @system_config.workers < 1
raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@system_config.workers}"
end
Expand All @@ -492,8 +495,24 @@ def run_supervisor
end
end
end
dry_run_cmd if @dry_run
supervise

begin
ServerEngine::Privilege.change(@chuser, @chgroup)
MessagePackFactory.init
Fluent::Engine.init(@system_config, supervisor_mode: true)
Fluent::Engine.run_configure(@conf, dry_run: dry_run)
rescue Fluent::ConfigError => e
$log.error 'config error', file: @config_path, error: e
$log.debug_backtrace
exit!(1)
end

if dry_run
$log.info 'finsihed dry run mode'
exit 0
else
supervise
end
end

def options
Expand Down Expand Up @@ -588,43 +607,13 @@ def create_socket_manager
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
end

def dry_run_cmd
$log.info "starting fluentd-#{Fluent::VERSION} as dry run mode", ruby: RUBY_VERSION
@system_config.suppress_config_dump = true
dry_run
exit 0
rescue => e
$log.error "dry run failed: #{e}"
exit 1
end

## Set Engine's dry_run_mode true to override all target_id of worker sections
def dry_run
begin
Fluent::Engine.dry_run_mode = true
ServerEngine::Privilege.change(@chuser, @chgroup)
MessagePackFactory.init
Fluent::Engine.init(@system_config)
Fluent::Engine.run_configure(@conf)
rescue Fluent::ConfigError => e
$log.error "config error", file: @config_path, error: e
$log.debug_backtrace
exit!(1)
ensure
Fluent::Engine.dry_run_mode = false
end
end

def show_plugin_config
name, type = @show_plugin_config.split(":") # input:tail
$log.info "show_plugin_config option is deprecated. Use fluent-plugin-config-format --format=txt #{name} #{type}"
exit 0
end

def supervise
# Make dumpable conf, which is set corresponding_proxies for all elements in all worker sections
dry_run

Process.setproctitle("supervisor:#{@system_config.process_name}") if @system_config.process_name
$log.info "starting fluentd-#{Fluent::VERSION}", pid: Process.pid, ruby: RUBY_VERSION

Expand Down
9 changes: 6 additions & 3 deletions test/command/test_fluentd.rb
Expand Up @@ -625,7 +625,8 @@ def assert_fluentd_fails_to_start(cmdline, *pattern_list, timeout: 10)
workers 2
</system>
<source>
@type single
@type dummy
tag dummy
@id single
@label @dummydata
</source>
Expand Down Expand Up @@ -669,7 +670,8 @@ def write(chunk)
workers 2
</system>
<source>
@type single
@type dummy
tag dummy
@id single
@label @dummydata
</source>
Expand Down Expand Up @@ -793,7 +795,7 @@ def multi_workers_ready?
@id blackhole
<buffer>
@type file
path #{File.join(@root_path, "buf", "file.*.log")}
path #{File.join(@root_path, "buf")}
</buffer>
</match>
</worker>
Expand Down Expand Up @@ -863,6 +865,7 @@ module Fluent::Plugin
class FakeInput < Input
Fluent::Plugin.register_input('fake', self)
config_param :secret, :string, secret: true
def multi_workers_ready?; true; end
end
end
EOC
Expand Down

0 comments on commit e4a1dd7

Please sign in to comment.