Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support tls in syslog #2399

Merged
merged 4 commits into from May 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 12 additions & 6 deletions lib/fluent/plugin/in_syslog.rb
Expand Up @@ -74,7 +74,7 @@ class SyslogInput < Input
desc 'The prefix of the tag. The tag itself is generated by the tag prefix, facility level, and priority.'
config_param :tag, :string
desc 'The transport protocol used to receive logs.(udp, tcp)'
config_param :protocol_type, :enum, list: [:tcp, :udp], default: :udp
config_param :protocol_type, :enum, list: [:tcp, :udp], default: nil, deprecated: "use transport directive"
desc 'The message frame type.(traditional, octet_count)'
config_param :frame_type, :enum, list: [:traditional, :octet_count], default: :traditional

Expand Down Expand Up @@ -107,6 +107,11 @@ class SyslogInput < Input
config_param :with_priority, :bool, default: true
end

# overwrite server plugin to change default to :udp
config_section :transport, required: false, multi: false, init: true, param_name: :transport_config do
config_argument :protocol, :enum, list: [:tcp, :udp, :tls], default: :udp
end

def configure(conf)
compat_parameters_convert(conf, :parser)

Expand Down Expand Up @@ -141,12 +146,13 @@ def multi_workers_ready?
def start
super

log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type}"
case @protocol_type
log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type || @transport_config.protocol}"
case @protocol_type || @transport_config.protocol
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default @transport_config.protocol is tcp. Change default to udp.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch 🙏
f23566a

when :udp then start_udp_server
when :tcp then start_tcp_server
when :tls then start_tcp_server(tls: true)
else
raise "BUG: invalid protocol_type value:#{@protocol_type}"
raise "BUG: invalid transport value: #{@protocol_type || @transport_config.protocol}"
end
end

Expand All @@ -156,12 +162,12 @@ def start_udp_server
end
end

def start_tcp_server
def start_tcp_server(tls: false)
octet_count_frame = @frame_type == :octet_count

delimiter = octet_count_frame ? " " : @delimiter
delimiter_size = delimiter.size
server_create_connection(:in_syslog_tcp_server, @port, bind: @bind, resolve_name: @resolve_hostname) do |conn|
server_create_connection(tls ? :in_syslog_tls_server : :in_syslog_tcp_server, @port, bind: @bind, resolve_name: @resolve_hostname) do |conn|
conn.data do |data|
buffer = conn.buffer
buffer << data
Expand Down
37 changes: 33 additions & 4 deletions test/plugin/test_in_syslog.rb
Expand Up @@ -55,6 +55,35 @@ def test_configure_resolve_hostname(param)
end
end

data('Use protocol_type' => ['protocol_type tcp', :tcp, :udp],
'Use transport' => ["<transport tcp>\n </transport>", nil, :tcp],
'Use transport and protocol' => ["protocol_type udp\n<transport tcp>\n </transport>", :udp, :tcp])
def test_configure_protocol(param)
conf, proto_type, transport_proto_type = *param
d = create_driver([CONFIG, conf].join("\n"))

assert_equal(d.instance.protocol_type, proto_type)
assert_equal(d.instance.transport_config.protocol, transport_proto_type)
end

# For backward compat
def test_respect_protocol_type_than_transport
d = create_driver([CONFIG, "<transport tcp> \n</transport>", "protocol_type udp"].join("\n"))
tests = create_test_case

d.run(expect_emits: 2) do
u = UDPSocket.new
u.connect('127.0.0.1', PORT)
tests.each {|test|
u.send(test['msg'], 0)
}
end

assert(d.events.size > 0)
compare_test_result(d.events, tests)
end


data(
ipv4: ['127.0.0.1', CONFIG, ::Socket::AF_INET],
ipv6: ['::1', IPv6_CONFIG, ::Socket::AF_INET6],
Expand Down Expand Up @@ -119,7 +148,7 @@ def test_msg_size_udp_for_large_msg
end

def test_msg_size_with_tcp
d = create_driver([CONFIG, 'protocol_type tcp'].join("\n"))
d = create_driver([CONFIG, "<transport tcp> \n</transport>"].join("\n"))
tests = create_test_case

d.run(expect_emits: 2) do
Expand All @@ -135,7 +164,7 @@ def test_msg_size_with_tcp
end

def test_msg_size_with_same_tcp_connection
d = create_driver([CONFIG, 'protocol_type tcp'].join("\n"))
d = create_driver([CONFIG, "<transport tcp> \n</transport>"].join("\n"))
tests = create_test_case

d.run(expect_emits: 2) do
Expand Down Expand Up @@ -289,7 +318,7 @@ def compare_test_result(events, tests, options = {})

sub_test_case 'octet counting frame' do
def test_msg_size_with_tcp
d = create_driver([CONFIG, 'protocol_type tcp', 'frame_type octet_count'].join("\n"))
d = create_driver([CONFIG, "<transport tcp> \n</transport>", 'frame_type octet_count'].join("\n"))
tests = create_test_case

d.run(expect_emits: 2) do
Expand All @@ -305,7 +334,7 @@ def test_msg_size_with_tcp
end

def test_msg_size_with_same_tcp_connection
d = create_driver([CONFIG, 'protocol_type tcp', 'frame_type octet_count'].join("\n"))
d = create_driver([CONFIG, "<transport tcp> \n</transport>", 'frame_type octet_count'].join("\n"))
tests = create_test_case

d.run(expect_emits: 2) do
Expand Down