Navigation Menu

Skip to content

Commit

Permalink
serf: raise an error when serf exits with failure
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Jan 5, 2015
1 parent 7f241be commit 1f6a143
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 35 deletions.
7 changes: 2 additions & 5 deletions bin/droonga-engine-absorb-data
Expand Up @@ -132,11 +132,8 @@ class AbsorbDataCommand
end

def run_remote_command(target, command, options)
serf = Droonga::Serf.new(nil, target)
result = serf.send_query(command, options)
#puts result[:result]
puts result[:error] unless result[:error].empty?
result[:response]
serf = Droonga::Serf.new(target)
serf.send_query(command, options)
end

def absorber
Expand Down
7 changes: 2 additions & 5 deletions bin/droonga-engine-join
Expand Up @@ -125,11 +125,8 @@ class JoinCommand
end

def run_remote_command(target, command, options)
serf = Droonga::Serf.new(nil, target)
result = serf.send_query(command, options)
#puts(result[:result])
puts(result[:error]) unless result[:error].empty?
result[:response]
serf = Droonga::Serf.new(target)
serf.send_query(command, options)
end

def absorber
Expand Down
7 changes: 2 additions & 5 deletions bin/droonga-engine-unjoin
Expand Up @@ -133,11 +133,8 @@ class UnjoinCommand
end

def run_remote_command(target, command, options)
serf = Droonga::Serf.new(nil, target)
result = serf.send_query(command, options)
puts(result[:result])
puts(result[:error]) unless result[:error].empty?
result[:response]
serf = Droonga::Serf.new(target)
serf.send_query(command, options)
end

def do_unjoin
Expand Down
31 changes: 17 additions & 14 deletions lib/droonga/serf.rb
Expand Up @@ -79,22 +79,25 @@ def join(*hosts)
def send_query(query, payload)
options = ["-format", "json"] + additional_options_from_payload(payload)
options += [query, JSON.generate(payload)]
result = run_command("query", *options)
result[:result] = JSON.parse(result[:result])
if payload["node"]
responses = result[:result]["Responses"]
response = responses[payload["node"]]
raw_serf_response = run_command("query", *options)
serf_response = JSON.parse(raw_serf_response)

node = payload["node"]
if node
responses = serf_response["Responses"]
response = responses[node]
if response.is_a?(String)
begin
result[:response] = JSON.parse(response)
JSON.parse(response)
rescue JSON::ParserError
result[:response] = response
response
end
else
result[:response] = response
response
end
else
response
end
result
end

def update_cluster_state
Expand All @@ -108,12 +111,12 @@ def update_cluster_state
end

def current_cluster_state
nodes = {}
result = run_command("members", "-format", "json")
result[:result] = JSON.parse(result[:result])
members = result[:result]
raw_response = run_command("members", "-format", "json")
response = JSON.parse(raw_response)

current_cluster_id = cluster_id
members["members"].each do |member|
nodes = {}
response["members"].each do |member|
foreign = member["tags"]["cluster_id"] != current_cluster_id
next if foreign

Expand Down
28 changes: 22 additions & 6 deletions lib/droonga/serf_command.rb
Expand Up @@ -20,6 +20,20 @@
module Droonga
class Serf
class Command
class Failed < Error
attr_reader :command_line, :exit_status, :output, :error
def initialize(command_line, exit_status, output, error)
@command_line = command_line
@exit_status = exit_status
@output = output
@error = error
message = "Failed to run serf: (#{@exit_status}): "
message << "#{@error}[#{@output}]: "
message << @command_line..join(" ")
super(message)
end
end

include Loggable

def initialize(serf, command, *options)
Expand All @@ -29,12 +43,14 @@ def initialize(serf, command, *options)
end

def run
stdout, stderror, status = Open3.capture3(@serf, @command, *@options, :pgroup => true)
{
:result => stdout,
:error => stderror,
:status => status,
}
command_line = [@serf, @command] + @options
stdout, stderror, status = Open3.capture3(*command_line,
:pgroup => true)
unless status.success?
raise Failed.new(command_line, status.to_i, stdout, stderror)
end
logger.error("run: #{stderror}") unless stderror.empty?
stdout
end

def log_tag
Expand Down

0 comments on commit 1f6a143

Please sign in to comment.