Navigation Menu

Skip to content

Commit

Permalink
Use features of serf module directly
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 21, 2015
1 parent 63022e2 commit f1f606b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 49 deletions.
20 changes: 10 additions & 10 deletions bin/droonga-engine-absorb-data
Expand Up @@ -182,14 +182,14 @@ module Droonga
:tag => @options.tag)
end

def run_remote_command(target, command, options)
serf = Serf.new(target, :verbose => @options.verbose)
serf.send_query(command, options)
def source_node_serf
@source_node_serf ||= Serf.new(source_node.to_s,
:verbose => @options.verbose)
end

def tag_value(target, tag)
serf = Serf.new(target, :verbose => @options[:verbose])
serf.get_tag(tag)
def destination_node_serf
@destination_node_serf ||= Serf.new(destination_node.to_s,
:verbose => @options.verbose)
end

def absorber
Expand Down Expand Up @@ -239,7 +239,7 @@ module Droonga

puts ""

timestamp = tag_value(source_node.to_s, "last-processed-message-timestamp")
timestamp = source_node_serf.last_processed_message_timestamp
unless timestamp
$stderr.puts("Couldn't get the time stamp of " +
"the last processed message from the source node.")
Expand All @@ -248,9 +248,9 @@ module Droonga
if timestamp and not timestamp.empty?
puts "The timestamp of the last processed message in the source node: #{timestamp}"
puts "Setting effective message timestamp for the destination node..."
response = run_remote_command(destination_node.to_s, "accept_messages_newer_than",
"node" => destination_node.to_s,
"timestamp" => timestamp)
response = destination_node_serf.send_query("accept_messages_newer_than",
"node" => destination_node.to_s,
"timestamp" => timestamp)
end
true
end
Expand Down
78 changes: 39 additions & 39 deletions bin/droonga-engine-join
Expand Up @@ -154,6 +154,16 @@ module Droonga
:tag => @options[:tag])
end

def source_node_serf
@source_node_serf ||= Serf.new(source_node.to_s,
:verbose => @options.verbose)
end

def joining_node_serf
@joining_node_serf ||= Serf.new(joining_node.to_s,
:verbose => @options.verbose)
end

def source_cluster_id
source_catalog.cluster_id
end
Expand All @@ -179,16 +189,6 @@ module Droonga
fetcher.fetch(:dataset => dataset)
end

def run_remote_command(target, command, options)
serf = Serf.new(target, :verbose => @options[:verbose])
serf.send_query(command, options)
end

def tag_value(target, tag)
serf = Serf.new(target, :verbose => @options[:verbose])
serf.get_tag(tag)
end

def absorber
@absorber ||= prepare_absorber
end
Expand Down Expand Up @@ -221,50 +221,50 @@ module Droonga
def set_source_node_role
if absorber.source_node_suspendable?
puts("Changing role of the source node...")
run_remote_command(source_node.to_s, "change_role",
"node" => source_node.to_s,
"role" => NodeMetadata::Role::ABSORB_SOURCE)
source_node_serf.send_query("change_role",
"node" => source_node.to_s,
"role" => NodeMetadata::Role::ABSORB_SOURCE)
wait_until_restarted(source_node)
end
@source_node_role_changed = true
end

def set_joining_node_role
puts("Changing role of the joining node...")
run_remote_command(joining_node.to_s, "change_role",
"node" => joining_node.to_s,
"role" => NodeMetadata::Role::ABSORB_DESTINATION)
joining_node_serf.send_query("change_role",
"node" => joining_node.to_s,
"role" => NodeMetadata::Role::ABSORB_DESTINATION)
wait_until_restarted(joining_node)
@joining_node_role_changed = true
end

def reset_source_node_role
if absorber.source_node_suspendable?
puts("Restoring role of the source node...")
run_remote_command(source_node.to_s, "change_role",
"node" => source_node.to_s,
"role" => NodeMetadata::Role::SERVICE_PROVIDER)
source_node_serf.send_query("change_role",
"node" => source_node.to_s,
"role" => NodeMetadata::Role::SERVICE_PROVIDER)
wait_until_restarted(source_node.to_s)
end
@source_node_role_changed = false
end

def reset_joining_node_role
puts("Restoring role of the joining node...")
run_remote_command(joining_node.to_s, "change_role",
"node" => joining_node.to_s,
"role" => NodeMetadata::Role::SERVICE_PROVIDER)
joining_node_serf.send_query("change_role",
"node" => joining_node.to_s,
"role" => NodeMetadata::Role::SERVICE_PROVIDER)
wait_until_restarted(joining_node.to_s)
@joining_node_role_changed = false
end

def do_join
puts("Joining new replica to the cluster...")
run_remote_command(joining_node.to_s, "join",
"node" => joining_node.to_s,
"type" => "replica",
"source" => source_node.to_s,
"dataset" => dataset)
joining_node_serf.send_query("join",
"node" => joining_node.to_s,
"type" => "replica",
"source" => source_node.to_s,
"dataset" => dataset)
wait_until_restarted(joining_node)
end

Expand All @@ -291,7 +291,7 @@ module Droonga
end

def set_effective_message_timestamp
timestamp = tag_value(source_node.to_s, "last-processed-message-timestamp")
timestamp = source_node_serf.last_processed_message_timestamp
unless timestamp
$stderr.puts("Couldn't get the time stamp of " +
"the last processed message from the source node.")
Expand All @@ -300,28 +300,28 @@ module Droonga
if timestamp and not timestamp.empty?
puts "The timestamp of the last processed message in the source node: #{timestamp}"
puts "Setting effective message timestamp for the destination node..."
response = run_remote_command(joining_node.to_s, "accept_messages_newer_than",
"node" => joining_node.to_s,
"timestamp" => timestamp)
response = joining_node_serf.send_query("accept_messages_newer_than",
"node" => joining_node.to_s,
"timestamp" => timestamp)
end
end

def register_to_existing_nodes
puts("Register new node to existing hosts in the cluster...")
run_remote_command(source_node.to_s, "add_replicas",
"cluster_id" => source_cluster_id,
"dataset" => dataset,
"hosts" => [joining_node.host])
source_node_serf.send_query("add_replicas",
"cluster_id" => source_cluster_id,
"dataset" => dataset,
"hosts" => [joining_node.host])
wait_until_restarted(source_node)
@node_registered = true
end

def unregister_from_existing_nodes
puts("Unregister new node from existing hosts in the cluster...")
run_remote_command(source_node, "remove_replicas",
"cluster_id" => source_cluster_id,
"dataset" => dataset,
"hosts" => [joining_node.host])
source_node_serf.send_query("remove_replicas",
"cluster_id" => source_cluster_id,
"dataset" => dataset,
"hosts" => [joining_node.host])
wait_until_restarted(source_node)
@node_registered = false
end
Expand Down

0 comments on commit f1f606b

Please sign in to comment.