From 6aee485882145a306e89446b30cedf254e85d9eb Mon Sep 17 00:00:00 2001 From: Michael Grosser Date: Thu, 2 Apr 2026 18:09:15 -0700 Subject: [PATCH] ruby 4 ractors --- lib/parallel.rb | 88 +++++++++++++++++++++++++++---------------- spec/parallel_spec.rb | 4 +- 2 files changed, 58 insertions(+), 34 deletions(-) diff --git a/lib/parallel.rb b/lib/parallel.rb index 491a85d..f8494d9 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -471,62 +471,86 @@ def work_in_ractors(job_factory, options) raise ArgumentError, "pass the code you want to execute as `ractor: [ClassName, :method_name]`" end + use_port = defined?(Ractor::Port) + # build - ractors = Array.new(options.fetch(:count)) do - Ractor.new do - loop do - got = receive - (klass, method_name), item, index = got - break if index == :break - begin - Ractor.yield [nil, klass.send(method_name, item), item, index] - rescue StandardError => e - Ractor.yield [e, nil, item, index] - end - end - end + ports = {} # port (ruby 4+) or ractor (ruby 3) => ractor + options.fetch(:count).times do + port, ractor = ractor_build(use_port) + ports[port] = ractor end # start - ractors.dup.each do |ractor| - if (set = job_factory.next) - item, index = set + ports.dup.each do |port, ractor| + if (job = job_factory.next) + item, index = job instrument_start item, index, options ractor.send [callback, item, index] - else - ractor.send([[nil, nil], nil, :break]) # stop the ractor - ractors.delete ractor + else # not enough work, `receive` would hang + ractor_stop ractor + ports.delete port end end - # replace with new items - while (set = job_factory.next) - item_next, index_next = set - done, (exception, result, item, index) = Ractor.select(*ractors) + # receive result and send new items to done ractors + while (job = job_factory.next) + # receive result + done_port, (exception, result, item_prev, index_prev) = Ractor.select(*ports.keys) + done_ractor = ports[done_port] if exception - ractors.delete done + ports.delete done_port break end - instrument_finish item, index, result, options - results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) } + ractor_result item_prev, index_prev, result, results, results_mutex, options + # send new + item_next, index_next = job instrument_start item_next, index_next, options - done.send([callback, item_next, index_next]) + done_ractor.send([callback, item_next, index_next]) end # finish - ractors.each do |ractor| - (new_exception, result, item, index) = ractor.take + ports.each do |port, ractor| + (new_exception, result, item, index) = use_port ? port.receive : ractor.take exception ||= new_exception next if new_exception - instrument_finish item, index, result, options - results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) } - ractor.send([[nil, nil], nil, :break]) # stop the ractor + ractor_result item, index, result, results, results_mutex, options + ractor_stop ractor end exception || results end + def ractor_build(use_port) + args = use_port ? [Ractor::Port.new] : [] + ractor = Ractor.new(*args) do |port| + loop do + (klass, method_name), item, index = receive + break if index == :break + begin + result = [nil, klass.send(method_name, item), item, index] + rescue StandardError => e + result = [e, nil, item, index] + end + if port + port.send result + else + Ractor.yield result + end + end + end + [use_port ? args.first : ractor, ractor] + end + + def ractor_result(item, index, result, results, results_mutex, options) + instrument_finish item, index, result, options + results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) } + end + + def ractor_stop(ractor) + ractor.send([[nil, nil], nil, :break]) + end + def work_in_processes(job_factory, options, &blk) workers = create_workers(job_factory, options, &blk) results = [] diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index 6e4e208..d2093fa 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -4,7 +4,7 @@ describe Parallel do worker_types = ["threads"] worker_types << "processes" if Process.respond_to?(:fork) - worker_types << "ractors" if defined?(Ractor) && RUBY_VERSION < "4.0.0" # TODO: need new code for ruby 4 ractors + worker_types << "ractors" if defined?(Ractor) def time_taken t = Time.now.to_f @@ -31,7 +31,7 @@ def execute_start_and_kill(command, amount, signal = 'INT') end def without_ractor_warning(out) - out.sub(/.*Ractor is experimental.*\n/, "") + out.sub(/.*Ractor.*is experimental.*\n/, "") end describe ".processor_count" do