Skip to content

Commit

Permalink
Merge pull request #171 from grosser/grosser/isolation
Browse files Browse the repository at this point in the history
Grosser/isolation
  • Loading branch information
grosser committed Mar 27, 2016
2 parents b54b7f6 + 3a76a82 commit d5887c5
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 46 deletions.
21 changes: 11 additions & 10 deletions Readme.md
Expand Up @@ -18,23 +18,23 @@ results = Parallel.map(['a','b','c']) do |one_letter|
end

# 3 Processes -> finished after 1 run
results = Parallel.map(['a','b','c'], :in_processes=>3){|one_letter| ... }
results = Parallel.map(['a','b','c'], in_processes: 3) { |one_letter| ... }

# 3 Threads -> finished after 1 run
results = Parallel.map(['a','b','c'], :in_threads=>3){|one_letter| ... }
results = Parallel.map(['a','b','c'], in_threads: 3) { |one_letter| ... }
```

Same can be done with `each`
```Ruby
Parallel.each(['a','b','c']){|one_letter| ... }
Parallel.each(['a','b','c']) { |one_letter| ... }
```
or `each_with_index` or `map_with_index`

Produce one item at a time with `lambda` (anything that responds to `.call`) or `Queue`.

```Ruby
items = [1,2,3]
Parallel.each(lambda{ items.pop || Parallel::Stop }){|number| ... }
Parallel.each( -> { items.pop || Parallel::Stop }) { |number| ... }
```


Expand All @@ -58,20 +58,20 @@ Try any of those to get working parallel AR

```Ruby
# reproducibly fixes things (spec/cases/map_with_ar.rb)
Parallel.each(User.all, :in_processes => 8) do |user|
Parallel.each(User.all, in_processes: 8) do |user|
user.update_attribute(:some_attribute, some_value)
end
User.connection.reconnect!

# maybe helps: explicitly use connection pool
Parallel.each(User.all, :in_threads => 8) do |user|
Parallel.each(User.all, in_threads: 8) do |user|
ActiveRecord::Base.connection_pool.with_connection do
user.update_attribute(:some_attribute, some_value)
end
end

# maybe helps: reconnect once inside every fork
Parallel.each(User.all, :in_processes => 8) do |user|
Parallel.each(User.all, in_processes: 8) do |user|
@reconnected ||= User.connection.reconnect! || true
user.update_attribute(:some_attribute, some_value)
end
Expand Down Expand Up @@ -101,7 +101,7 @@ end
```Ruby
# gem install ruby-progressbar

Parallel.map(1..50, :progress => "Doing stuff") { sleep 1 }
Parallel.map(1..50, progress: "Doing stuff") { sleep 1 }

# Doing stuff | ETA: 00:00:02 | ==================== | Time: 00:00:10
```
Expand All @@ -113,12 +113,13 @@ Use `:finish` or `:start` hook to get progress information.
They are called on the main process and protected with a mutex.

```Ruby
Parallel.map(1..100, :finish => lambda { |item, i, result| ... do something ... }) { sleep 1 }
Parallel.map(1..100, finish: -> (item, i, result) { ... do something ... }) { sleep 1 }
```

Tips
====
- [Benchmark/Test] Disable threading/forking with `:in_threads => 0` or `:in_processes => 0`, great to test performance or to debug parallel issues
- [Benchmark/Test] Disable threading/forking with `in_threads: 0` or `in_processes: 0`, great to test performance or to debug parallel issues
- [Isolation] Do not reuse previous worker processes: `isolation: true`

TODO
====
Expand Down
49 changes: 38 additions & 11 deletions lib/parallel.rb
Expand Up @@ -35,15 +35,16 @@ def initialize(read, write, pid)
@read, @write, @pid = read, write, pid
end

def close_pipes
read.close
write.close
def stop
close_pipes
wait # if it goes zombie, rather wait here to be able to debug
end

def wait
Process.wait(pid)
rescue Interrupt
# process died
# might be passed to started_processes and simultaneously closed by another thread
# when running in isolation mode, so we have to check if it is closed before closing
def close_pipes
read.close unless read.closed?
write.close unless write.closed?
end

def work(data)
Expand All @@ -61,6 +62,14 @@ def work(data)
raise result.exception if ExceptionWrapper === result
result
end

private

def wait
Process.wait(pid)
rescue Interrupt
# process died
end
end

class JobFactory
Expand Down Expand Up @@ -300,22 +309,31 @@ def work_in_threads(job_factory, options, &block)
end

def work_in_processes(job_factory, options, &blk)
workers = create_workers(job_factory, options, &blk)
workers = if options[:isolation]
[] # we create workers per job and not beforehand
else
create_workers(job_factory, options, &blk)
end
results = []
results_mutex = Mutex.new # arrays are not thread-safe
exception = nil

UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do
in_threads(options) do |i|
worker = workers[i]
worker.thread = Thread.current

begin
loop do
break if exception
item, index = job_factory.next
break unless index

if options[:isolation]
worker = replace_worker(job_factory, workers, i, options, blk)
end

worker.thread = Thread.current

begin
result = with_instrumentation item, index, options do
worker.work(job_factory.pack(item, index))
Expand All @@ -332,15 +350,24 @@ def work_in_processes(job_factory, options, &blk)
end
end
ensure
worker.close_pipes
worker.wait # if it goes zombie, rather wait here to be able to debug
worker.stop if worker
end
end
end

handle_exception(exception, results)
end

def replace_worker(job_factory, workers, i, options, blk)
# old worker is no longer used ... stop it
worker = workers[i]
worker.stop if worker

# create a new replacement worker
running = workers - [worker]
workers[i] = worker(job_factory, options.merge(started_workers: running), &blk)
end

def create_workers(job_factory, options, &block)
workers = []
Array.new(options[:count]).each do
Expand Down
12 changes: 2 additions & 10 deletions spec/cases/closes_processes_at_runtime.rb
@@ -1,13 +1,5 @@
require './spec/cases/helper'
cmd = "ps uaxw|grep ruby|wc -l"

processes_before = `#{cmd}`.to_i
Parallel.each((0..10).to_a, :in_processes => 5) { |a| a*2 }
sleep 1
processes_after = `#{cmd}`.to_i

if processes_before == processes_after
print 'OK'
else
print "FAIL: before:#{processes_before} -- after:#{processes_after}"
process_diff do
Parallel.each((0..10).to_a, :in_processes => 5) { |a| a*2 }
end
11 changes: 4 additions & 7 deletions spec/cases/each_with_ar_sqlite.rb
Expand Up @@ -7,7 +7,7 @@
ActiveRecord::Schema.verbose = false
ActiveRecord::Base.establish_connection(
:adapter => "sqlite3",
:database => ":memory:"
:database => Tempfile.new("db").path
)

class User < ActiveRecord::Base
Expand All @@ -26,14 +26,11 @@ class User < ActiveRecord::Base

3.times { User.create!(:name => "X") }

print "Parent: "
puts User.first.name
puts "Parent: #{User.first.name}"

print "Parallel (#{in_worker_type}): "
Parallel.each([1], in_worker_type => 1) do
puts User.all.map(&:name).join
puts "Parallel (#{in_worker_type}): #{User.all.map(&:name).join}"
end

print "\nParent: "
puts User.first.name
puts "Parent: #{User.first.name}"

18 changes: 18 additions & 0 deletions spec/cases/helper.rb
@@ -1,2 +1,20 @@
require 'bundler/setup'
require 'parallel'

def process_diff
cmd = "ps uaxw|grep ruby|wc -l"

processes_before = `#{cmd}`.to_i

yield

sleep 1

processes_after = `#{cmd}`.to_i

if processes_before == processes_after
print 'OK'
else
print "FAIL: before:#{processes_before} -- after:#{processes_after}"
end
end
8 changes: 8 additions & 0 deletions spec/cases/map_isolation.rb
@@ -0,0 +1,8 @@
require './spec/cases/helper'

process_diff do
result = Parallel.map([1,2,3,4], in_processes: 2, isolation: true) do |i|
$i ||= i
end
puts result
end
21 changes: 13 additions & 8 deletions spec/parallel_spec.rb
Expand Up @@ -370,6 +370,11 @@ def cpus
l = Array.new(10_000){|i| i}
Parallel.map(l, {in_threads: 4}){|x| x+1}.should == l.map{|x| x+1}
end

it 'can work in isolation' do
out = `ruby spec/cases/map_isolation.rb`
out.should == "1\n2\n3\n4\nOK"
end
end

describe ".map_with_index" do
Expand Down Expand Up @@ -426,8 +431,8 @@ def cpus
end

worker_types.each do |type|
pending "works with SQLite in #{type}" do
`WORKER_TYPE=#{type} ruby spec/cases/each_with_ar_sqlite.rb`.should == "Parent: X\nParallel (in_#{type}): XXX\n\nParent: X\n"
it "works with SQLite in #{type}" do
`WORKER_TYPE=#{type} ruby spec/cases/each_with_ar_sqlite.rb 2>&1`.should == "Parent: X\nParallel (in_#{type}): XXX\nParent: X\n"
end

it "stops all workers when one fails in #{type}" do
Expand Down Expand Up @@ -468,19 +473,19 @@ def cpus

describe "progress" do
it "takes the title from :progress" do
`ruby spec/cases/progress.rb`.sub(/=+/, '==').strip.should == "Doing stuff: |==|"
`ruby spec/cases/progress.rb 2>&1`.sub(/=+/, '==').strip.should == "Doing stuff: |==|"
end

it "takes true from :progress" do
`TITLE=true ruby spec/cases/progress.rb`.sub(/=+/, '==').strip.should == "Progress: |==|"
`TITLE=true ruby spec/cases/progress.rb 2>&1`.sub(/=+/, '==').strip.should == "Progress: |==|"
end

it "works with :finish" do
`ruby spec/cases/progress_with_finish.rb`.strip.sub(/=+/, '==').gsub(/\n+/,"\n").should == "Doing stuff: |==|\n100"
`ruby spec/cases/progress_with_finish.rb 2>&1`.strip.sub(/=+/, '==').gsub(/\n+/,"\n").should == "Doing stuff: |==|\n100"
end

it "takes the title from :progress[:title] and passes options along" do
`ruby spec/cases/progress_with_options.rb`.should =~ /Reticulating Splines ;+ \d+ ;+/
`ruby spec/cases/progress_with_options.rb 2>&1`.should =~ /Reticulating Splines ;+ \d+ ;+/
end
end

Expand All @@ -489,11 +494,11 @@ def cpus
let(:result) { "ITEM-1\nITEM-2\nITEM-3\n" }

it "runs in threads" do
`ruby spec/cases/with_#{thing}.rb THREADS`.should == result
`ruby spec/cases/with_#{thing}.rb THREADS 2>&1`.should == result
end

it "runs in processs" do
`ruby spec/cases/with_#{thing}.rb PROCESSES`.should == result
`ruby spec/cases/with_#{thing}.rb PROCESSES 2>&1`.should == result
end

it "refuses to use progress" do
Expand Down

0 comments on commit d5887c5

Please sign in to comment.