Permalink
Browse files

Make it possible to register new readers on a parallel_io

  • Loading branch information...
1 parent 0b3af0a commit 0b850b5606f88ca8b5d53516d2fced263b906f63 @gdb committed Jan 12, 2014
Showing with 45 additions and 3 deletions.
  1. +3 −3 lib/rubysh/runner.rb
  2. +11 −0 lib/rubysh/subprocess/parallel_io.rb
  3. +31 −0 test/functional/lib/on_output.rb
View
@@ -1,6 +1,6 @@
module Rubysh
class Runner
- attr_accessor :command, :targets
+ attr_accessor :command, :targets, :parallel_io
def initialize(command)
@runner_state = :initialized
@@ -288,11 +288,11 @@ def prepare_io
if on_write = command.opts[:on_write]
@parallel_io.on_write(on_write)
else
- @parallel_io.on_write do |target_name, written, remaining|
+ @parallel_io.on_write do |target_name, data, remaining|
if data == Subprocess::ParallelIO::EOF
Rubysh.log.debug("EOF reached on #{target_name.inspect}")
else
- Rubysh.log.debug("Just wrote #{written.inspect} on #{target_name.inspect}")
+ Rubysh.log.debug("Just wrote #{data.inspect} on #{target_name.inspect}")
end
end
end
@@ -15,6 +15,14 @@ def initialize(readers, writers)
@writer_buffers = {}
end
+ def register_reader(reader, name)
+ @readers[reader] = name
+ end
+
+ def register_writer(writer, name)
+ @writers[writer] = name
+ end
+
def on_read(method=nil, &blk)
raise "Can't provide both method and block" if method && blk
@on_read = method || blk
@@ -86,6 +94,7 @@ def run_select_loop(timeout)
end
ready_readers, ready_writers, _ = selected
+ $stdout.puts "Stuff: #{ready_readers.inspect}, #{ready_writers.inspect} (total: #{@readers.inspect}"
ready_readers.each do |reader|
read_available(reader)
@@ -106,7 +115,9 @@ def consume_all_available
def read_available(reader)
begin
data = reader.read_nonblock(4096)
+ p data
rescue EOFError, Errno::EPIPE
+ p "done"
finalize_reader(reader)
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
else
@@ -30,5 +30,36 @@ class ReadTest < FunctionalTest
assert_raises(Rubysh::Error::BaseError) {runner.read}
end
end
+
+ describe 'when registering a reader post-hoc' do
+ it 'successfully uses both the existing and new reader' do
+ buffers = {}
+ runner = Rubysh('sh', '-c', '
+echo stdout1
+read _
+echo stdout2
+',
+ Rubysh.>, Rubysh.<,
+ :on_read => Proc.new do |name, bytes|
+ (buffers[name] ||= '') << bytes
+ end
+ ).run_async
+ reader, writer = IO.pipe
+ runner.parallel_io.register_reader(reader, :pipe)
+
+ # run_once may be triggered by sigchld
+ runner.parallel_io.run_once until buffers.length > 0
+ assert_equal(nil, buffers[:pipe])
+ assert_equal("stdout1\n", buffers[:stdout])
+ buffers.clear
+
+ runner.write("stdin\n")
+ writer.write('pipe')
+
+ runner.parallel_io.run_once until buffers[:stdout]
+ assert_equal('pipe', buffers[:pipe])
+ assert_equal("stdout2\n", buffers[:stdout])
+ end
+ end
end
end

0 comments on commit 0b850b5

Please sign in to comment.