Navigation Menu

Skip to content

Commit

Permalink
Separate options to specify information for source and destination
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 15, 2015
1 parent 1edeebb commit 0209025
Showing 1 changed file with 91 additions and 46 deletions.
137 changes: 91 additions & 46 deletions bin/droonga-engine-absorb-data
Expand Up @@ -21,8 +21,8 @@ require "socket"
require "coolio"

require "droonga/engine/version"
require "droonga/catalog_generator"
require "droonga/path"
require "droonga/node_name"
require "droonga/data_absorber_client"
require "droonga/serf"
require "droonga/client"
Expand All @@ -37,12 +37,9 @@ class AbsorbDataCommand
assert_valid_options
trap_signals

puts "Start to absorb data from #{@options.source_host}"
puts " to #{@options.destination_host}"
puts "Start to absorb data from #{@options.soruce_dataset} at #{source_node.to_s}"
puts " to #{@options.dataset} at #{destination_host.to_s}"
puts " via #{@options.receiver_host} (this host)"
puts " dataset = #{@options.dataset}"
puts " port = #{@options.port}"
puts " tag = #{@options.tag}"
puts ""
puts "Absorbing..."

Expand All @@ -55,58 +52,88 @@ class AbsorbDataCommand
private
def parse_options
options = OpenStruct.new
options.port = Droonga::CatalogGenerator::DEFAULT_PORT
options.tag = Droonga::CatalogGenerator::DEFAULT_TAG
options.dataset = Droonga::CatalogGenerator::DEFAULT_DATASET

options.host = Socket.gethostname
options.port = Droonga::DataAbsorberClient::DEFAULT_PORT
options.tag = Droonga::DataAbsorberClient::DEFAULT_TAG
options.dataset = Droonga::DataAbsorberClient::DEFAULT_DATASET

options.source_host = Droonga::DataAbsorberClient::DEFAULT_HOST
options.source_port = Droonga::DataAbsorberClient::DEFAULT_PORT
options.source_tag = Droonga::DataAbsorberClient::DEFAULT_TAG
options.source_dataset = Droonga::DataAbsorberClient::DEFAULT_DATASET

options.receiver_host = Socket.gethostname
options.messages_per_second = Droonga::DataAbsorberClient::DEFAULT_MESSAGES_PER_SECOND

options.messages_per_second = Droonga::DataAbsorberClient::DEFAULT_MESSAGES_PER_SECOND
options.progress_interval_seconds = Droonga::DataAbsorberClient::DEFAULT_PROGRESS_INTERVAL_SECONDS

options.verbose = false

parser = OptionParser.new
parser.version = Droonga::Engine::VERSION

parser.separator("")
parser.separator("Connection:")
parser.on("--source-host=HOST",
"Host name of the source cluster to be connected.") do |host|
options.source_host = host
end
parser.on("--destination-host=HOST",
"Host name of this cluster to be connected.") do |host|
options.destination_host = host
end
parser.on("--receiver-host=HOST",
"Host name of this computer.",
"(#{options.receiver_host})") do |host|
options.receiver_host = host
parser.separator("Destination node:")
parser.on("--host=HOST",
"Host name of the destination node.") do |host|
options.host = host
end
parser.on("--port=PORT", Integer,
"Port number of the source cluster to be connected.",
"Port number of the destination node.",
"(#{options.port})") do |port|
options.port = port
end

parser.separator("")
parser.separator("Data:")
parser.on("--tag=TAG",
"Tag name to be used to communicate with Droonga system.",
parser.on("--tag=TAG", Integer,
"Tag name of the destination node.",
"(#{options.tag})") do |tag|
options.tag = tag
end
parser.on("--dataset=DATASET",
"Dataset to be absorbed.",
"Name of the destination dataset.",
"(#{options.dataset})") do |dataset|
options.dataset = dataset
end

parser.separator("")
parser.separator("Source node:")
parser.on("--source-host=HOST",
"Host name of the source node.",
"(#{options.source_host})") do |host|
options.source_host = host
end
parser.on("--source-port=PORT", Integer,
"Port number of the source node.",
"(#{options.source_port})") do |host|
options.source_host = host
end
parser.on("--source-tag=TAG",
"Tag name of the source node.",
"(#{options.source_tag})") do |tag|
options.source_tag = tag
end
parser.on("--dataset=DATASET",
"Name of the source dataset.",
"(#{options.source_dataset})") do |dataset|
options.source_dataset = dataset
end

parser.separator("")
parser.separator("Connection:")
parser.on("--receiver-host=HOST",
"Host name of this computer.",
"(#{options.receiver_host})") do |host|
options.receiver_host = host
end

parser.separator("")
parser.separator("Miscellaneous:")
parser.on("--records-per-second=N", Integer,
"Maximum number of records per second to be absorbed.",
"'#{Droonga::Client::RateLimiter::NO_LIMIT}' means no limit.",
"(#{options.messages_per_second})") do |n|
options.messages_per_second = n
end

parser.separator("")
parser.separator("Miscellaneous:")
parser.on("--progress-interval-seconds=N", Integer,
"Interval seconds to report progress.",
"(#{options.progress_interval_seconds})") do |n|
Expand All @@ -118,6 +145,13 @@ class AbsorbDataCommand
options.verbose = verbose
end

parser.separator("")
parser.separator("For backward compatibility:")
parser.on("--destination-host=HOST",
"Alias to \"--host\".") do |host|
options.host = host
end

parser.parse!(ARGV)
@options = options
end
Expand All @@ -126,17 +160,21 @@ class AbsorbDataCommand
unless @options.source_host
raise "You must specify the source host via --source-host option."
end
unless @options.destination_host
raise "You must specify the destination host via --destination-host option."
unless @options.host
raise "You must specify the destination host via --host option."
end
end

def source_node
"#{@options.source_host}:#{@options.port}/#{@options.tag}"
@source_node ||= NodeName.new(:host => @options.source_host,
:port => @options.source_port,
:tag => @options.source_tag)
end

def destination_node
"#{@options.destination_host}:#{@options.port}/#{@options.tag}"
@destination_node ||= NodeName.new(:host => @options.host,
:port => @options.port,
:tag => @options.tag)
end

def run_remote_command(target, command, options)
Expand All @@ -150,14 +188,21 @@ class AbsorbDataCommand

def prepare_absorber
absorber_options = {
:dataset => @options.dataset,
:source_host => @options.source_host,
:destination_host => @options.destination_host,
:host => @options.host,
:port => @options.port,
:tag => @options.tag,
:dataset => @options.dataset,

:source_host => @options.source_host,
:source_port => @options.source_port,
:source_tag => @options.source_tag,
:source_dataset => @options.source_dataset,

:receiver_host => @options.receiver_host,
:port => @options.port,
:tag => @options.tag,

:messages_per_second => @options.messages_per_second,
:progress_interval_seconds => @options.progress_interval_seconds,

:client_options => {
:backend => :coolio,
:loop => @loop,
Expand Down Expand Up @@ -185,15 +230,15 @@ class AbsorbDataCommand

puts ""

response = run_remote_command(source_node, "report_metadata",
"node" => source_node,
response = run_remote_command(source_node.to_s, "report_metadata",
"node" => source_node.to_s,
"key" => "last_processed_message_timestamp")
timestamp = response["value"]
if timestamp and not timestamp.empty?
puts "The timestamp of the last processed message in the source node: #{timestamp}"
puts "Setting effective message timestamp for the destination node..."
response = run_remote_command(destination_node, "accept_messages_newer_than",
"node" => destination_node,
response = run_remote_command(destination_node.to_s, "accept_messages_newer_than",
"node" => destination_node.to_s,
"timestamp" => timestamp)
end
true
Expand Down

0 comments on commit 0209025

Please sign in to comment.