Navigation Menu

Skip to content

Commit

Permalink
Put Serf related implementations under a directory
Browse files Browse the repository at this point in the history
Conflicts:
	lib/droonga/serf.rb
	lib/droonga/serf/agent.rb
	lib/droonga/serf/command.rb
  • Loading branch information
piroor committed Jan 6, 2015
1 parent ce1a8d0 commit 85bdec9
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 3 deletions.
6 changes: 4 additions & 2 deletions lib/droonga/serf.rb
Expand Up @@ -23,7 +23,9 @@
require "droonga/loggable"
require "droonga/catalog_loader"
require "droonga/node_metadata"
require "droonga/serf_downloader"
require "droonga/serf/downloader"
require "droonga/serf/agent"
require "droonga/serf/command"
require "droonga/line_buffer"
require "droonga/safe_file_writer"
require "droonga/service_installation"
Expand Down Expand Up @@ -192,7 +194,7 @@ def ensure_serf
serf_path = self.class.path
@serf = serf_path.to_s
return if serf_path.executable?
downloader = SerfDownloader.new(serf_path)
downloader = Downloader.new(serf_path)
downloader.download
end

Expand Down
211 changes: 211 additions & 0 deletions lib/droonga/serf/agent.rb
@@ -0,0 +1,211 @@
# Copyright (C) 2014-2015 Droonga Project
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1 as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "English"

require "coolio"

require "droonga/loggable"

module Droonga
class Serf
class Agent
# the port must be different from droonga-http-server's agent!
PORT = 7946

include Loggable

MAX_N_READ_CHECKS = 10

attr_writer :on_ready
attr_writer :on_failure
def initialize(loop, serf, host, bind_port, rpc_port, *options)
@loop = loop
@serf = serf
@host = host
@bind_port = bind_port
@rpc_port = rpc_port
@options = options
@pid = nil
@on_ready = nil
@on_failure = nil
@n_ready_checks = 0
end

def start
capture_output do |output_write, error_write|
env = {}
spawn_options = {
:out => output_write,
:err => error_write,
}
@pid = spawn(env, @serf, "agent",
"-bind", "#{@host}:#{@bind_port}",
"-rpc-addr", "#{@host}:#{@rpc_port}",
"-log-level", serf_log_level,
*@options, spawn_options)
end
start_ready_check
end

def stop
return if @pid.nil?
Process.waitpid(@pid)
@output_io.close
@error_io.close
@pid = nil
end

def running?
not @pid.nil?
end

private
def serf_log_level
level = Logger::Level.default
case level
when "trace", "debug", "info", "warn"
level
when "error", "fatal"
"err"
else
level # Or error?
end
end

def capture_output
result = nil
output_read, output_write = IO.pipe
error_read, error_write = IO.pipe

begin
result = yield(output_write, error_write)
rescue
output_read.close unless output_read.closed?
output_write.close unless output_write.closed?
error_read.close unless error_read.closed?
error_write.close unless error_write.closed?
raise
end

output_line_buffer = LineBuffer.new
on_read_output = lambda do |data|
on_standard_output(output_line_buffer, data)
end
@output_io = Coolio::IO.new(output_read)
@output_io.on_read do |data|
on_read_output.call(data)
end
@loop.attach(@output_io)

error_line_buffer = LineBuffer.new
on_read_error = lambda do |data|
on_error_output(error_line_buffer, data)
end
@error_io = Coolio::IO.new(error_read)
@error_io.on_read do |data|
on_read_error.call(data)
end
@loop.attach(@error_io)

result
end

def on_standard_output(line_buffer, data)
line_buffer.feed(data) do |line|
line = line.chomp
case line
when /\A==> /
content = $POSTMATCH
logger.info(content)
when /\A /
content = $POSTMATCH
case content
when /\A(\d{4})\/(\d{2})\/(\d{2}) (\d{2}):(\d{2}):(\d{2}) \[(\w+)\] /
year, month, day = $1, $2, $3
hour, minute, second = $4, $5, $6
level = $7
content = $POSTMATCH
level = normalize_level(level)
logger.send(level, content)
else
logger.info(content)
end
else
logger.info(line)
end
end
end

def normalize_level(level)
level = level.downcase
case level
when "err"
"error"
else
level
end
end

def on_error_output(line_buffer, data)
line_buffer.feed(data) do |line|
line = line.chomp
logger.error(line.gsub(/\A==> /, ""))
end
end

def start_ready_check
@n_ready_checks += 1

checker = Coolio::TCPSocket.connect(@host, @bind_port)

on_connect = lambda do
@on_ready.call if @on_ready
checker.close
end
checker.on_connect do
on_connect.call
end

on_connect_failed = lambda do
if @n_ready_checks >= MAX_N_READ_CHECKS
@on_failure.call if @on_failure
else
timer = Coolio::TimerWatcher.new(1)
on_timer = lambda do
start_ready_check
timer.detach
end
timer.on_timer do
on_timer.call
end
@loop.attach(timer)
end
end
checker.on_connect_failed do
on_connect_failed.call
end

@loop.attach(checker)
end

def log_tag
tag = "serf-agent"
tag << "[#{@pid}]" if @pid
tag
end
end
end
end
61 changes: 61 additions & 0 deletions lib/droonga/serf/command.rb
@@ -0,0 +1,61 @@
# Copyright (C) 2014-2015 Droonga Project
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 2.1 as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "open3"

require "droonga/loggable"

module Droonga
class Serf
class Command
class Failure < Error
attr_reader :command_line, :exit_status, :output, :error
def initialize(command_line, exit_status, output, error)
@command_line = command_line
@exit_status = exit_status
@output = output
@error = error
message = "Failed to run serf: (#{@exit_status}): "
message << "#{@error.strip}[#{@output.strip}]: "
message << @command_line.join(" ")
super(message)
end
end

include Loggable

def initialize(serf, command, *options)
@serf = serf
@command = command
@options = options
end

def run
command_line = [@serf, @command] + @options
stdout, stderror, status = Open3.capture3(*command_line,
:pgroup => true)
unless status.success?
raise Failure.new(command_line, status.to_i, stdout, stderror)
end
logger.error("run: #{stderror}") unless stderror.empty?
stdout
end

def log_tag
"serf[#{@command}]"
end
end
end
end
Expand Up @@ -24,7 +24,7 @@
require "droonga/loggable"

module Droonga
class SerfDownloader
class Serf::Downloader
include Loggable

class DownloadFailed < StandardError
Expand Down

0 comments on commit 85bdec9

Please sign in to comment.