Permalink
Browse files

Make multi-stage pipelines work

  • Loading branch information...
1 parent 9444a36 commit e46660d318b66de6a348b64d08ab6ab10bbc0d0d @gdb committed Sep 8, 2012
View
@@ -1,2 +1,10 @@
#!/usr/bin/env rake
-require "bundler/gem_tasks"
+# require 'bundler/gem_tasks'
+require 'rake/testtask'
+
+Rake::TestTask.new do |t|
+ t.libs = ["lib"]
+ # t.warning = true
+ t.verbose = true
+ t.test_files = FileList['test/**/*.rb']
+end
View
@@ -7,7 +7,7 @@
require 'rubysh/command'
require 'rubysh/error'
require 'rubysh/fd'
-require 'rubysh/pipe'
+require 'rubysh/pipeline'
require 'rubysh/redirect'
require 'rubysh/subprocess'
@@ -19,7 +19,7 @@
# => Command: ls /tmp | grep myfile
# Rubysh('ls', '/tmp', Rubysh.stderr > Rubysh.stdout)
# => Command: ls /tmp 2>&1
-# Rubysh('ls', '/tmp', Rubysh.> '/tmp/outfile.txt')
+# Rubysh('ls', '/tmp', Rubysh.>('/tmp/outfile.txt'))
# => Command: ls /tmp > /tmp/outfile.txt
#
# TODO:
@@ -63,11 +63,10 @@ def self.Command(*args)
Command.new(*args)
end
- def self.Pipe(*args)
- Pipe.new(*args)
+ def self.Pipeline(*args)
+ Pipeline.new(*args)
end
- # External API methods
def self.stdin
FD.new(0)
end
@@ -80,6 +79,25 @@ def self.stderr
FD.new(2)
end
+ def self.>(target)
+ Redirect.new(1, '>', target)
+ end
+
+ def self.>>(target)
+ Redirect.new(1, '>>', target)
+ end
+
+ def self.<(target)
+ Redirect.new(0, '<', target)
+ end
+
+ # TODO: not sure exactly how this should work.
+ #
+ # Hack to implement <<<
+ # def self.<<
+ # TripleLessThan.new
+ # end
+
# Internal utility methods
def self.log
unless @log
@@ -22,15 +22,15 @@ def inspect
to_s
end
- def |(other)
- Pipe.new(self, other)
- end
-
def run
run_async
wait
end
+ def |(other)
+ raise NotImplementedError.new("Override in subclass")
+ end
+
def initialize(args)
raise NotImplementedError.new("Override in subclass")
end
View
@@ -1,18 +1,19 @@
module Rubysh
class Command < BaseCommand
- attr_accessor :args, :extra_opts, :subprocess
+ attr_accessor :args, :extra_directives, :subprocess
def initialize(args)
@args = args
@subprocess = nil
# From things like pipe, where context dictates some properties
# of how this command is run.
- @extra_opts = []
+ @extra_directives = []
+ @extra_post_fork = []
end
- def add_opt(opt)
- @extra_opts << opt
+ def add_directive(directive)
+ @extra_directives << directive
end
def stringify
@@ -22,43 +23,54 @@ def stringify
end
def run_async
- instantiate_subprocess unless @subprocess
+ instantiate_subprocess
@subprocess.run
end
def wait
@subprocess.wait
end
+ def |(other)
+ Pipeline.new([self, other])
+ end
+
+ def post_fork(&blk)
+ @extra_post_fork << blk
+ end
+
def stdout=(value)
- opt = FD.new(:stdout) > value
- add_opt(opt)
+ directive = FD.new(:stdout) > value
+ add_directive(directive)
end
def stdin=(value)
- opt = FD.new(:stdin) < value
- add_opt(opt)
+ directive = FD.new(:stdin) < value
+ add_directive(directive)
end
def status
subprocess.status
end
+ # This whole instantiation thing is kind of janky.
def instantiate_subprocess
- opts = []
+ return @subprocess if @subprocess
+ directives = []
args = @args.map do |arg|
case arg
when BaseCommand
raise NotImplementedError.new('Not ready for subshells yet')
when Redirect
- opts << arg
+ directives << arg
nil
else
arg
end
end.compact
- opts += @extra_opts
- @subprocess = Subprocess.new(args, opts)
+ directives += @extra_directives
+ post_forks = @extra_post_fork
+ @subprocess = Subprocess.new(args, directives, post_forks)
end
end
end
View
@@ -24,7 +24,7 @@ def >(target)
end
def <(target)
- Redirect.new(self, '>', target)
+ Redirect.new(self, '<', target)
end
def to_s
View
@@ -1,41 +1,4 @@
module Rubysh
- class Pipe < BaseCommand
- attr_accessor :left, :right
-
- def initialize(left, right)
- @left = left
- @right = right
-
- @pipe = Subprocess::PipeWrapper.new
- setup_pipe
- end
-
- def setup_pipe
- @left.stdout = @pipe.writer
- @right.stdin = @pipe.reader
- end
-
- def close_pipe
- @pipe.close
- end
-
- def stringify
- "#{left.stringify} | #{right.stringify}"
- end
-
- def run_async
- @left.run_async
- @right.run_async
- close_pipe
- end
-
- def wait
- # It's likely we should actually wait for these in parallel; I'm
- # not really sure right now. Might be tricky to avoid waiting
- # for other processes run by this program (could probably use
- # process groups for that?)
- @left.wait
- @right.wait
- end
+ class Pipe < Directive
end
end
View
@@ -0,0 +1,64 @@
+module Rubysh
+ class Pipeline < BaseCommand
+ attr_accessor :pipeline
+
+ def initialize(pipeline)
+ @pipeline = pipeline
+ end
+
+ def instantiate_subprocess
+ @pipeline.each {|cmd| cmd.instantiate_subprocess}
+ end
+
+ def pipeline_pairs
+ @pipeline[0...-1].zip(@pipeline[1..-1])
+ end
+
+ def stringify
+ @pipeline.map {|cmd| cmd.stringify}.join(' | ')
+ end
+
+ def |(other)
+ self.class.new(pipeline + [other])
+ end
+
+ def run_async
+ return unless @pipeline.length > 0
+
+ last_pipe = nil
+
+ pipeline_pairs.each do |left, right|
+ # TODO: maybe create an object to represent the pipe
+ # relationship, instead of manually assembling here.
+ #
+ # Don't want to have more than 2 pipes open at a time, so need
+ # to #run_async and #close here.
+ pipe = Subprocess::PipeWrapper.new
+ setup_pipe(pipe, left, right)
+
+ left.run_async
+ last_pipe.close if last_pipe
+ last_pipe = pipe
+ end
+
+ @pipeline[-1].run_async
+ last_pipe.close if last_pipe
+ end
+
+ def setup_pipe(pipe, left, right)
+ left.stdout = pipe.writer
+ left.post_fork {pipe.write_only}
+
+ right.stdin = pipe.reader
+ right.post_fork {pipe.read_only}
+ end
+
+ def wait
+ # It's likely we should actually wait for these in parallel; I'm
+ # not really sure right now. Might be tricky to avoid waiting
+ # for other processes run by this program (could probably use
+ # process groups for that?)
+ @pipeline.each {|cmd| cmd.wait}
+ end
+ end
+end
Oops, something went wrong.

0 comments on commit e46660d

Please sign in to comment.