Skip to content

Commit

Permalink
a few more touch ups
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshMcKin committed Feb 9, 2014
1 parent d80f790 commit 4e05d4e
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 58 deletions.
98 changes: 51 additions & 47 deletions lib/hot_tub/pool.rb
Expand Up @@ -17,8 +17,8 @@ class Pool
# pool.run {|clnt| puts clnt.head('/').code }
#
# HotTub::Pool defaults never_block to true, which means if we run out of
# connections simply create a new client to continue operations.
# The pool will grow and extra connections will be resued until activity dies down.
# clients simply create a new client to continue operations.
# The pool will grow and extra clients will be reused until activity dies down.
# If you would like to block and possibly throw an exception rather than temporarily
# grow the set :size, set :never_block to false; blocking_timeout defaults to 10 seconds.
#
Expand All @@ -41,11 +41,13 @@ class Pool
def initialize(opts={},&client_block)
raise ArgumentError, 'a block that initializes a new client is required' unless block_given?

@size = (opts[:size] || 5) # in seconds
@blocking_timeout = (opts[:blocking_timeout] || 10) # in seconds
@size = (opts[:size] || 5) # in seconds
@blocking_timeout = (opts[:blocking_timeout] || 10) # in seconds
@never_block = (opts[:never_block].nil? ? true : opts[:never_block]) # Return new client if we run out
@reap_timeout = (opts[:reap_timeout] || 600) # the interval to reap connections in seconds

@close = opts[:close] # => lambda {|clnt| clnt.close}
@clean = opts[:clean] # => lambda {|clnt| clnt.clean}
@never_block = (opts[:never_block].nil? ? true : opts[:never_block]) # Return new client if we run out
@client_block = client_block

@pool = [] # stores available clients
Expand All @@ -57,8 +59,9 @@ def initialize(opts={},&client_block)
@cond = @monitor.new_cond
@last_activity = Time.now
@reaper = Thread.new {
reap_pool
}
Thread.current["name"] = "pool_reaper"
reap
}
at_exit {close_all}
end

Expand All @@ -74,17 +77,17 @@ def run(&block)
push(clnt) if clnt
end

# Calls close on all connections and reset the pools
# Calls close on all clients and reset the pools
# Its possible clients may be returned to the pool after close_all,
# but the close_client block ensures the client should be stale
# and the clean method should repairs those connections if they are called
# and the clean method should repairs those clients if they are called
def close_all
@monitor.synchronize do
while (clnt = @pool.pop || clnt = @out.pop)
begin
close_client(clnt)
rescue => e
HotTub.logger.error "There was an error close one of your HotTub::Pool connections: #{e}"
HotTub.logger.error "There was an error close one of your HotTub::Pool clients: #{e}"
end
end
end
Expand All @@ -102,25 +105,23 @@ def client
# Safely add client back to pool, only if
# that clnt is registered
def push(clnt)
return false if clnt.nil?
reap = false
@monitor.synchronize do
@out.delete(clnt)
@pool << clnt
reap = _reap_pool?
@cond.signal
if clnt
@monitor.synchronize do
@out.delete(clnt)
@pool << clnt
@cond.signal
wake_reaper if _reap?
end
end
nil # make sure never return the pool
ensure
wake_reaper if reap
nil
end

def alarm_time
(Time.now + @blocking_timeout)
end

def raise_alarm?(alm_time)
(alm_time <= Time.now)
def raise_alarm?(time)
(time <= Time.now)
end

def raise_alarm
Expand All @@ -129,6 +130,18 @@ def raise_alarm
raise BlockingTimeout, message
end

# _empty? is volatile; and may cause be inaccurate
# if called outside @monitor.synchronize {}
def _empty?
@pool.empty?
end

# _available? is volatile; and may cause be inaccurate
# if called outside @monitor.synchronize {}
def _available?
!_empty?
end

# Safely pull client from pool, adding if allowed
def pop
clnt = nil
Expand All @@ -137,16 +150,11 @@ def pop
raise_alarm if raise_alarm?(alarm)
@monitor.synchronize do
@last_activity = Time.now
if (@pool.empty?)
if _add? || @never_block
clnt = _add(true)
else
@cond.wait(@blocking_timeout)
end
if (_available? || _add)
@out << clnt = @pool.pop
else
clnt = @pool.pop
@cond.wait(@blocking_timeout)
end
@out << clnt if clnt
end
end
clnt
Expand All @@ -158,37 +166,33 @@ def _current_size

# Only want to add a client if the pool is empty in keeping with
# a lazy model. _add? is volatile; and may cause be in accurate
# if called outside @pool_mutex.synchronize {}
# if called outside @monitor.synchronize {}
def _add?
(@pool.empty? && (@size > _current_size))
(_empty? && (@never_block || (@size > _current_size)))
end

# _add is volatile; and may cause threading issues
# if called outside @pool_mutex.synchronize {}
def _add(out=false)
# if called outside @monitor.synchronize {}
def _add
return false unless _add?
nc = @client_block.call
HotTub.logger.info "Adding HotTub client: #{nc.class.name} to pool"
if out
@out << nc
return nc
else
@pool << nc
end
nil
@pool << nc
true
end

# _reap_pool? is volatile; and may cause be inaccurate
# if called outside @pool_mutex.synchronize {}
def _reap_pool?
(!@pool.empty? && (_current_size > @size) && ((@last_activity + (600)) < Time.now))
# if called outside @monitor.synchronize {}
def _reap?
(_available? && (_current_size > @size) && ((@last_activity + (@reap_timeout)) < Time.now))
end

# Remove extra connections from front of pool
def reap_pool
# Remove and close extra clients after reap_timeout
def reap
reaped = []
loop do
@monitor.synchronize do
while _reap_pool?
while _reap?
reaped << @pool.shift
end
end
Expand Down
19 changes: 8 additions & 11 deletions spec/pool_spec.rb
Expand Up @@ -78,24 +78,21 @@
describe '#close_all' do
before(:each) do
@pool = HotTub::Pool.new(:size => 5) { MocClient.new }
5.times do
@pool.send(:_add)
end
end

it "should reset out" do
@pool.instance_variable_set(:@out, @pool.instance_variable_get(:@pool))
@pool.instance_variable_set(:@pool, [])
@pool.instance_variable_get(:@out).length.should eql(5)
@pool.send(:_current_size).should eql(5)
@pool.instance_variable_set(:@out, [MocClient.new,MocClient.new,MocClient.new])
@pool.instance_variable_get(:@out).length.should eql(3)
@pool.send(:_current_size).should eql(3)
@pool.close_all
@pool.instance_variable_get(:@out).length.should eql(0)
@pool.send(:_current_size).should eql(0)
end

it "should reset pool" do
@pool.send(:_current_size).should eql(5)
@pool.instance_variable_get(:@pool).length.should eql(5)
@pool.instance_variable_set(:@pool, [MocClient.new,MocClient.new,MocClient.new])
@pool.instance_variable_get(:@pool).length.should eql(3)
@pool.send(:_current_size).should eql(3)
@pool.close_all
@pool.instance_variable_get(:@pool).length.should eql(0)
@pool.send(:_current_size).should eql(0)
Expand Down Expand Up @@ -196,13 +193,13 @@
end
end

describe '#reap_pool' do
describe '#reap' do
context 'current_size is greater than :size' do
it "should remove a connection from the pool" do
pool = HotTub::Pool.new({:size => 1}) { MocClient.new }
pool.instance_variable_set(:@last_activity,(Time.now - 601))
pool.instance_variable_set(:@pool, [MocClient.new,MocClient.new])
pool.send(:_reap_pool?).should be_true
pool.send(:_reap?).should be_true
pool.instance_variable_get(:@reaper).wakeup # run the reaper thread
sleep(0.1) # let results
pool.send(:_current_size).should eql(1)
Expand Down

0 comments on commit 4e05d4e

Please sign in to comment.