Navigation Menu

Skip to content

Commit

Permalink
Re-implement system.absorb-data based on AsyncCommand
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 10, 2015
1 parent 2184e0c commit d02310c
Showing 1 changed file with 62 additions and 37 deletions.
99 changes: 62 additions & 37 deletions lib/droonga/plugins/system/absorb_data.rb
Expand Up @@ -14,15 +14,17 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "droonga/plugin"
require "droonga/plugin/async_command"
require "droonga/catalog/dataset"
require "droonga/serf"
require "droonga/node_name"

require "drndump/dumper"

module Droonga
module Plugins
module System
class AbsorbDataHandler < Droonga::Handler
class AbsorbDataHandler < AsyncCommand::Handler
action.synchronous = true

DEFAULT_MESSAGES_PER_SECOND = 100
Expand All @@ -33,62 +35,85 @@ def initialize
end
end

class DumpFailed < InternalServerError
def initialize(error)
super("source node returns an error.",
error)
class DataAbsorber
private
def prefix
"system.absorb-data"
end
end

def handle(message)
raise MissingHostParameter.new unless message.include?("host")
def error_name
"AbsorbFailure"
end

def error_message
"failed to absorb data"
end

def handle
dumper = Drndump::Dumper.new(dumper_params)

serf = Serf.new(my_node_name)
serf.set_tag("absorbing", true)

dumper = Drndump::Dumper.new(dumper_params(message))
dumper_error_message = dumper.run do |message|
@messenger.forward(message,
"to" => my_node_name,
"type" => message["type"])
forward("#{prefix}.progress")
end

serf = Serf.new(my_node_name)
serf.set_tag("absorbing", true)
serf.set_tag("absorbing", true)

error_message = dumper.run do |message|
@messenger.forward(message,
"to" => my_node_name,
"type" => message["type"])
if dumper_error_message
error(error_name, dumper_error_message)
end
end

serf.set_tag("absorbing", true)
private
def dumper_params
params = @request.request
{
:host => params["host"],
:port => params["port"] || NodeName::DEFAULT_PORT,
:tag => params["tag"] || NodeName::DEFAULT_TAG,
:dataset => params["dataset"] || Catalog::Dataset::DEFAULT_NAME,

raise DumpFailed.new(error_message) if error_message
:receiver_host => myself.host,
:receiver_port => 0,

true
end
:messages_per_second => params["messagesPerSecond"] || DEFAULT_MESSAGES_PER_SECOND,
}
end

private
def dumper_params(message)
{
:host => message["host"],
:port => message["port"] || NodeName::DEFAULT_PORT,
:tag => message["tag"] || NodeName::DEFAULT_TAG,
:dataset => message["dataset"] || Catalog::Dataset::DEFAULT_NAME,

:receiver_host => myself.host,
:receiver_port => 0,

:messages_per_second => message["messagesPerSecond"] || DEFAULT_MESSAGES_PER_SECOND,
}
def myself
@myself ||= NodeName.parse(my_node_name)
end

def my_node_name
@messenger.engine_state.name
end

def log_tag
"[#{Process.ppid}] data-absorber"
end
end

def myself
@myself ||= NodeName.parse(my_node_name)
def handle(message)
raise MissingHostParameter.new unless message.include?("host")
super
end

def my_node_name
@messenger.engine_state.name
private
def start(request)
absorber = DataAbsorber.new(loop, messenger, request)
absorber.start
end
end

define_single_step do |step|
step.name = "system.absorb-data"
step.handler = AbsorbDataHandler
step.collector = Collectors::Or
step.collector = Collectors::And
end
end
end
Expand Down

0 comments on commit d02310c

Please sign in to comment.