Skip to content

Commit

Permalink
Support graceful shutdown and restart
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed May 27, 2014
1 parent e93e11d commit aec8070
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 71 deletions.
175 changes: 107 additions & 68 deletions lib/droonga/command/droonga_engine.rb
Expand Up @@ -213,17 +213,15 @@ def run_main_loop
service_runner = nil
trap(:INT) do
serf.shutdown if serf
service_runner.stop_immedieate if service_runner
raw_loop.stop
service_runner.stop_immediately if service_runner
end
trap(Signals::GRACEFUL_STOP) do
serf.shutdown if serf
service_runner.stop_graceful if service_runner
end
trap(Signals::IMMEDIATE_STOP) do
serf.shutdown if serf
service_runner.stop_immediate if service_runner
raw_loop.stop
service_runner.stop_immediately if service_runner
end
trap(Signals::GRACEFUL_RESTART) do
old_service_runner = service_runner
Expand All @@ -235,7 +233,7 @@ def run_main_loop
trap(Signals::IMMEDIATE_RESTART) do
old_service_runner = service_runner
service_runner = run_service(raw_loop)
old_service_runner.stop_immediate
old_service_runner.stop_immediately
end

serf = run_serf(raw_loop)
Expand Down Expand Up @@ -284,11 +282,8 @@ def on_ready=(callback)
@on_ready = callback
end

def on_finish=(callback)
@on_finish = callback
end

def run
control_write_in, control_write_out = IO.pipe
control_read_in, control_read_out = IO.pipe
listen_fd = @configuration.listen_socket.fileno
heartbeat_fd = @configuration.heartbeat_socket.fileno
Expand All @@ -299,66 +294,66 @@ def run
"#{$0}-service",
"--listen-fd", listen_fd.to_s,
"--heartbeat-fd", heartbeat_fd.to_s,
"--control-read-fd", control_write_in.fileno.to_s,
"--control-write-fd", control_read_out.fileno.to_s,
*@configuration.to_command_line,
]
options = {
listen_fd => listen_fd,
heartbeat_fd => heartbeat_fd,
control_write_in => control_write_in,
control_read_out => control_read_out,
}
if @log_output
options[:out] = @log_output
options[:err] = @log_output
end
@pid = spawn(env, *command_line, options)
control_write_in.close
control_read_out.close
attach_control_write_out(control_write_out)
attach_control_read_in(control_read_in)
end

def stop_graceful
stop(Signals::GRACEFUL_STOP)
@control_write_out.write("stop-graceful\n")
end

def stop_immedieate
stop(Signals::IMMEDIATE_STOP)
def stop_immediately
@control_write_out.write("stop-immediately\n")
end

def success?
@success
end

private
def stop(signal)
return if @pid.nil?

pid = @pid
Process.kill(signal, pid)
@pid = nil
@stop_timer = Coolio::TimerWatcher.new(0.5, true)
on_timer = lambda do
_, status = Process.waitpid2(pid, Process::WNOHANG)
if status
@success = status.success?
@stop_timer.detach
end
end
@stop_timer.on_timer do
on_timer.call
end
@raw_loop.attach(@stop_timer)
def on_ready
@on_ready.call if @on_ready
end

def on_finish
_, status = Process.waitpid2(@pid)
@success = status.success?
@control_write_out.close
@control_read_in.close
end

def attach_control_write_out(control_write_out)
@control_write_out = Coolio::IO.new(control_write_out)
@raw_loop.attach(@control_write_out)
end

def attach_control_read_in(control_read_in)
@control_read_in = Coolio::IO.new(control_read_in)
on_read = lambda do |data|
# TODO: should buffer data to handle half line received case
data.each_line do |line|
case line
when "ready\n"
@on_ready.call if @on_ready
on_ready
when "finish\n"
on_finish
end
end
end
Expand All @@ -382,25 +377,40 @@ def initialize
@engine_name = nil
@listen_fd = nil
@heartbeat_fd = nil
@contrtol_fd = nil
@contrtol_read_fd = nil
@contrtol_write_fd = nil
@contrtol_write_closed = false
end

def run(command_line_arguments)
create_new_process_group

parse_command_line_arguments!(command_line_arguments)
PluginLoader.load_all

control_write_io = IO.new(@control_write_fd)
begin
run_services
rescue
logger.exception("failed to run services", $!)
ensure
shutdown_services
unless @control_write_closed
control_write_io.write("finish\n")
control_write_io.close
end
end

true
end

private
def create_new_process_group
begin
Process.setsid
rescue SystemCallError, NotImplementedError
end
end

def parse_command_line_arguments!(command_line_arguments)
parser = OptionParser.new
add_internal_options(parser)
Expand All @@ -422,6 +432,10 @@ def add_internal_options(parser)
"Use FD as the heartbeat file descriptor") do |fd|
@heartbeat_fd = fd
end
parser.on("--control-read-fd=FD", Integer,
"Use FD to read control messages from the service") do |fd|
@control_read_fd = fd
end
parser.on("--control-write-fd=FD", Integer,
"Use FD to write control messages from the service") do |fd|
@control_write_fd = fd
Expand All @@ -433,27 +447,18 @@ def host
end

def run_services
@stopping = false
@engine = nil
@receiver = nil
raw_loop = Coolio::Loop.default
@loop = EventLoop.new(raw_loop)
@loop = Coolio::Loop.default

run_internal_message_receiver
run_engine
run_receiver
setup_signals
run_control_io
@loop.run
end

def shutdown_services
shutdown_control_io
shutdown_receiver
shutdown_internal_message_receiver
shutdown_engine
@loop = nil
end

def run_internal_message_receiver
@internal_message_receiver = create_internal_message_receiver
host, port = @internal_message_receiver.start
Expand All @@ -478,12 +483,6 @@ def run_engine
@engine.start
end

def shutdown_engine
return if @engine.nil?
@engine, engine = nil, @engine
engine.shutdown
end

def run_receiver
@receiver = create_receiver
@receiver.start
Expand All @@ -496,17 +495,59 @@ def shutdown_receiver
end

def run_control_io
@control_read = Coolio::IO.new(IO.new(@control_read_fd))
@control_read_fd = nil
on_read = lambda do |data|
# TODO: should buffer data to handle half line received case
data.each_line do |line|
case line
when "stop-graceful\n"
stop_graceful
when "stop-immediately\n"
stop_immediately
end
end
end
@control_read.on_read do |data|
on_read.call(data)
end
read_on_close = lambda do
if @control_read
@control_read = nil
stop_immediately
end
end
@control_read.on_close do
read_on_close.call
end
@loop.attach(@control_read)

@control_write = Coolio::IO.new(IO.new(@control_write_fd))
@control_write_fd = nil
write_on_close = lambda do
if @control_write
@control_write = nil
stop_immediately
end
@control_write_closed = true
end
@control_write.on_close do
write_on_close.call
end
@loop.attach(@control_write)

@control_write.write("ready\n")
end

def shutdown_control_io
return if @control_write.nil?
@control_write, control_write = nil, @control_write
control_write.close
if @control_write
@control_write, control_write = nil, @control_write
control_write.detach
end
if @control_read
@control_read, control_read = nil, @control_read
control_read.close
end
end

def create_receiver
Expand Down Expand Up @@ -541,27 +582,25 @@ def on_message(tag, time, record)
@engine.process(message)
end

def setup_signals
trap(Signals::GRACEFUL_STOP) do
stop_graceful
end
trap(Signals::IMMEDIATE_STOP) do
stop_immediate
end
trap(:INT) do
stop_immediate
trap(:INT, "DEFAULT")
def stop_graceful
return if @stopping
@stopping = true
shutdown_receiver
@engine.stop_graceful do
shutdown_control_io
shutdown_internal_message_receiver
end
end

def stop_graceful
# It may be called after stop_graceful.
def stop_immediately
shutdown_control_io
shutdown_receiver if @receiver
shutdown_internal_message_receiver
@engine.stop_immediately
@loop.stop
end

def stop_immediate
shutdown_services
end

def log_tag
"service"
end
Expand Down
25 changes: 22 additions & 3 deletions lib/droonga/engine.rb
Expand Up @@ -59,14 +59,33 @@ def start
logger.trace("start: done")
end

def shutdown
logger.trace("shutdown: start")
def stop_graceful
logger.trace("stop_graceful: start")
@catalog_observer.stop
@live_nodes_list_observer.stop
on_finish = lambda do
output_last_processed_timestamp
@dispatcher.shutdown
@state.shutdown
yield
end
if @state.have_session?
@state.on_finish = on_finish
else
on_finish.call
end
logger.trace("stop_graceful: done")
end

# It may be called after stop_graceful.
def stop_immediately
logger.trace("stop_immediately: start")
output_last_processed_timestamp
@catalog_observer.stop
@live_nodes_list_observer.stop
@dispatcher.shutdown
@state.shutdown
logger.trace("shutdown: done")
logger.trace("stop_immediately: done")
end

def process(message)
Expand Down
9 changes: 9 additions & 0 deletions lib/droonga/engine_state.rb
Expand Up @@ -31,6 +31,7 @@ class EngineState
attr_reader :internal_name
attr_reader :forwarder
attr_reader :replier
attr_accessor :on_finish
def initialize(loop, name, internal_name)
@loop = loop
@name = name
Expand All @@ -39,6 +40,7 @@ def initialize(loop, name, internal_name)
@current_id = 0
@forwarder = Forwarder.new(@loop)
@replier = Replier.new(@forwarder)
@on_finish = nil
end

def start
Expand Down Expand Up @@ -86,6 +88,13 @@ def register_session(id, session)

def unregister_session(id)
@sessions.delete(id)
unless have_session?
@on_finish.call if @on_finish
end
end

def have_session?
not @sessions.empty?
end

private
Expand Down

0 comments on commit aec8070

Please sign in to comment.