Skip to content

Commit

Permalink
Merge pull request pycassa#189 from edevil/master
Browse files Browse the repository at this point in the history
Reduce critical section protected by global lock
  • Loading branch information
thobbs committed Feb 15, 2013
2 parents 9dd31f8 + 9a63234 commit 53454c1
Showing 1 changed file with 48 additions and 36 deletions.
84 changes: 48 additions & 36 deletions pycassa/pool.py
Expand Up @@ -512,53 +512,65 @@ def _put_conn(self, conn):
self._q.put_nowait(conn)
return conn

def _new_if_required(self, max_conns, check_empty_queue=False):
""" Creates new connection if there is room """
try:
self._pool_lock.acquire()
if (not check_empty_queue or self._q.empty()) and self._current_conns < max_conns:
new_conn = True
self._current_conns += 1
else:
new_conn = False
finally:
self._pool_lock.release()

if new_conn:
try:
return self._create_connection()
except:
try:
self._pool_lock.acquire()
self._current_conns -= 1
raise
finally:
self._pool_lock.release()
return None

def get(self):
""" Gets a connection from the pool. """
conn = None
if self._pool_threadlocal:
try:
conn = None
if self._tlocal.current:
conn = self._tlocal.current
if conn:
return conn
except AttributeError:
pass
try:
self._pool_lock.acquire()
if self._current_conns < self._pool_size:
# The pool was not prefilled, and we need to add connections to reach pool_size
conn = self._create_connection()
self._current_conns += 1

conn = self._new_if_required(self._pool_size)
if not conn:
# if queue is empty and max_overflow is not reached, create new conn
conn = self._new_if_required(self._max_conns, check_empty_queue=True)

if not conn:
# We will have to fetch from the queue, and maybe block
timeout = self.pool_timeout
if timeout == -1:
timeout = None

try:
conn = self._q.get(timeout=timeout)
except Queue.Empty:
self._notify_on_pool_max(pool_max=self._max_conns)
size_msg = "size %d" % (self._pool_size, )
if self._overflow_enabled:
size_msg += "overflow %d" % (self._max_overflow)
message = "ConnectionPool limit of %s reached, unable to obtain connection after %d seconds" \
% (size_msg, self.pool_timeout)
raise NoConnectionAvailable(message)
else:
try:
# We don't want to waste time blocking if overflow is not enabled; similarly,
# if we're not at the max overflow, we can fail quickly and create a new
# connection
timeout = self.pool_timeout
if timeout == -1:
timeout = None
block = self._current_conns >= self._max_conns
elif timeout == 0:
block = False
else:
block = self._current_conns >= self._max_conns

conn = self._q.get(block, timeout)
conn._checkout()
except Queue.Empty:
if self._current_conns < self._max_conns:
conn = self._create_connection()
self._current_conns += 1
else:
self._notify_on_pool_max(pool_max=self._max_conns)
size_msg = "size %d" % (self._pool_size, )
if self._overflow_enabled:
size_msg += "overflow %d" % (self._max_overflow)
message = "ConnectionPool limit of %s reached, unable to obtain connection after %d seconds" \
% (size_msg, self.pool_timeout)
raise NoConnectionAvailable(message)
finally:
self._pool_lock.release()
conn._checkout()

if self._pool_threadlocal:
self._tlocal.current = conn
Expand Down

0 comments on commit 53454c1

Please sign in to comment.