Navigation Menu

Skip to content

Commit

Permalink
Unify BufferedForwarder to EngineNode
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Jan 6, 2015
1 parent ff421d5 commit 6bd1683
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 102 deletions.
88 changes: 0 additions & 88 deletions lib/droonga/buffered_forwarder.rb

This file was deleted.

14 changes: 1 addition & 13 deletions lib/droonga/cluster.rb
Expand Up @@ -85,7 +85,7 @@ def forward(message, destination)
receiver_node_name = receiver.match(/\A[^:]+:\d+\/[^.]+/).to_s
@engine_nodes.each do |node|
if node.name == receiver_node_name
node.forwarder.forward(message, destination)
node.forward(message, destination)
return true
end
end
Expand Down Expand Up @@ -142,18 +142,6 @@ def writable_nodes
end.collect(&:name)
end

def unwritable_node?(node_name)
case node_metadata.role
when NodeMetadata::Role::SERVICE_PROVIDER
absorb_source_nodes.include?(node_name) or
absorb_destination_nodes.include?(node_name)
when NodeMetadata::Role::ABSORB_SOURCE
absorb_destination_nodes.include?(node_name)
else
false
end
end

def on_change
@on_change.call if @on_change
end
Expand Down
48 changes: 48 additions & 0 deletions lib/droonga/engine_node.rb
Expand Up @@ -13,7 +13,9 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "droonga/loggable"
require "droonga/forwarder"
require "droonga/forward_buffer"
require "droonga/node_metadata"

module Droonga
Expand All @@ -26,6 +28,35 @@ def initialize(name, state, sender_role, loop)
@sender_role = sender_role

@forwarder = Forwarder.new(loop, :buffering => true)
@buffer = ForwardBuffer.new(name, @forwarder)
end

def start
logger.trace("start: start")
resume
logger.trace("start: done")
end

def shutdown
logger.trace("shutdown: start")
@forwarder.shutdown
logger.trace("shutdown: done")
end

def resume
@forwarder.resume
@buffer.start_forward if really_writable?
end

def forward(message, destination)
if not really_writable?
@buffer.add(message, destination)
elsif @buffer.empty?
@forwarder.forward(message, destination)
else
@buffer.add(message, destination)
@buffer.start_forward
end
end

def live?
Expand Down Expand Up @@ -74,6 +105,18 @@ def writable?
end
end

def really_writable?
return false unless writable?
case @sender_role
when NodeMetadata::Role::SERVICE_PROVIDER
service_provider?
when NodeMetadata::Role::ABSORB_SOURCE
not absorb_destination?
else
true
end
end

def status
if forwardable?
"active"
Expand All @@ -84,8 +127,13 @@ def status
end
end

private
def on_change
@forwarder.resume
end

def log_tag
"[#{Process.ppid}] engine-node"
end
end
end
2 changes: 1 addition & 1 deletion lib/droonga/engine_state.rb
Expand Up @@ -19,7 +19,7 @@

require "droonga/loggable"
require "droonga/event_loop"
require "droonga/buffered_forwarder"
require "droonga/forwarder"
require "droonga/replier"

module Droonga
Expand Down

0 comments on commit 6bd1683

Please sign in to comment.