Navigation Menu

Skip to content

Commit

Permalink
Detect suitable slice for requests like system.status correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 15, 2015
1 parent 909291e commit d24dc1e
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 33 deletions.
16 changes: 12 additions & 4 deletions lib/droonga/catalog/slices_volume.rb
Expand Up @@ -39,8 +39,16 @@ def slices
@slices ||= create_slices
end

def select_slices(range=0..-1)
slices.sort_by(&:label)[range]
def select_slices(how=:all, range=0..-1)
selected = slices.sort_by(&:label)[range]
case how
when :random
[selected.sample]
when :all
selected
else
selected
end
end

def choose_slice(record)
Expand Down Expand Up @@ -76,13 +84,13 @@ def compute_routes(message, active_nodes)
slices = []
case message["type"]
when "broadcast"
slices = select_slices
slices = select_slices(:all)
when "scatter"
record = message["record"]
if record
slices = [choose_slice(record)]
else
slices = select_slices
slices = select_slices(message["slice"].to_sym)
end
end
slices.each do |slice|
Expand Down
24 changes: 10 additions & 14 deletions lib/droonga/planner.rb
Expand Up @@ -22,12 +22,12 @@ class Planner
include Loggable
include ErrorMessages

attr_writer :write, :collector_class
attr_writer :write, :single_operation, :collector_class

def initialize(dataset)
@dataset = dataset
@write = false
@specified_random = nil
@single_operation = false
@collector_class = nil
end

Expand All @@ -42,28 +42,21 @@ def plan(message, params={})
}
end

if options[:record] or random?
if options[:record] or single_operation?
scatter(message, options)
else
broadcast(message, options)
end
end

def random=(value)
@specified_random = value
end

private
def write?
@write
end

def random?
if @specified_random.nil?
not write?
else
@specified_random
end
def single_operation?
return false if write?
@single_operation
end

def scatter(message, options={})
Expand All @@ -72,7 +65,10 @@ def scatter(message, options={})
:write => write?,
:record => options[:record],
}
scatter_options[:replica] = "random" if random?
if single_operation?
scatter_options[:slice] = "random"
scatter_options[:replica] = "random"
end
planner.scatter(scatter_options)
planner.reduce(options[:reduce])
planner.plan
Expand Down
2 changes: 1 addition & 1 deletion lib/droonga/plugins/system/absorb_data.rb
Expand Up @@ -329,7 +329,7 @@ def start(request)

define_single_step do |step|
step.name = "system.absorb-data"
step.random = true
step.single_operation = true
step.handler = AbsorbDataHandler
step.collector = Collectors::And
end
Expand Down
2 changes: 1 addition & 1 deletion lib/droonga/plugins/system/status.rb
Expand Up @@ -44,7 +44,7 @@ def engine_state

define_single_step do |step|
step.name = "system.status"
step.random = true
step.single_operation = true
step.handler = StatusHandler
step.collector = Collectors::Or
end
Expand Down
2 changes: 1 addition & 1 deletion lib/droonga/single_step.rb
Expand Up @@ -33,7 +33,7 @@ def plan(message)
# XXX: Re-implement me.
planner = Planner.new(@dataset)
planner.write = @definition.write?
planner.random = @definition.random?
planner.single_operation = @definition.single_operation?
planner.collector_class = @definition.collector_class

body = message["body"]
Expand Down
17 changes: 5 additions & 12 deletions lib/droonga/single_step_definition.rb
Expand Up @@ -18,7 +18,7 @@ class SingleStepDefinition
attr_accessor :name
attr_accessor :handler
attr_accessor :collector
attr_writer :write
attr_writer :write, :single_operation
attr_writer :timeout_seconds_calculator
attr_accessor :inputs
attr_accessor :output
Expand All @@ -28,7 +28,7 @@ def initialize(plugin_module)
@handler = nil
@collector = nil
@write = false
@user_defined_random = nil
@single_operation = false
@timeout_seconds_calculator = lambda do |step|
if step["timeout"]
return step["timeout"]
Expand All @@ -47,16 +47,9 @@ def write?
@write
end

def random=(value)
@user_defined_random = value
end

def random?
if @user_defined_random.nil?
not write?
else
@user_defined_random
end
def single_operation?
return false if @write
@single_operation
end

def timeout_seconds_for_step(step)
Expand Down

0 comments on commit d24dc1e

Please sign in to comment.