From bc999d2464a341f7648bd13727b293d2a326e945 Mon Sep 17 00:00:00 2001 From: James Tucker Date: Tue, 24 May 2011 02:32:43 -0700 Subject: [PATCH] Initial introduction of a deferrable pool for resource pooling where the work can be encapsulated using deferrables --- lib/em/deferrable.rb | 1 + lib/em/deferrable/pool.rb | 97 ++++++++++++++++++++++++++++++ tests/test_deferrable_pool.rb | 107 ++++++++++++++++++++++++++++++++++ 3 files changed, 205 insertions(+) create mode 100644 lib/em/deferrable/pool.rb create mode 100644 tests/test_deferrable_pool.rb diff --git a/lib/em/deferrable.rb b/lib/em/deferrable.rb index ecb5febc6..18a6d316b 100644 --- a/lib/em/deferrable.rb +++ b/lib/em/deferrable.rb @@ -25,6 +25,7 @@ module EventMachine module Deferrable + autoload :Pool, 'em/deferrable/pool' # Specify a block to be executed if and when the Deferrable object receives # a status of :succeeded. See #set_deferred_status for more information. diff --git a/lib/em/deferrable/pool.rb b/lib/em/deferrable/pool.rb new file mode 100644 index 000000000..80cf6a38d --- /dev/null +++ b/lib/em/deferrable/pool.rb @@ -0,0 +1,97 @@ +# = EM::Deferrable::Pool +# +# A simple async resource pool based on a resource and work queue. Resources +# are enqueued and work waits for resources to become available. +# +# Example: +# +# EM.run do +# pool = EM::Deferrable::Pool.new +# spawn = lambda { pool.add EM::HttpRequest.new('http://example.org') } +# 10.times { spawn[] } +# done, scheduled = 0, 0 +# +# check = lambda do +# done += 1 +# if done >= scheduled +# EM.stop +# end +# end +# +# pool.on_error { |conn| spawn[] } +# +# 100.times do +# pool.perform do |conn| +# req = conn.get :path => '/', :keepalive => true +# +# req.callback do +# p [:success, conn.object_id, i, req.response.size] +# check[] +# end +# +# req.errback { check[] } +# +# req +# end +# end +# end +# +class EM::Deferrable::Pool + def initialize + @resources = EM::Queue.new + @removed = [] + end + + def add resource + @resources.push resource + end + alias requeue add + + def remove resource + @removed << resource + end + + def on_error *a, &b + @on_error = EM::Callback(*a, &b) + end + + def perform(*a, &b) + work = EM::Callback(*a, &b) + + @resources.pop do |resource| + if removed? resource + @removed.delete resource + reschedule work + else + process work, resource + end + end + end + alias reschedule perform + + def process work, resource + deferrable = work.call resource + if deferrable.kind_of?(EM::Deferrable) + completion deferrable, resource + else + raise ArgumentError, "deferrable expected from work" + end + end + + def completion deferrable, resource + deferrable.callback { requeue resource } + deferrable.errback { failure resource } + end + + def failure resource + if @on_error + @on_error.call resource + else + requeue resource + end + end + + def removed? resource + @removed.include? resource + end +end \ No newline at end of file diff --git a/tests/test_deferrable_pool.rb b/tests/test_deferrable_pool.rb new file mode 100644 index 000000000..cd2ba2ce2 --- /dev/null +++ b/tests/test_deferrable_pool.rb @@ -0,0 +1,107 @@ +class TestDeferrablePool < Test::Unit::TestCase + def pool + @pool ||= EM::Deferrable::Pool.new + end + + def go + EM.run { yield } + end + + def stop + EM.stop + end + + def deferrable + @deferrable ||= EM::DefaultDeferrable.new + end + + def test_supports_more_work_than_resources + ran = false + go do + pool.perform do + ran = true + deferrable + end + stop + end + assert_equal false, ran + go do + pool.add :resource + stop + end + assert_equal true, ran + end + + def test_reques_resources_on_error + pooled_res, pooled_res2 = nil + pool.add :res + go do + pool.perform do |res| + pooled_res = res + deferrable + end + stop + end + deferrable.fail + go do + pool.perform do |res| + pooled_res2 = res + deferrable + end + stop + end + assert_equal :res, pooled_res + assert_equal pooled_res, pooled_res2 + end + + def test_supports_custom_error_handler + eres = nil + pool.on_error do |res| + eres = res + end + performs = [] + pool.add :res + go do + pool.perform do |res| + performs << res + deferrable + end + pool.perform do |res| + performs << res + deferrable + end + deferrable.fail + stop + end + assert_equal :res, eres + # manual requeues required when error handler is installed: + assert_equal 1, performs.size + assert_equal :res, performs.first + end + + def test_catches_successful_deferrables + performs = [] + pool.add :res + go do + pool.perform { |res| performs << res; deferrable } + pool.perform { |res| performs << res; deferrable } + stop + end + assert_equal [:res], performs + deferrable.succeed + go { stop } + assert_equal [:res, :res], performs + end + + def test_prunes_locked_and_removed_resources + performs = [] + pool.add :res + deferrable.succeed + go do + pool.perform { |res| performs << res; pool.remove res; deferrable } + pool.perform { |res| performs << res; pool.remove res; deferrable } + stop + end + assert_equal [:res], performs + end +end \ No newline at end of file