WorkerQueue - simpler parrallel worker #310

Open
wants to merge 4 commits into
from

Conversation

Projects
None yet
3 participants
Contributor

funny-falcon commented Mar 26, 2012

EM::WorkerQueue is based on EM::Iterator and EM::Queue ideas, but has simpler workflow

worker = EM::WorkerQueue.new(:concurency=>4){|v, iter| 
   EM.add_timer(1){ puts v; iter.done}
}
10.times{|i| worker.push i}

Also it supplies pull interface

queue = Queue.new
worker = EM::WorkerQueue.new(:concurency=>4){|v, iter| 
   EM.add_timer(1){ puts v; iter.done}
}
worker.on_empty{ queue.pop{|v| worker.push(v) } }
10.times{|i| queue.push i}

EM::Iterator could be implemented on top of EM::WorkerQueue, such implementation provided

I've just tried using your WorkerQueue and got a little confused about that concurrency parameter.

My intuition tells me that it defines the number of threads/processes, which work in parallel on the tasks stored in the WorkerQueue. Thus, given the following code:

class A
  # these will be objects, which should get process concurrently
  def process()
    # do something
  end
end

@a_list_of_As = []
EM.run do
  process_proc = proc do |param|
    param.process()
  end
  on_empty_proc = proc do 
    EM.stop()
  end
  queue = EM::WorkerQueue.new( process_proc,
                               { :on_empty => on_empty_proc,
                                 :concurrency => 1 }
                             )
  @a_list_of_As.each do |param|
    queue.push( param )
  end
  queue.close
  queue.run
end

I expect, that the list of As is processed sequentially. And when I set :concurrency => @a_list_of_As.length all A's #process are called concurrently.

However, the above code will only execute the very first A and then stops the EM.

I'm probably misreading and misinterpreting your WorkerQueue. What I'm doing wrong? Can you enlighten me?

Contributor

funny-falcon commented Jun 30, 2012

process_proc should be written as:

process_proc = proc do |param, work|
  param.process()
  work.done
end

ie, you ought to call #done and second argument passed to process_call

Thanks for the fast reply. Though this does not fix the issue with the :concurrency parameter. If set to 1 only the first A is processed. If set to @a_list_of_As.length they get processed sequentially and not concurrently.

Just to make sure, I've set it up correctly:
I'm using Ruby 1.9.2 via RVM, checked out your branch of this pull request (iter_lazy), did a rake compile:rubyeventmachine and added gem "eventmachine", "1.0.0.beta.4", :path => "path/to/git/root/of/eventmachine" to my Gemfile. bundle install tells me, that's using the correct Gem.

Contributor

funny-falcon commented Jun 30, 2012

Well, you should use on_done instead of on_empty:

  • on_empty is for feeding empty queue, so that it could do more work,
  • on_done is for reacting when closed queue becomes empty.

on_empty - is a "pull" interface for WorkerQueue, alternative to intentional #push-ing, so that you could
react, when there is free worker, to make a #push at that moment.

Secondly, WorkerQueue is for executing inside of EventMachine event loop, not outside. So that, jobs will be
processed concurrently only if you use EventMachine aware jobs (em-http-request, for example).
If you wish to use threads, then simply use EM.defer , or make a worker pool by itself using ::Queue .

(I have a supposition, that my EM::WorkerQueue is somewhat similar to EM::Pool, but with more convenient interface)

Contributor

funny-falcon commented Jun 30, 2012

Try to run this:

require 'eventmachine'
require 'em/worker_queue'

class A < Struct.new(:i)
  # these will be objects, which should get process concurrently
  def process(work)
    puts "Start #{i}"
    EM.add_timer(0.6) {
      puts "Stop #{i}"
      work.done
    }
  end
end

@a_list_of_As = (1..10).map{|i| A.new(i)}
EM.run do
  process_proc = proc do |param, work|
    param.process(work)
  end
  on_done_proc = proc do 
    puts "HO"
    EM.stop()
  end
  queue = EM::WorkerQueue.new( process_proc,
                               { :on_done => on_done_proc,
                                 :concurrency => 3 }
                             )
  @a_list_of_As.each do |param|
    queue.push( param )
  end
  queue.close
  queue.run
end

Yeah, that works. Thanks a lot. Will try to integrate that into my current program. Maybe you find some time to extend the documentation of the WorkerQueue class. Might be helpful for future users.

Contributor

funny-falcon commented Jul 1, 2012

Example with pull interface on_empty:

require 'eventmachine'
require 'em/worker_queue'

class A < Struct.new(:i)
  # these will be objects, which should get process concurrently
  def process(work)
    puts "Start #{i}"
    EM.add_timer(0.6) {
      puts "Stop #{i}"
      work.done
    }
  end
end

@a_list_of_As = (1..10).map{|i| A.new(i)}
EM.run do
  process_proc = proc do |param, work|
    param.process(work)
  end
  on_empty_proc = proc do |que|
    puts "on_empty"
    2.times do 
      (job = @a_list_of_As.shift) ? 
        que.push(job) :
        que.close
    end
  end
  on_done_proc = proc do 
    puts "HO"
    EM.stop()
  end
  queue = EM::WorkerQueue.new( process_proc,
                               { :on_empty => on_empty_proc,
                                 :on_done => on_done_proc,
                                 :concurrency => 3 }
                             )
end
Contributor

funny-falcon commented Sep 6, 2012

I've fixed errors, Iterator#map and Iterator#inject

funny-falcon added some commits Jan 8, 2015

worker queue
Queue specialized for running callback on each pushed value concerning
concurrency parameter.
Contributor

funny-falcon commented Jan 8, 2015

I've updated branch: collapse many commits, ensure iterator tests are passing, add with_object method to iterator.

@sodabrew sodabrew added this to the v1.2.0 milestone Feb 2, 2015

@sodabrew sodabrew modified the milestones: v1.2.0, v1.2.1 Feb 25, 2016

@sodabrew sodabrew modified the milestones: v1.2.1, v1.3.0 Nov 17, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment