Navigation Menu

Skip to content

Commit

Permalink
Add system.statistics.object.count.per-replica command
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 29, 2015
1 parent a5072dd commit 3fc6d75
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 3 deletions.
9 changes: 7 additions & 2 deletions lib/droonga/planner.rb
Expand Up @@ -22,12 +22,13 @@ class Planner
include Loggable
include ErrorMessages

attr_writer :write, :single_operation, :collector_class
attr_writer :write, :single_operation, :use_all_replicas, :collector_class

def initialize(dataset)
@dataset = dataset
@write = false
@single_operation = false
@use_all_replicas = false
@collector_class = nil
end

Expand Down Expand Up @@ -75,6 +76,10 @@ def single_operation?
@single_operation
end

def use_all_replicas?
write? or @use_all_replicas
end

def scatter(message, options={})
planner = DistributedCommandPlanner.new(@dataset, message)
scatter_options = {
Expand All @@ -95,7 +100,7 @@ def broadcast(message, options={})
broadcast_options = {
:write => write?,
}
if write?
if use_all_replicas?
broadcast_options[:replica] = "all"
elsif single_operation?
broadcast_options[:slice] = "random"
Expand Down
28 changes: 27 additions & 1 deletion lib/droonga/plugins/system/statistics.rb
Expand Up @@ -23,8 +23,11 @@ class StatisticsObjectCountHandler < Droonga::Handler
include DatabaseScanner

def handle(message)
counts(message.request["output"])
end

def counts(output)
counts = {}
output = message.request["output"]
if output and output.is_a?(Array)
if output.include?("tables")
counts["tables"] = n_tables
Expand All @@ -48,6 +51,29 @@ def handle(message)
step.handler = StatisticsObjectCountHandler
step.collector = Collectors::RecursiveSum
end

class StatisticsObjectCountPerReplicaHandler < StatisticsObjectCountHandler
def handle(message)
{
replica_name => counts(message.request["output"]),
}
end

def replica_name
my_node_name
end

def my_node_name
ENV["DROONGA_ENGINE_NAME"]
end
end

define_single_step do |step|
step.name = "system.statistics.object.count.per-replica"
step.use_all_replicas = true
step.handler = StatisticsObjectCountPerReplicaHandler
step.collector = Collectors::Sum
end
end
end
end
1 change: 1 addition & 0 deletions lib/droonga/single_step.rb
Expand Up @@ -34,6 +34,7 @@ def plan(message)
planner = Planner.new(@dataset)
planner.write = @definition.write?
planner.single_operation = @definition.single_operation?
planner.use_all_replicas = @definition.use_all_replicas?
planner.collector_class = @definition.collector_class

body = message["body"]
Expand Down
7 changes: 7 additions & 0 deletions lib/droonga/single_step_definition.rb
Expand Up @@ -19,6 +19,7 @@ class SingleStepDefinition
attr_accessor :handler
attr_accessor :collector
attr_writer :write, :single_operation
attr_writer :use_all_replicas
attr_writer :timeout_calculator
attr_accessor :inputs
attr_accessor :output
Expand All @@ -29,6 +30,7 @@ def initialize(plugin_module)
@collector = nil
@write = false
@single_operation = false
@use_all_replicas = false
@timeout_calculator = lambda do |step|
if step["timeout"]
return step["timeout"]
Expand All @@ -52,6 +54,11 @@ def single_operation?
@single_operation
end

def use_all_replicas?
return true if @write
@use_all_replicas
end

def timeout_for_step(step)
if @timeout_calculator
@timeout_calculator.call(step)
Expand Down

0 comments on commit 3fc6d75

Please sign in to comment.