Navigation Menu

Skip to content

Commit

Permalink
Do absorb via serf event
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Jun 28, 2014
1 parent 7061db7 commit 224ac0a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 49 deletions.
60 changes: 18 additions & 42 deletions bin/droonga-engine-absorb-data
Expand Up @@ -21,11 +21,12 @@ require "open3"

require "droonga/engine/version"
require "droonga/catalog_generator"
require "droonga/data_absorber"
require "droonga/serf"

options = OpenStruct.new
options.drndump = "drndump"
options.client = "droonga-request"
options.port = Droonga::CatalogGenerator::DEFAULT_PORT
options.tag = Droonga::CatalogGenerator::DEFAULT_TAG
options.dataset = Droonga::CatalogGenerator::DEFAULT_DATASET
parser = OptionParser.new
parser.version = Droonga::Engine::VERSION

Expand All @@ -41,66 +42,41 @@ parser.on("--destination-host=HOST",
end
parser.on("--port=PORT", Integer,
"Port number of the source cluster to be connected.",
"(#{Droonga::CatalogGenerator::DEFAULT_PORT})") do |port|
"(#{options.port})") do |port|
options.port = port
end

parser.separator("")
parser.separator("Data:")
parser.on("--tag=TAG",
"Tag name to be used to communicate with Droonga system.",
"(#{Droonga::CatalogGenerator::DEFAULT_TAG})") do |tag|
"(#{options.tag})") do |tag|
options.tag = tag
end
parser.on("--dataset=DATASET",
"Dataset to be absorbed.",
"(#{Droonga::CatalogGenerator::DEFAULT_DATASET})") do |dataset|
"(#{options.dataset})") do |dataset|
options.dataset = dataset
end

parser.separator("")
parser.separator("Droonga protocol:")
parser.on("--receiver-host=HOST",
"Host name of this node to be received a response from clusters.") do |host|
options.destination_host = host
end
parser.on("--receiver-port=PORT", Integer,
"Port number of this node to be received a response from clusters.") do |port|
options.receiver_port = port
end

parser.separator("")
parser.separator("Commands:")
parser.on("--drndump=PATH",
"Path to the drndump command.",
"(#{options.client})") do |path|
options.drndump = path
end
parser.on("--droonga-request=PATH",
"Path to the droonga-request command.",
"(#{options.client})") do |path|
options.client = path
end

parser.parse!(ARGV)

unless options.source_host
raise "You must specify the source host via --source-host option."
end
unless options.destination_host
raise "You must specify the destination host (this node) " +
"via --destination-host option."
raise "You must specify the destination host via --destination-host option."
end

Droonga::DataAbsorber.absorb(:drndump => options.drndump,
:client => options.client,
:dataset => options.dataset,
:source_host => options.source_host,
:destination_host => options.destination_host,
:port => options.port,
:tag => options.tag,
:receiver_port => options.receiver_port) do |dump|
puts dump
end
destination_node = "#{options.destination_host}:#{options.port}/#{options.tag}"

puts "Absorbing data..."
Droonga::Serf.send_query(destination_node, "absorb_data",
"node" => destination_node,
"source" => options.source_host,
"port" => options.port,
"tag" => options.tag,
"dataset" => options.dataset)
puts "Done."

exit(true)
23 changes: 16 additions & 7 deletions lib/droonga/command/serf_event_handler.rb
Expand Up @@ -191,18 +191,27 @@ def create_current_catalog_generator
end

def absorb_data
dataset = @payload["dataset"]
return unless dataset
return unless event_for_me?

soruce = @payload["soruce"]
return unless soruce

current_catalog = JSON.parse(Path.catalog.read)
generator = CatalogGenerator.new
generator.load(current_catalog)
dataset_name = @payload["dataset"]
port = @payload["port"]
tag = @payload["port"]

if dataset_name.nil? or port.nil? or tag.nil?
current_catalog = JSON.parse(Path.catalog.read)
generator = CatalogGenerator.new
generator.load(current_catalog)

port = @payload["port"] || generator.datasets[dataset].replicas.port
tag = @payload["tag"] || generator.datasets[dataset].replicas.tag
dataset = generator.dataset_for_host(soruce)
return unless dataset

dataset_name = dataset.name
port = dataset.replicas.port
tag = dataset.replicas.tag
end

DataAbsorber.absorb(:dataset => dataset,
:source_host => source,
Expand Down

0 comments on commit 224ac0a

Please sign in to comment.