Skip to content

Commit

Permalink
use monitor and condition variable to reduce code
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshMcKin committed Feb 7, 2014
1 parent 12fb9af commit 29ec50d
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 54 deletions.
96 changes: 47 additions & 49 deletions lib/hot_tub/pool.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
require 'monitor'
module HotTub
class Pool
include HotTub::KnownClients
attr_reader :current_size, :last_activity

# Thread-safe lazy connection pool modeled after Queue
# Thread-safe lazy connection pool
#
# == Example Net::HTTP
# pool = HotTub::Pool.new(:size => 10) {
Expand Down Expand Up @@ -45,20 +46,21 @@ def initialize(opts={},&client_block)
@close = opts[:close] # => lambda {|clnt| clnt.close}
@clean = opts[:clean] # => lambda {|clnt| clnt.clean}
@never_block = (opts[:never_block].nil? ? true : opts[:never_block]) # Return new client if we run out
@client_block = client_block
@client_block = client_block

@pool = [] # stores available connection
@pool.taint
@register = [] # stores all connections at all times
@register.taint
@waiting = [] # waiting threads
@waiting.taint
@stale = [] # stale/orphan connections to be reaped
@stale.taint
@pool_mutex = Mutex.new
@orphans = [] # orphan connections to be reaped
@orphans.taint

@monitor = Monitor.new
@cond = @monitor.new_cond
@current_size = 0
@last_activity = Time.now
@fetching_client = false
@reaper = Thread.new {
reap_pool
}
Expand All @@ -82,7 +84,7 @@ def run(&block)
# but the close_client block ensures the client should be stale
# and the clean method should repairs those connections if they are called
def close_all
@pool_mutex.synchronize do
@monitor.synchronize do
while clnt = @register.pop
@pool.delete(clnt)
begin
Expand All @@ -109,28 +111,20 @@ def client
def push(clnt)
pushed = false
reap = false
@pool_mutex.synchronize do
@monitor.synchronize do
if @register.include?(clnt)
pushed = true
@register.delete(clnt)
@register << clnt
@pool << clnt
end
begin
t = @waiting.shift
t.wakeup if t
rescue ThreadError
retry
end
reap = _reap_pool?
@orphans << clnt unless pushed # orphan close
@cond.signal
end
@stale << clnt unless pushed # orphan close
nil # make sure never return the pool
ensure
begin
@reaper.wakeup if reap
rescue ThreadError
end
wake_reaper if reap
end

def alarm_time
Expand All @@ -149,49 +143,24 @@ def raise_alarm

# Safely pull client from pool, adding if allowed
def pop
@fetching_client = true # kill reap_pool
clnt = nil
@pool_mutex.synchronize do
@monitor.synchronize do
@last_activity = Time.now
alarm = alarm_time
while clnt.nil?
raise_alarm if raise_alarm?(alarm)
if (@pool.empty?)
if _add? || @never_block
clnt = _add(true)
else
@waiting.push Thread.current
@pool_mutex.sleep(@blocking_timeout)
@cond.wait(@blocking_timeout)
end
else
clnt = @pool.pop
end
end
@fetching_client = false
clnt
end
end

# _reap_pool? is volatile; and may cause be inaccurate
# if called outside @pool_mutex.synchronize {}
def _reap_pool?
(!@fetching_client && (@current_size > @size) && ((@last_activity + (600)) < Time.now))
end

# Remove extra connections from front of pool
def reap_pool
@pool_mutex.synchronize do
while true
while stale = @stale.pop
close_client(stale)
end
while _reap_pool? && clnt = @pool.shift
@register.delete(clnt)
@current_size -= 1
close_client(clnt)
end
@pool_mutex.sleep
end
end
clnt
end

# Only want to add a client if the pool is empty in keeping with
Expand All @@ -204,7 +173,6 @@ def _add?
# _add is volatile; and may cause threading issues
# if called outside @pool_mutex.synchronize {}
def _add(no_pool=false)
@last_activity = Time.now
@current_size += 1
nc = @client_block.call
HotTub.logger.info "Adding HotTub client: #{nc.class.name} to pool"
Expand All @@ -213,6 +181,36 @@ def _add(no_pool=false)
@pool << nc
nil
end

# _reap_pool? is volatile; and may cause be inaccurate
# if called outside @pool_mutex.synchronize {}
def _reap_pool?
((@current_size > @size) && ((@last_activity + (600)) < Time.now))
end

# Remove extra connections from front of pool
def reap_pool
@monitor.synchronize do
loop do
while orphan = @orphans.pop
close_client(orphan)
end
while _reap_pool? && clnt = @pool.shift
@register.delete(clnt)
@current_size -= 1
close_client(clnt)
end
@monitor.sleep
end
end
end

def wake_reaper
begin
@reaper.wakeup if reap
rescue ThreadError
end
end
end

class BlockingTimeout < StandardError;end
Expand Down
26 changes: 21 additions & 5 deletions spec/pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
@connection = connection
end
# returned to pool after work was done
@pool.instance_variable_get(:@pool).pop.should eql(@connection)
@pool.instance_variable_get(:@pool).pop.should eql(@connection)
end

it "should work" do
Expand Down Expand Up @@ -247,7 +247,7 @@

context 'Net::Http' do
before(:each) do
@pool = HotTub::Pool.new(:size => 10) {
@pool = HotTub::Pool.new(:size => 10) {
uri = URI.parse(HotTub::Server.url)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = false
Expand All @@ -257,7 +257,7 @@
end
it "should work" do
result = nil
@pool.run{|clnt|
@pool.run{|clnt|
uri = URI.parse(HotTub::Server.url)
result = clnt.head(uri.path).code
}
Expand All @@ -270,9 +270,9 @@
lambda {
15.times.each do
threads << Thread.new do
@pool.run{|connection|
@pool.run{|connection|
uri = URI.parse(HotTub::Server.url)
Thread.current[:status] = connection.head(uri.path).code
Thread.current[:status] = connection.head(uri.path).code
}
end
end
Expand Down Expand Up @@ -301,4 +301,20 @@
end
end
end
# describe "benchmark" do
# @pool = HotTub::Pool.new {Object.new}
# now = Time.now
# threads = []
# 30000.times.each do
# threads << Thread.new do
# @pool.run{|conn|
# }
# end
# end
# sleep(0.001)
# threads.each do |t|
# t.join
# end
# puts Time.now - now
# end
end

0 comments on commit 29ec50d

Please sign in to comment.