Skip to content

Commit

Permalink
Merge 73a28b7 into a6f1221
Browse files Browse the repository at this point in the history
  • Loading branch information
HoneyryderChuck committed Apr 12, 2016
2 parents a6f1221 + 73a28b7 commit e7e0b37
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 36 deletions.
1 change: 1 addition & 0 deletions celluloid.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Gem::Specification.new do |gem|
"examples/*"
]

gem.add_runtime_dependency 'timeout-extensions'
gem.require_path = "lib"

Celluloid::Sync::Gemspec[gem]
Expand Down
47 changes: 37 additions & 10 deletions lib/celluloid.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
$CELLULOID_BACKPORTED = (ENV["CELLULOID_BACKPORTED"] != "false") unless defined?($CELLULOID_BACKPORTED)
Celluloid::Notices.backported if $CELLULOID_BACKPORTED

require 'timeout/extensions'

module Celluloid
# Expose all instance methods as singleton methods
extend self
Expand Down Expand Up @@ -401,18 +403,26 @@ def receive(timeout = nil, &block)
# Sleep letting the actor continue processing messages
def sleep(interval)
actor = Thread.current[:celluloid_actor]
if actor
actor.sleep(interval)
else
Kernel.sleep interval
end
return Kernel.sleep(interval) unless actor
sleeper = Sleeper.new(actor.timers, interval)
Celluloid.suspend(:sleeping, sleeper)
end

# Timeout on task suspension (eg Sync calls to other actors)
def timeout(duration)
Thread.current[:celluloid_actor].timeout(duration) do
yield
end
# Timeout on task suspension
# The second argument is the exception that will be raised on timeouts
def timeout(duration, klass = TaskTimeout)
bt = caller
task = Task.current
timers = Thread.current[:celluloid_actor].timers
timer = timers.after(duration) do
exception = klass.new("execution expired")
# WARNING: if the klass exception is not a Celluloid::Interruption, things won't fly!!!
exception.set_backtrace bt
task.resume exception
end unless duration.nil?
yield
ensure
timer.cancel if timer
end

# Run given block in an exclusive mode: all synchronous calls block the whole
Expand Down Expand Up @@ -455,6 +465,23 @@ def async(meth = nil, *args, &block)
def future(meth = nil, *args, &block)
Thread.current[:celluloid_actor].behavior_proxy.future meth, *args, &block
end


class Sleeper
def initialize(timers, interval)
@timers = timers
@interval = interval
end

def before_suspend(task)
@timers.after(@interval) { task.resume }
end

def wait
Kernel.sleep(@interval)
end
end

end

if defined?(JRUBY_VERSION) && JRUBY_VERSION == "1.7.3"
Expand Down
30 changes: 4 additions & 26 deletions lib/celluloid/actor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -245,38 +245,16 @@ def every(interval, &block)
end

def timeout(duration)
bt = caller
task = Task.current
timer = @timers.after(duration) do
exception = TaskTimeout.new("execution expired")
exception.set_backtrace bt
task.resume exception
end
yield
ensure
timer.cancel if timer
Celluloid::timeout(duration) { yield }
end
private :timeout

class Sleeper
def initialize(timers, interval)
@timers = timers
@interval = interval
end

def before_suspend(task)
@timers.after(@interval) { task.resume }
end

def wait
Kernel.sleep(@interval)
end
end

# Sleep for the given amount of time
def sleep(interval)
sleeper = Sleeper.new(@timers, interval)
Celluloid.suspend(:sleeping, sleeper)
Celluloid::sleep(interval)
end
private :sleep

# Handle standard low-priority messages
def handle_message(message)
Expand Down
2 changes: 2 additions & 0 deletions lib/celluloid/group/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def shutdown
def create
queue = Queue.new
thread = Thread.new do
Thread.current.timeout_handler = Celluloid.method(:timeout).to_proc
Thread.current.sleep_handler = Celluloid.method(:sleep).to_proc
while proc = queue.pop
begin
proc.call
Expand Down
31 changes: 31 additions & 0 deletions spec/shared/actor_examples.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,37 @@ class ExampleMailbox < Celluloid::Mailbox; end
end
end

context :timeouts do
let :actor_class do
Class.new do
include CelluloidSpecs.included_module

def name
sleep 0.5
:foo
end

def ask_name_with_timeout(other, duration)
Timeout::timeout(duration) { other.name }
end
end
end

it "allows timing out tasks, raising Celluloid::Task::TimeoutError" do
a1 = actor_class.new
a2 = actor_class.new

expect { a1.ask_name_with_timeout a2, 0.3 }.to raise_error(Celluloid::TaskTimeout)
end

it "does not raise when it completes in time" do
a1 = actor_class.new
a2 = actor_class.new

expect(a1.ask_name_with_timeout(a2, 0.6)).to be(:foo)
end
end

context :mailbox_size do
subject do
Class.new do
Expand Down

0 comments on commit e7e0b37

Please sign in to comment.