Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add <worker n> section to set a configuration for a specific worker #1507

Merged
merged 7 commits into from Apr 13, 2017

Conversation

mururu
Copy link
Member

@mururu mururu commented Mar 17, 2017

Closes #1392.

This PR introduce <worker n> section. Configurations under this section is enabled only in specified worker and act as if they work in single worker environment. More precisely, config elements in worker section get worker_target_id recursively. Prior to configure phase of corresponding plugins, system_config.workers for the plugins is overwritten as 1 if target_worker_id equals the worker's worker_id.
It also means that plugins(including owned plugins) which doesn't support multi worker feature can run only in worker section.
A worker section can have <label>, <source>, <filter> and <match> as a child element.

Example from #1392

<system>
  workers 8
  root_dir /path/fluentd/root
</system>
<source>  # top-level sections works on all workers in parallel
  @id edge_input
  @type forward
  @label @traffic
  port 24224
</source>
<label @traffic>  # this section works on all workers too
  <match **>
    @id backend_output
    @type forward
    <buffer>
      @type file
      flush_interval 8m
    </buffer>
  </match>
</label>
<worker 0> # this section works only on first worker process
  <source>
    @id system_log_reader
    @type tail
    tag monitoring
    path /path/now/watching/...
  </source>
  <match monitoring>
    @id output_for_monitoring
    @type elasticsearch
    # ...
  </match>
</worker>

@mururu mururu changed the title [WIP] Add <worker n> section to set a configuration for a specific worker Add <worker n> section to set a configuration for a specific worker Mar 23, 2017
@mururu
Copy link
Member Author

mururu commented Mar 23, 2017

Updated.
@repeatedly @tagomoris Could you review this?

@tagomoris
Copy link
Member

Please add an example configuration which use this feature under example/.

@mururu
Copy link
Member Author

mururu commented Mar 23, 2017

Done.

@mururu
Copy link
Member Author

mururu commented Mar 23, 2017

Updated the example to try this feature easily.

</inject>
</match>

<worker 0> # this section works only on first worker process
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This configuration doesn't use the <worker> section feature to execute plugins which doesn't support multi process workers.

}
end

def has_target?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#has_target doesn't show the correct meaning of this method. What's the target?
#has_target_worker_id looks better.

private
def worker_id
return @_worker_id if @_worker_id
@_worker_id = (ENV['SERVERENGINE_WORKER_ID'] || 0).to_i
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above deleted comment should be here.


target_worker_id = target_worker_id_str.to_i
if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise ConfigError, "worker#{target_worker_id} specified by <worker> directive doesn't exist. Specify id between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If specify -1, log message is worker-1. This is unclear, worker - 1 or worker -1.

"worker id #{target_worker_id} specified in directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"

@@ -62,6 +62,7 @@ def configure(conf)

# initialize <match> and <filter> elements
conf.elements('filter', 'match').each { |e|
next if e.has_target_worker_id? && e.target_worker_id != Fluent::Engine.worker_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract to method is better?

@@ -121,7 +122,8 @@ def lifecycle(desc: false)
end

def add_match(type, pattern, conf)
log.info :worker0, "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type
log_type = conf.target_worker_id == Fluent::Engine.worker_id ? :default : :worker0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract to method is better too?

@@ -614,5 +614,205 @@ def assert_fluentd_fails_to_start(cmdline, *pattern_list, timeout: 10)
"config error file=\"#{conf_path}\" error_class=Fluent::ConfigError error=\"Plugin 'file' does not support multi workers configuration (Fluent::Plugin::FileBuffer)\"",
)
end

test 'failed to start workers when configured plugins as chidren of MultiOutput do not support multi worker configuration' do
script = "require 'fluent/plugin/output'\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here document is better.

end

test 'success to start workers when configured plugins only for specific worker do not support multi worker configuration' do
script = "require 'fluent/plugin/input'\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

end

test 'success to start workers when configured plugins as a chidren of MultiOutput only for specific worker do not support multi worker configuration' do
script = "require 'fluent/plugin/output'\n"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@mururu
Copy link
Member Author

mururu commented Apr 10, 2017

I added #for_every_workers?, #for_this_worker? and #for_another_worker? instead of #has_target_worker_id? and #target_worker_id ==/!= Fluent::Engine.worker_id. How about that?

@repeatedly
Copy link
Member

LGTM :)

@repeatedly repeatedly merged commit 751d130 into fluent:master Apr 13, 2017
@repeatedly
Copy link
Member

Merged. Thanks for hard work!

@mururu
Copy link
Member Author

mururu commented Apr 13, 2017

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants