Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reloading pipelines call plugin initialize, which may cause resource leaks #8902

Open
jakelandis opened this issue Dec 29, 2017 · 7 comments

Comments

@jakelandis
Copy link
Contributor

Tested against master (7.0.0-alpha1 pre-release). Related: logstash-plugins/logstash-filter-grok#132

Reloading a pipeline causes the initialize method of the plugin to be called be called twice. One time to validate the configuration and another to acutally start the pipeline. This can result in un-expected behavior if the plugin initialize method needs to be paried with a cleanup type method.

Specifically this causes a thread leak in the grok plugin since that plugin creates a new thread in the initialize method that is expected to be joined with the close method. Since the initialize is getting called twice, where the first time, the close method is never called, that thread is created and never destroyed.

Below is a log with the grok plugin and a trace of who is calling iniatilize (commit: 590e47f)

  def initialize(params)
      puts caller

Below illustrates that when reload.rb is called it results in calling the initialize method twice for a single reload.

Sending Logstash's logs to /Users/jake/workspace/logstash/logs which is now configured via log4j2.properties
[2017-12-29T15:57:17,379][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/Users/jake/workspace/logstash/modules/fb_apache/configuration"}
[2017-12-29T15:57:17,386][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/Users/jake/workspace/logstash/modules/netflow/configuration"}
[2017-12-29T15:57:17,417][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2017-12-29T15:57:17,511][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.0.0-alpha1"}
[2017-12-29T15:57:17,576][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
/Users/jake/workspace/logstash/logstash-core/lib/logstash/filter_delegator.rb:23:in `initialize'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/plugins/plugin_factory.rb:88:in `plugin'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:112:in `plugin'
(eval):12:in `<eval>'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:84:in `eval'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:84:in `initialize'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:169:in `initialize'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:40:in `execute'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:315:in `block in converge_state'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:312:in `block in converge_state'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:299:in `each'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:299:in `converge_state'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:166:in `block in converge_state_and_update'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:164:in `converge_state_and_update'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:90:in `execute'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/runner.rb:343:in `block in execute'
/Users/jake/workspace/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'
[2017-12-29T15:57:17,943][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2017-12-29T15:57:18,041][INFO ][logstash.pipeline        ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x67f529f@/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:246 sleep>"}
[2017-12-29T15:57:18,053][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
/Users/jake/workspace/logstash/logstash-core/lib/logstash/filter_delegator.rb:23:in `initialize'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/plugins/plugin_factory.rb:88:in `plugin'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:112:in `plugin'
(eval):12:in `<eval>'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:84:in `eval'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:84:in `initialize'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline_action/reload.rb:34:in `execute'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:315:in `block in converge_state'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:312:in `block in converge_state'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:299:in `each'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:299:in `converge_state'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:166:in `block in converge_state_and_update'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:164:in `converge_state_and_update'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:105:in `block in execute'
/Users/jake/workspace/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/interval.rb:18:in `interval'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:94:in `execute'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/runner.rb:343:in `block in execute'
/Users/jake/workspace/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'
[2017-12-29T15:57:27,228][INFO ][logstash.pipelineaction.reload] Reloading pipeline {"pipeline.id"=>:main}
[2017-12-29T15:57:27,548][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x67f529f@/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:246 run>"}
/Users/jake/workspace/logstash/logstash-core/lib/logstash/filter_delegator.rb:23:in `initialize'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/plugins/plugin_factory.rb:88:in `plugin'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:112:in `plugin'
(eval):12:in `<eval>'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:84:in `eval'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:84:in `initialize'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:169:in `initialize'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:40:in `execute'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline_action/reload.rb:48:in `execute'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:315:in `block in converge_state'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:312:in `block in converge_state'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:299:in `each'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:299:in `converge_state'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:166:in `block in converge_state_and_update'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:164:in `converge_state_and_update'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:105:in `block in execute'
/Users/jake/workspace/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/interval.rb:18:in `interval'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/agent.rb:94:in `execute'
/Users/jake/workspace/logstash/logstash-core/lib/logstash/runner.rb:343:in `block in execute'
/Users/jake/workspace/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'
[2017-12-29T15:57:27,641][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2017-12-29T15:57:27,675][INFO ][logstash.pipeline        ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0x502f150b@/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:246 sleep>"}
[2017-12-29T15:57:27,680][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2017-12-29T15:57:33,125][WARN ][logstash.runner          ] SIGINT received. Shutting down.
stopping!
[2017-12-29T15:57:33,458][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x502f150b@/Users/jake/workspace/logstash/logstash-core/lib/logstash/pipeline.rb:246 run>"}

Process finished with exit code 0
@jakelandis
Copy link
Contributor Author

Below is the code that is calling initialize method without a clean up method. I suggest we just nuke it... It doesn't make much sense why we wouldn't let a reload happen if the new pipeline can't be reloaded. The prior pipeline is already checked before allowing reload.

      begin
        pipeline_validator =
          if @pipeline_config.settings.get_value("pipeline.java_execution")
            LogStash::JavaBasePipeline.new(@pipeline_config)
          else
            LogStash::BasePipeline.new(@pipeline_config)
          end
      rescue => e
        return LogStash::ConvergeResult::FailedAction.from_exception(e)
      end

      if !pipeline_validator.reloadable?
        return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the new pipeline is not reloadable")
      end

jakelandis added a commit to jakelandis/logstash that referenced this issue Dec 29, 2017
This change stops checking to see if the next pipeline may be reloaded. That check calls the plugin constructor without calling the effective desconstructor which can result in resource leaks if resources are setup in the initialize method. Also see: logstash-plugins/logstash-filter-grok#132

fixes elastic#8902
@jakelandis
Copy link
Contributor Author

After some discussions on PR #8903, rather then simply nuke the reload check, we should iterate through each plugin we instantiate and ensure that close is always called.

For bonus points we should also improve the logging when we can't reload.

@IrlJidel
Copy link
Contributor

When checking my custom plugins to see if I explicitly defined an initialize method instead of just register I noticed that codecs appear to call register twice

[INFO ][logstash.codecs.XXXXX ] registered XXXXX plugin {:id=>"e78f8f1aa72b11013cc9c02f3f40aca6925f1c9a-10"}
[INFO ][logstash.codecs.XXXXX ] registered XXXXX plugin {:id=>"e78f8f1aa72b11013cc9c02f3f40aca6925f1c9a-10"}

It appears codec initialize method also calls register

codecs/base.rb

  def initialize(params={})
    super
    config_init(@params)
    register if respond_to?(:register)
    setup_multi_encode!
end

@marioplumbarius
Copy link

@jakelandis Any plans to dig into this?

@jakelandis
Copy link
Contributor Author

@marioluan - Not in the next couple weeks. We fixed the one identified leak in the grok filter by moving the logic out of the initialization.

However, the root cause here should still be addressed since it is too easy of a mistake and breaks reasonable assumptions about how things work.

@IrlJidel
Copy link
Contributor

IrlJidel commented Feb 6, 2018

@jakelandis any impact with codecs calling register twice?

@jakelandis
Copy link
Contributor Author

@IrlJidel - possible, but unlikely. We shouldn't call register twice, but given the typical scope of a codec it likely harmless. A codec wouldn't typically would spin up new threads or consume external resources in it's setup. (it could, just unlikely)

I logged #9108 to address the oddity of calling register from initialize.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants