Skip to content

Commit

Permalink
Add Parallel#find and Parallel#first_result #273
Browse files Browse the repository at this point in the history
In addtion to Parallel::Kill and Parallel::Break, a new Exception class
named Parallel::Return allows raising an Exception which wraps a result
value. That wrapped result will be unwrapped.

When a method expects a single result, the option `:return_one` must be
provided, because a given Enumeration might not contain a matching element or
result.
  • Loading branch information
ellcs committed Oct 6, 2020
1 parent 6d2a85f commit 21b385e
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 12 deletions.
64 changes: 52 additions & 12 deletions lib/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ class Break < StandardError
class Kill < StandardError
end

class Return < StandardError
attr_reader :return_value

def initialize(return_value)
@return_value = return_value
end
end

class UndumpableException < StandardError
attr_reader :backtrace
def initialize(original)
Expand Down Expand Up @@ -228,6 +236,24 @@ def each(array, options={}, &block)
map(array, options.merge(:preserve_results => false), &block)
end

def first_result(array, options = {}, &block)
raise "You must provide a block when calling #first_result" if block.nil?
map(array, options.merge({return_one: true})) do |*a|
if result = block.call(*a)
raise Parallel::Return.new(result)
end
end
end

def find(array, options = {}, &block)
raise "You must provide a block when calling #find" if block.nil?
map(array, options.merge({return_one: true})) do |*a|
if block.call(*a)
raise Parallel::Return.new(*a)
end
end
end

def any?(*args, &block)
raise "You must provide a block when calling #any?" if block.nil?
!each(*args) { |*a| raise Parallel::Kill if block.call(*a) }
Expand Down Expand Up @@ -271,14 +297,22 @@ def map(source, options = {}, &block)
add_progress_bar!(job_factory, options)

results = if size == 0
work_direct(job_factory, options, &block)
elsif method == :in_threads
work_in_threads(job_factory, options.merge(:count => size), &block)
else
work_in_processes(job_factory, options.merge(:count => size), &block)
end
work_direct(job_factory, options, &block)
elsif method == :in_threads
work_in_threads(job_factory, options.merge(:count => size), &block)
else
work_in_processes(job_factory, options.merge(:count => size), &block)
end
if results
options[:return_results] ? results : source
if options[:return_one]
if results.is_a?(Return)
results.return_value
end
elsif options[:return_results]
results
else
source
end
end
end

Expand Down Expand Up @@ -401,11 +435,9 @@ def work_in_processes(job_factory, options, &blk)
results_mutex.synchronize { results[index] = result } # arrays are not threads safe on jRuby
rescue StandardError => e
exception = e
# takes care of Kill and its subclasses
if Parallel::Kill === exception
(workers - [worker]).each do |w|
w.thread.kill if w.thread
UserInterruptHandler.kill(w.pid)
end
stop_workers(workers - [worker])
end
end
end
Expand All @@ -418,6 +450,13 @@ def work_in_processes(job_factory, options, &blk)
handle_exception(exception, results)
end

def stop_workers(workers_to_stop)
workers_to_stop.each do |w|
w.thread.kill if w.thread
UserInterruptHandler.kill(w.pid)
end
end

def replace_worker(job_factory, workers, i, options, blk)
options[:mutex].synchronize do
# old worker is no longer used ... stop it
Expand Down Expand Up @@ -485,7 +524,8 @@ def process_incoming_jobs(read, write, job_factory, options, &block)
end

def handle_exception(exception, results)
return nil if [Parallel::Break, Parallel::Kill].include? exception.class
return if [Parallel::Break, Parallel::Kill].include?(exception.class)
return exception if exception.class == Parallel::Return
raise exception if exception
results
end
Expand Down
12 changes: 12 additions & 0 deletions spec/cases/parallel_find.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
require './spec/cases/helper'
STDOUT.sync = true # otherwise results can go weird...

result = ""
[{in_processes: 2}, {in_threads: 2}].each do |options|
x = ["bob", "alice", "ellcs", "grosser"]
result = Parallel.find(x, options) do |s|
s.include?("ll")
end
end

print result
13 changes: 13 additions & 0 deletions spec/cases/parallel_find_nothing.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
require './spec/cases/helper'
STDOUT.sync = true # otherwise results can go weird...

result = "i should be nil"
[{in_processes: 2}, {in_threads: 2}].each do |options|
x = ["bob", "alice", "ellcs", "grosser"]
result = Parallel.find(x, options) do |o|
# nothing matches
false
end
end

print result.inspect
12 changes: 12 additions & 0 deletions spec/cases/parallel_first_result.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
require './spec/cases/helper'
STDOUT.sync = true # otherwise results can go weird...

result = ""
[{in_processes: 2}, {in_threads: 2}].each do |options|
x = ["bob", "alice", "ellcs", "grosser"]
result = Parallel.first_result(x, options) do |s|
s.include?("ll") && s
end
end

print result
13 changes: 13 additions & 0 deletions spec/cases/parallel_first_result_nothing.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
require './spec/cases/helper'
STDOUT.sync = true # otherwise results can go weird...

result = "i should be nil"
[{in_processes: 2}, {in_threads: 2}].each do |options|
x = ["bob", "alice", "ellcs", "grosser"]
result = Parallel.first_result(x, options) do |o|
# nothing matches
false
end
end

print result.inspect
20 changes: 20 additions & 0 deletions spec/parallel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,26 @@ def cpus
end
end

describe ".find" do
it "returns nil if none is turthy" do
`ruby spec/cases/parallel_find_nothing.rb`.strip.should == 'nil'
end

it "returns false if all results are falsy" do
`ruby spec/cases/parallel_find.rb`.strip.should == 'ellcs'
end
end

describe ".first_result" do
it "returns nil if none is turthy" do
`ruby spec/cases/parallel_first_result_nothing.rb`.strip.should == 'nil'
end

it "returns false if all results are falsy" do
`ruby spec/cases/parallel_first_result.rb`.strip.should == 'ellcs'
end
end

describe ".all?" do
it "returns true if all results are truthy" do
`ruby spec/cases/all_true.rb`.split(',').should == ['true'] * 3 * 3
Expand Down

0 comments on commit 21b385e

Please sign in to comment.