Navigation Menu

Skip to content

Commit

Permalink
Build and dispatch one way message for dead nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Jun 26, 2014
1 parent 8a0a0e3 commit d10b736
Showing 1 changed file with 23 additions and 0 deletions.
23 changes: 23 additions & 0 deletions lib/droonga/dispatcher.rb
Expand Up @@ -170,19 +170,40 @@ def dispatch(message, destination)

def dispatch_steps(steps)
id = @engine_state.generate_id

one_way_steps = []
have_dead_nodes = not @engine_state.dead_nodes.empty?

steps.each do |step|
dataset = @catalog.dataset(step["dataset"])
if dataset
if have_dead_nodes and write_step?(step)
routes = dataset.get_routes(step, @engine_state.dead_nodes)
unless routes.empty?
one_way_step = Marshal.load(Marshal.dump(step))
one_way_step["routes"] = routes
one_way_steps << one_way_step
end
end
routes = dataset.get_routes(step, @engine_state.live_nodes)
step["routes"] = routes
else
step["routes"] ||= [id]
end
end

dispatch_message = { "id" => id, "steps" => steps }
get_destinations(steps).each do |destination|
dispatch(dispatch_message, destination)
end

unless one_way_steps.empty?
dispatch_message = { "id" => @engine_state.generate_id,
"steps" => one_way_steps }
get_destinations(one_way_steps).each do |destination|
dispatch(dispatch_message, destination)
end
end
end

def get_destinations(steps)
Expand Down Expand Up @@ -217,6 +238,8 @@ def local?(route)
end

def write_step?(step)
return false unless step["dataset"]

step_runner = @step_runners[step["dataset"]]
return false unless step_runner

Expand Down

0 comments on commit d10b736

Please sign in to comment.