diff --git a/lib/hot_tub/pool.rb b/lib/hot_tub/pool.rb index c0e8f02..e07e99a 100644 --- a/lib/hot_tub/pool.rb +++ b/lib/hot_tub/pool.rb @@ -17,8 +17,8 @@ class Pool # pool.run {|clnt| puts clnt.head('/').code } # # HotTub::Pool defaults never_block to true, which means if we run out of - # connections simply create a new client to continue operations. - # The pool will grow and extra connections will be resued until activity dies down. + # clients simply create a new client to continue operations. + # The pool will grow and extra clients will be reused until activity dies down. # If you would like to block and possibly throw an exception rather than temporarily # grow the set :size, set :never_block to false; blocking_timeout defaults to 10 seconds. # @@ -41,11 +41,13 @@ class Pool def initialize(opts={},&client_block) raise ArgumentError, 'a block that initializes a new client is required' unless block_given? - @size = (opts[:size] || 5) # in seconds - @blocking_timeout = (opts[:blocking_timeout] || 10) # in seconds + @size = (opts[:size] || 5) # in seconds + @blocking_timeout = (opts[:blocking_timeout] || 10) # in seconds + @never_block = (opts[:never_block].nil? ? true : opts[:never_block]) # Return new client if we run out + @reap_timeout = (opts[:reap_timeout] || 600) # the interval to reap connections in seconds + @close = opts[:close] # => lambda {|clnt| clnt.close} @clean = opts[:clean] # => lambda {|clnt| clnt.clean} - @never_block = (opts[:never_block].nil? ? true : opts[:never_block]) # Return new client if we run out @client_block = client_block @pool = [] # stores available clients @@ -57,8 +59,9 @@ def initialize(opts={},&client_block) @cond = @monitor.new_cond @last_activity = Time.now @reaper = Thread.new { - reap_pool - } + Thread.current["name"] = "pool_reaper" + reap + } at_exit {close_all} end @@ -74,17 +77,17 @@ def run(&block) push(clnt) if clnt end - # Calls close on all connections and reset the pools + # Calls close on all clients and reset the pools # Its possible clients may be returned to the pool after close_all, # but the close_client block ensures the client should be stale - # and the clean method should repairs those connections if they are called + # and the clean method should repairs those clients if they are called def close_all @monitor.synchronize do while (clnt = @pool.pop || clnt = @out.pop) begin close_client(clnt) rescue => e - HotTub.logger.error "There was an error close one of your HotTub::Pool connections: #{e}" + HotTub.logger.error "There was an error close one of your HotTub::Pool clients: #{e}" end end end @@ -102,25 +105,23 @@ def client # Safely add client back to pool, only if # that clnt is registered def push(clnt) - return false if clnt.nil? - reap = false - @monitor.synchronize do - @out.delete(clnt) - @pool << clnt - reap = _reap_pool? - @cond.signal + if clnt + @monitor.synchronize do + @out.delete(clnt) + @pool << clnt + @cond.signal + wake_reaper if _reap? + end end - nil # make sure never return the pool - ensure - wake_reaper if reap + nil end def alarm_time (Time.now + @blocking_timeout) end - def raise_alarm?(alm_time) - (alm_time <= Time.now) + def raise_alarm?(time) + (time <= Time.now) end def raise_alarm @@ -129,6 +130,18 @@ def raise_alarm raise BlockingTimeout, message end + # _empty? is volatile; and may cause be inaccurate + # if called outside @monitor.synchronize {} + def _empty? + @pool.empty? + end + + # _available? is volatile; and may cause be inaccurate + # if called outside @monitor.synchronize {} + def _available? + !_empty? + end + # Safely pull client from pool, adding if allowed def pop clnt = nil @@ -137,16 +150,11 @@ def pop raise_alarm if raise_alarm?(alarm) @monitor.synchronize do @last_activity = Time.now - if (@pool.empty?) - if _add? || @never_block - clnt = _add(true) - else - @cond.wait(@blocking_timeout) - end + if (_available? || _add) + @out << clnt = @pool.pop else - clnt = @pool.pop + @cond.wait(@blocking_timeout) end - @out << clnt if clnt end end clnt @@ -158,37 +166,33 @@ def _current_size # Only want to add a client if the pool is empty in keeping with # a lazy model. _add? is volatile; and may cause be in accurate - # if called outside @pool_mutex.synchronize {} + # if called outside @monitor.synchronize {} def _add? - (@pool.empty? && (@size > _current_size)) + (_empty? && (@never_block || (@size > _current_size))) end # _add is volatile; and may cause threading issues - # if called outside @pool_mutex.synchronize {} - def _add(out=false) + # if called outside @monitor.synchronize {} + def _add + return false unless _add? nc = @client_block.call HotTub.logger.info "Adding HotTub client: #{nc.class.name} to pool" - if out - @out << nc - return nc - else - @pool << nc - end - nil + @pool << nc + true end # _reap_pool? is volatile; and may cause be inaccurate - # if called outside @pool_mutex.synchronize {} - def _reap_pool? - (!@pool.empty? && (_current_size > @size) && ((@last_activity + (600)) < Time.now)) + # if called outside @monitor.synchronize {} + def _reap? + (_available? && (_current_size > @size) && ((@last_activity + (@reap_timeout)) < Time.now)) end - # Remove extra connections from front of pool - def reap_pool + # Remove and close extra clients after reap_timeout + def reap reaped = [] loop do @monitor.synchronize do - while _reap_pool? + while _reap? reaped << @pool.shift end end diff --git a/spec/pool_spec.rb b/spec/pool_spec.rb index 78ad19b..16d49b1 100644 --- a/spec/pool_spec.rb +++ b/spec/pool_spec.rb @@ -78,24 +78,21 @@ describe '#close_all' do before(:each) do @pool = HotTub::Pool.new(:size => 5) { MocClient.new } - 5.times do - @pool.send(:_add) - end end it "should reset out" do - @pool.instance_variable_set(:@out, @pool.instance_variable_get(:@pool)) - @pool.instance_variable_set(:@pool, []) - @pool.instance_variable_get(:@out).length.should eql(5) - @pool.send(:_current_size).should eql(5) + @pool.instance_variable_set(:@out, [MocClient.new,MocClient.new,MocClient.new]) + @pool.instance_variable_get(:@out).length.should eql(3) + @pool.send(:_current_size).should eql(3) @pool.close_all @pool.instance_variable_get(:@out).length.should eql(0) @pool.send(:_current_size).should eql(0) end it "should reset pool" do - @pool.send(:_current_size).should eql(5) - @pool.instance_variable_get(:@pool).length.should eql(5) + @pool.instance_variable_set(:@pool, [MocClient.new,MocClient.new,MocClient.new]) + @pool.instance_variable_get(:@pool).length.should eql(3) + @pool.send(:_current_size).should eql(3) @pool.close_all @pool.instance_variable_get(:@pool).length.should eql(0) @pool.send(:_current_size).should eql(0) @@ -196,13 +193,13 @@ end end - describe '#reap_pool' do + describe '#reap' do context 'current_size is greater than :size' do it "should remove a connection from the pool" do pool = HotTub::Pool.new({:size => 1}) { MocClient.new } pool.instance_variable_set(:@last_activity,(Time.now - 601)) pool.instance_variable_set(:@pool, [MocClient.new,MocClient.new]) - pool.send(:_reap_pool?).should be_true + pool.send(:_reap?).should be_true pool.instance_variable_get(:@reaper).wakeup # run the reaper thread sleep(0.1) # let results pool.send(:_current_size).should eql(1)