Skip to content

Commit

Permalink
Call handler from inside the proxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
daijiro committed Aug 15, 2013
1 parent 1091f1a commit 1adacde
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 15 deletions.
2 changes: 1 addition & 1 deletion lib/droonga/plugin/handler_proxy.rb
Expand Up @@ -30,7 +30,7 @@ def initialize(*arguments)
command :proxy

def proxy(request, *arguments)
@proxy.handle(request)
@proxy.handle(request, arguments)
end
end
end
45 changes: 31 additions & 14 deletions lib/droonga/proxy.rb
Expand Up @@ -28,7 +28,7 @@ def initialize(worker, name)
@local = Regexp.new("^#{@name}")
end

def handle(message)
def handle(message, arguments)
case message
when Array
handle_incoming_message(message)
Expand All @@ -44,7 +44,7 @@ def handle_incoming_message(message)
components = planner.components
message = { "id" => id, "components" => components }
destinations.each do |destination, frequency|
dispatch(destination, message)
dispatch(message, destination)
end
end

Expand All @@ -63,16 +63,16 @@ def handle_internal_message(message)
collector.handle(message["input"], message["value"])
end

def dispatch(destination, message)
def dispatch(message, destination)
if local?(destination)
handle_internal_message(message)
else
post(farm_path(destination), message)
post(message, "to"=>farm_path(destination), "type"=>"proxy")
end
end

def post(route, message)
@worker.post(message, "to"=> route, "type"=>"proxy")
def post(message, destination)
@worker.post(message, destination)
end

def generate_id
Expand Down Expand Up @@ -164,7 +164,7 @@ def get_collector(id)
"route" => route,
"component" => component,
"n_of_inputs" => 0,
"values" => []
"value" => nil
}
tasks << task
(component["inputs"] || [nil]).each do |input|
Expand Down Expand Up @@ -235,13 +235,31 @@ def initialize(id, proxy, components, tasks, inputs)
def handle(name, value)
tasks = @inputs[name]
tasks.each do |task|
if name
task["values"] << value
task["n_of_inputs"] += 1
end
component = task["component"]
type = component["type"]
args = component["args"]
command = component["command"]
if command
message = {
"task"=>task,
"name"=>name,
"value"=>value
}
#todo: add_route and n_of_expects++ if it would run asynchronously
@proxy.post(message, command)
else
task["value"] ||= {}
task["value"][name] ||= []
task["value"][name] << value
end
task["n_of_inputs"] += 1 if name
return if task["n_of_inputs"] < component["n_of_expects"]
result = task["values"]
#the task is done
result = task["value"]
case type
when "send"
@proxy.post(result, args)
end
component["descendants"].each do |name, indices|
message = {
"id" => @id,
Expand All @@ -252,13 +270,12 @@ def handle(name, value)
dest = @components[index]
routes = dest["routes"]
routes.each do |route|
@proxy.dispatch(route, message)
@proxy.dispatch(message, route)
end
end
end
@n_dones += 1
@proxy.collectors.delete(@id) if @n_dones == @tasks.size
p @proxy.collectors.size
end
end
end
Expand Down

0 comments on commit 1adacde

Please sign in to comment.