Navigation Menu

Skip to content

Commit

Permalink
Implement system.absorb-data command
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 10, 2015
1 parent 930b1b2 commit 67ce4de
Showing 1 changed file with 49 additions and 0 deletions.
49 changes: 49 additions & 0 deletions lib/droonga/plugins/system/absorb_data.rb
Expand Up @@ -14,16 +14,65 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "droonga/plugin"
require "droonga/catalog/dataset"
require "droonga/node_name"

require "drndump/dumper"

module Droonga
module Plugins
module System
class AbsorbDataHandler < Droonga::Handler
action.synchronous = true

class MissingHostParameter < BadRequest
def initialize
super("\"host\" must be specified.")
end
end

class DumpFailed < InternalServerError
def initialize(error)
super("source node returns an error.",
error)
end
end

def handle(message)
raise MissingHostParameter.new unless message.include?("host")

dumper = Drndump::Dumper.new(dumper_params(message))
error_message = dumper.run do |message|
@messenger.forward(message,
"to" => my_node_name,
"type" => message["type"])
end

raise DumpFailed.new(error_message) if error_message

true
end

private
def dumper_params(message)
{
:host => message["host"],
:port => message["port"] ||= NodeName::DEFAULT_PORT,
:tag => message["tag"] ||= NodeName::DEFAULT_TAG,
:dataset => message["dataset"] ||= Catalog::Dataset::DEFAULT_NAME,

:receiver_host => myself.host,
:receiver_port => 0,
}
end

def myself
@myself ||= NodeName.parse(my_node_name)
end

def my_node_name
@messenger.engine_state.name
end
end

define_single_step do |step|
Expand Down

0 comments on commit 67ce4de

Please sign in to comment.