Navigation Menu

Skip to content

Commit

Permalink
Don't save "absorbing" tag as a part of cluster state to suppress res…
Browse files Browse the repository at this point in the history
…tarting
  • Loading branch information
piroor committed Apr 9, 2015
1 parent 9eb1c5a commit 235aae4
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 24 deletions.
11 changes: 6 additions & 5 deletions bin/droonga-engine-absorb-data
Expand Up @@ -136,9 +136,11 @@ class AbsorbDataCommand
serf.send_query(command, options)
end

def current_cluster_state(target)
def current_member_state(target)
serf = Droonga::Serf.new(target, :verbose => @options.verbose)
serf.current_cluster_state(:node => target)
serf.current_members.select do |member|
member["name"] == target
end.first
end

def absorber
Expand Down Expand Up @@ -170,9 +172,8 @@ class AbsorbDataCommand
last_progress = ""
while true
sleep(3)
state = current_cluster_state(source_node)
if state[destination_node].nil? or
not state[destination_node]["absorbing"]
state = current_member_state(destination_node)
if state.nil? or state["tags"]["absorbing"] != "true"
break
end

Expand Down
11 changes: 6 additions & 5 deletions bin/droonga-engine-join
Expand Up @@ -141,9 +141,11 @@ class JoinCommand
serf.send_query(command, options)
end

def current_cluster_state(target)
def current_member_state(target)
serf = Droonga::Serf.new(target, :verbose => @options[:verbose])
serf.current_cluster_state(:node => target)
serf.current_members.select do |member|
member["name"] == target
end.first
end

def absorber
Expand Down Expand Up @@ -229,9 +231,8 @@ class JoinCommand
last_progress = ""
while true
sleep(3)
state = current_cluster_state(source_node)
if state[joining_node].nil? or
not state[joining_node]["absorbing"]
state = current_member_state(joining_node)
if state.nil? or state["tags"]["absorbing"] != "true"
break
end
progress = absorber.report_progress(@start_time_in_seconds)
Expand Down
20 changes: 6 additions & 14 deletions lib/droonga/serf.rb
Expand Up @@ -111,24 +111,17 @@ def update_cluster_state
end
end

def current_cluster_state(options={})
def current_members
raw_response = run_command("members", "-format", "json")
response = JSON.parse(raw_response)
response["members"]
end

current_cluster_id = nil
if options[:node]
response["members"].each do |member|
next if member["name"] != options[:node]
current_cluster_id = member["tags"]["cluster_id"]
break
end
else
current_cluster_id = cluster_id
end

def current_cluster_state
current_cluster_id = cluster_id
nodes = {}
unprocessed_messages_existence = {}
response["members"].each do |member|
current_members.each do |member|
foreign = member["tags"]["cluster_id"] != current_cluster_id
next if foreign

Expand All @@ -144,7 +137,6 @@ def current_cluster_state(options={})
"role" => member["tags"]["role"],
"accept_messages_newer_than" => member["tags"]["accept-messages-newer-than"],
"live" => member["status"] == "alive",
"absorbing" => member["tags"]["absorbing"] == "true",
}
end
unprocessed_messages_existence.each do |node_name, have_messages|
Expand Down

0 comments on commit 235aae4

Please sign in to comment.