Navigation Menu

Skip to content

Commit

Permalink
Use forwarders dedicated for remote engine nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Jan 6, 2015
1 parent 1d1bccb commit 1522367
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
15 changes: 14 additions & 1 deletion lib/droonga/cluster.rb
Expand Up @@ -61,6 +61,7 @@ def reload
logger.info("cluster state not changed")
else
logger.info("cluster state changed")
engine_nodes.each(&:on_change)
on_change
end
end
Expand All @@ -69,6 +70,18 @@ def engine_nodes
@engine_nodes ||= create_engine_nodes
end

def forward(message, destination)
receiver = destination["to"]
receiver_node_name = receiver.match(/\A[^:]+:\d+\/[^.]+/).to_s
@engine_nodes.each do |node|
if node.name == receiver_node_name
node.forwarder.forward(message, destination)
return true
end
end
false
end

def all_nodes
if @catalog
@catalog.all_nodes
Expand Down Expand Up @@ -156,7 +169,7 @@ def all_node_names
def create_engine_nodes
all_node_names.collect do |name|
node_state = @state[name] || {}
EngineNode.new(name, node_state)
EngineNode.new(name, node_state, @loop)
end
end

Expand Down
13 changes: 10 additions & 3 deletions lib/droonga/dispatcher.rb
Expand Up @@ -115,6 +115,9 @@ def process_message(message)

def forward(message, destination)
logger.trace("forward start")
unless local?(destination)
return if @cluster.forward(message, destination)
end
@forwarder.forward(message, destination)
logger.trace("forward done")
end
Expand Down Expand Up @@ -179,9 +182,13 @@ def dispatch(message, destination)
if local?(destination)
process_internal_message(message)
else
@forwarder.forward(@message.merge("body" => message),
"type" => "dispatcher",
"to" => destination)
forward_message = @message.merge("body" => message)
forward_destination = {
"type" => "dispatcher",
"to" => destination,
}
@cluster.forward(forward_message, forward_destination) ||
@forwarder.forward(forward_message, forward_destination)
end
end

Expand Down
11 changes: 9 additions & 2 deletions lib/droonga/engine_node.rb
Expand Up @@ -13,15 +13,18 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "droonga/forwarder"
require "droonga/node_metadata"

module Droonga
class EngineNode
attr_reader :name
attr_reader :name, :forwarder

def initialize(name, state)
def initialize(name, state, loop)
@name = name
@state = state

@forwarder = Forwarder.new(loop, :buffering => true)
end

def live?
Expand Down Expand Up @@ -68,5 +71,9 @@ def writable_by?(sender_role)
false
end
end

def on_change
@forwarder.resume
end
end
end

0 comments on commit 1522367

Please sign in to comment.