Navigation Menu

Skip to content

Commit

Permalink
Use FluentMessageSender directly for other engine nodes
Browse files Browse the repository at this point in the history
Conflicts:
	lib/droonga/engine_node.rb
  • Loading branch information
piroor committed Jan 6, 2015
1 parent 03fb153 commit 2094f6b
Showing 1 changed file with 33 additions and 7 deletions.
40 changes: 33 additions & 7 deletions lib/droonga/engine_node.rb
Expand Up @@ -14,8 +14,8 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "droonga/loggable"
require "droonga/forwarder"
require "droonga/forward_buffer"
require "droonga/fluent_message_sender"
require "droonga/node_metadata"

module Droonga
Expand All @@ -27,34 +27,60 @@ def initialize(name, state, sender_role, loop)
@state = state
@sender_role = sender_role

@forwarder = Forwarder.new(loop, :buffering => true)
@buffer = ForwardBuffer.new(name, @forwarder)

unless @name =~ /\A(.*):(\d+)\/([^.]+)\z/
raise "name format: hostname:port/tag"
end
host = $1
port = $2
tag = $3
@sender = FluentMessageSender.new(loop, host, port,
:buffering => true)
@sender.start
end

def start
logger.trace("start: start")
@forwarder.start
@sender.resume
@buffer.start_forward if really_writable?
logger.trace("start: done")
end

def shutdown
logger.trace("shutdown: start")
@forwarder.shutdown
@sender.shutdown
logger.trace("shutdown: done")
end

def forward(message, destination)
if not really_writable?
@buffer.add(message, destination)
elsif @buffer.empty?
@forwarder.forward(message, destination)
@output(message, destination)
else
@buffer.add(message, destination)
@buffer.start_forward
end
end

def output(message, destination)
command = destination["type"]
receiver = destination["to"]
arguments = destination["arguments"]

override_message = {
"type" => command,
}
override_message["arguments"] = arguments if arguments
message = message.merge(override_message)
output_tag = "#{tag}.message"
log_info = "<#{receiver}>:<#{output_tag}>"
logger.trace("forward: start: #{log_info}")
@sender.send(output_tag, message)
logger.trace("forward: end")
end

def live?
@state.nil? or @state["live"]
end
Expand Down Expand Up @@ -123,11 +149,11 @@ def status
end
end

private
def on_change
@forwarder.resume
@sender.resume
end

private
def log_tag
"[#{Process.ppid}] engine-node"
end
Expand Down

0 comments on commit 2094f6b

Please sign in to comment.