diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 85f182fdbbc..3e1aad5f1f9 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -455,6 +455,7 @@ def shutdown # TODO: should we also check against calling shutdown multiple times concurrently? stop_inputs wait_for_shutdown + ensure clear_pipeline_metrics end # def shutdown diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index 22915b85501..59700c86444 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -21,6 +21,7 @@ require_relative "../support/mocks_classes" require_relative "../support/helpers" require 'support/pipeline/pipeline_helpers' +require 'logstash/pipeline_action/reload' require "stud/try" require 'timeout' require "thread" @@ -75,6 +76,23 @@ def push_once end end +class DummyCrashingInput < LogStash::Inputs::Base + config_name "dummycrashinginput" + + config :on_register, :validate => :boolean + + def register + raise(LogStash::ConfigurationError, 'crashing input on register') if @on_register + end + + def run(queue) + while !stop? + queue << LogStash::Event.new + sleep(0.5) + end + end +end + class DummyCodec < LogStash::Codecs::Base config_name "dummycodec" milestone 2 @@ -1407,4 +1425,100 @@ def flush(options) end end end + + context "#shutdown" do + subject(:pipeline) { mock_java_pipeline_from_string(config, pipeline_settings_obj, pipeline_metric) } + let(:pipeline_metric) { LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new) } + let(:dummyinput_class) { DummyManualInputGenerator } + let(:dummyinput_config) { {'enable_metric' => true} } + let(:dummyinput) { dummyinput_class.new(dummyinput_config) } + let(:pipeline_id) { "shutdown" } + let(:config) { build_pipeline_string_config(dummyinput_config) } + + def build_pipeline_string_config(dummyinput_config) + <<-"EOS" + input { dummyinput { #{ dummyinput_config.to_s.tr('{},', '') } } } + filter { dummyfilter {} } + output { dummyoutput {} } + EOS + end + + before(:each) do + allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(::LogStash::Outputs::DummyOutput::new) + allow(DummyManualInputGenerator).to receive(:new).with(any_args).and_return(dummyinput) + + allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(dummyinput_class) + allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain) + allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter) + allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput) + + allow(pipeline).to receive(:clear_pipeline_metrics).and_call_original + allow(pipeline).to receive(:stop_inputs).and_call_original + allow(pipeline).to receive(:wait_for_shutdown).and_return(double(new: nil)) + end + + context "of a running pipeline" do + let(:pipeline_settings) { { "pipeline.batch.size" => 1, "pipeline.workers" => 1, "pipeline.id" => pipeline_id, "metric.collect" => true } } + + it "should clear the pipeline metrics" do + dummyinput.keep_running.make_true + + expect { pipeline.start }.to_not raise_error + + pipeline.shutdown + + expect(pipeline).to have_received(:clear_pipeline_metrics) + expect(pipeline).to have_received(:stop_inputs) + expect(pipeline).to have_received(:wait_for_shutdown) + end + end + + context "of a failed reloading pipeline" do + let(:agent) { double("agent") } + let(:dummyinput_class) { DummyCrashingInput } + let(:pipeline_settings) { { + "pipeline.id" => pipeline_id, + "pipeline.batch.size" => 1, + "pipeline.workers" => 1, + "metric.collect" => true, + "config.reload.automatic" => true + } } + let(:pipelines) do + registry = LogStash::PipelinesRegistry.new + registry.create_pipeline(pipeline_id.to_sym, pipeline) { true } + registry + end + + # This test ensure that even for failed pipeline reloads, the shutdown method cleans + # the pipeline's metric store. If those metrics are not cleaned, it would pilling up + # data from previous reloads attempts. + it "should clear the pipeline metrics for every shutdown" do + expect { pipeline.start }.to_not raise_error + + wait(10).for { pipeline.running? } + + new_config_string = build_pipeline_string_config({'on_register' => 'true' }) + reload_action = LogStash::PipelineAction::Reload.new( + mock_pipeline_config(pipeline_id, new_config_string, pipeline_settings), + pipeline_metric + ) + + # 1st failed attempt (finished_execution? = > false) + action_result = reload_action.execute(agent, pipelines) + expect(action_result.successful?).to be_falsy + expect(pipeline).to have_received(:clear_pipeline_metrics) + expect(pipeline).to have_received(:stop_inputs) + expect(pipeline).to have_received(:wait_for_shutdown) + + # 2nd failed attempt (finished_execution? = > true, as the previous reload failed) + pipeline = pipelines.states.get(pipeline_id.to_sym).pipeline + allow(pipeline).to receive(:wait_for_shutdown).and_return(double(new: nil)) + allow(pipeline).to receive(:clear_pipeline_metrics).and_call_original + + action_result = reload_action.execute(agent, pipelines) + expect(action_result.successful?).to be_falsy + expect(pipeline).to have_received(:clear_pipeline_metrics) + end + end + end end