simonmenke / actionpool forked from spox/actionpool

Easy thread pooling

This URL has Read+Write access

name age message
file LICENSE Loading commit data...
file README.rdoc
file actionpool.gemspec
directory lib/
README.rdoc

ActionPool

ActionPool is just a simple thread pool. It allows for various contraints and resizing in a pretty easy and unobtrusive manner. You can set limits on how long tasks are worked on, as well as on the life of a thread. For things that like to use lots threads, it can be helpful to reuse threads instead of constantly recreating them.

install (easy):

    gem install ActionPool

install (less easy):

    git clone http://github.com/spox/actionpool.git
    cd actionpool
    gem build *.gemspec
    gem install ./

install (less easy that’s a little easier)

rip makes it easy to install directly from a github repository.

Documentation

rdocs

Example

Code:

    require 'actionpool'

    pool = ActionPool::Pool.new
    pool.process do
        sleep(2)
        raise 'Wakeup main thread'
    end
    20.times do
        pool.process do
            puts "Thread: #{Thread.current}"
            sleep(0.1)
        end
    end
    begin
        sleep
    rescue Exception => e
        puts "Thread pool woke me up: #{e}"
    end

Result:

    Thread: #<Thread:0x93ebeb8>
    Thread: #<Thread:0x93eb92c>
    Thread: #<Thread:0x93eb8a0>
    Thread: #<Thread:0x93eb814>
    Thread: #<Thread:0x93eb788>
    Thread: #<Thread:0x93eb670>
    Thread: #<Thread:0x93eb5e4>
    Thread: #<Thread:0x93eb558>
    Thread: #<Thread:0x93eb4cc>
    Thread: #<Thread:0x93ebeb8>
    Thread: #<Thread:0x93eb92c>
    Thread: #<Thread:0x93eb8a0>
    Thread: #<Thread:0x93eb814>
    Thread: #<Thread:0x93eb788>
    Thread: #<Thread:0x93eb670>
    Thread: #<Thread:0x93eb5e4>
    Thread: #<Thread:0x93eb558>
    Thread: #<Thread:0x93eb4cc>
    Thread: #<Thread:0x93eb92c>
    Thread: #<Thread:0x93eb8a0>
    Thread pool woke me up: Wakeup main thread

Important note

The worker threads in the ActionPool will catch all Exception objects that your block fails to catch. Instead of just eating these exceptions, they are passed back to the creating thread of the pool (generally the main thread). This is important to note if you care about processing unexpected exceptions and what allows the example code above to be woken up from its sleep.

The internals

Probably more information than you really want

ActionPool has some simple settings that make things work. First, the pool has a minimum and maximum number of allowed threads. On initialization, the minimum number of threads are created and put into the pool. By default, this is 10 threads. As the number of tasks added to the pool increases, the pool will grow as needed. When more tasks are in the pool than threads to process them, new threads will be added into the pool, until the maximum thread threshold is reached. Taking the example above, we can demonstrate this easily by adjusting our limits:

    require 'actionpool'

    pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 3)
    pool.process do
        sleep(10)
        raise 'Wakeup main thread'
    end
    20.times do
        pool.process do
            puts "Thread: #{Thread.current}"
            sleep(rand(0.0))
        end
    end
    begin
        sleep
    rescue Exception => e
        puts "Thread pool woke me up: #{e}"
    end

Which results in:

    Thread: #<Thread:0x86c1760>
    Thread: #<Thread:0x86c1080>
    Thread: #<Thread:0x86c1080>
    Thread: #<Thread:0x86c1080>
    Thread: #<Thread:0x86c1760>
    Thread: #<Thread:0x86c1760>
    Thread: #<Thread:0x86c1760>
    Thread: #<Thread:0x86c1080>
    Thread: #<Thread:0x86c1760>
    Thread: #<Thread:0x86c1080>
    Thread: #<Thread:0x86c1080>
    Thread: #<Thread:0x86c1760>
    Thread: #<Thread:0x86c1080>
    Thread: #<Thread:0x86c1760>
    Thread: #<Thread:0x86c1760>
    Thread: #<Thread:0x86c1760>
    Thread: #<Thread:0x86c1080>
    Thread: #<Thread:0x86c1760>
    Thread: #<Thread:0x86c1760>
    Thread: #<Thread:0x86c1080>
    Thread pool woke me up: Wakeup main thread

Our pool starts with a single thread that is occupied by the sleeping task waiting to raise an exception. As we begin to add new tasks, the pool grows to accommidate the growing number of tasks, until it reaches the maximum threshold of 3. At that point, the pool simply processes the tasks until the task list is empty.

The pool also has the ability to limit the amount of time a thread spends working on a given task. By default, a thread will work on a given task until the task is completed, or the pool is shutdown. However, as the following example shows, it is very easy to limit this time to avoid the pool being bogged down on long running tasks:

    require 'actionpool'

    pool = ActionPool::Pool.new(:min_threads => 1, :max_threads => 1, :a_to => 1)
    pool.process do
        puts "#{Time.now}: I'm a long running task"
        sleep(100)
        raise 'Wakeup main thread'
    end
    pool.process do
        puts "#{Time.now}: Waiting for my turn"
        raise "I'm waking up the main thread"
    end
    begin
        sleep
    rescue Exception => e
        puts "Thread pool woke me up: #{e}"
    end

Results:

    2009-10-10 08:47:08 -0700: I'm a long running task
    2009-10-10 08:47:09 -0700: Waiting for my turn
    Thread pool woke me up: I'm waking up the main thread

If you have a number of tasks you would like to schedule at once, it is easy with the add_jobs method:

    require 'actionpool'

    pool = ActionPool::Pool.new
    a = 0
    lock = Mutex.new
    tasks = [].fill(lambda{ lock.synchronize{ a += 1 } }, 0..19)
    pool.add_jobs(tasks)
    sleep(0.5)
    puts "Result: #{a}"

Results:

    Result: 20

Footer

I hope this library is useful. If you find any bugs, or need any help, you can find me on DALnet and Freenode.

License

    ActionPool is licensed under the LGPLv3
    Copyright (c) 2009 spox <spox@modspox.com>