Navigation Menu

Skip to content

Commit

Permalink
Wait until all existing nodes are restarted, after cluster informatio…
Browse files Browse the repository at this point in the history
…n is changed
  • Loading branch information
piroor committed Apr 21, 2015
1 parent 685cda8 commit 414de86
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
25 changes: 23 additions & 2 deletions bin/droonga-engine-join
Expand Up @@ -169,6 +169,27 @@ module Droonga
source_catalog.cluster_id
end

def all_nodes
existing_nodes + [joining_node]
end

def existing_nodes
@existing_nodes ||= prepare_existing_nodes
end

def prepare_existing_nodes
generator = Catalog::Generator.new
generator.load(raw_source_catalog)

dataset = generator.dataset_for_host(source_node.host)
other_hosts = dataset.replicas.hosts
other_hosts.collect do |host|
NodeName.new(:host => host,
:port => source_node.port,
:tag => source_node.tag)
end
end

def source_catalog
@source_catalog ||= parse_source_catalog
end
Expand Down Expand Up @@ -314,7 +335,7 @@ module Droonga

def register_to_existing_nodes
puts("Register new node to existing hosts in the cluster...")
source_node_serf.ensure_restarted do
source_node_serf.ensure_restarted(*existing_nodes) do
source_node_serf.send_query("add_replicas",
"cluster_id" => source_cluster_id,
"dataset" => dataset,
Expand All @@ -325,7 +346,7 @@ module Droonga

def unregister_from_existing_nodes
puts("Unregister new node from existing hosts in the cluster...")
source_node_serf.ensure_restarted do
source_node_serf.ensure_restarted(*existing_nodes) do
source_node_serf.send_query("remove_replicas",
"cluster_id" => source_cluster_id,
"dataset" => dataset,
Expand Down
23 changes: 17 additions & 6 deletions lib/droonga/serf.rb
Expand Up @@ -222,20 +222,31 @@ def cluster_id
CHECK_RESTARTED_INTERVAL = 3
CHECK_RESTARTED_TIMEOUT = 60 * 5

def ensure_restarted(&block)
def ensure_restarted(*nodes, &block)
nodes << @name.to_s if nodes.empty?

targets = nodes.collect do |node|
serf = self.class.new(node)
{
:serf => serf,
:previous_name => serf.get_tag(Tag.internal_node_name),
}
end

start_time = Time.now
previous_internal_name = get_tag(Tag.internal_node_name)
restarted = false

yield # the given operation must restart the service.

while Time.now - start_time < CHECK_RESTARTED_TIMEOUT
restarted = get_tag(Tag.internal_node_name) == previous_internal_name
break if restarted
targets.reject! do |target|
name = target[:serf].get_tag(Tag.internal_node_name)
name != target[:previous_name]
end
break if targets.empty?
sleep(CHECK_RESTARTED_INTERVAL)
end

restarted
targets.empty?
end

private
Expand Down

0 comments on commit 414de86

Please sign in to comment.