Skip to content

Commit

Permalink
Make connection pool fair with respect to waiting threads.
Browse files Browse the repository at this point in the history
The core of this fix is a threadsafe, fair Queue class.  It is
very similar to Queue in stdlib except that it supports waiting
with a timeout.

The issue this solves is that if several threads are contending for
database connections, an unfair queue makes is possible that a thread
will timeout even while other threads successfully acquire and release
connections.  A fair queue means the thread that has been waiting the
longest will get the next available connection.

This includes a few test fixes to avoid test ordering issues that
cropped up during development of this patch.
  • Loading branch information
pmahoney committed May 25, 2012
1 parent 40cfcac commit 02b2335
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 38 deletions.
Expand Up @@ -2,7 +2,6 @@
require 'monitor'
require 'set'
require 'active_support/core_ext/module/deprecation'
require 'timeout'

module ActiveRecord
# Raised when a connection could not be obtained within the connection
Expand Down Expand Up @@ -70,6 +69,131 @@ module ConnectionAdapters
# after which the Reaper will consider a connection reapable. (default
# 5 seconds).
class ConnectionPool
# Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool
# with which it shares a Monitor. But could be a generic Queue.
#
# The Queue in stdlib's 'thread' could replace this class except
# stdlib's doesn't support waiting with a timeout.
class Queue
def initialize(lock = Monitor.new)
@lock = lock
@cond = @lock.new_cond
@num_waiting = 0
@queue = []
end

# Test if any threads are currently waiting on the queue.
def any_waiting?
synchronize do
@num_waiting > 0
end
end

# Return the number of threads currently waiting on this
# queue.
def num_waiting
synchronize do
@num_waiting
end
end

# Add +element+ to the queue. Never blocks.
def add(element)
synchronize do
@queue.push element
@cond.signal
end
end

# If +element+ is in the queue, remove and return it, or nil.
def delete(element)
synchronize do
@queue.delete(element)
end
end

# Remove all elements from the queue.
def clear
synchronize do
@queue.clear
end
end

# Remove the head of the queue.
#
# If +timeout+ is not given, remove and return the head the
# queue if the number of available elements is strictly
# greater than the number of threads currently waiting (that
# is, don't jump ahead in line). Otherwise, return nil.
#
# If +timeout+ is given, block if it there is no element
# available, waiting up to +timeout+ seconds for an element to
# become available.
#
# Raises:
# - ConnectionTimeoutError if +timeout+ is given and no element
# becomes available after +timeout+ seconds,
def poll(timeout = nil)
synchronize do
if timeout
no_wait_poll || wait_poll(timeout)
else
no_wait_poll
end
end
end

private

def synchronize(&block)
@lock.synchronize(&block)
end

# Test if the queue currently contains any elements.
def any?
!@queue.empty?
end

# A thread can remove an element from the queue without
# waiting if an only if the number of currently available
# connections is strictly greater than the number of waiting
# threads.
def can_remove_no_wait?
@queue.size > @num_waiting
end

# Removes and returns the head of the queue if possible, or nil.
def remove
@queue.shift
end

# Remove and return the head the queue if the number of
# available elements is strictly greater than the number of
# threads currently waiting. Otherwise, return nil.
def no_wait_poll
remove if can_remove_no_wait?
end

# Waits on the queue up to +timeout+ seconds, then removes and
# returns the head of the queue.
def wait_poll(timeout)
@num_waiting += 1

t0 = Time.now
elapsed = 0
loop do
@cond.wait(timeout - elapsed)

return remove if any?

elapsed = Time.now - t0
raise ConnectionTimeoutError if elapsed >= timeout
end
ensure
@num_waiting -= 1
end
end

# Every +frequency+ seconds, the reaper will call +reap+ on +pool+.
# A reaper instantiated with a nil frequency will never reap the
# connection pool.
Expand Down Expand Up @@ -100,21 +224,6 @@ def run
attr_accessor :automatic_reconnect, :checkout_timeout, :dead_connection_timeout
attr_reader :spec, :connections, :size, :reaper

class Latch # :nodoc:
def initialize
@mutex = Mutex.new
@cond = ConditionVariable.new
end

def release
@mutex.synchronize { @cond.broadcast }
end

def await
@mutex.synchronize { @cond.wait @mutex }
end
end

# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
# object which describes database connection information (e.g. adapter,
# host name, username, password, etc), as well as the maximum size for
Expand All @@ -137,9 +246,18 @@ def initialize(spec)
# default max pool size to 5
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5

@latch = Latch.new
@connections = []
@automatic_reconnect = true

@available = Queue.new self
end

# Hack for tests to be able to add connections. Do not call outside of tests
def insert_connection_for_test!(c) #:nodoc:
synchronize do
@connections << c
@available.add c
end
end

# Retrieve the connection associated with the current thread, or call
Expand Down Expand Up @@ -197,6 +315,7 @@ def disconnect!
conn.disconnect!
end
@connections = []
@available.clear
end
end

Expand All @@ -211,6 +330,10 @@ def clear_reloadable_connections!
@connections.delete_if do |conn|
conn.requires_reloading?
end
@available.clear
@connections.each do |conn|
@available.add conn
end
end
end

Expand All @@ -234,23 +357,10 @@ def clear_stale_cached_connections! # :nodoc:
# Raises:
# - PoolFullError: no connection can be obtained from the pool.
def checkout
loop do
# Checkout an available connection
synchronize do
# Try to find a connection that hasn't been leased, and lease it
conn = connections.find { |c| c.lease }

# If all connections were leased, and we have room to expand,
# create a new connection and lease it.
if !conn && connections.size < size
conn = checkout_new_connection
conn.lease
end

return checkout_and_verify(conn) if conn
end

Timeout.timeout(@checkout_timeout, PoolFullError) { @latch.await }
synchronize do
conn = acquire_connection
conn.lease
checkout_and_verify(conn)
end
end

Expand All @@ -266,21 +376,24 @@ def checkin(conn)
end

release conn

@available.add conn
end
@latch.release
end

# Remove a connection from the connection pool. The connection will
# remain open and active but will no longer be managed by this pool.
def remove(conn)
synchronize do
@connections.delete conn
@available.delete conn

# FIXME: we might want to store the key on the connection so that removing
# from the reserved hash will be a little easier.
release conn

@available.add checkout_new_connection if @available.any_waiting?
end
@latch.release
end

# Removes dead connections from the pool. A dead connection can occur
Expand All @@ -293,11 +406,35 @@ def reap
remove conn if conn.in_use? && stale > conn.last_use && !conn.active?
end
end
@latch.release
end

private

# Acquire a connection by one of 1) immediately removing one
# from the queue of available connections, 2) creating a new
# connection if the pool is not at capacity, 3) waiting on the
# queue for a connection to become available.
#
# Raises:
# - PoolFullError if a connection could not be acquired (FIXME:
# why not ConnectionTimeoutError?
def acquire_connection
if conn = @available.poll
conn
elsif @connections.size < @size
checkout_new_connection
else
t0 = Time.now
begin
@available.poll(@checkout_timeout)
rescue ConnectionTimeoutError
msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
[@checkout_timeout, Time.now - t0]
raise PoolFullError, msg
end
end
end

def release(conn)
thread_id = if @reserved_connections[current_connection_id] == conn
current_connection_id
Expand Down
Expand Up @@ -1339,6 +1339,9 @@ def test_custom_primary_key_on_new_record_should_fetch_with_query
author = Author.new(:name => "David")
assert !author.essays.loaded?

# cache metadata in advance to avoid extra sql statements executed while testing
Essay.first

assert_queries 1 do
assert_equal 1, author.essays.size
end
Expand Down
Expand Up @@ -36,7 +36,7 @@ def test_expire_mutates_in_use

def test_close
pool = ConnectionPool.new(ConnectionSpecification.new({}, nil))
pool.connections << adapter
pool.insert_connection_for_test! adapter
adapter.pool = pool

# Make sure the pool marks the connection in use
Expand Down

0 comments on commit 02b2335

Please sign in to comment.