Skip to content

Commit

Permalink
Ensure pipeline metrics are cleared on the pipeline shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
edmocosta committed Jun 26, 2024
1 parent efa8378 commit ed01cba
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 0 deletions.
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ def shutdown
# TODO: should we also check against calling shutdown multiple times concurrently?
stop_inputs
wait_for_shutdown
ensure
clear_pipeline_metrics
end # def shutdown

Expand Down
114 changes: 114 additions & 0 deletions logstash-core/spec/logstash/java_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
require_relative "../support/mocks_classes"
require_relative "../support/helpers"
require 'support/pipeline/pipeline_helpers'
require 'logstash/pipeline_action/reload'
require "stud/try"
require 'timeout'
require "thread"
Expand Down Expand Up @@ -75,6 +76,23 @@ def push_once
end
end

class DummyCrashingInput < LogStash::Inputs::Base
config_name "dummycrashinginput"

config :on_register, :validate => :boolean

def register
raise(LogStash::ConfigurationError, 'crashing input on register') if @on_register
end

def run(queue)
while !stop?
queue << LogStash::Event.new
sleep(0.5)
end
end
end

class DummyCodec < LogStash::Codecs::Base
config_name "dummycodec"
milestone 2
Expand Down Expand Up @@ -1407,4 +1425,100 @@ def flush(options)
end
end
end

context "#shutdown" do
subject(:pipeline) { mock_java_pipeline_from_string(config, pipeline_settings_obj, pipeline_metric) }
let(:pipeline_metric) { LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new) }
let(:dummyinput_class) { DummyManualInputGenerator }
let(:dummyinput_config) { {'enable_metric' => true} }
let(:dummyinput) { dummyinput_class.new(dummyinput_config) }
let(:pipeline_id) { "shutdown" }
let(:config) { build_pipeline_string_config(dummyinput_config) }

def build_pipeline_string_config(dummyinput_config)
<<-"EOS"
input { dummyinput { #{ dummyinput_config.to_s.tr('{},', '') } } }
filter { dummyfilter {} }
output { dummyoutput {} }
EOS
end

before(:each) do
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(::LogStash::Outputs::DummyOutput::new)
allow(DummyManualInputGenerator).to receive(:new).with(any_args).and_return(dummyinput)

allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(dummyinput_class)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)

allow(pipeline).to receive(:clear_pipeline_metrics).and_call_original
allow(pipeline).to receive(:stop_inputs).and_call_original
allow(pipeline).to receive(:wait_for_shutdown).and_return(double(new: nil))
end

context "of a running pipeline" do
let(:pipeline_settings) { { "pipeline.batch.size" => 1, "pipeline.workers" => 1, "pipeline.id" => pipeline_id, "metric.collect" => true } }

it "should clear the pipeline metrics" do
dummyinput.keep_running.make_true

expect { pipeline.start }.to_not raise_error

pipeline.shutdown

expect(pipeline).to have_received(:clear_pipeline_metrics)
expect(pipeline).to have_received(:stop_inputs)
expect(pipeline).to have_received(:wait_for_shutdown)
end
end

context "of a failed reloading pipeline" do
let(:agent) { double("agent") }
let(:dummyinput_class) { DummyCrashingInput }
let(:pipeline_settings) { {
"pipeline.id" => pipeline_id,
"pipeline.batch.size" => 1,
"pipeline.workers" => 1,
"metric.collect" => true,
"config.reload.automatic" => true
} }
let(:pipelines) do
registry = LogStash::PipelinesRegistry.new
registry.create_pipeline(pipeline_id.to_sym, pipeline) { true }
registry
end

# This test ensure that even for failed pipeline reloads, the shutdown method cleans
# the pipeline's metric store. If those metrics are not cleaned, it would pilling up
# data from previous reloads attempts.
it "should clear the pipeline metrics for every shutdown" do
expect { pipeline.start }.to_not raise_error

wait(10).for { pipeline.running? }

new_config_string = build_pipeline_string_config({'on_register' => 'true' })
reload_action = LogStash::PipelineAction::Reload.new(
mock_pipeline_config(pipeline_id, new_config_string, pipeline_settings),
pipeline_metric
)

# 1st failed attempt (finished_execution? = > false)
action_result = reload_action.execute(agent, pipelines)
expect(action_result.successful?).to be_falsy
expect(pipeline).to have_received(:clear_pipeline_metrics)
expect(pipeline).to have_received(:stop_inputs)
expect(pipeline).to have_received(:wait_for_shutdown)

# 2nd failed attempt (finished_execution? = > true, as the previous reload failed)
pipeline = pipelines.states.get(pipeline_id.to_sym).pipeline
allow(pipeline).to receive(:wait_for_shutdown).and_return(double(new: nil))
allow(pipeline).to receive(:clear_pipeline_metrics).and_call_original

action_result = reload_action.execute(agent, pipelines)
expect(action_result.successful?).to be_falsy
expect(pipeline).to have_received(:clear_pipeline_metrics)
end
end
end
end

0 comments on commit ed01cba

Please sign in to comment.