Navigation Menu

Skip to content

Commit

Permalink
Cleanup codes around confusing options
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 15, 2015
1 parent 0209025 commit be7210a
Showing 1 changed file with 70 additions and 48 deletions.
118 changes: 70 additions & 48 deletions bin/droonga-engine-join
Expand Up @@ -23,8 +23,9 @@ require "coolio"

require "droonga/engine/version"
require "droonga/path"
require "droonga/node_name"
require "droonga/catalog/dataset"
require "droonga/catalog_fetcher"
require "droonga/catalog_generator"
require "droonga/catalog_loader"
require "droonga/safe_file_writer"
require "droonga/data_absorber_client"
Expand All @@ -38,9 +39,12 @@ class JoinCommand
parse_options
trap_signals

puts "Start to join a new node #{@options[:host]}"
puts " to the cluster of #{@options["replica-source-host"]}"
puts "Start to join a new node #{joining_node.host}"
puts " to the cluster of #{source_node.host}"
puts " via #{@options["receiver-host"]} (this host)"
puts " port = #{joining_node.port}"
puts " tag = #{joining_node.tag}"
puts " dataset = #{dataset}"
puts ""
puts "Source Cluster ID: #{source_cluster_id}"
puts ""
Expand Down Expand Up @@ -76,33 +80,36 @@ class JoinCommand
option.on("no-copy", "Don't copy data from the source cluster.",
:default => false)

option.separator("Connections:")
option.separator("Target:")
option.on(:host=,
"Host name of the new node to be joined.",
:required => true)
option.on("replica-source-host=",
"Host name of the soruce node in the cluster to be connected.",
:required => true)
option.on("receiver-host=",
"Host name of this host.",
:default => Socket.gethostname)
option.on(:dataset=,
"Dataset name of for the node to be joined.",
:default => Droonga::CatalogGenerator::DEFAULT_DATASET)

option.on(:port=,
"Port number of the source cluster to be connected.",
:as => Integer,
:default => Droonga::CatalogGenerator::DEFAULT_PORT)
:default => Droonga::NodeName::DEFAULT_PORT)
option.on(:tag=,
"Tag name of the soruce cluster to be connected.",
:default => Droonga::CatalogGenerator::DEFAULT_TAG)
:default => Droonga::NodeName::DEFAULT_TAG)
option.on(:dataset=,
"Dataset name of for the node to be joined.",
:default => Droonga::Catalog::Dataset::DEFAULT_NAME)

option.separator("Connections:")
option.on("receiver-host=",
"Host name of this host.",
:default => Socket.gethostname)

option.separator("Miscellaneous:")
option.on("records-per-second=",
"Maximum number of records per second to be copied. " +
"'#{Droonga::Client::RateLimiter::NO_LIMIT}' means no limit.",
:as => Integer,
:default => Droonga::DataAbsorberClient::DEFAULT_MESSAGES_PER_SECOND)

option.separator("Miscellaneous:")
option.on("progress-interval-seconds=",
"Interval seconds to report progress.",
:as => Integer,
Expand All @@ -116,12 +123,20 @@ class JoinCommand
exit(false)
end

def dataset
@options[:dataset]
end

def joining_node
"#{@options[:host]}:#{@options[:port]}/#{@options[:tag]}"
@joining_node ||= NodeName.new(:host => @options[:host],
:port => @options[:port],
:tag => @options[:tag])
end

def source_node
"#{@options["replica-source-host"]}:#{@options[:port]}/#{@options[:tag]}"
@source_node ||= NodeName.new(:host => @options["replica-source-host"],
:port => @options[:port],
:tag => @options[:tag])
end

def source_cluster_id
Expand All @@ -142,11 +157,11 @@ class JoinCommand
end

def fetch_source_catalog
fetcher = Droonga::CatalogFetcher.new(:host => @options["replica-source-host"],
:port => @options[:port],
:tag => @options[:tag],
fetcher = Droonga::CatalogFetcher.new(:host => source_node.host,
:port => source_node.port,
:tag => source_node.tag,
:receiver_host => @options["receiver-host"])
fetcher.fetch(:dataset => @options[:dataset])
fetcher.fetch(:dataset => dataset)
end

def run_remote_command(target, command, options)
Expand All @@ -160,14 +175,21 @@ class JoinCommand

def prepare_absorber
absorber_options = {
:dataset => @options[:dataset],
:source_host => @options["replica-source-host"],
:destination_host => @options[:host],
:host => joining_node.host,
:port => joining_node.port,
:tag => joining_node.tag,
:dataset => dataset,

:source_host => source_node.host,
:source_port => source_node.port,
:source_tag => source_node.tag,
:source_dataset => dataset,

:receiver_host => @options["receiver-host"],
:port => @options[:port],
:tag => @options[:tag],

:messages_per_second => @options["records-per-second"],
:progress_interval_seconds => @options["progress-interval-seconds"],

:client_options => {
:backend => :coolio,
:loop => @loop,
Expand All @@ -179,8 +201,8 @@ class JoinCommand
def set_source_node_role
if absorber.source_node_suspendable?
puts("Changing role of the source node...")
run_remote_command(source_node, "change_role",
"node" => source_node,
run_remote_command(source_node.to_s, "change_role",
"node" => source_node.to_s,
"role" => Droonga::NodeMetadata::Role::ABSORB_SOURCE)
wait_until_restarted(source_node)
end
Expand All @@ -189,8 +211,8 @@ class JoinCommand

def set_joining_node_role
puts("Changing role of the joining node...")
run_remote_command(joining_node, "change_role",
"node" => joining_node,
run_remote_command(joining_node.to_s, "change_role",
"node" => joining_node.to_s,
"role" => Droonga::NodeMetadata::Role::ABSORB_DESTINATION)
wait_until_restarted(joining_node)
@joining_node_role_changed = true
Expand All @@ -199,30 +221,30 @@ class JoinCommand
def reset_source_node_role
if absorber.source_node_suspendable?
puts("Restoring role of the source node...")
run_remote_command(source_node, "change_role",
"node" => source_node,
run_remote_command(source_node.to_s, "change_role",
"node" => source_node.to_s,
"role" => Droonga::NodeMetadata::Role::SERVICE_PROVIDER)
wait_until_restarted(source_node)
wait_until_restarted(source_node.to_s)
end
@source_node_role_changed = false
end

def reset_joining_node_role
puts("Restoring role of the joining node...")
run_remote_command(joining_node, "change_role",
"node" => joining_node,
run_remote_command(joining_node.to_s, "change_role",
"node" => joining_node.to_s,
"role" => Droonga::NodeMetadata::Role::SERVICE_PROVIDER)
wait_until_restarted(joining_node)
wait_until_restarted(joining_node.to_s)
@joining_node_role_changed = false
end

def do_join
puts("Joining new replica to the cluster...")
run_remote_command(joining_node, "join",
"node" => joining_node,
run_remote_command(joining_node.to_s, "join",
"node" => joining_node.to_s,
"type" => "replica",
"source" => source_node,
"dataset" => @options[:dataset])
"source" => source_node.to_s,
"dataset" => dataset)
wait_until_restarted(joining_node)
end

Expand All @@ -249,25 +271,25 @@ class JoinCommand
end

def set_effective_message_timestamp
response = run_remote_command(source_node, "report_metadata",
"node" => source_node,
response = run_remote_command(source_node.to_s, "report_metadata",
"node" => source_node.to_s,
"key" => "last_processed_message_timestamp")
timestamp = response["value"]
if timestamp and not timestamp.empty?
puts "The timestamp of the last processed message in the source node: #{timestamp}"
puts "Setting effective message timestamp for the destination node..."
response = run_remote_command(joining_node, "accept_messages_newer_than",
"node" => joining_node,
response = run_remote_command(joining_node.to_s, "accept_messages_newer_than",
"node" => joining_node.to_s,
"timestamp" => timestamp)
end
end

def register_to_existing_nodes
puts("Register new node to existing hosts in the cluster...")
run_remote_command(source_node, "add_replicas",
run_remote_command(source_node.to_s, "add_replicas",
"cluster_id" => source_cluster_id,
"dataset" => @options[:dataset],
"hosts" => [@options[:host]])
"dataset" => dataset,
"hosts" => [joining_node.host])
wait_until_restarted(source_node)
@node_registered = true
end
Expand All @@ -276,8 +298,8 @@ class JoinCommand
puts("Unregister new node from existing hosts in the cluster...")
run_remote_command(source_node, "remove_replicas",
"cluster_id" => source_cluster_id,
"dataset" => @options[:dataset],
"hosts" => [@options[:host]])
"dataset" => dataset,
"hosts" => [joining_node.host])
wait_until_restarted(source_node)
@node_registered = false
end
Expand Down

0 comments on commit be7210a

Please sign in to comment.