Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dry run mode #2651

Merged
merged 7 commits into from Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/fluent/agent.rb
Expand Up @@ -62,7 +62,9 @@ def configure(conf)

# initialize <match> and <filter> elements
conf.elements('filter', 'match').each { |e|
next if e.for_another_worker?
if !Fluent::Engine.supervisor_mode && e.for_another_worker?
next
end
pattern = e.arg.empty? ? '**' : e.arg
type = e['@type']
raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/command/fluentd.rb
Expand Up @@ -323,7 +323,7 @@

supervisor = Fluent::Supervisor.new(opts)
supervisor.configure(supervisor: true)
supervisor.run_supervisor
supervisor.run_supervisor(dry_run: opts[:dry_run])
else
if opts[:standalone_worker] && opts[:workers] && opts[:workers] > 1
puts "Error: multi workers is not supported with --no-supervisor"
Expand Down
57 changes: 28 additions & 29 deletions lib/fluent/engine.rb
Expand Up @@ -41,16 +41,16 @@ def initialize
@fluent_log_event_router = nil
@system_config = SystemConfig.new

@dry_run_mode = false
@supervisor_mode = false
end

MAINLOOP_SLEEP_INTERVAL = 0.3

attr_reader :root_agent, :system_config
attr_accessor :dry_run_mode
attr_reader :root_agent, :system_config, :supervisor_mode

def init(system_config)
def init(system_config, supervisor_mode: false)
@system_config = system_config
@supervisor_mode = supervisor_mode

@suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
Expand All @@ -75,34 +75,29 @@ def parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
end
end

def run_configure(conf)
def run_configure(conf, dry_run: false)
configure(conf)
conf.check_not_fetched { |key, e|
conf.check_not_fetched do |key, e|
parent_name, plugin_name = e.unused_in
if parent_name
message = if plugin_name
"section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin"
else
"section <#{e.name}> is not used in <#{parent_name}>"
end
if e.for_every_workers?
$log.warn :worker0, message
elsif e.for_this_worker?
$log.warn message
end
next
message = if parent_name && plugin_name
"section <#{e.name}> is not used in <#{parent_name}> of #{plugin_name} plugin"
elsif parent_name
"section <#{e.name}> is not used in <#{parent_name}>"
elsif e.name != 'system' && !(@without_source && e.name == 'source')
"parameter '#{key}' in #{e.to_s.strip} is not used."
else
nil
end
next if message.nil?

if dry_run && @supervisor_mode
$log.warn :supervisor, message
elsif e.for_every_workers?
$log.warn :worker0, message
elsif e.for_this_worker?
$log.warn message
end
unless e.name == 'system'
unless @without_source && e.name == 'source'
message = "parameter '#{key}' in #{e.to_s.strip} is not used."
if e.for_every_workers?
$log.warn :worker0, message
elsif e.for_this_worker?
$log.warn message
end
end
end
}
end
end

def configure(conf)
Expand Down Expand Up @@ -182,6 +177,10 @@ def push_log_event(tag, time, record)
end

def worker_id
if @supervisor_mode
return -1
end

return @_worker_id if @_worker_id
# if ENV doesn't have SERVERENGINE_WORKER_ID, it is a worker under --no-supervisor or in tests
# so it's (almost) a single worker, worker_id=0
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/base.rb
Expand Up @@ -52,7 +52,7 @@ def fluentd_worker_id
end

def configure(conf)
if conf.respond_to?(:for_this_worker?) && conf.for_this_worker?
if Fluent::Engine.supervisor_mode || (conf.respond_to?(:for_this_worker?) && conf.for_this_worker?)
workers = if conf.target_worker_ids && !conf.target_worker_ids.empty?
conf.target_worker_ids.size
else
Expand Down
16 changes: 7 additions & 9 deletions lib/fluent/root_agent.rb
Expand Up @@ -89,7 +89,7 @@ def configure(conf)
raise Fluent::ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end
available_worker_ids.delete(target_worker_id) if available_worker_ids.include?(target_worker_id)
if used_worker_ids.include?(target_worker_id) && !Fluent::Engine.dry_run_mode
if used_worker_ids.include?(target_worker_id)
raise Fluent::ConfigError, "specified worker_id<#{worker_id}> collisions is detected on <worker> directive. Available worker id(s): #{available_worker_ids}"
end
used_worker_ids << target_worker_id
Expand All @@ -100,9 +100,6 @@ def configure(conf)
end
end

# On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0).
target_worker_ids = [0] if Fluent::Engine.dry_run_mode

unless target_worker_ids.empty?
e.set_target_worker_ids(target_worker_ids.uniq)
end
Expand All @@ -113,9 +110,6 @@ def configure(conf)
raise Fluent::ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end

## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0).
target_worker_id = 0 if Fluent::Engine.dry_run_mode

e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise Fluent::ConfigError, "<worker> section cannot have <#{elem.name}> directive"
Expand All @@ -132,7 +126,9 @@ def configure(conf)
# initialize <label> elements before configuring all plugins to avoid 'label not found' in input, filter and output.
label_configs = {}
conf.elements(name: 'label').each { |e|
next if e.for_another_worker?
if !Fluent::Engine.supervisor_mode && e.for_another_worker?
next
end
name = e.arg
raise ConfigError, "Missing symbol argument on <label> directive" if name.empty?

Expand All @@ -154,7 +150,9 @@ def configure(conf)
log.info :worker0, "'--without-source' is applied. Ignore <source> sections"
else
conf.elements(name: 'source').each { |e|
next if e.for_another_worker?
if !Fluent::Engine.supervisor_mode && e.for_another_worker?
next
end
type = e['@type']
raise ConfigError, "Missing '@type' parameter on <source> directive" unless type
add_source(type, e)
Expand Down
57 changes: 23 additions & 34 deletions lib/fluent/supervisor.rb
Expand Up @@ -450,7 +450,6 @@ def initialize(opt)
@use_v1_config = opt[:use_v1_config]
@conf_encoding = opt[:conf_encoding]
@log_path = opt[:log_path]
@dry_run = opt[:dry_run]
@show_plugin_config = opt[:show_plugin_config]
@libs = opt[:libs]
@plugin_dirs = opt[:plugin_dirs]
Expand All @@ -473,7 +472,11 @@ def initialize(opt)
@finished = false
end

def run_supervisor
def run_supervisor(dry_run: false)
if dry_run
$log.info "starting fluentd-#{Fluent::VERSION} as dry run mode", ruby: RUBY_VERSION
end

if @system_config.workers < 1
raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@system_config.workers}"
end
Expand All @@ -492,8 +495,24 @@ def run_supervisor
end
end
end
dry_run_cmd if @dry_run
supervise

begin
ServerEngine::Privilege.change(@chuser, @chgroup)
MessagePackFactory.init
Fluent::Engine.init(@system_config, supervisor_mode: true)
Fluent::Engine.run_configure(@conf, dry_run: dry_run)
rescue Fluent::ConfigError => e
$log.error 'config error', file: @config_path, error: e
$log.debug_backtrace
exit!(1)
end

if dry_run
$log.info 'finsihed dry run mode'
exit 0
else
supervise
end
end

def options
Expand Down Expand Up @@ -588,43 +607,13 @@ def create_socket_manager
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
end

def dry_run_cmd
$log.info "starting fluentd-#{Fluent::VERSION} as dry run mode", ruby: RUBY_VERSION
@system_config.suppress_config_dump = true
dry_run
exit 0
rescue => e
$log.error "dry run failed: #{e}"
exit 1
end

## Set Engine's dry_run_mode true to override all target_id of worker sections
def dry_run
begin
Fluent::Engine.dry_run_mode = true
ServerEngine::Privilege.change(@chuser, @chgroup)
MessagePackFactory.init
Fluent::Engine.init(@system_config)
Fluent::Engine.run_configure(@conf)
rescue Fluent::ConfigError => e
$log.error "config error", file: @config_path, error: e
$log.debug_backtrace
exit!(1)
ensure
Fluent::Engine.dry_run_mode = false
end
end

def show_plugin_config
name, type = @show_plugin_config.split(":") # input:tail
$log.info "show_plugin_config option is deprecated. Use fluent-plugin-config-format --format=txt #{name} #{type}"
exit 0
end

def supervise
# Make dumpable conf, which is set corresponding_proxies for all elements in all worker sections
dry_run

Process.setproctitle("supervisor:#{@system_config.process_name}") if @system_config.process_name
$log.info "starting fluentd-#{Fluent::VERSION}", pid: Process.pid, ruby: RUBY_VERSION

Expand Down
9 changes: 6 additions & 3 deletions test/command/test_fluentd.rb
Expand Up @@ -625,7 +625,8 @@ def assert_fluentd_fails_to_start(cmdline, *pattern_list, timeout: 10)
workers 2
</system>
<source>
@type single
@type dummy
tag dummy
@id single
@label @dummydata
</source>
Expand Down Expand Up @@ -669,7 +670,8 @@ def write(chunk)
workers 2
</system>
<source>
@type single
@type dummy
tag dummy
@id single
@label @dummydata
</source>
Expand Down Expand Up @@ -793,7 +795,7 @@ def multi_workers_ready?
@id blackhole
<buffer>
@type file
path #{File.join(@root_path, "buf", "file.*.log")}
path #{File.join(@root_path, "buf")}
</buffer>
</match>
</worker>
Expand Down Expand Up @@ -863,6 +865,7 @@ module Fluent::Plugin
class FakeInput < Input
Fluent::Plugin.register_input('fake', self)
config_param :secret, :string, secret: true
def multi_workers_ready?; true; end
end
end
EOC
Expand Down