Permalink
Browse files

Add new on_read/on_write options

  • Loading branch information...
1 parent c1a6275 commit b42547a5a0154979a0108fdc6458b9fbf7cc32cc @gdb committed Jan 10, 2014
@@ -5,6 +5,10 @@ module Rubysh
#
# - freeze after initialize?
class BaseCommand
+ def opts
+ @opts || {}
+ end
+
def stringify_arg(arg)
case arg
when BaseCommand, BaseDirective
View
@@ -126,7 +126,9 @@ def prepare_target(runner)
:target_name => target_name,
:read_pos => 0,
:subprocess_fd_number => Util.to_fileno(source),
- :tee => @opts[:tee]
+ :tee => @opts[:tee],
+ :on_read => @opts[:on_read],
+ :on_write => @opts[:on_write],
}
end
View
@@ -36,6 +36,7 @@ def write(bytes, target=0)
# read(:how => :nonblock): Return whatever is immediately available
def read(target=nil, opts=nil)
raise Rubysh::Error::AlreadyRunError.new("Can only read from a runner in runner_state :started or :waited, not #{@runner_state.inspect}") unless @runner_state == :started || @runner_state == :waited
+ raise Rubysh::Error::BaseError.new("Can't read from a runner where :on_read has been provided") if command.opts[:on_read]
if target.kind_of?(Hash)
opts = target
@@ -263,28 +264,36 @@ def prepare!
# there.
def prepare_io
@parallel_io = Subprocess::PidAwareParallelIO.new(readers, writers, subprocesses)
- @parallel_io.on_read do |target_name, data|
- state = @targets[target_name]
- buffer = state[:buffer]
- if data == Subprocess::ParallelIO::EOF
- Rubysh.log.debug("EOF reached on #{target_name.inspect}")
- buffer.close_write
- else
- Rubysh.log.debug("Just read #{data.inspect} on #{target_name.inspect}")
- tee = state[:tee]
- tee.write(data) if tee
-
- # Seek to end
- buffer.pos = buffer.length
- buffer.write(data)
+ if on_read = command.opts[:on_read]
+ @parallel_io.on_read(on_read)
+ else
+ @parallel_io.on_read do |target_name, data|
+ state = @targets[target_name]
+ buffer = state[:buffer]
+ if data == Subprocess::ParallelIO::EOF
+ Rubysh.log.debug("EOF reached on #{target_name.inspect}")
+ buffer.close_write
+ else
+ Rubysh.log.debug("Just read #{data.inspect} on #{target_name.inspect}")
+ tee = state[:tee]
+ tee.write(data) if tee
+
+ # Seek to end
+ buffer.pos = buffer.length
+ buffer.write(data)
+ end
end
end
- @parallel_io.on_write do |target_name, written, remaining|
- if data == Subprocess::ParallelIO::EOF
- Rubysh.log.debug("EOF reached on #{target_name.inspect}")
- else
- Rubysh.log.debug("Just wrote #{written.inspect} on #{target_name.inspect}")
+ if on_write = command.opts[:on_write]
+ @parallel_io.on_write(on_write)
+ else
+ @parallel_io.on_write do |target_name, written, remaining|
+ if data == Subprocess::ParallelIO::EOF
+ Rubysh.log.debug("EOF reached on #{target_name.inspect}")
+ else
+ Rubysh.log.debug("Just wrote #{written.inspect} on #{target_name.inspect}")
+ end
end
end
end
View
@@ -123,7 +123,6 @@ def apply_directive(directive, is_parent)
def exec_program
begin
Kernel.exec([command, command], *args)
- raise Rubysh::Error::UnreachableError.new("This code should be unreachable. If you are seeing this exception, it means someone overrode Kernel.exec. That's not very nice of them.")
rescue Exception => e
msg = {
'message' => e.message,
@@ -135,6 +134,8 @@ def exec_program
# Note: atexit handlers will fire in this case. May want to do
# something about that.
exit(1)
+ else
+ raise Rubysh::Error::UnreachableError.new("This code should be unreachable. If you are seeing this exception, it means someone overrode Kernel.exec. That's not very nice of them.")
end
end
@@ -15,12 +15,14 @@ def initialize(readers, writers)
@writer_buffers = {}
end
- def on_read(&blk)
- @on_read = blk
+ def on_read(method=nil, &blk)
+ raise "Can't provide both method and block" if method && blk
+ @on_read = method || blk
end
- def on_write(&blk)
- @on_write = blk
+ def on_write(method=nil, &blk)
+ raise "Can't provide both method and block" if method && blk
+ @on_write = method || blk
end
def write(writer_name, data, close_on_complete=true)
View
@@ -3,7 +3,7 @@
require 'minitest/autorun'
require 'minitest/spec'
-require 'mocha'
+require 'mocha/setup'
$:.unshift(File.expand_path('../lib', File.dirname(__FILE__)))
@@ -0,0 +1,34 @@
+require File.expand_path('../_lib', File.dirname(__FILE__))
+
+module RubyshTest::Functional
+ class ReadTest < FunctionalTest
+ describe 'when using :on_read' do
+ it 'calls back as output is streamed' do
+ stdout = ''
+ stderr = ''
+
+ runner = Rubysh('ruby', '-e', 'puts "hi"; $stderr.puts "hullo there"; puts "hello"',
+ Rubysh.>, Rubysh.stderr > :stderr,
+ :on_read => Proc.new do |target_name, data|
+ case target_name
+ when :stdout then stdout << data unless data == Rubysh::Subprocess::ParallelIO::EOF
+ when :stderr then stderr << data unless data == Rubysh::Subprocess::ParallelIO::EOF
+ else
+ raise "Invalid name: #{target_name.inspect}"
+ end
+ end
+ ).run
+ assert_equal("hi\nhello\n", stdout)
+ assert_equal("hullo there\n", stderr)
+ end
+
+ it 'does not allow reading' do
+ runner = Rubysh('echo', 'hi',
+ Rubysh.>, Rubysh.stderr > :stderr,
+ :on_read => Proc.new {}
+ ).run
+ assert_raises(Rubysh::Error::BaseError) {runner.read}
+ end
+ end
+ end
+end

0 comments on commit b42547a

Please sign in to comment.