Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline.id to log lines #11087

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/jvm.options
Expand Up @@ -76,3 +76,6 @@

# Entropy source for randomness
-Djava.security.egd=file:/dev/urandom

# Copy the logging context from parent threads to children
-Dlog4j2.isThreadContextMapInheritable=true
4 changes: 2 additions & 2 deletions config/log4j2.properties
Expand Up @@ -4,7 +4,7 @@ name = LogstashPropertiesConfig
appender.console.type = Console
appender.console.name = plain_console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %m%n
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %m%n

appender.json_console.type = Console
appender.json_console.name = json_console
Expand All @@ -21,7 +21,7 @@ appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval = 1
appender.rolling.policies.time.modulate = true
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %-.10000m%n
appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c]%notEmpty{[%X{pipeline.id}]} %-.10000m%n
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
Expand Down
3 changes: 2 additions & 1 deletion logstash-core/lib/logstash/agent.rb
Expand Up @@ -89,6 +89,7 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)

def execute
@thread = Thread.current # this var is implicitly used by Stud.stop?
LogStash::Util.set_thread_name("Agent thread")
logger.debug("Starting agent")

transition_to_running
Expand Down Expand Up @@ -307,7 +308,7 @@ def converge_state(pipeline_actions)

pipeline_actions.map do |action|
Thread.new(action, converge_result) do |action, converge_result|
java.lang.Thread.currentThread().setName("Converge #{action}");
LogStash::Util.set_thread_name("Converge #{action}")
# We execute every task we need to converge the current state of pipelines
# for every task we will record the action result, that will help us
# the results of all the task will determine if the converge was successful or not
Expand Down
5 changes: 5 additions & 0 deletions logstash-core/lib/logstash/java_pipeline.rb
Expand Up @@ -8,6 +8,8 @@
require "logstash/compiler"
require "logstash/config/lir_serializer"

java_import org.apache.logging.log4j.ThreadContext

module LogStash; class JavaPipeline < JavaBasePipeline
include LogStash::Util::Loggable
attr_reader \
Expand Down Expand Up @@ -102,6 +104,7 @@ def start
@thread = Thread.new do
begin
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
ThreadContext.put("pipeline.id", pipeline_id)
run
@finished_run.make_true
rescue => e
Expand Down Expand Up @@ -236,6 +239,7 @@ def start_workers
pipeline_workers.times do |t|
thread = Thread.new do
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
ThreadContext.put("pipeline.id", pipeline_id)
org.logstash.execution.WorkerLoop.new(
lir_execution, filter_queue_client, @events_filtered, @events_consumed,
@flushRequested, @flushing, @shutdownRequested, @drain_queue).run
Expand Down Expand Up @@ -305,6 +309,7 @@ def start_input(plugin)

def inputworker(plugin)
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
ThreadContext.put("pipeline.id", pipeline_id)
begin
plugin.run(wrapped_write_client(plugin.id.to_sym))
rescue => e
Expand Down
11 changes: 9 additions & 2 deletions logstash-core/lib/logstash/pipeline.rb
Expand Up @@ -12,6 +12,8 @@
require "logstash/filter_delegator"
require "logstash/compiler"

java_import org.apache.logging.log4j.ThreadContext

module LogStash; class BasePipeline < AbstractPipeline
include LogStash::Util::Loggable

Expand Down Expand Up @@ -172,7 +174,8 @@ def start

@thread = Thread.new do
begin
LogStash::Util.set_thread_name("pipeline.#{pipeline_id}")
LogStash::Util.set_thread_name("[#{pipeline_id}]-manager")
ThreadContext.put("pipeline.id", pipeline_id)
run
@finished_run.make_true
rescue => e
Expand Down Expand Up @@ -300,7 +303,8 @@ def start_workers

pipeline_workers.times do |t|
thread = Thread.new(batch_size, batch_delay, self) do |_b_size, _b_delay, _pipeline|
Util.set_thread_name("[#{pipeline_id}]>worker#{t}")
LogStash::Util::set_thread_name("[#{pipeline_id}]>worker#{t}")
ThreadContext.put("pipeline.id", pipeline_id)
_pipeline.worker_loop(_b_size, _b_delay)
end
@worker_threads << thread
Expand Down Expand Up @@ -430,6 +434,7 @@ def start_input(plugin)

def inputworker(plugin)
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
ThreadContext.put("pipeline.id", pipeline_id)
begin
plugin.run(wrapped_write_client(plugin.id.to_sym))
rescue => e
Expand Down Expand Up @@ -535,6 +540,8 @@ def start_flusher
raise "Attempted to start flusher on a stopped pipeline!" if stopped?

@flusher_thread = Thread.new do
LogStash::Util.set_thread_name("[#{pipeline_id}]-flusher-thread")
ThreadContext.put("pipeline.id", pipeline_id)
while Stud.stoppable_sleep(5, 0.1) { stopped? }
flush
break if stopped?
Expand Down
Expand Up @@ -60,7 +60,10 @@ public IRubyObject start(final ThreadContext context) {
} else {
queueWriter = qw;
}
Thread t = new Thread(() -> input.start(queueWriter::push));
Thread t = new Thread(() -> {
org.apache.logging.log4j.ThreadContext.put("pipeline.id", pipeline.pipelineId().toString());
input.start(queueWriter::push);
});
t.setName(pipeline.pipelineId().asJavaString() + "_" + input.getName() + "_" + input.getId());
t.start();
return JavaObject.wrap(context.getRuntime(), t);
Expand Down
12 changes: 12 additions & 0 deletions qa/integration/fixtures/pipeline_id_log_spec.yml
@@ -0,0 +1,12 @@
---
services:
- logstash
config: |-
input {
generator {
count => 4
}
}
output {
null {}
}
47 changes: 47 additions & 0 deletions qa/integration/specs/pipeline_id_log_spec.rb
@@ -0,0 +1,47 @@
require_relative '../framework/fixture'
require_relative '../framework/settings'
require_relative '../services/logstash_service'
require_relative '../framework/helpers'
require "logstash/devutils/rspec/spec_helper"
require "yaml"

describe "Test Logstash Pipeline id" do
before(:all) {
@fixture = Fixture.new(__FILE__)
# used in multiple LS tests
@ls = @fixture.get_service("logstash")
}

after(:all) {
@fixture.teardown
}

before(:each) {
# backup the application settings file -- logstash.yml
FileUtils.cp(@ls.application_settings_file, "#{@ls.application_settings_file}.original")
}

after(:each) {
@ls.teardown
# restore the application settings file -- logstash.yml
FileUtils.mv("#{@ls.application_settings_file}.original", @ls.application_settings_file)
}

let(:temp_dir) { Stud::Temporary.directory("logstash-pipelinelog-test") }
let(:config) { @fixture.config("root") }

it "should write logs with pipeline.id" do
pipeline_name = "custom_pipeline"
settings = {
"path.logs" => temp_dir,
"pipeline.id" => pipeline_name
}
IO.write(@ls.application_settings_file, settings.to_yaml)
@ls.spawn_logstash("-w", "1" , "-e", config)
@ls.wait_for_logstash
sleep 2 until @ls.exited?
plainlog_file = "#{temp_dir}/logstash-plain.log"
expect(File.exists?(plainlog_file)).to be true
expect(IO.read(plainlog_file) =~ /\[logstash.javapipeline\s*\]\[#{pipeline_name}\]/).to be > 0
end
end