Navigation Menu

Skip to content

Commit

Permalink
Detect message tag
Browse files Browse the repository at this point in the history
Conflicts:
	lib/droonga/engine_node.rb
  • Loading branch information
piroor committed Jan 6, 2015
1 parent 9c1dffd commit 11da7ec
Showing 1 changed file with 17 additions and 11 deletions.
28 changes: 17 additions & 11 deletions lib/droonga/engine_node.rb
Expand Up @@ -30,17 +30,11 @@ def initialize(name, state, sender_role, loop)
@sender_role = sender_role

@buffer = ForwardBuffer.new(name)
@buffer.on_forward = lambda do |message, destination|
output(message, destination)
end

unless @name =~ /\A(.*):(\d+)\/([^.]+)\z/
raise "name format: hostname:port/tag"
end
host = $1
port = $2
tag = $3
@sender = FluentMessageSender.new(loop, host, port,
parsed_name = parse_node_name(@name)
@sender = FluentMessageSender.new(loop,
parsed_name[:host],
parsed_name[:port],
:buffering => true)
@sender.start
end
Expand Down Expand Up @@ -102,6 +96,17 @@ def on_change
end

private
def parse_node_name(name)
unless name =~ /\A(.*):(\d+)\/([^.]+)\z/
raise "name format: hostname:port/tag"
end
{
:host => $1,
:port => $2,
:tag => $3,
}
end

def role
if @state
@state["role"]
Expand Down Expand Up @@ -146,13 +151,14 @@ def output(message, destination)
command = destination["type"]
receiver = destination["to"]
arguments = destination["arguments"]
parsed_receiver = parse_node_name(receiver)

override_message = {
"type" => command,
}
override_message["arguments"] = arguments if arguments
message = message.merge(override_message)
output_tag = "#{tag}.message"
output_tag = "#{parsed_receiver[:tag]}.message"
log_info = "<#{receiver}>:<#{output_tag}>"
logger.trace("forward: start: #{log_info}")
@sender.send(output_tag, message)
Expand Down

0 comments on commit 11da7ec

Please sign in to comment.