Skip to content

Commit

Permalink
Add a before_worker_exit callback to be called on graceful shutdown
Browse files Browse the repository at this point in the history
Worker can trigger a graceful shutdown with `Process.kill(:TERM, Process.pid)`
  • Loading branch information
byroot committed Jun 29, 2023
1 parent 13b83f1 commit 0efdfcf
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Unreleased

- Implement `before_worker_exit` callback.
- Make each mold and worker a process group leader.
- Get rid of `Pitchfork::PrereadInput`.
- Treat `TERM` as graceful shutdown rather than quick shutdown.
Expand Down
5 changes: 5 additions & 0 deletions lib/pitchfork/configurator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Configurator
:after_mold_fork => lambda { |server, worker|
server.logger.info("mold gen=#{worker.generation} pid=#{$$} spawned")
},
:before_worker_exit => nil,
:after_worker_exit => lambda { |server, worker, status|
m = if worker.nil?
"repead unknown process (#{status.inspect})"
Expand Down Expand Up @@ -144,6 +145,10 @@ def after_worker_hard_timeout(*args, &block)
set_hook(:after_worker_hard_timeout, block_given? ? block : args[0], 2)
end

def before_worker_exit(*args, &block)
set_hook(:before_worker_exit, block_given? ? block : args[0], 2)
end

def after_worker_exit(*args, &block)
set_hook(:after_worker_exit, block_given? ? block : args[0], 3)
end
Expand Down
47 changes: 31 additions & 16 deletions lib/pitchfork/http_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ def inspect
end

def call(original_thread) # :nodoc:
@server.logger.error("worker=#{@worker.nr} pid=#{@worker.pid} timed out, exiting")
if @callback
@callback.call(@server, @worker, Info.new(original_thread, @rack_env))
begin
@server.logger.error("worker=#{@worker.nr} pid=#{@worker.pid} timed out, exiting")
if @callback
@callback.call(@server, @worker, Info.new(original_thread, @rack_env))
end
rescue => error
Pitchfork.log_error(@server.logger, "after_worker_timeout error", error)
end
rescue => error
Pitchfork.log_error(@server.logger, "after_worker_timeout error", error)
ensure
exit
@server.worker_exit(@worker)
end

def finished # :nodoc:
Expand All @@ -77,8 +78,8 @@ def extend_deadline(extra_time)
:listener_opts, :children,
:orig_app, :config, :ready_pipe,
:default_middleware, :early_hints
attr_writer :after_worker_exit, :after_worker_ready, :after_request_complete, :refork_condition,
:after_worker_timeout, :after_worker_hard_timeout
attr_writer :after_worker_exit, :before_worker_exit, :after_worker_ready, :after_request_complete,
:refork_condition, :after_worker_timeout, :after_worker_hard_timeout

attr_reader :logger
include Pitchfork::SocketHelper
Expand Down Expand Up @@ -313,7 +314,7 @@ def monitor_loop(sleep = true)
if REFORKING_AVAILABLE && @respawn && @children.molds.empty?
logger.info("No mold alive, shutting down")
@exit_status = 1
@sig_queue << :QUIT
@sig_queue << :TERM
@respawn = false
end

Expand Down Expand Up @@ -360,7 +361,7 @@ def monitor_loop(sleep = true)
logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} ready")
old_molds.each do |old_mold|
logger.info("Terminating old mold pid=#{old_mold.pid} gen=#{old_mold.generation}")
old_mold.soft_kill(:QUIT)
old_mold.soft_kill(:TERM)
end
else
logger.error("Unexpected message in sig_queue #{message.inspect}")
Expand All @@ -376,9 +377,9 @@ def stop(graceful = true)
limit = Pitchfork.time_now + timeout
until @children.workers.empty? || Pitchfork.time_now > limit
if graceful
soft_kill_each_child(:QUIT)
soft_kill_each_child(:TERM)
else
kill_each_child(:TERM)
kill_each_child(:INT)
end
if monitor_loop(false) == StopIteration
return StopIteration
Expand All @@ -388,6 +389,17 @@ def stop(graceful = true)
@promotion_lock.unlink
end

def worker_exit(worker)
if @before_worker_exit
begin
@before_worker_exit.call(self, worker)
rescue => error
Pitchfork.log_error(logger, "before_worker_exit error", error)
end
end
Process.exit
end

def rewindable_input
Pitchfork::HttpParser.input_class.method_defined?(:rewind)
end
Expand Down Expand Up @@ -529,6 +541,7 @@ def spawn_worker(worker, detach:)

after_fork_internal
worker_loop(worker)
worker_exit(worker)
end

worker
Expand Down Expand Up @@ -587,7 +600,7 @@ def wait_for_pending_workers
def maintain_worker_count
(off = @children.workers_count - worker_processes) == 0 and return
off < 0 and return spawn_missing_workers
@children.each_worker { |w| w.nr >= worker_processes and w.soft_kill(:QUIT) }
@children.each_worker { |w| w.nr >= worker_processes and w.soft_kill(:TERM) }
end

def restart_outdated_workers
Expand All @@ -604,7 +617,7 @@ def restart_outdated_workers
outdated_workers = @children.workers.select { |w| !w.exiting? && w.generation < @children.mold.generation }
outdated_workers.each do |worker|
logger.info("worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} restarting")
worker.soft_kill(:QUIT)
worker.soft_kill(:TERM)
end
end
end
Expand Down Expand Up @@ -714,7 +727,7 @@ def nuke_listeners!(readers)
def init_worker_process(worker)
worker.reset
worker.register_to_master(@control_socket[1])
# we'll re-trap :QUIT later for graceful shutdown iff we accept clients
# we'll re-trap :QUIT and :TERM later for graceful shutdown iff we accept clients
exit_sigs = [ :QUIT, :TERM, :INT ]
exit_sigs.each { |sig| trap(sig) { exit!(0) } }
exit!(0) if (@sig_queue & exit_sigs)[0]
Expand All @@ -732,6 +745,7 @@ def init_worker_process(worker)
readers = LISTENERS.dup
readers << worker
trap(:QUIT) { nuke_listeners!(readers) }
trap(:TERM) { nuke_listeners!(readers) }
readers
end

Expand All @@ -740,6 +754,7 @@ def init_mold_process(mold)
after_mold_fork.call(self, mold)
readers = [mold]
trap(:QUIT) { nuke_listeners!(readers) }
trap(:TERM) { nuke_listeners!(readers) }
readers
end

Expand Down
27 changes: 26 additions & 1 deletion test/integration/test_configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,36 @@ def test_after_request_complete
request_count += 1
$stderr.puts "[after_request_complete] request_count=\#{request_count}"
end
before_worker_exit do |server, worker|
$stderr.puts "[before_worker_exit]"
end
CONFIG

assert_healthy("http://#{addr}:#{port}")
assert_stderr("[after_request_complete] request_count=1")
assert_healthy("http://#{addr}:#{port}")
assert_stderr("[after_request_complete] request_count=2")

assert_clean_shutdown(pid)
end

def test_before_worker_exit
addr, port = unused_port

pid = spawn_server(app: File.join(ROOT, "test/integration/env.ru"), config: <<~CONFIG)
listen "#{addr}:#{port}"
worker_processes 1
before_worker_exit do |server, worker|
$stderr.puts "[before_worker_exit]"
end
CONFIG

assert_healthy("http://#{addr}:#{port}")
assert_clean_shutdown(pid)
assert_stderr("[before_worker_exit]")
end

def test_soft_timeout
addr, port = unused_port

Expand All @@ -34,13 +54,18 @@ def test_soft_timeout
after_worker_timeout do |server, worker, timeout_info|
$stderr.puts "[after_worker_timeout]"
end
before_worker_exit do |server, worker|
$stderr.puts "[before_worker_exit]"
end
CONFIG

assert_healthy("http://#{addr}:#{port}/")

assert_equal false, healthy?("http://#{addr}:#{port}/?10")
assert_stderr("timed out, exiting")
assert_stderr("[after_worker_timeout]")
assert_stderr("[before_worker_exit]")

assert_clean_shutdown(pid)
end
Expand Down

0 comments on commit 0efdfcf

Please sign in to comment.