diff --git a/lib/fluent/plugin/in_nats.rb b/lib/fluent/plugin/in_nats.rb index 4d51fa6..193589a 100644 --- a/lib/fluent/plugin/in_nats.rb +++ b/lib/fluent/plugin/in_nats.rb @@ -52,18 +52,21 @@ def shutdown end def run + queues = @queue.split(',') EM.next_tick { @nats_conn = NATS.connect(@nats_config) { - @nats_conn.subscribe(@queue) do |msg, reply, sub| - tag = "#{@tag}.#{sub}" - begin - msg_json = JSON.parse(msg) - rescue JSON::ParserError => e - $log.error "Failed parsing JSON #{e.inspect}. Passing as a normal string" - msg_json = msg + queues.each do |queue| + @nats_conn.subscribe(queue) do |msg, reply, sub| + tag = "#{@tag}.#{sub}" + begin + msg_json = JSON.parse(msg) + rescue JSON::ParserError => e + $log.error "Failed parsing JSON #{e.inspect}. Passing as a normal string" + msg_json = msg + end + time = Engine.now + Engine.emit(tag, time, msg_json) end - time = Engine.now - Engine.emit(tag, time, msg_json) end } } diff --git a/test/plugin/in_nats.rb b/test/plugin/in_nats.rb index cdabef1..81c1309 100644 --- a/test/plugin/in_nats.rb +++ b/test/plugin/in_nats.rb @@ -1,6 +1,6 @@ require 'test/unit' require 'fluent/test' -require 'lib/fluent/plugin/in_nats' +require 'lib/fluent/plugin/in_nats.rb' require 'nats/client' require 'test_helper' @@ -12,7 +12,7 @@ class NATSInputTest < Test::Unit::TestCase host localhost user nats password nats - queue fluent.> + queue fluent.>,fluent2.> ] @@ -26,7 +26,7 @@ def test_configure assert_equal 'localhost', d.instance.host assert_equal 'nats', d.instance.user assert_equal 'nats', d.instance.password - assert_equal 'fluent.>', d.instance.queue + assert_equal 'fluent.>,fluent2.>', d.instance.queue end def test_emit_with_credentials @@ -37,6 +37,7 @@ def test_emit_with_credentials d.expect_emit "nats.fluent.test1", time, {"message"=>'nats', "fluent_timestamp"=>time} d.expect_emit "nats.fluent.test2", time, {"message"=>'nats', "fluent_timestamp"=>time} + d.expect_emit "nats.fluent2.test1", time, {"message"=>'nats', "fluent_timestamp"=>time} uri = "nats://#{d.instance.user}:#{d.instance.password}@#{d.instance.host}:#{d.instance.port}" @@ -58,6 +59,7 @@ def test_emit_without_credentials d.expect_emit "nats.fluent.test1", time, {"message"=>'nats', "fluent_timestamp"=>time} d.expect_emit "nats.fluent.test2", time, {"message"=>'nats', "fluent_timestamp"=>time} + d.expect_emit "nats.fluent2.test1", time, {"message"=>'nats', "fluent_timestamp"=>time} uri = "nats://#{d.instance.host}:#{d.instance.port}" @@ -78,6 +80,7 @@ def test_emit_without_fluent_timestamp Fluent::Engine.now = time d.expect_emit "nats.fluent.test1", time, {"message"=>'nats'} + d.expect_emit "nats.fluent2.test1", time, {"message"=>'nats'} uri = "nats://#{d.instance.host}:#{d.instance.port}" start_nats(uri) @@ -98,6 +101,8 @@ def test_emit_arrays d.expect_emit "nats.fluent.empty_array", time, [] d.expect_emit "nats.fluent.string_array", time, %w(one two three) + d.expect_emit "nats.fluent2.empty_array", time, [] + d.expect_emit "nats.fluent2.string_array", time, %w(one two three) uri = "nats://#{d.instance.host}:#{d.instance.port}" start_nats(uri) @@ -117,6 +122,7 @@ def test_empty_publish_string Fluent::Engine.now = time d.expect_emit "nats.fluent.nil", time, nil + d.expect_emit "nats.fluent2.nil", time, nil uri = "nats://#{d.instance.host}:#{d.instance.port}" start_nats(uri) @@ -140,6 +146,7 @@ def test_regular_publish_string Fluent::Engine.now = time d.expect_emit "nats.fluent.string", time, "Lorem ipsum dolor sit amet" + d.expect_emit "nats.fluent2.string", time, "Lorem ipsum dolor sit amet" uri = "nats://#{d.instance.host}:#{d.instance.port}" start_nats(uri) @@ -169,3 +176,5 @@ def send(uri, tag, msg) } end end + +