Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

em-posix-spawn: Use SIGCLD to detect child termination

When only stdout and stderr are used to determine if a child has
terminated, it may spawn children that inherit stdout and stderr and
leave them open. This leaves the original child unreaped if only these
output streams are used to detect termination.

Change-Id: I8abf0b58a62364317609beb36ea71f5f55d0ab8f
  • Loading branch information...
commit 1240c42cd288da170e43b53b726282fb6f0cc744 1 parent 47c51e2
@pietern pietern authored
Showing with 88 additions and 26 deletions.
  1. +88 −26 em-posix-spawn/lib/em/posix/spawn/child.rb
View
114 em-posix-spawn/lib/em/posix/spawn/child.rb
@@ -45,7 +45,9 @@ def initialize(*args)
@timeout = @options.delete(:timeout)
@max = @options.delete(:max)
@options.delete(:chdir) if @options[:chdir].nil?
- exec!
+
+ @sigcld = SignalHandler.instance
+ sigcld.synchronize { exec! }
end
# All data written to the child process's stdout stream as a String.
@@ -65,24 +67,73 @@ def success?
@status && @status.success?
end
+ # Determine if the process has already terminated.
+ def terminated?
+ !! @status
+ end
+
# Send the SIGTERM signal to the process.
#
# Returns the Process::Status object obtained by reaping the process.
def kill
+ @timer.cancel if @timer
::Process.kill('TERM', @pid) rescue nil
- reap
end
private
- # Wait for the child process to exit
- #
- # Returns the Process::Status object obtained by reaping the process.
- def reap
- @timer.cancel if @timer
- ::Process::waitpid(@pid)
- @runtime = Time.now - @start
- @status = $?
+ attr_reader :sigcld
+
+ class SignalHandler
+
+ def self.instance
+ @instance ||= begin
+ new.tap { |instance|
+ prev_handler = Signal.trap("CLD") {
+ instance.signal
+ prev_handler.call if prev_handler
+ }
+ }
+ end
+ end
+
+ def initialize
+ @pid_to_trigger_pipe = {}
+ @pid_to_process_status = {}
+ @paused = false
+ @pending = 0
+ end
+
+ def synchronize
+ @paused = true
+ yield
+
+ ensure
+ @paused = false
+ @pending.times { signal }
+ @pending = 0
+ end
+
+ def pid_to_io(pid)
+ r, w = IO.pipe
+ @pid_to_trigger_pipe[pid] = w
+ r
+ end
+
+ def pid_to_process_status(pid)
+ @pid_to_process_status.delete(pid)
+ end
+
+ def signal
+ if @paused
+ @pending += 1
+ else
+ pid = ::Process.wait(-1)
+ @pid_to_process_status[pid] = $?
+ w = @pid_to_trigger_pipe.delete(pid)
+ w.close if w
+ end
+ end
end
# Execute command, write input, and read output. This is called
@@ -104,34 +155,25 @@ def exec!
# keep track of open fds
in_flight = [cin, cout, cerr].compact
- finalize = lambda { |io|
- in_flight.delete(io)
- if in_flight.empty?
- @out = cout.buffer
- @err = cerr.buffer
-
- reap
- set_deferred_success
- end
- }
-
in_flight.each { |io|
# force binary encoding
io.force_encoding
# register finalize hook
- io.callback { finalize.call(io) }
+ io.callback { in_flight.delete(io) }
}
+ failure = nil
+
# keep track of max output
max = @max
if max && max > 0
check_buffer_size = lambda {
if cout.buffer.size + cerr.buffer.size > max
+ failure = MaximumOutputExceeded
in_flight.each(&:close)
-
+ in_flight.clear
kill
- set_deferred_failure MaximumOutputExceeded
end
}
@@ -143,12 +185,32 @@ def exec!
timeout = @timeout
if timeout && timeout > 0
@timer = Timer.new(timeout) {
+ failure = TimeoutExceeded
in_flight.each(&:close)
-
+ in_flight.clear
kill
- set_deferred_failure TimeoutExceeded
}
end
+
+ # watch sigcld trigger pipe
+ csigcld = EM.watch sigcld.pid_to_io(@pid), ReadableStream, ''
+ csigcld.notify_readable = true
+ csigcld.callback {
+ in_flight.each(&:close)
+ in_flight.clear
+
+ @timer.cancel if @timer
+ @runtime = Time.now - @start
+ @status = sigcld.pid_to_process_status(@pid)
+ @out = cout.buffer
+ @err = cerr.buffer
+
+ if failure
+ set_deferred_failure failure
+ else
+ set_deferred_success
+ end
+ }
end
class Stream < Connection
Please sign in to comment.
Something went wrong with that request. Please try again.