Skip to content

Commit

Permalink
Add a first stab at an interactive read/write interface
Browse files Browse the repository at this point in the history
  • Loading branch information
gdb committed Jan 22, 2013
1 parent dde5fe2 commit dfa6fb7
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 15 deletions.
6 changes: 3 additions & 3 deletions lib/rubysh.rb
Expand Up @@ -118,15 +118,15 @@ def self.stderr
FD.new(2)
end

def self.>(target)
def self.>(target=:stdout)
Redirect.new(1, '>', target)
end

def self.>>(target)
def self.>>(target=:stdout)
Redirect.new(1, '>>', target)
end

def self.<(target)
def self.<(target=:stdin)
Redirect.new(0, '<', target)
end

Expand Down
5 changes: 4 additions & 1 deletion lib/rubysh/redirect.rb
Expand Up @@ -119,7 +119,10 @@ def prepare_target(runner)
:target_reading? => target_reading?,
:target => target_reading? ? pipe.reader : pipe.writer,
:complement => target_reading? ? pipe.writer : pipe.reader,
:buffer => []
:buffer => StringIO.new,
:target_name => target_name,
:read_pos => 0,
:subprocess_fd_number => Util.to_fileno(source)
}
end

Expand Down
109 changes: 101 additions & 8 deletions lib/rubysh/runner.rb
Expand Up @@ -14,10 +14,70 @@ def initialize(command)
prepare!
end

def data(target_name)
state = target_state(target_name)
raise Rubysh::Error::BaseError.new("Can only access data for readable FDs") unless state[:target_reading?]
state[:buffer].join
def write(bytes, target=0)
raise Rubysh::Error::AlreadyRunError.new("Can only write to a runner in runner_state :started, not #{@runner_state.inspect}") unless @runner_state == :started
state = target_state(target, false)
target_name = state[:target_name]
@parallel_io.write(target_name, bytes, false)
end

# A bit of an unothordox read interface, not sure if I like
# it. Also, the target/opts magic is probably too magical (and not
# consistent with write!)
#
# You can do:
#
# read: finish the subprocess, and read from FD 1 in the child
# read(:how => :partial): wait until there are bytes on FD 1, and
# then return what you can
# read(2, :how => :partial): Do the same for FD 2
# read(:stdout, :how => :partial): Do the same with whatever the named
# descriptor :stdout
# read(:how => :nonblock): Return whatever is immediately available
def read(target=nil, opts=nil)
raise Rubysh::Error::AlreadyRunError.new("Can only read from a runner in runner_state :started or :waited, not #{@runner_state.inspect}") unless @runner_state == :started || @runner_state == :waited

if target.kind_of?(Hash)
opts = target
target = nil
end
target ||= 1
opts ||= {}

# TODO: add a stringio
state = target_state(target, true)
target_name = state[:target_name]

# Be nice to people and validate the hash
valid_keys = [:how]
extra_keys = opts.keys - valid_keys
raise raise Rubysh::Error::BaseError.new("Unrecognized keys #{extra_keys.inspect}. (Valid keys: #{valid_keys.inspect}") if extra_keys.length > 0

case how = opts[:how]
when :partial
# Read until we get some bytes
@parallel_io.run_once until state[:buffer].length != state[:read_pos]
when :nonblock
@parallel_io.read_available(state[:target])
when nil
communicate if @runner_state == :started
else
raise Rubysh::Error::BaseError.new("Invalid read directive #{how.inspect}")
end

state[:buffer].pos = state[:read_pos]
bytes = state[:buffer].read
# Could also increment by bytes, but meh.
state[:read_pos] = state[:buffer].pos
bytes
end

def communicate
raise Rubysh::Error::AlreadyRunError.new("Can only communicate with a runner in runner_state :started, not #{@runner_state.inspect}") unless @runner_state == :started
writers.each do |io, target_name|
@parallel_io.close(target_name) unless io.closed?
end
wait
end

# Ruby's Process::Status. Has fun things like pid and signaled?
Expand Down Expand Up @@ -87,7 +147,7 @@ def to_s
def inspect
extras = []
valid_readers = readers.values.map(&:inspect).join(', ')
valid_writers = readers.values.map(&:inspect).join(', ')
valid_writers = writers.values.map(&:inspect).join(', ')

extras << "readers: #{valid_readers}" if valid_readers.length > 0
extras << "writers: #{valid_writers}" if valid_writers.length > 0
Expand All @@ -107,12 +167,40 @@ def state(object)
end

# Internal helpers
def target_state(target_name)
@targets[target_name] || raise(Rubysh::Error::BaseError.new("Invalid target: #{target_name.inspect} (valid targets are: #{@targets.keys.inspect})"))
def target_state(target_name, reading=nil)
case target_name
when Symbol
target_state = @targets[target_name]
raise Rubysh::Error::BaseError.new("Invalid target symbol: #{target_name.inspect} (valid target symbols are: #{@targets.keys.inspect})") unless target_state
when Fixnum
targets = targets_by_fd_numbers
target_state = targets[target_name]
raise Rubysh::Error::BaseError.new("Invalid target fd number: #{target_name.inspect} (valid target fd numbers are: #{targets.keys.inspect}})") unless target_state
else
raise Rubysh::Error::BaseError.new("Invalid type for target name: #{target_name.inspect} (#{target_name.class}). Valid types are Symbol and Fixnum.")
end

if reading.nil?
# No checking
elsif target_state[:target_reading?] && !reading
raise Rubysh::Error::BaseError.new("Trying to write to read pipe #{target_name}")
elsif !target_state[:target_reading?] && reading
raise Rubysh::Error::BaseError.new("Trying to read from write pipe #{target_name}")
end

target_state
end

private

def targets_by_fd_numbers
@targets.inject({}) do |hash, (_, target_state)|
fd_num = target_state[:subprocess_fd_number]
hash[fd_num] = target_state
hash
end
end

def do_wait
raise Rubysh::Error::AlreadyRunError.new("You must run parallel io before waiting. (Perhaps you want to use the 'run' method, which takes care of the plumbing for you?)") unless @runner_state == :parallel_io_ran
@command.wait(self)
Expand All @@ -136,11 +224,16 @@ def prepare!
def prepare_io
@parallel_io = Subprocess::ParallelIO.new(readers, writers)
@parallel_io.on_read do |target_name, data|
state = @targets[target_name]
buffer = state[:buffer]
if data == Subprocess::ParallelIO::EOF
Rubysh.log.debug("EOF reached on #{target_name.inspect}")
buffer.close_write
else
Rubysh.log.debug("Just read #{data.inspect} on #{target_name.inspect}")
@targets[target_name][:buffer] << data
# Seek to end
buffer.pos = buffer.length
buffer.write(data)
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/rubysh/subprocess/parallel_io.rb
Expand Up @@ -65,8 +65,6 @@ def run
end
end

private

def run_once
potential_readers = available_readers
potential_writers = available_writers
Expand Down Expand Up @@ -97,6 +95,8 @@ def read_available(reader)
end
end

private

def finalize_reader(reader)
@finished_readers.add(reader)
issue_reader_callback(reader, EOF)
Expand Down
2 changes: 1 addition & 1 deletion lib/rubysh/triple_less_than.rb
Expand Up @@ -8,7 +8,7 @@ def initialize(fd)
@fd = fd
end

def <(literal)
def <(literal=:stdin)
TripleLessThan.new(@fd, literal)
end

Expand Down

0 comments on commit dfa6fb7

Please sign in to comment.