Navigation Menu

Skip to content

Commit

Permalink
Format Serf output
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Jun 25, 2014
1 parent d808ad9 commit d5b4dcb
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 18 deletions.
2 changes: 1 addition & 1 deletion lib/droonga/command/droonga_engine.rb
Expand Up @@ -132,7 +132,7 @@ def address_family
end

def log_level
ENV["DROONGA_LOG_LEVEL"] || Logger::Level.default_label
ENV["DROONGA_LOG_LEVEL"] || Logger::Level.default
end

def daemon?
Expand Down
16 changes: 10 additions & 6 deletions lib/droonga/logger.rb
Expand Up @@ -49,12 +49,12 @@ def label(level)
LABELS[level]
end

def default
WARN
def value(label)
LABELS.index(label.to_s)
end

def default_label
label(default)
def default
ENV["DROONGA_LOG_LEVEL"] || label(WARN)
end
end
end
Expand All @@ -73,15 +73,19 @@ def default_output=(output)
def initialize(options={})
@output = options[:output] || self.class.default_output
@tag = options[:tag]
self.level = ENV["DROONGA_LOG_LEVEL"] || Level.default_label
self.level = options[:level] || Level.default
end

def level
Level.label(@level)
end

def level=(level)
@level = Level::LABELS.index(level.to_s)
if level.is_a?(Numeric)
@level = level
else
@level = Level.value(level)
end
end

def trace(message, data={})
Expand Down
154 changes: 143 additions & 11 deletions lib/droonga/serf.rb
Expand Up @@ -13,10 +13,13 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

require "English"

require "droonga/path"
require "droonga/loggable"
require "droonga/catalog_loader"
require "droonga/serf_downloader"
require "droonga/line_buffer"

module Droonga
class Serf
Expand All @@ -31,7 +34,7 @@ def path
def initialize(loop, name)
@loop = loop
@name = name
@pid = nil
@agent = nil
end

def start
Expand All @@ -43,23 +46,24 @@ def start
detect_other_hosts.each do |other_host|
retry_joins.push("-retry-join", other_host)
end
@pid = run("agent",
"-node", @name,
"-bind", extract_host(@name),
"-event-handler", "droonga-engine-serf-event-handler",
*retry_joins)
@agent = run("agent",
"-node", @name,
"-bind", extract_host(@name),
"-event-handler", "droonga-engine-serf-event-handler",
"-log-level", log_level,
*retry_joins)
logger.trace("start: done")
end

def running?
not @pid.nil?
@agent and @agent.running?
end

def shutdown
logger.trace("shutdown: start")
Process.waitpid(run("leave"))
Process.waitpid(@pid)
@pid = nil
run("leave").shutdown
@agent.shutdown
@agent = nil
logger.trace("shutdown: done")
end

Expand All @@ -85,13 +89,29 @@ def find_system_serf
end

def run(command, *options)
spawn(@serf, command, "-rpc-addr", rpc_address, *options)
process = SerfProcess.new(@loop, @serf, command,
"-rpc-addr", rpc_address,
*options)
process.start
process
end

def extract_host(node_name)
node_name.split(":").first
end

def log_level
level = Logger::Level.default
case level
when "trace", "debug", "info", "warn"
level
when "error", "fatal"
"err"
else
level # Or error?
end
end

def rpc_address
"#{extract_host(@name)}:7373"
end
Expand All @@ -110,5 +130,117 @@ def detect_other_hosts
def log_tag
"serf"
end

class SerfProcess
include Loggable

def initialize(loop, serf, command, *options)
@loop = loop
@serf = serf
@command = command
@options = options
@pid = nil
end

def start
capture_output do |output_write, error_write|
env = {}
spawn_options = {
:out => output_write,
:err => error_write,
}
@pid = spawn(env, @serf, @command, *@options, spawn_options)
end
end

def shutdown
return if @pid.nil?
Process.waitpid(@pid)
@output_io.close
@error_io.close
@pid = nil
end

def running?
not @pid.nil?
end

private
def capture_output
result = nil
output_read, output_write = IO.pipe
error_read, error_write = IO.pipe

begin
result = yield(output_write, error_write)
rescue
output_read.close unless output_read.closed?
output_write.close unless output_write.closed?
error_read.close unless error_read.closed?
error_write.close unless error_write.closed?
raise
end

output_line_buffer = LineBuffer.new
on_read_output = lambda do |data|
on_standard_output(output_line_buffer, data)
end
@output_io = Coolio::IO.new(output_read)
@output_io.on_read do |data|
on_read_output.call(data)
end
@loop.attach(@output_io)

error_line_buffer = LineBuffer.new
on_read_error = lambda do |data|
on_error_output(error_line_buffer, data)
end
@error_io = Coolio::IO.new(error_read)
@error_io.on_read do |data|
on_read_error.call(data)
end
@loop.attach(@error_io)

result
end

def on_standard_output(line_buffer, data)
line_buffer.feed(data) do |line|
line = line.chomp
case line
when /\A==> /
content = $POSTMATCH
logger.info(content)
when /\A /
content = $POSTMATCH
case content
when /\A(\d{4})\/(\d{2})\/(\d{2}) (\d{2}):(\d{2}):(\d{2}) \[(\w+)\] /
year, month, day = $1, $2, $3
hour, minute, second = $4, $5, $6
level = $7
content = $POSTMATCH
logger.send(level.downcase, content)
else
logger.info(content)
end
else
logger.info(line)
end
end
end

def on_error_output(line_buffer, data)
line_buffer.feed(data) do |line|
line = line.chomp
logger.error(line.gsub(/\A==> /, ""))
end
end

def log_tag
tag = "serf"
tag << "[#{@pid}]" if @pid
tag
end
end
end
end

0 comments on commit d5b4dcb

Please sign in to comment.