Navigation Menu

Skip to content

Commit

Permalink
serf: extract code for "serf agent" as a class
Browse files Browse the repository at this point in the history
"serf agent" is a special command. It runs serf as a service. Other
command runs serf as a command.
  • Loading branch information
kou committed Jan 5, 2015
1 parent 5e07a2f commit eb9aea2
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 181 deletions.
15 changes: 8 additions & 7 deletions lib/droonga/command/droonga_engine.rb
Expand Up @@ -341,7 +341,7 @@ def initialize(configuration)
end

def run
@serf = run_serf
start_serf
@service_runner = run_service
setup_initial_on_ready
@catalog_observer = run_catalog_observer
Expand Down Expand Up @@ -390,14 +390,16 @@ def trap_signals
def stop_gracefully
@command_runner.stop
@catalog_observer.stop
@serf.stop
@serf.leave
@serf_agent.stop
@service_runner.stop_gracefully
end

def stop_immediately
@command_runner.stop
@catalog_observer.stop
@serf.stop
@serf.leave
@serf_agent.stop
@service_runner.stop_immediately
end

Expand Down Expand Up @@ -426,10 +428,9 @@ def run_service
service_runner
end

def run_serf
serf = Serf.new(@loop, @configuration.engine_name)
serf.start
serf
def start_serf
@serf = Serf.new(@configuration.engine_name)
@serf_agent = @serf.run_agent(@loop)
end

def run_catalog_observer
Expand Down
2 changes: 1 addition & 1 deletion lib/droonga/command/remote.rb
Expand Up @@ -37,7 +37,7 @@ def initialize(serf_name, params)
@response = {
"log" => []
}
@serf = Serf.new(nil, @serf_name)
@serf = Serf.new(@serf_name)

@service_installation = ServiceInstallation.new
@service_installation.ensure_using_service_base_directory
Expand Down
204 changes: 31 additions & 173 deletions lib/droonga/serf.rb
Expand Up @@ -13,8 +13,6 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

require "English"

require "json"
require "coolio"
require "open3"
Expand All @@ -24,15 +22,13 @@
require "droonga/catalog_loader"
require "droonga/node_metadata"
require "droonga/serf_downloader"
require "droonga/serf_agent"
require "droonga/line_buffer"
require "droonga/safe_file_writer"
require "droonga/service_installation"

module Droonga
class Serf
# the port must be different from droonga-http-server's agent!
AGENT_PORT = 7946

class << self
def path
Droonga::Path.base + "serf"
Expand All @@ -41,61 +37,44 @@ def path

include Loggable

def initialize(loop, name)
# TODO: Don't allow nil for loop. It reduces nil checks and
# simplifies source code.
@loop = loop
def initialize(name)
@name = name
@agent = nil
@service_installation = ServiceInstallation.new
end

def start
logger.trace("start: start")
def run_agent(loop)
logger.trace("run_agent: start")
ensure_serf
retry_joins = []
detect_other_hosts.each do |other_host|
retry_joins.push("-retry-join", other_host)
end
@agent = run("agent",
"-node", @name,
"-bind", "#{extract_host(@name)}:#{port}",
"-event-handler", "droonga-engine-serf-event-handler",
"-log-level", log_level,
"-tag", "type=engine",
"-tag", "role=#{role}",
"-tag", "cluster_id=#{cluster_id}",
*retry_joins)
Thread.new do
sleep 1 # wait until the serf agent becomes running
update_cluster_state if @agent.running?
agent = Agent.new(loop, @serf,
extract_host(@name), agent_port, rpc_port,
"-node", @name,
"-event-handler", "droonga-engine-serf-event-handler",
"-log-level", log_level,
"-tag", "type=engine",
"-tag", "role=#{role}",
"-tag", "cluster_id=#{cluster_id}",
*retry_joins)
agent.on_ready = lambda do
update_cluster_state
end
logger.trace("start: done")
end

def running?
@agent and @agent.running?
end

def stop
logger.trace("stop: start")
run("leave").stop
@agent.stop
@agent = nil
logger.trace("stop: done")
agent.start
logger.trace("run_agent: done")
agent
end

def restart
logger.trace("restart: start")
stop
start
logger.trace("restart: done")
def leave
ensure_serf
run_once("leave")
end

def join(*hosts)
ensure_serf
nodes = hosts.collect do |host|
"#{host}:#{port}"
"#{host}:#{agent_port}"
end
run_once("join", *nodes)
end
Expand Down Expand Up @@ -203,16 +182,8 @@ def find_system_serf
nil
end

def run(command, *options)
process = SerfProcess.new(@loop, @serf, command,
"-rpc-addr", rpc_address,
*options)
process.start
process
end

def run_once(command, *options)
process = SerfProcess.new(@loop, @serf, command,
process = SerfProcess.new(@serf, command,
"-rpc-addr", rpc_address,
*options)
process.run_once
Expand Down Expand Up @@ -243,15 +214,19 @@ def log_level
end

def rpc_address
"#{extract_host(@name)}:7373"
"#{extract_host(@name)}:#{rpc_port}"
end

def rpc_port
7373
end

def node_metadata
@node_metadata ||= NodeMetadata.new
end

def port
AGENT_PORT
def agent_port
Agent::PORT
end

def detect_other_hosts
Expand All @@ -272,35 +247,10 @@ def log_tag
class SerfProcess
include Loggable

def initialize(loop, serf, command, *options)
@loop = loop
def initialize(serf, command, *options)
@serf = serf
@command = command
@options = options
@pid = nil
end

def start
capture_output do |output_write, error_write|
env = {}
spawn_options = {
:out => output_write,
:err => error_write,
}
@pid = spawn(env, @serf, @command, *@options, spawn_options)
end
end

def stop
return if @pid.nil?
Process.waitpid(@pid)
@output_io.close
@error_io.close
@pid = nil
end

def running?
not @pid.nil?
end

def run_once
Expand All @@ -311,98 +261,6 @@ def run_once
:status => status,
}
end

private
def capture_output
result = nil
output_read, output_write = IO.pipe
error_read, error_write = IO.pipe

begin
result = yield(output_write, error_write)
rescue
output_read.close unless output_read.closed?
output_write.close unless output_write.closed?
error_read.close unless error_read.closed?
error_write.close unless error_write.closed?
raise
end

output_line_buffer = LineBuffer.new
on_read_output = lambda do |data|
on_standard_output(output_line_buffer, data)
end
@output_io = Coolio::IO.new(output_read)
@output_io.on_read do |data|
on_read_output.call(data)
end
# TODO: Don't allow nil for loop. It reduces nil checks and
# simplifies source code.
@loop.attach(@output_io) if @loop

error_line_buffer = LineBuffer.new
on_read_error = lambda do |data|
on_error_output(error_line_buffer, data)
end
@error_io = Coolio::IO.new(error_read)
@error_io.on_read do |data|
on_read_error.call(data)
end
# TODO: Don't allow nil for loop. It reduces nil checks and
# simplifies source code.
@loop.attach(@error_io) if @loop

result
end

def on_standard_output(line_buffer, data)
line_buffer.feed(data) do |line|
line = line.chomp
case line
when /\A==> /
content = $POSTMATCH
logger.info(content)
when /\A /
content = $POSTMATCH
case content
when /\A(\d{4})\/(\d{2})\/(\d{2}) (\d{2}):(\d{2}):(\d{2}) \[(\w+)\] /
year, month, day = $1, $2, $3
hour, minute, second = $4, $5, $6
level = $7
content = $POSTMATCH
level = normalize_level(level)
logger.send(level, content)
else
logger.info(content)
end
else
logger.info(line)
end
end
end

def normalize_level(level)
level = level.downcase
case level
when "err"
"error"
else
level
end
end

def on_error_output(line_buffer, data)
line_buffer.feed(data) do |line|
line = line.chomp
logger.error(line.gsub(/\A==> /, ""))
end
end

def log_tag
tag = "serf"
tag << "[#{@pid}]" if @pid
tag
end
end
end
end

0 comments on commit eb9aea2

Please sign in to comment.