Skip to content

Commit

Permalink
Add multi queue support
Browse files Browse the repository at this point in the history
  • Loading branch information
u-ichi committed Feb 25, 2013
1 parent 234852f commit 5df4b32
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
21 changes: 12 additions & 9 deletions lib/fluent/plugin/in_nats.rb
Expand Up @@ -52,18 +52,21 @@ def shutdown
end

def run
queues = @queue.split(',')
EM.next_tick {
@nats_conn = NATS.connect(@nats_config) {
@nats_conn.subscribe(@queue) do |msg, reply, sub|
tag = "#{@tag}.#{sub}"
begin
msg_json = JSON.parse(msg)
rescue JSON::ParserError => e
$log.error "Failed parsing JSON #{e.inspect}. Passing as a normal string"
msg_json = msg
queues.each do |queue|
@nats_conn.subscribe(queue) do |msg, reply, sub|
tag = "#{@tag}.#{sub}"
begin
msg_json = JSON.parse(msg)
rescue JSON::ParserError => e
$log.error "Failed parsing JSON #{e.inspect}. Passing as a normal string"
msg_json = msg
end
time = Engine.now
Engine.emit(tag, time, msg_json)
end
time = Engine.now
Engine.emit(tag, time, msg_json)
end
}
}
Expand Down
15 changes: 12 additions & 3 deletions test/plugin/in_nats.rb
@@ -1,6 +1,6 @@
require 'test/unit'
require 'fluent/test'
require 'lib/fluent/plugin/in_nats'
require 'lib/fluent/plugin/in_nats.rb'
require 'nats/client'
require 'test_helper'

Expand All @@ -12,7 +12,7 @@ class NATSInputTest < Test::Unit::TestCase
host localhost
user nats
password nats
queue fluent.>
queue fluent.>,fluent2.>
]


Expand All @@ -26,7 +26,7 @@ def test_configure
assert_equal 'localhost', d.instance.host
assert_equal 'nats', d.instance.user
assert_equal 'nats', d.instance.password
assert_equal 'fluent.>', d.instance.queue
assert_equal 'fluent.>,fluent2.>', d.instance.queue
end

def test_emit_with_credentials
Expand All @@ -37,6 +37,7 @@ def test_emit_with_credentials

d.expect_emit "nats.fluent.test1", time, {"message"=>'nats', "fluent_timestamp"=>time}
d.expect_emit "nats.fluent.test2", time, {"message"=>'nats', "fluent_timestamp"=>time}
d.expect_emit "nats.fluent2.test1", time, {"message"=>'nats', "fluent_timestamp"=>time}

uri = "nats://#{d.instance.user}:#{d.instance.password}@#{d.instance.host}:#{d.instance.port}"

Expand All @@ -58,6 +59,7 @@ def test_emit_without_credentials

d.expect_emit "nats.fluent.test1", time, {"message"=>'nats', "fluent_timestamp"=>time}
d.expect_emit "nats.fluent.test2", time, {"message"=>'nats', "fluent_timestamp"=>time}
d.expect_emit "nats.fluent2.test1", time, {"message"=>'nats', "fluent_timestamp"=>time}

uri = "nats://#{d.instance.host}:#{d.instance.port}"

Expand All @@ -78,6 +80,7 @@ def test_emit_without_fluent_timestamp
Fluent::Engine.now = time

d.expect_emit "nats.fluent.test1", time, {"message"=>'nats'}
d.expect_emit "nats.fluent2.test1", time, {"message"=>'nats'}

uri = "nats://#{d.instance.host}:#{d.instance.port}"
start_nats(uri)
Expand All @@ -98,6 +101,8 @@ def test_emit_arrays

d.expect_emit "nats.fluent.empty_array", time, []
d.expect_emit "nats.fluent.string_array", time, %w(one two three)
d.expect_emit "nats.fluent2.empty_array", time, []
d.expect_emit "nats.fluent2.string_array", time, %w(one two three)

uri = "nats://#{d.instance.host}:#{d.instance.port}"
start_nats(uri)
Expand All @@ -117,6 +122,7 @@ def test_empty_publish_string
Fluent::Engine.now = time

d.expect_emit "nats.fluent.nil", time, nil
d.expect_emit "nats.fluent2.nil", time, nil

uri = "nats://#{d.instance.host}:#{d.instance.port}"
start_nats(uri)
Expand All @@ -140,6 +146,7 @@ def test_regular_publish_string
Fluent::Engine.now = time

d.expect_emit "nats.fluent.string", time, "Lorem ipsum dolor sit amet"
d.expect_emit "nats.fluent2.string", time, "Lorem ipsum dolor sit amet"

uri = "nats://#{d.instance.host}:#{d.instance.port}"
start_nats(uri)
Expand Down Expand Up @@ -169,3 +176,5 @@ def send(uri, tag, msg)
}
end
end


0 comments on commit 5df4b32

Please sign in to comment.