Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for source_hostname_key and source_address_key on umatched syslog messages #2553

Merged
merged 2 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 10 additions & 3 deletions lib/fluent/plugin/in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,21 @@ def start_tcp_server(tls: false)

private

def emit_unmatched(data, sock)
record = {"unmatched_line" => data}
record[@source_address_key] = sock.remote_addr if @source_address_key
record[@source_hostname_key] = sock.remote_host if @source_hostname_key
emit("#{@tag}.unmatched", Fluent::EventTime.now, record)
end

def message_handler(data, sock)
pri = nil
text = data
unless @parser_parse_priority
m = SYSLOG_REGEXP.match(data)
unless m
if @emit_unmatched_lines
emit("#{@tag}.unmatched", Fluent::EventTime.now, {"unmatched_line" => data})
emit_unmatched(data, sock)
end
log.warn "invalid syslog message: #{data.dump}"
return
Expand All @@ -218,7 +225,7 @@ def message_handler(data, sock)
@parser.parse(text) do |time, record|
unless time && record
if @emit_unmatched_lines
emit("#{@tag}.unmatched", Fluent::EventTime.now, {"unmatched_line" => text})
emit_unmatched(data, sock)
end
log.warn "failed to parse message", data: data
return
Expand All @@ -238,7 +245,7 @@ def message_handler(data, sock)
end
rescue => e
if @emit_unmatched_lines
emit("#{@tag}.unmatched", Fluent::EventTime.now, {"unmatched_line" => text})
emit_unmatched(data, sock)
end
log.error "invalid input", data: data, error: e
log.error_backtrace
Expand Down
62 changes: 56 additions & 6 deletions test/plugin/test_in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,31 @@ def create_test_case(large_message: false)
end
end

def test_emit_unmatched_lines
d = create_driver([CONFIG, 'emit_unmatched_lines true'].join("\n"))
tests = [
def create_unmatched_lines_test_case
[
# valid message
{'msg' => '<6>Sep 10 00:00:00 localhost logger: xxx', 'expected' => {'host'=>'localhost', 'ident'=>'logger', 'message'=>'xxx'}},
# missing priority
{'msg' => 'hello world', 'expected' => {'unmatched_line' => 'hello world'}},
# timestamp parsing failure
{'msg' => '<6>ZZZ 99 99:99:99 localhost logger: xxx', 'expected' => {'unmatched_line' => '<6>ZZZ 99 99:99:99 localhost logger: xxx'}},
]
end

def compare_unmatched_lines_test_result(events, tests, options = {})
events.each_index { |i|
tests[i]['expected'].each { |k,v|
assert_equal v, events[i][2][k], "No key <#{k}> in response or value mismatch"
}
assert_equal('syslog.unmatched', events[i][0], 'tag does not match syslog.unmatched') unless i==0
assert_equal(options[:address], events[i][2]['source_address'], 'response has no source_address or mismatch') if options[:address]
assert_equal(options[:hostname], events[i][2]['source_hostname'], 'response has no source_hostname or mismatch') if options[:hostname]
}
end

def test_emit_unmatched_lines
d = create_driver([CONFIG, 'emit_unmatched_lines true'].join("\n"))
tests = create_unmatched_lines_test_case

d.run(expect_emits: 3) do
u = UDPSocket.new
Expand All @@ -383,9 +398,44 @@ def test_emit_unmatched_lines
end

assert_equal tests.size, d.events.size
tests.size.times do |i|
assert_equal tests[i]['expected'], d.events[i][2]
assert_equal 'syslog.unmatched', d.events[i][0] unless i==0
compare_unmatched_lines_test_result(d.events, tests)
end

def test_emit_unmatched_lines_with_hostname
d = create_driver([CONFIG, 'emit_unmatched_lines true', 'source_hostname_key source_hostname'].join("\n"))
tests = create_unmatched_lines_test_case

hostname = nil
d.run(expect_emits: 3) do
u = UDPSocket.new
u.do_not_reverse_lookup = false
u.connect('127.0.0.1', PORT)
hostname = u.peeraddr[2]
tests.each {|test|
u.send(test['msg'], 0)
}
end

assert_equal tests.size, d.events.size
compare_unmatched_lines_test_result(d.events, tests, {hostname: hostname})
end

def test_emit_unmatched_lines_with_address
d = create_driver([CONFIG, 'emit_unmatched_lines true', 'source_address_key source_address'].join("\n"))
tests = create_unmatched_lines_test_case

address = nil
d.run(expect_emits: 3) do
u = UDPSocket.new
u.do_not_reverse_lookup = false
u.connect('127.0.0.1', PORT)
address = u.peeraddr[3]
tests.each {|test|
u.send(test['msg'], 0)
}
end

assert_equal tests.size, d.events.size
compare_unmatched_lines_test_result(d.events, tests, {address: address})
end
end