From 65fe02e34f71f0d28eccce111dd7203a9124b515 Mon Sep 17 00:00:00 2001 From: Michael Grosser Date: Sat, 26 Mar 2016 21:20:40 -0700 Subject: [PATCH 1/3] isolation --- Readme.md | 21 +++++----- lib/parallel.rb | 49 ++++++++++++++++++----- spec/cases/closes_processes_at_runtime.rb | 12 +----- spec/cases/helper.rb | 18 +++++++++ spec/cases/map_isolation.rb | 8 ++++ spec/parallel_spec.rb | 5 +++ 6 files changed, 82 insertions(+), 31 deletions(-) create mode 100644 spec/cases/map_isolation.rb diff --git a/Readme.md b/Readme.md index dee6922..8402a9a 100644 --- a/Readme.md +++ b/Readme.md @@ -18,15 +18,15 @@ results = Parallel.map(['a','b','c']) do |one_letter| end # 3 Processes -> finished after 1 run -results = Parallel.map(['a','b','c'], :in_processes=>3){|one_letter| ... } +results = Parallel.map(['a','b','c'], in_processes: 3) { |one_letter| ... } # 3 Threads -> finished after 1 run -results = Parallel.map(['a','b','c'], :in_threads=>3){|one_letter| ... } +results = Parallel.map(['a','b','c'], in_threads: 3) { |one_letter| ... } ``` Same can be done with `each` ```Ruby -Parallel.each(['a','b','c']){|one_letter| ... } +Parallel.each(['a','b','c']) { |one_letter| ... } ``` or `each_with_index` or `map_with_index` @@ -34,7 +34,7 @@ Produce one item at a time with `lambda` (anything that responds to `.call`) or ```Ruby items = [1,2,3] -Parallel.each(lambda{ items.pop || Parallel::Stop }){|number| ... } +Parallel.each( -> { items.pop || Parallel::Stop }) { |number| ... } ``` @@ -58,20 +58,20 @@ Try any of those to get working parallel AR ```Ruby # reproducibly fixes things (spec/cases/map_with_ar.rb) -Parallel.each(User.all, :in_processes => 8) do |user| +Parallel.each(User.all, in_processes: 8) do |user| user.update_attribute(:some_attribute, some_value) end User.connection.reconnect! # maybe helps: explicitly use connection pool -Parallel.each(User.all, :in_threads => 8) do |user| +Parallel.each(User.all, in_threads: 8) do |user| ActiveRecord::Base.connection_pool.with_connection do user.update_attribute(:some_attribute, some_value) end end # maybe helps: reconnect once inside every fork -Parallel.each(User.all, :in_processes => 8) do |user| +Parallel.each(User.all, in_processes: 8) do |user| @reconnected ||= User.connection.reconnect! || true user.update_attribute(:some_attribute, some_value) end @@ -101,7 +101,7 @@ end ```Ruby # gem install ruby-progressbar -Parallel.map(1..50, :progress => "Doing stuff") { sleep 1 } +Parallel.map(1..50, progress: "Doing stuff") { sleep 1 } # Doing stuff | ETA: 00:00:02 | ==================== | Time: 00:00:10 ``` @@ -113,12 +113,13 @@ Use `:finish` or `:start` hook to get progress information. They are called on the main process and protected with a mutex. ```Ruby -Parallel.map(1..100, :finish => lambda { |item, i, result| ... do something ... }) { sleep 1 } +Parallel.map(1..100, finish: -> (item, i, result) { ... do something ... }) { sleep 1 } ``` Tips ==== - - [Benchmark/Test] Disable threading/forking with `:in_threads => 0` or `:in_processes => 0`, great to test performance or to debug parallel issues + - [Benchmark/Test] Disable threading/forking with `in_threads: 0` or `in_processes: 0`, great to test performance or to debug parallel issues + - [Isolation] Do not reuse previous worker processes: `isolation: true` TODO ==== diff --git a/lib/parallel.rb b/lib/parallel.rb index bce80c2..020df18 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -35,15 +35,16 @@ def initialize(read, write, pid) @read, @write, @pid = read, write, pid end - def close_pipes - read.close - write.close + def stop + close_pipes + wait # if it goes zombie, rather wait here to be able to debug end - def wait - Process.wait(pid) - rescue Interrupt - # process died + # might be passed to started_processes and simultaneously closed by another thread + # when running in isolation mode, so we have to check if it is closed before closing + def close_pipes + read.close unless read.closed? + write.close unless write.closed? end def work(data) @@ -61,6 +62,14 @@ def work(data) raise result.exception if ExceptionWrapper === result result end + + private + + def wait + Process.wait(pid) + rescue Interrupt + # process died + end end class JobFactory @@ -300,7 +309,11 @@ def work_in_threads(job_factory, options, &block) end def work_in_processes(job_factory, options, &blk) - workers = create_workers(job_factory, options, &blk) + workers = if options[:isolation] + [] # we create workers per job and not beforehand + else + create_workers(job_factory, options, &blk) + end results = [] results_mutex = Mutex.new # arrays are not thread-safe exception = nil @@ -308,7 +321,6 @@ def work_in_processes(job_factory, options, &blk) UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do in_threads(options) do |i| worker = workers[i] - worker.thread = Thread.current begin loop do @@ -316,6 +328,12 @@ def work_in_processes(job_factory, options, &blk) item, index = job_factory.next break unless index + if options[:isolation] + worker = replace_worker(job_factory, workers, i, options, blk) + end + + worker.thread = Thread.current + begin result = with_instrumentation item, index, options do worker.work(job_factory.pack(item, index)) @@ -332,8 +350,7 @@ def work_in_processes(job_factory, options, &blk) end end ensure - worker.close_pipes - worker.wait # if it goes zombie, rather wait here to be able to debug + worker.stop if worker end end end @@ -341,6 +358,16 @@ def work_in_processes(job_factory, options, &blk) handle_exception(exception, results) end + def replace_worker(job_factory, workers, i, options, blk) + # old worker is no longer used ... stop it + worker = workers[i] + worker.stop if worker + + # create a new replacement worker + running = workers - [worker] + workers[i] = worker(job_factory, options.merge(started_workers: running), &blk) + end + def create_workers(job_factory, options, &block) workers = [] Array.new(options[:count]).each do diff --git a/spec/cases/closes_processes_at_runtime.rb b/spec/cases/closes_processes_at_runtime.rb index d101a43..f45e86d 100644 --- a/spec/cases/closes_processes_at_runtime.rb +++ b/spec/cases/closes_processes_at_runtime.rb @@ -1,13 +1,5 @@ require './spec/cases/helper' -cmd = "ps uaxw|grep ruby|wc -l" -processes_before = `#{cmd}`.to_i -Parallel.each((0..10).to_a, :in_processes => 5) { |a| a*2 } -sleep 1 -processes_after = `#{cmd}`.to_i - -if processes_before == processes_after - print 'OK' -else - print "FAIL: before:#{processes_before} -- after:#{processes_after}" +process_diff do + Parallel.each((0..10).to_a, :in_processes => 5) { |a| a*2 } end diff --git a/spec/cases/helper.rb b/spec/cases/helper.rb index 6bf1bbe..ce9fc73 100644 --- a/spec/cases/helper.rb +++ b/spec/cases/helper.rb @@ -1,2 +1,20 @@ require 'bundler/setup' require 'parallel' + +def process_diff + cmd = "ps uaxw|grep ruby|wc -l" + + processes_before = `#{cmd}`.to_i + + yield + + sleep 1 + + processes_after = `#{cmd}`.to_i + + if processes_before == processes_after + print 'OK' + else + print "FAIL: before:#{processes_before} -- after:#{processes_after}" + end +end diff --git a/spec/cases/map_isolation.rb b/spec/cases/map_isolation.rb new file mode 100644 index 0000000..0bed5b7 --- /dev/null +++ b/spec/cases/map_isolation.rb @@ -0,0 +1,8 @@ +require './spec/cases/helper' + +process_diff do + result = Parallel.map([1,2,3,4], in_processes: 2, isolation: true) do |i| + $i ||= i + end + puts result +end diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index 4e1b97a..5ec0755 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -370,6 +370,11 @@ def cpus l = Array.new(10_000){|i| i} Parallel.map(l, {in_threads: 4}){|x| x+1}.should == l.map{|x| x+1} end + + it 'can work in isolation' do + out = `ruby spec/cases/map_isolation.rb` + out.should == "1\n2\n3\n4\nOK" + end end describe ".map_with_index" do From de25018777a513c075073b3ba502cbecff43fa4b Mon Sep 17 00:00:00 2001 From: Michael Grosser Date: Sat, 26 Mar 2016 22:20:49 -0700 Subject: [PATCH 2/3] fix sqlite specs --- spec/cases/each_with_ar_sqlite.rb | 11 ++++------- spec/parallel_spec.rb | 4 ++-- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/spec/cases/each_with_ar_sqlite.rb b/spec/cases/each_with_ar_sqlite.rb index 025ee85..be85a27 100644 --- a/spec/cases/each_with_ar_sqlite.rb +++ b/spec/cases/each_with_ar_sqlite.rb @@ -7,7 +7,7 @@ ActiveRecord::Schema.verbose = false ActiveRecord::Base.establish_connection( :adapter => "sqlite3", - :database => ":memory:" + :database => Tempfile.new("db").path ) class User < ActiveRecord::Base @@ -26,14 +26,11 @@ class User < ActiveRecord::Base 3.times { User.create!(:name => "X") } -print "Parent: " -puts User.first.name +puts "Parent: #{User.first.name}" -print "Parallel (#{in_worker_type}): " Parallel.each([1], in_worker_type => 1) do - puts User.all.map(&:name).join + puts "Parallel (#{in_worker_type}): #{User.all.map(&:name).join}" end -print "\nParent: " -puts User.first.name +puts "Parent: #{User.first.name}" diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index 5ec0755..a9ffed3 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -431,8 +431,8 @@ def cpus end worker_types.each do |type| - pending "works with SQLite in #{type}" do - `WORKER_TYPE=#{type} ruby spec/cases/each_with_ar_sqlite.rb`.should == "Parent: X\nParallel (in_#{type}): XXX\n\nParent: X\n" + it "works with SQLite in #{type}" do + `WORKER_TYPE=#{type} ruby spec/cases/each_with_ar_sqlite.rb`.should == "Parent: X\nParallel (in_#{type}): XXX\nParent: X\n" end it "stops all workers when one fails in #{type}" do From 3a76a82403cb4e38ba4d1bcde754b09ac34a348f Mon Sep 17 00:00:00 2001 From: Michael Grosser Date: Sat, 26 Mar 2016 22:31:11 -0700 Subject: [PATCH 3/3] stderr output is a failure reason --- spec/parallel_spec.rb | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/spec/parallel_spec.rb b/spec/parallel_spec.rb index a9ffed3..e031ee4 100644 --- a/spec/parallel_spec.rb +++ b/spec/parallel_spec.rb @@ -432,7 +432,7 @@ def cpus worker_types.each do |type| it "works with SQLite in #{type}" do - `WORKER_TYPE=#{type} ruby spec/cases/each_with_ar_sqlite.rb`.should == "Parent: X\nParallel (in_#{type}): XXX\nParent: X\n" + `WORKER_TYPE=#{type} ruby spec/cases/each_with_ar_sqlite.rb 2>&1`.should == "Parent: X\nParallel (in_#{type}): XXX\nParent: X\n" end it "stops all workers when one fails in #{type}" do @@ -473,19 +473,19 @@ def cpus describe "progress" do it "takes the title from :progress" do - `ruby spec/cases/progress.rb`.sub(/=+/, '==').strip.should == "Doing stuff: |==|" + `ruby spec/cases/progress.rb 2>&1`.sub(/=+/, '==').strip.should == "Doing stuff: |==|" end it "takes true from :progress" do - `TITLE=true ruby spec/cases/progress.rb`.sub(/=+/, '==').strip.should == "Progress: |==|" + `TITLE=true ruby spec/cases/progress.rb 2>&1`.sub(/=+/, '==').strip.should == "Progress: |==|" end it "works with :finish" do - `ruby spec/cases/progress_with_finish.rb`.strip.sub(/=+/, '==').gsub(/\n+/,"\n").should == "Doing stuff: |==|\n100" + `ruby spec/cases/progress_with_finish.rb 2>&1`.strip.sub(/=+/, '==').gsub(/\n+/,"\n").should == "Doing stuff: |==|\n100" end it "takes the title from :progress[:title] and passes options along" do - `ruby spec/cases/progress_with_options.rb`.should =~ /Reticulating Splines ;+ \d+ ;+/ + `ruby spec/cases/progress_with_options.rb 2>&1`.should =~ /Reticulating Splines ;+ \d+ ;+/ end end @@ -494,11 +494,11 @@ def cpus let(:result) { "ITEM-1\nITEM-2\nITEM-3\n" } it "runs in threads" do - `ruby spec/cases/with_#{thing}.rb THREADS`.should == result + `ruby spec/cases/with_#{thing}.rb THREADS 2>&1`.should == result end it "runs in processs" do - `ruby spec/cases/with_#{thing}.rb PROCESSES`.should == result + `ruby spec/cases/with_#{thing}.rb PROCESSES 2>&1`.should == result end it "refuses to use progress" do