Navigation Menu

Skip to content

Commit

Permalink
Use FluentMessageSender directly for other engine nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Jan 6, 2015
1 parent d5f8d24 commit f6587af
Showing 1 changed file with 27 additions and 5 deletions.
32 changes: 27 additions & 5 deletions lib/droonga/engine_node.rb
Expand Up @@ -14,6 +14,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "droonga/forwarder"
require "droonga/fluent_message_sender"
require "droonga/node_metadata"

module Droonga
Expand All @@ -25,23 +26,44 @@ def initialize(name, state, sender_role, loop)
@state = state
@sender_role = sender_role

@forwarder = Forwarder.new(loop, :buffering => true)
unless @name =~ /\A(.*):(\d+)\/([^.]+)\z/
raise "name format: hostname:port/tag"
end
host = $1
port = $2
tag = $3
@sender = FluentMessageSender.new(loop, host, port,
:buffering => true)
@sender.start
end

def start
logger.trace("start: start")
@forwarder.start
@sender.resume
logger.trace("start: done")
end

def shutdown
logger.trace("shutdown: start")
@forwarder.shutdown
@sender.shutdown
logger.trace("shutdown: done")
end

def forward(message, destination)
@forwarder.forward(message, destination)
command = destination["type"]
receiver = destination["to"]
arguments = destination["arguments"]

override_message = {
"type" => command,
}
override_message["arguments"] = arguments if arguments
message = message.merge(override_message)
output_tag = "#{tag}.message"
log_info = "<#{receiver}>:<#{output_tag}>"
logger.trace("forward: start: #{log_info}")
@sender.send(output_tag, message)
logger.trace("forward: end")
end

def live?
Expand Down Expand Up @@ -101,7 +123,7 @@ def status
end

def on_change
@forwarder.resume
@sender.resume
end
end
end

0 comments on commit f6587af

Please sign in to comment.