Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 56 additions & 32 deletions lib/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -471,62 +471,86 @@ def work_in_ractors(job_factory, options)
raise ArgumentError, "pass the code you want to execute as `ractor: [ClassName, :method_name]`"
end

use_port = defined?(Ractor::Port)

# build
ractors = Array.new(options.fetch(:count)) do
Ractor.new do
loop do
got = receive
(klass, method_name), item, index = got
break if index == :break
begin
Ractor.yield [nil, klass.send(method_name, item), item, index]
rescue StandardError => e
Ractor.yield [e, nil, item, index]
end
end
end
ports = {} # port (ruby 4+) or ractor (ruby 3) => ractor
options.fetch(:count).times do
port, ractor = ractor_build(use_port)
ports[port] = ractor
end

# start
ractors.dup.each do |ractor|
if (set = job_factory.next)
item, index = set
ports.dup.each do |port, ractor|
if (job = job_factory.next)
item, index = job
instrument_start item, index, options
ractor.send [callback, item, index]
else
ractor.send([[nil, nil], nil, :break]) # stop the ractor
ractors.delete ractor
else # not enough work, `receive` would hang
ractor_stop ractor
ports.delete port
end
end

# replace with new items
while (set = job_factory.next)
item_next, index_next = set
done, (exception, result, item, index) = Ractor.select(*ractors)
# receive result and send new items to done ractors
while (job = job_factory.next)
# receive result
done_port, (exception, result, item_prev, index_prev) = Ractor.select(*ports.keys)
done_ractor = ports[done_port]
if exception
ractors.delete done
ports.delete done_port
break
end
instrument_finish item, index, result, options
results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) }
ractor_result item_prev, index_prev, result, results, results_mutex, options

# send new
item_next, index_next = job
instrument_start item_next, index_next, options
done.send([callback, item_next, index_next])
done_ractor.send([callback, item_next, index_next])
end

# finish
ractors.each do |ractor|
(new_exception, result, item, index) = ractor.take
ports.each do |port, ractor|
(new_exception, result, item, index) = use_port ? port.receive : ractor.take
exception ||= new_exception
next if new_exception
instrument_finish item, index, result, options
results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) }
ractor.send([[nil, nil], nil, :break]) # stop the ractor
ractor_result item, index, result, results, results_mutex, options
ractor_stop ractor
end

exception || results
end

def ractor_build(use_port)
args = use_port ? [Ractor::Port.new] : []
ractor = Ractor.new(*args) do |port|
loop do
(klass, method_name), item, index = receive
break if index == :break
begin
result = [nil, klass.send(method_name, item), item, index]
rescue StandardError => e
result = [e, nil, item, index]
end
if port
port.send result
else
Ractor.yield result
end
end
end
[use_port ? args.first : ractor, ractor]
end

def ractor_result(item, index, result, results, results_mutex, options)
instrument_finish item, index, result, options
results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) }
end

def ractor_stop(ractor)
ractor.send([[nil, nil], nil, :break])
end

def work_in_processes(job_factory, options, &blk)
workers = create_workers(job_factory, options, &blk)
results = []
Expand Down
4 changes: 2 additions & 2 deletions spec/parallel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
describe Parallel do
worker_types = ["threads"]
worker_types << "processes" if Process.respond_to?(:fork)
worker_types << "ractors" if defined?(Ractor) && RUBY_VERSION < "4.0.0" # TODO: need new code for ruby 4 ractors
worker_types << "ractors" if defined?(Ractor)

def time_taken
t = Time.now.to_f
Expand All @@ -31,7 +31,7 @@ def execute_start_and_kill(command, amount, signal = 'INT')
end

def without_ractor_warning(out)
out.sub(/.*Ractor is experimental.*\n/, "")
out.sub(/.*Ractor.*is experimental.*\n/, "")
end

describe ".processor_count" do
Expand Down
Loading