diff --git a/lib/parallel.rb b/lib/parallel.rb index 48a99fd..163a2d4 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -14,6 +14,14 @@ class Break < StandardError class Kill < StandardError end + class Return < StandardError + attr_reader :return_value + + def initialize(return_value) + @return_value = return_value + end + end + class UndumpableException < StandardError attr_reader :backtrace def initialize(original) @@ -228,6 +236,24 @@ def each(array, options={}, &block) map(array, options.merge(:preserve_results => false), &block) end + def first_result(array, options = {}, &block) + raise "You must provide a block when calling #first_result" if block.nil? + map(array, options.merge({return_one: true})) do |*a| + if result = block.call(*a) + raise Parallel::Return.new(result) + end + end + end + + def find(array, options = {}, &block) + raise "You must provide a block when calling #find" if block.nil? + map(array, options.merge({return_one: true})) do |*a| + if block.call(*a) + raise Parallel::Return.new(*a) + end + end + end + def any?(*args, &block) raise "You must provide a block when calling #any?" if block.nil? !each(*args) { |*a| raise Parallel::Kill if block.call(*a) } @@ -271,14 +297,22 @@ def map(source, options = {}, &block) add_progress_bar!(job_factory, options) results = if size == 0 - work_direct(job_factory, options, &block) - elsif method == :in_threads - work_in_threads(job_factory, options.merge(:count => size), &block) - else - work_in_processes(job_factory, options.merge(:count => size), &block) - end + work_direct(job_factory, options, &block) + elsif method == :in_threads + work_in_threads(job_factory, options.merge(:count => size), &block) + else + work_in_processes(job_factory, options.merge(:count => size), &block) + end if results - options[:return_results] ? results : source + if options[:return_one] + if results.is_a?(Return) + results.return_value + end + elsif options[:return_results] + results + else + source + end end end @@ -401,11 +435,9 @@ def work_in_processes(job_factory, options, &blk) results_mutex.synchronize { results[index] = result } # arrays are not threads safe on jRuby rescue StandardError => e exception = e + # takes care of Kill and its subclasses if Parallel::Kill === exception - (workers - [worker]).each do |w| - w.thread.kill if w.thread - UserInterruptHandler.kill(w.pid) - end + stop_workers(workers - [worker]) end end end @@ -418,6 +450,13 @@ def work_in_processes(job_factory, options, &blk) handle_exception(exception, results) end + def stop_workers(workers_to_stop) + workers_to_stop.each do |w| + w.thread.kill if w.thread + UserInterruptHandler.kill(w.pid) + end + end + def replace_worker(job_factory, workers, i, options, blk) options[:mutex].synchronize do # old worker is no longer used ... stop it @@ -485,7 +524,8 @@ def process_incoming_jobs(read, write, job_factory, options, &block) end def handle_exception(exception, results) - return nil if [Parallel::Break, Parallel::Kill].include? exception.class + return if [Parallel::Break, Parallel::Kill].include?(exception.class) + return exception if exception.class == Parallel::Return raise exception if exception results end diff --git a/spec/cases/parallel_find.rb b/spec/cases/parallel_find.rb new file mode 100644 index 0000000..bca996b --- /dev/null +++ b/spec/cases/parallel_find.rb @@ -0,0 +1,12 @@ +require './spec/cases/helper' +STDOUT.sync = true # otherwise results can go weird... + +result = "" +[{in_processes: 2}, {in_threads: 2}].each do |options| + x = ["bob", "alice", "ellcs", "grosser"] + result = Parallel.find(x, options) do |s| + s.include?("ll") + end +end + +print result diff --git a/spec/cases/parallel_find_nothing.rb b/spec/cases/parallel_find_nothing.rb new file mode 100644 index 0000000..9cea032 --- /dev/null +++ b/spec/cases/parallel_find_nothing.rb @@ -0,0 +1,13 @@ +require './spec/cases/helper' +STDOUT.sync = true # otherwise results can go weird... + +result = "i should be nil" +[{in_processes: 2}, {in_threads: 2}].each do |options| + x = ["bob", "alice", "ellcs", "grosser"] + result = Parallel.find(x, options) do |o| + # nothing matches + false + end +end + +print result.inspect diff --git a/spec/cases/parallel_first_result.rb b/spec/cases/parallel_first_result.rb new file mode 100644 index 0000000..31ebca6 --- /dev/null +++ b/spec/cases/parallel_first_result.rb @@ -0,0 +1,12 @@ +require './spec/cases/helper' +STDOUT.sync = true # otherwise results can go weird... + +result = "" +[{in_processes: 2}, {in_threads: 2}].each do |options| + x = ["bob", "alice", "ellcs", "grosser"] + result = Parallel.first_result(x, options) do |s| + s.include?("ll") && s + end +end + +print result diff --git a/spec/cases/parallel_first_result_nothing.rb b/spec/cases/parallel_first_result_nothing.rb new file mode 100644 index 0000000..1b9155e --- /dev/null +++ b/spec/cases/parallel_first_result_nothing.rb @@ -0,0 +1,13 @@ +require './spec/cases/helper' +STDOUT.sync = true # otherwise results can go weird... + +result = "i should be nil" +[{in_processes: 2}, {in_threads: 2}].each do |options| + x = ["bob", "alice", "ellcs", "grosser"] + result = Parallel.first_result(x, options) do |o| + # nothing matches + false + end +end + +print result.inspect diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index e9c53d0..dbe46bb 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -511,6 +511,26 @@ def cpus end end + describe ".find" do + it "returns nil if none is turthy" do + `ruby spec/cases/parallel_find_nothing.rb`.strip.should == 'nil' + end + + it "returns false if all results are falsy" do + `ruby spec/cases/parallel_find.rb`.strip.should == 'ellcs' + end + end + + describe ".first_result" do + it "returns nil if none is turthy" do + `ruby spec/cases/parallel_first_result_nothing.rb`.strip.should == 'nil' + end + + it "returns false if all results are falsy" do + `ruby spec/cases/parallel_first_result.rb`.strip.should == 'ellcs' + end + end + describe ".all?" do it "returns true if all results are truthy" do `ruby spec/cases/all_true.rb`.split(',').should == ['true'] * 3 * 3