Navigation Menu

Skip to content

Commit

Permalink
Forward buffered messages correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Jan 6, 2015
1 parent 66241e3 commit 8ede242
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
5 changes: 4 additions & 1 deletion lib/droonga/engine_node.rb
Expand Up @@ -27,7 +27,10 @@ def initialize(name, state, sender_role, loop)
@state = state
@sender_role = sender_role

@buffer = ForwardBuffer.new(name, @forwarder)
@buffer = ForwardBuffer.new(name)
@buffer.on_forward = lambda do |message, destination|
output(message, destination)
end

unless @name =~ /\A(.*):(\d+)\/([^.]+)\z/
raise "name format: hostname:port/tag"
Expand Down
14 changes: 10 additions & 4 deletions lib/droonga/forward_buffer.rb
Expand Up @@ -29,8 +29,10 @@ class ForwardBuffer

SUFFIX = ".msgpack"

def initialize(node_name, forwarder)
@forwarder = forwarder
attr_writer :on_forward

def initialize(node_name)
@on_forward = nil

@packer = MessagePack::Packer.new
@unpacker = MessagePack::Unpacker.new
Expand Down Expand Up @@ -73,8 +75,8 @@ def forward(buffered_message_path)
@unpacker.feed(file_contents)
buffered_message = @unpacker.read
@unpacker.reset
@forwarder.forward(buffered_message["message"],
buffered_message["destination"])
on_forward(buffered_message["message"],
buffered_message["destination"])
FileUtils.rm_f(buffered_message_path.to_s)
logger.trace("forward: done (#{buffered_message_path})")
end
Expand All @@ -83,6 +85,10 @@ def file_path(time_stamp=Time.now)
@data_directory + "#{time_stamp.iso8601(6)}#{SUFFIX}"
end

def on_forward(message, destination)
@on_forward.call(message, destination) if @on_forward
end

def log_tag
"[#{Process.ppid}] forward-buffer"
end
Expand Down

0 comments on commit 8ede242

Please sign in to comment.