Permalink
Browse files

Initial generalized Celluloid::Worker

  • Loading branch information...
1 parent 6fafa9d commit 2cd3a30c13710e2afadea9666fa92de5405eaa2f @tarcieri tarcieri committed Apr 9, 2012
Showing with 102 additions and 4 deletions.
  1. +5 −3 CHANGES.md
  2. +7 −1 lib/celluloid.rb
  3. +61 −0 lib/celluloid/worker.rb
  4. +29 −0 spec/celluloid/worker_spec.rb
View
@@ -1,9 +1,11 @@
-0.10.1
-------
+HEAD
+----
+* Celluloid::Worker provides a background worker model
* Celluloid::Pool now provides round-robin distribution among actors
-* Celluloid::ThreadPool renamed to Celluloid::InternalPool to emphasize its...
+* Celluloid::ThreadPool renamed to Celluloid::InternalPool to emphasize its
internalness
* Support for asynchronously calling private methods inside actors
+* Future is now an instance method on all actors
* Async call exception logs now contain the failed method
0.10.0
View
@@ -288,6 +288,11 @@ def defer(&block)
# Celluloid::Future, which uses an Actor to run the block
Future.new(&block).value
end
+
+ # Handle calls to future within an actor itself
+ def future(meth, *args, &block)
+ Actor.future Thread.current[:actor].mailbox, meth, *args, &block
+ end
# Process async calls via method_missing
def method_missing(meth, *args, &block)
@@ -296,7 +301,7 @@ def method_missing(meth, *args, &block)
unbanged_meth = meth.to_s.sub(/!$/, '')
args.unshift unbanged_meth
- call = AsyncCall.new(@mailbox, :__send__, args, block)
+ call = AsyncCall.new(nil, :__send__, args, block)
begin
Thread.current[:actor].mailbox << call
rescue MailboxError
@@ -338,3 +343,4 @@ def method_missing(meth, *args, &block)
require 'celluloid/future'
require 'celluloid/group'
require 'celluloid/supervisor'
+require 'celluloid/worker'
View
@@ -0,0 +1,61 @@
+module Celluloid
+ # Manages a fixed-size pool of workers
+ module Worker
@seamusabshere

seamusabshere Apr 25, 2012

hey @tarcieri love this abstraction - we're already using it in production at http://impact.brighterplanet.com/ - when are you gonna release a gem that includes it?

@tarcieri

tarcieri Apr 25, 2012

Owner

Soon! There's still a bug in the current implementation which prevents creating a large number of futures on JRuby I'd like to get fixed first.

+ def self.included(klass)
+ klass.send :include, Celluloid
+ klass.send :extend, ClassMethods
+ end
+
+ module ClassMethods
+ def group(options = {})
+ Manager.new(self, options)
+ end
+ end
+
+ # Delegates work (i.e. methods) and supervises workers
+ class Manager
+ include Celluloid
+ trap_exit :crash_handler
+
+ def initialize(worker_class, options = {})
+ @size = options[:size] || Celluloid.cores
+ @args = options[:args]
+
+ @worker_class = worker_class
+ @idle = @size.times.map { worker_class.new(*@args) }
+ end
+
+ # Execute the given method within a worker
+ def execute(method, *args, &block)
+ wait :ready while @idle.empty?
+ worker = @idle.shift
+
+ begin
+ worker.send method, *args, &block
+ ensure
+ if worker.alive?
+ @idle << worker
+ signal :ready if @idle.size == 1
+ end
+ end
+ end
+
+ # Spawn a new worker for every crashed one
+ def crash_handler
+ @idle << worker_class.new(*args)
+ end
+
+ def respond_to?(method)
+ super || (@worker_class ? @worker_class.instance_methods.include?(method.to_sym) : false)
+ end
+
+ def method_missing(method, *args, &block)
+ if respond_to?(method)
+ future :execute, method, *args, &block
+ else
+ super
+ end
+ end
+ end
+ end
+end
@@ -0,0 +1,29 @@
+require 'spec_helper'
+
+describe Celluloid::Worker do
+ before do
+ class MyWorker
+ include Celluloid::Worker
+
+ def process(queue = nil)
+ if queue
+ queue << :done
+ else
+ :done
+ end
+ end
+ end
+ end
+
+ subject { MyWorker.group }
+
+ it "processes work units synchronously" do
+ subject.process.value.should == :done
+ end
+
+ it "processes work units asynchronously" do
+ queue = Queue.new
+ subject.process!(queue)
+ queue.pop.should == :done
+ end
+end

0 comments on commit 2cd3a30

Please sign in to comment.